diff --git a/buildfile.go b/buildfile.go index 97cf35c406..a2b6da7347 100644 --- a/buildfile.go +++ b/buildfile.go @@ -488,7 +488,7 @@ func (b *buildFile) CmdAdd(args string) error { } b.tmpContainers[container.ID] = struct{}{} - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return err } defer container.Unmount() @@ -610,7 +610,7 @@ func (b *buildFile) commit(id string, autoCmd []string, comment string) error { b.tmpContainers[container.ID] = struct{}{} fmt.Fprintf(b.outStream, " ---> Running in %s\n", utils.TruncateID(container.ID)) id = container.ID - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return err } defer container.Unmount() diff --git a/container.go b/container.go index 5b068aa987..7ebfb3e397 100644 --- a/container.go +++ b/container.go @@ -201,9 +201,10 @@ func (settings *NetworkSettings) PortMappingAPI() []APIPort { // Inject the io.Reader at the given path. Note: do not close the reader func (container *Container) Inject(file io.Reader, pth string) error { - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return fmt.Errorf("inject: error mounting container %s: %s", container.ID, err) } + defer container.Unmount() // Return error if path exists destPath := path.Join(container.RootfsPath(), pth) @@ -505,7 +506,7 @@ func (container *Container) Start() (err error) { container.cleanup() } }() - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return err } if container.runtime.networkManager.disabled { @@ -1287,21 +1288,27 @@ func (container *Container) Resize(h, w int) error { } func (container *Container) ExportRw() (archive.Archive, error) { - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return nil, err } if container.runtime == nil { return nil, fmt.Errorf("Can't load storage driver for unregistered container %s", container.ID) } + defer container.Unmount() return container.runtime.Diff(container) } func (container *Container) Export() (archive.Archive, error) { - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return nil, err } - return archive.Tar(container.RootfsPath(), archive.Uncompressed) + + archive, err := archive.Tar(container.RootfsPath(), archive.Uncompressed) + if err != nil { + return nil, err + } + return EofReader(archive, func() { container.Unmount() }), nil } func (container *Container) WaitTimeout(timeout time.Duration) error { @@ -1319,12 +1326,6 @@ func (container *Container) WaitTimeout(timeout time.Duration) error { } } -func (container *Container) EnsureMounted() error { - // FIXME: EnsureMounted is deprecated because drivers are now responsible - // for re-entrant mounting in their Get() method. - return container.Mount() -} - func (container *Container) Mount() error { return container.runtime.Mount(container) } @@ -1422,10 +1423,11 @@ func (container *Container) GetSize() (int64, int64) { driver = container.runtime.driver ) - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { utils.Errorf("Warning: failed to compute size of container rootfs %s: %s", container.ID, err) return sizeRw, sizeRootfs } + defer container.Unmount() if differ, ok := container.runtime.driver.(graphdriver.Differ); ok { sizeRw, err = differ.DiffSize(container.ID) @@ -1453,13 +1455,14 @@ func (container *Container) GetSize() (int64, int64) { } func (container *Container) Copy(resource string) (archive.Archive, error) { - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return nil, err } var filter []string basePath := path.Join(container.RootfsPath(), resource) stat, err := os.Stat(basePath) if err != nil { + container.Unmount() return nil, err } if !stat.IsDir() { @@ -1470,10 +1473,15 @@ func (container *Container) Copy(resource string) (archive.Archive, error) { filter = []string{path.Base(basePath)} basePath = path.Dir(basePath) } - return archive.TarFilter(basePath, &archive.TarOptions{ + + archive, err := archive.TarFilter(basePath, &archive.TarOptions{ Compression: archive.Uncompressed, Includes: filter, }) + if err != nil { + return nil, err + } + return EofReader(archive, func() { container.Unmount() }), nil } // Returns true if the container exposes a certain port diff --git a/graph.go b/graph.go index 176626d60a..42da42c8af 100644 --- a/graph.go +++ b/graph.go @@ -97,6 +97,7 @@ func (graph *Graph) Get(name string) (*Image, error) { if err != nil { return nil, fmt.Errorf("Driver %s failed to get image rootfs %s: %s", graph.driver, img.ID, err) } + defer graph.driver.Put(img.ID) var size int64 if img.Parent == "" { @@ -193,6 +194,7 @@ func (graph *Graph) Register(jsonData []byte, layerData archive.Archive, img *Im if err != nil { return fmt.Errorf("Driver %s failed to get image rootfs %s: %s", graph.driver, img.ID, err) } + defer graph.driver.Put(img.ID) img.graph = graph if err := StoreImage(img, jsonData, layerData, tmp, rootfs); err != nil { return err diff --git a/graphdriver/aufs/aufs.go b/graphdriver/aufs/aufs.go index 81cdd88134..8ab544eeba 100644 --- a/graphdriver/aufs/aufs.go +++ b/graphdriver/aufs/aufs.go @@ -31,6 +31,7 @@ import ( "os/exec" "path" "strings" + "sync" ) func init() { @@ -38,7 +39,9 @@ func init() { } type Driver struct { - root string + root string + sync.Mutex // Protects concurrent modification to active + active map[string]int } // New returns a new AUFS driver. @@ -54,12 +57,17 @@ func Init(root string) (graphdriver.Driver, error) { "layers", } + a := &Driver{ + root: root, + active: make(map[string]int), + } + // Create the root aufs driver dir and return // if it already exists // If not populate the dir structure if err := os.MkdirAll(root, 0755); err != nil { if os.IsExist(err) { - return &Driver{root}, nil + return a, nil } return nil, err } @@ -69,7 +77,7 @@ func Init(root string) (graphdriver.Driver, error) { return nil, err } } - return &Driver{root}, nil + return a, nil } // Return a nil error if the kernel supports aufs @@ -167,6 +175,14 @@ func (a *Driver) createDirsFor(id string) error { // Unmount and remove the dir information func (a *Driver) Remove(id string) error { + // Protect the a.active from concurrent access + a.Lock() + defer a.Unlock() + + if a.active[id] != 0 { + utils.Errorf("Warning: removing active id %s\n", id) + } + // Make sure the dir is umounted first if err := a.unmount(id); err != nil { return err @@ -210,18 +226,47 @@ func (a *Driver) Get(id string) (string, error) { ids = []string{} } + // Protect the a.active from concurrent access + a.Lock() + defer a.Unlock() + + count := a.active[id] + // If a dir does not have a parent ( no layers )do not try to mount // just return the diff path to the data out := path.Join(a.rootPath(), "diff", id) if len(ids) > 0 { out = path.Join(a.rootPath(), "mnt", id) - if err := a.mount(id); err != nil { - return "", err + + if count == 0 { + if err := a.mount(id); err != nil { + return "", err + } } } + + a.active[id] = count + 1 + return out, nil } +func (a *Driver) Put(id string) { + // Protect the a.active from concurrent access + a.Lock() + defer a.Unlock() + + if count := a.active[id]; count > 1 { + a.active[id] = count - 1 + } else { + ids, _ := getParentIds(a.rootPath(), id) + // We only mounted if there are any parents + if ids != nil && len(ids) > 0 { + a.unmount(id) + } + delete(a.active, id) + } +} + // Returns an archive of the contents for the id func (a *Driver) Diff(id string) (archive.Archive, error) { return archive.TarFilter(path.Join(a.rootPath(), "diff", id), &archive.TarOptions{ diff --git a/graphdriver/devmapper/driver.go b/graphdriver/devmapper/driver.go index 10ac172562..dae712b9b5 100644 --- a/graphdriver/devmapper/driver.go +++ b/graphdriver/devmapper/driver.go @@ -5,8 +5,10 @@ package devmapper import ( "fmt" "github.com/dotcloud/docker/graphdriver" + "github.com/dotcloud/docker/utils" "io/ioutil" "path" + "sync" ) func init() { @@ -20,7 +22,9 @@ func init() { type Driver struct { *DeviceSet - home string + home string + sync.Mutex // Protects concurrent modification to active + active map[string]int } var Init = func(home string) (graphdriver.Driver, error) { @@ -31,6 +35,7 @@ var Init = func(home string) (graphdriver.Driver, error) { d := &Driver{ DeviceSet: deviceSet, home: home, + active: make(map[string]int), } return d, nil } @@ -82,6 +87,14 @@ func (d *Driver) Create(id, parent string) error { } func (d *Driver) Remove(id string) error { + // Protect the d.active from concurrent access + d.Lock() + defer d.Unlock() + + if d.active[id] != 0 { + utils.Errorf("Warning: removing active id %s\n", id) + } + mp := path.Join(d.home, "mnt", id) if err := d.unmount(id, mp); err != nil { return err @@ -90,13 +103,38 @@ func (d *Driver) Remove(id string) error { } func (d *Driver) Get(id string) (string, error) { + // Protect the d.active from concurrent access + d.Lock() + defer d.Unlock() + + count := d.active[id] + mp := path.Join(d.home, "mnt", id) - if err := d.mount(id, mp); err != nil { - return "", err + if count == 0 { + if err := d.mount(id, mp); err != nil { + return "", err + } } + + d.active[id] = count + 1 + return path.Join(mp, "rootfs"), nil } +func (d *Driver) Put(id string) { + // Protect the d.active from concurrent access + d.Lock() + defer d.Unlock() + + if count := d.active[id]; count > 1 { + d.active[id] = count - 1 + } else { + mp := path.Join(d.home, "mnt", id) + d.unmount(id, mp) + delete(d.active, id) + } +} + func (d *Driver) mount(id, mountPoint string) error { // Create the target directories if they don't exist if err := osMkdirAll(mountPoint, 0755); err != nil && !osIsExist(err) { diff --git a/graphdriver/driver.go b/graphdriver/driver.go index 1d5995dffc..2be3f05f3a 100644 --- a/graphdriver/driver.go +++ b/graphdriver/driver.go @@ -17,6 +17,7 @@ type Driver interface { Remove(id string) error Get(id string) (dir string, err error) + Put(id string) Exists(id string) bool Status() [][2]string diff --git a/graphdriver/vfs/driver.go b/graphdriver/vfs/driver.go index 12230f463a..21da63878a 100644 --- a/graphdriver/vfs/driver.go +++ b/graphdriver/vfs/driver.go @@ -84,6 +84,11 @@ func (d *Driver) Get(id string) (string, error) { return dir, nil } +func (d *Driver) Put(id string) { + // The vfs driver has no runtime resources (e.g. mounts) + // to clean up, so we don't need anything here +} + func (d *Driver) Exists(id string) bool { _, err := os.Stat(d.dir(id)) return err == nil diff --git a/image.go b/image.go index f062910ef8..7652824d49 100644 --- a/image.go +++ b/image.go @@ -104,6 +104,7 @@ func StoreImage(img *Image, jsonData []byte, layerData archive.Archive, root, la if err != nil { return err } + defer driver.Put(img.Parent) changes, err := archive.ChangesDirs(layer, parent) if err != nil { return err @@ -147,7 +148,7 @@ func jsonPath(root string) string { } // TarLayer returns a tar archive of the image's filesystem layer. -func (img *Image) TarLayer() (archive.Archive, error) { +func (img *Image) TarLayer() (arch archive.Archive, err error) { if img.graph == nil { return nil, fmt.Errorf("Can't load storage driver for unregistered image %s", img.ID) } @@ -160,19 +161,35 @@ func (img *Image) TarLayer() (archive.Archive, error) { if err != nil { return nil, err } + + defer func() { + if err == nil { + driver.Put(img.ID) + } + }() + if img.Parent == "" { - return archive.Tar(imgFs, archive.Uncompressed) - } else { - parentFs, err := driver.Get(img.Parent) + archive, err := archive.Tar(imgFs, archive.Uncompressed) if err != nil { return nil, err } - changes, err := archive.ChangesDirs(imgFs, parentFs) - if err != nil { - return nil, err - } - return archive.ExportChanges(imgFs, changes) + return EofReader(archive, func() { driver.Put(img.ID) }), nil } + + parentFs, err := driver.Get(img.Parent) + if err != nil { + return nil, err + } + defer driver.Put(img.Parent) + changes, err := archive.ChangesDirs(imgFs, parentFs) + if err != nil { + return nil, err + } + archive, err := archive.ExportChanges(imgFs, changes) + if err != nil { + return nil, err + } + return EofReader(archive, func() { driver.Put(img.ID) }), nil } func ValidateID(id string) error { diff --git a/integration/utils_test.go b/integration/utils_test.go index a7734fb257..d7a2814472 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -71,9 +71,10 @@ func containerRun(eng *engine.Engine, id string, t utils.Fataler) { func containerFileExists(eng *engine.Engine, id, dir string, t utils.Fataler) bool { c := getContainer(eng, id, t) - if err := c.EnsureMounted(); err != nil { + if err := c.Mount(); err != nil { t.Fatal(err) } + defer c.Unmount() if _, err := os.Stat(path.Join(c.RootfsPath(), dir)); err != nil { if os.IsNotExist(err) { return false diff --git a/runtime.go b/runtime.go index d95fc11b45..1b17bac973 100644 --- a/runtime.go +++ b/runtime.go @@ -134,6 +134,7 @@ func (runtime *Runtime) Register(container *Container) error { if err != nil { return fmt.Errorf("Error getting container filesystem %s from driver %s: %s", container.ID, runtime.driver, err) } + defer runtime.driver.Put(container.ID) container.rootfs = rootfs container.runtime = runtime @@ -466,6 +467,8 @@ func (runtime *Runtime) Create(config *Config, name string) (*Container, []strin if err != nil { return nil, nil, err } + defer runtime.driver.Put(initID) + if err := setupInitLayer(initPath); err != nil { return nil, nil, err } @@ -523,9 +526,10 @@ func (runtime *Runtime) Create(config *Config, name string) (*Container, []strin func (runtime *Runtime) Commit(container *Container, repository, tag, comment, author string, config *Config) (*Image, error) { // FIXME: freeze the container before copying it to avoid data corruption? // FIXME: this shouldn't be in commands. - if err := container.EnsureMounted(); err != nil { + if err := container.Mount(); err != nil { return nil, err } + defer container.Unmount() rwTar, err := container.ExportRw() if err != nil { @@ -769,8 +773,7 @@ func (runtime *Runtime) Mount(container *Container) error { } func (runtime *Runtime) Unmount(container *Container) error { - // FIXME: Unmount is deprecated because drivers are responsible for mounting - // and unmounting when necessary. Use driver.Remove() instead. + runtime.driver.Put(container.ID) return nil } @@ -782,10 +785,12 @@ func (runtime *Runtime) Changes(container *Container) ([]archive.Change, error) if err != nil { return nil, fmt.Errorf("Error getting container rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err) } + defer runtime.driver.Put(container.ID) initDir, err := runtime.driver.Get(container.ID + "-init") if err != nil { return nil, fmt.Errorf("Error getting container init rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err) } + defer runtime.driver.Put(container.ID + "-init") return archive.ChangesDirs(cDir, initDir) } @@ -804,7 +809,11 @@ func (runtime *Runtime) Diff(container *Container) (archive.Archive, error) { return nil, fmt.Errorf("Error getting container rootfs %s from driver %s: %s", container.ID, container.runtime.driver, err) } - return archive.ExportChanges(cDir, changes) + archive, err := archive.ExportChanges(cDir, changes) + if err != nil { + return nil, err + } + return EofReader(archive, func() { runtime.driver.Put(container.ID) }), nil } func (runtime *Runtime) Run(c *Container, startCallback execdriver.StartCallback) (int, error) { diff --git a/utils.go b/utils.go index f0591158a4..e3ba08d51c 100644 --- a/utils.go +++ b/utils.go @@ -5,8 +5,10 @@ import ( "github.com/dotcloud/docker/archive" "github.com/dotcloud/docker/pkg/namesgenerator" "github.com/dotcloud/docker/utils" + "io" "strconv" "strings" + "sync/atomic" ) type Change struct { @@ -339,3 +341,28 @@ func (c *checker) Exists(name string) bool { func generateRandomName(runtime *Runtime) (string, error) { return namesgenerator.GenerateRandomName(&checker{runtime}) } + +// Read an io.Reader and call a function when it returns EOF +func EofReader(r io.Reader, callback func()) *eofReader { + return &eofReader{ + Reader: r, + callback: callback, + } +} + +type eofReader struct { + io.Reader + gotEOF int32 + callback func() +} + +func (r *eofReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + if err == io.EOF { + // Use atomics to make the gotEOF check threadsafe + if atomic.CompareAndSwapInt32(&r.gotEOF, 0, 1) { + r.callback() + } + } + return +}