Merge pull request #25523 from dmcgowan/fsync-layer-filestore

Update layer store to sync transaction files before committing
This commit is contained in:
Aaron Lehmann 2016-09-08 10:03:12 -07:00 committed by GitHub
commit bc06542a17
3 changed files with 189 additions and 16 deletions

View File

@ -34,7 +34,7 @@ type fileMetadataStore struct {
type fileMetadataTransaction struct { type fileMetadataTransaction struct {
store *fileMetadataStore store *fileMetadataStore
root string ws *ioutils.AtomicWriteSet
} }
// NewFSMetadataStore returns an instance of a metadata store // NewFSMetadataStore returns an instance of a metadata store
@ -71,33 +71,32 @@ func (fms *fileMetadataStore) StartTransaction() (MetadataTransaction, error) {
if err := os.MkdirAll(tmpDir, 0755); err != nil { if err := os.MkdirAll(tmpDir, 0755); err != nil {
return nil, err return nil, err
} }
ws, err := ioutils.NewAtomicWriteSet(tmpDir)
td, err := ioutil.TempDir(tmpDir, "layer-")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create a new tempdir
return &fileMetadataTransaction{ return &fileMetadataTransaction{
store: fms, store: fms,
root: td, ws: ws,
}, nil }, nil
} }
func (fm *fileMetadataTransaction) SetSize(size int64) error { func (fm *fileMetadataTransaction) SetSize(size int64) error {
content := fmt.Sprintf("%d", size) content := fmt.Sprintf("%d", size)
return ioutil.WriteFile(filepath.Join(fm.root, "size"), []byte(content), 0644) return fm.ws.WriteFile("size", []byte(content), 0644)
} }
func (fm *fileMetadataTransaction) SetParent(parent ChainID) error { func (fm *fileMetadataTransaction) SetParent(parent ChainID) error {
return ioutil.WriteFile(filepath.Join(fm.root, "parent"), []byte(digest.Digest(parent).String()), 0644) return fm.ws.WriteFile("parent", []byte(digest.Digest(parent).String()), 0644)
} }
func (fm *fileMetadataTransaction) SetDiffID(diff DiffID) error { func (fm *fileMetadataTransaction) SetDiffID(diff DiffID) error {
return ioutil.WriteFile(filepath.Join(fm.root, "diff"), []byte(digest.Digest(diff).String()), 0644) return fm.ws.WriteFile("diff", []byte(digest.Digest(diff).String()), 0644)
} }
func (fm *fileMetadataTransaction) SetCacheID(cacheID string) error { func (fm *fileMetadataTransaction) SetCacheID(cacheID string) error {
return ioutil.WriteFile(filepath.Join(fm.root, "cache-id"), []byte(cacheID), 0644) return fm.ws.WriteFile("cache-id", []byte(cacheID), 0644)
} }
func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) error { func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) error {
@ -105,11 +104,11 @@ func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) er
if err != nil { if err != nil {
return err return err
} }
return ioutil.WriteFile(filepath.Join(fm.root, "descriptor.json"), jsonRef, 0644) return fm.ws.WriteFile("descriptor.json", jsonRef, 0644)
} }
func (fm *fileMetadataTransaction) TarSplitWriter(compressInput bool) (io.WriteCloser, error) { func (fm *fileMetadataTransaction) TarSplitWriter(compressInput bool) (io.WriteCloser, error) {
f, err := os.OpenFile(filepath.Join(fm.root, "tar-split.json.gz"), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) f, err := fm.ws.FileWriter("tar-split.json.gz", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -131,15 +130,16 @@ func (fm *fileMetadataTransaction) Commit(layer ChainID) error {
if err := os.MkdirAll(filepath.Dir(finalDir), 0755); err != nil { if err := os.MkdirAll(filepath.Dir(finalDir), 0755); err != nil {
return err return err
} }
return os.Rename(fm.root, finalDir)
return fm.ws.Commit(finalDir)
} }
func (fm *fileMetadataTransaction) Cancel() error { func (fm *fileMetadataTransaction) Cancel() error {
return os.RemoveAll(fm.root) return fm.ws.Cancel()
} }
func (fm *fileMetadataTransaction) String() string { func (fm *fileMetadataTransaction) String() string {
return fm.root return fm.ws.String()
} }
func (fms *fileMetadataStore) GetSize(layer ChainID) (int64, error) { func (fms *fileMetadataStore) GetSize(layer ChainID) (int64, error) {

View File

@ -80,3 +80,83 @@ func (w *atomicFileWriter) Close() (retErr error) {
} }
return nil return nil
} }
// AtomicWriteSet is used to atomically write a set
// of files and ensure they are visible at the same time.
// Must be committed to a new directory.
type AtomicWriteSet struct {
root string
}
// NewAtomicWriteSet creates a new atomic write set to
// atomically create a set of files. The given directory
// is used as the base directory for storing files before
// commit. If no temporary directory is given the system
// default is used.
func NewAtomicWriteSet(tmpDir string) (*AtomicWriteSet, error) {
td, err := ioutil.TempDir(tmpDir, "write-set-")
if err != nil {
return nil, err
}
return &AtomicWriteSet{
root: td,
}, nil
}
// WriteFile writes a file to the set, guaranteeing the file
// has been synced.
func (ws *AtomicWriteSet) WriteFile(filename string, data []byte, perm os.FileMode) error {
f, err := ws.FileWriter(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}
type syncFileCloser struct {
*os.File
}
func (w syncFileCloser) Close() error {
err := w.File.Sync()
if err1 := w.File.Close(); err == nil {
err = err1
}
return err
}
// FileWriter opens a file writer inside the set. The file
// should be synced and closed before calling commit.
func (ws *AtomicWriteSet) FileWriter(name string, flag int, perm os.FileMode) (io.WriteCloser, error) {
f, err := os.OpenFile(filepath.Join(ws.root, name), flag, perm)
if err != nil {
return nil, err
}
return syncFileCloser{f}, nil
}
// Cancel cancels the set and removes all temporary data
// created in the set.
func (ws *AtomicWriteSet) Cancel() error {
return os.RemoveAll(ws.root)
}
// Commit moves all created files to the target directory. The
// target directory must not exist and the parent of the target
// directory must exist.
func (ws *AtomicWriteSet) Commit(target string) error {
return os.Rename(ws.root, target)
}
// String returns the location the set is writing to.
func (ws *AtomicWriteSet) String() string {
return ws.root
}

View File

@ -5,9 +5,21 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"testing" "testing"
) )
var (
testMode os.FileMode = 0640
)
func init() {
// Windows does not support full Linux file mode
if runtime.GOOS == "windows" {
testMode = 0666
}
}
func TestAtomicWriteToFile(t *testing.T) { func TestAtomicWriteToFile(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writers-test") tmpDir, err := ioutil.TempDir("", "atomic-writers-test")
if err != nil { if err != nil {
@ -16,7 +28,7 @@ func TestAtomicWriteToFile(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
expected := []byte("barbaz") expected := []byte("barbaz")
if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, 0666); err != nil { if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err) t.Fatalf("Error writing to file: %v", err)
} }
@ -33,7 +45,88 @@ func TestAtomicWriteToFile(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Error statting file: %v", err) t.Fatalf("Error statting file: %v", err)
} }
if expected := os.FileMode(0666); st.Mode() != expected { if expected := os.FileMode(testMode); st.Mode() != expected {
t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode()) t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
} }
} }
func TestAtomicWriteSetCommit(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
if err != nil {
t.Fatalf("Error when creating temporary directory: %s", err)
}
defer os.RemoveAll(tmpDir)
if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
t.Fatalf("Error creating tmp directory: %s", err)
}
targetDir := filepath.Join(tmpDir, "target")
ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
if err != nil {
t.Fatalf("Error creating atomic write set: %s", err)
}
expected := []byte("barbaz")
if err := ws.WriteFile("foo", expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err)
}
if _, err := ioutil.ReadFile(filepath.Join(targetDir, "foo")); err == nil {
t.Fatalf("Expected error reading file where should not exist")
}
if err := ws.Commit(targetDir); err != nil {
t.Fatalf("Error committing file: %s", err)
}
actual, err := ioutil.ReadFile(filepath.Join(targetDir, "foo"))
if err != nil {
t.Fatalf("Error reading from file: %v", err)
}
if bytes.Compare(actual, expected) != 0 {
t.Fatalf("Data mismatch, expected %q, got %q", expected, actual)
}
st, err := os.Stat(filepath.Join(targetDir, "foo"))
if err != nil {
t.Fatalf("Error statting file: %v", err)
}
if expected := os.FileMode(testMode); st.Mode() != expected {
t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
}
}
func TestAtomicWriteSetCancel(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
if err != nil {
t.Fatalf("Error when creating temporary directory: %s", err)
}
defer os.RemoveAll(tmpDir)
if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
t.Fatalf("Error creating tmp directory: %s", err)
}
ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
if err != nil {
t.Fatalf("Error creating atomic write set: %s", err)
}
expected := []byte("barbaz")
if err := ws.WriteFile("foo", expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err)
}
if err := ws.Cancel(); err != nil {
t.Fatalf("Error committing file: %s", err)
}
if _, err := ioutil.ReadFile(filepath.Join(tmpDir, "target", "foo")); err == nil {
t.Fatalf("Expected error reading file where should not exist")
} else if !os.IsNotExist(err) {
t.Fatalf("Unexpected error reading file: %s", err)
}
}