From 5a52917e4d099eecb18a9ede260469e38df9933a Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Wed, 5 Dec 2018 03:44:45 +1100 Subject: [PATCH] daemon: switch to semaphore-gated WaitGroup for startup tasks Many startup tasks have to run for each container, and thus using a WaitGroup (which doesn't have a limit to the number of parallel tasks) can result in Docker exceeding the NOFILE limit quite trivially. A more optimal solution is to have a parallelism limit by using a semaphore. In addition, several startup tasks were not parallelised previously which resulted in very long startup times. According to my testing, 20K dead containers resulted in ~6 minute startup times (during which time Docker is completely unusable). This patch fixes both issues, and the parallelStartupTimes factor chosen (128 * NumCPU) is based on my own significant testing of the 20K container case. This patch (on my machines) reduces the startup time from 6 minutes to less than a minute (ideally this could be further reduced by removing the need to scan all dead containers on startup -- but that's beyond the scope of this patchset). In order to avoid the NOFILE limit problem, we also detect this on-startup and if NOFILE < 2*128*NumCPU we will reduce the parallelism factor to avoid hitting NOFILE limits (but also emit a warning since this is almost certainly a mis-configuration). Signed-off-by: Aleksa Sarai --- daemon/daemon.go | 212 +++++++++++++++++++++++++-------------- daemon/daemon_unix.go | 35 +++++++ daemon/daemon_windows.go | 5 + 3 files changed, 179 insertions(+), 73 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index 14b31e7667..e8f482f6f0 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -67,6 +67,7 @@ import ( "github.com/docker/libnetwork/cluster" nwconfig "github.com/docker/libnetwork/config" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) // ContainersNamespace is the name of the namespace used for users containers @@ -198,6 +199,7 @@ func (daemon *Daemon) NewResolveOptionsFunc() resolver.ResolveOptionsFunc { } func (daemon *Daemon) restore() error { + var mapLock sync.Mutex containers := make(map[string]*container.Container) logrus.Info("Loading containers: start.") @@ -207,68 +209,99 @@ func (daemon *Daemon) restore() error { return err } - for _, v := range dir { - id := v.Name() - container, err := daemon.load(id) - if err != nil { - logrus.Errorf("Failed to load container %v: %v", id, err) - continue - } - if !system.IsOSSupported(container.OS) { - logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS) - continue - } - // Ignore the container if it does not support the current driver being used by the graph - currentDriverForContainerOS := daemon.graphDrivers[container.OS] - if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS { - rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS) - if err != nil { - logrus.Errorf("Failed to load container mount %v: %v", id, err) - continue - } - container.RWLayer = rwlayer - logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning()) + // parallelLimit is the maximum number of parallel startup jobs that we + // allow (this is the limited used for all startup semaphores). The multipler + // (128) was chosen after some fairly significant benchmarking -- don't change + // it unless you've tested it significantly (this value is adjusted if + // RLIMIT_NOFILE is small to avoid EMFILE). + parallelLimit := adjustParallelLimit(len(dir), 128*runtime.NumCPU()) - containers[container.ID] = container - } else { - logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID) - } + // Re-used for all parallel startup jobs. + var group sync.WaitGroup + sem := semaphore.NewWeighted(int64(parallelLimit)) + + for _, v := range dir { + group.Add(1) + go func(id string) { + defer group.Done() + _ = sem.Acquire(context.Background(), 1) + defer sem.Release(1) + + container, err := daemon.load(id) + if err != nil { + logrus.Errorf("Failed to load container %v: %v", id, err) + return + } + if !system.IsOSSupported(container.OS) { + logrus.Errorf("Failed to load container %v: %s (%q)", id, system.ErrNotSupportedOperatingSystem, container.OS) + return + } + // Ignore the container if it does not support the current driver being used by the graph + currentDriverForContainerOS := daemon.graphDrivers[container.OS] + if (container.Driver == "" && currentDriverForContainerOS == "aufs") || container.Driver == currentDriverForContainerOS { + rwlayer, err := daemon.imageService.GetLayerByID(container.ID, container.OS) + if err != nil { + logrus.Errorf("Failed to load container mount %v: %v", id, err) + return + } + container.RWLayer = rwlayer + logrus.Debugf("Loaded container %v, isRunning: %v", container.ID, container.IsRunning()) + + mapLock.Lock() + containers[container.ID] = container + mapLock.Unlock() + } else { + logrus.Debugf("Cannot load container %s because it was created with another graph driver.", container.ID) + } + }(v.Name()) } + group.Wait() removeContainers := make(map[string]*container.Container) restartContainers := make(map[*container.Container]chan struct{}) activeSandboxes := make(map[string]interface{}) + for id, c := range containers { - if err := daemon.registerName(c); err != nil { - logrus.Errorf("Failed to register container name %s: %s", c.ID, err) - delete(containers, id) - continue - } - if err := daemon.Register(c); err != nil { - logrus.Errorf("Failed to register container %s: %s", c.ID, err) - delete(containers, id) - continue - } - - // The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver. - // We should rewrite it to use the daemon defaults. - // Fixes https://github.com/docker/docker/issues/22536 - if c.HostConfig.LogConfig.Type == "" { - if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil { - logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err) - continue - } - } - } - - var ( - wg sync.WaitGroup - mapLock sync.Mutex - ) - for _, c := range containers { - wg.Add(1) + group.Add(1) go func(c *container.Container) { - defer wg.Done() + defer group.Done() + _ = sem.Acquire(context.Background(), 1) + defer sem.Release(1) + + if err := daemon.registerName(c); err != nil { + logrus.Errorf("Failed to register container name %s: %s", c.ID, err) + mapLock.Lock() + delete(containers, id) + mapLock.Unlock() + return + } + if err := daemon.Register(c); err != nil { + logrus.Errorf("Failed to register container %s: %s", c.ID, err) + mapLock.Lock() + delete(containers, id) + mapLock.Unlock() + return + } + + // The LogConfig.Type is empty if the container was created before docker 1.12 with default log driver. + // We should rewrite it to use the daemon defaults. + // Fixes https://github.com/docker/docker/issues/22536 + if c.HostConfig.LogConfig.Type == "" { + if err := daemon.mergeAndVerifyLogConfig(&c.HostConfig.LogConfig); err != nil { + logrus.Errorf("Failed to verify log config for container %s: %q", c.ID, err) + } + } + }(c) + } + group.Wait() + + for _, c := range containers { + group.Add(1) + go func(c *container.Container) { + defer group.Done() + _ = sem.Acquire(context.Background(), 1) + defer sem.Release(1) + daemon.backportMountSpec(c) if err := daemon.checkpointAndSave(c); err != nil { logrus.WithError(err).WithField("container", c.ID).Error("error saving backported mountspec to disk") @@ -415,7 +448,8 @@ func (daemon *Daemon) restore() error { c.Unlock() }(c) } - wg.Wait() + group.Wait() + daemon.netController, err = daemon.initNetworkController(daemon.configStore, activeSandboxes) if err != nil { return fmt.Errorf("Error initializing network controller: %v", err) @@ -423,18 +457,24 @@ func (daemon *Daemon) restore() error { // Now that all the containers are registered, register the links for _, c := range containers { - if err := daemon.registerLinks(c, c.HostConfig); err != nil { - logrus.Errorf("failed to register link for container %s: %v", c.ID, err) - } - } + group.Add(1) + go func(c *container.Container) { + _ = sem.Acquire(context.Background(), 1) + + if err := daemon.registerLinks(c, c.HostConfig); err != nil { + logrus.Errorf("failed to register link for container %s: %v", c.ID, err) + } + + sem.Release(1) + group.Done() + }(c) + } + group.Wait() - group := sync.WaitGroup{} for c, notifier := range restartContainers { group.Add(1) - go func(c *container.Container, chNotify chan struct{}) { - defer group.Done() - + _ = sem.Acquire(context.Background(), 1) logrus.Debugf("Starting container %s", c.ID) // ignore errors here as this is a best effort to wait for children to be @@ -456,22 +496,27 @@ func (daemon *Daemon) restore() error { logrus.Errorf("Failed to start container %s: %s", c.ID, err) } close(chNotify) - }(c, notifier) + sem.Release(1) + group.Done() + }(c, notifier) } group.Wait() - removeGroup := sync.WaitGroup{} for id := range removeContainers { - removeGroup.Add(1) + group.Add(1) go func(cid string) { + _ = sem.Acquire(context.Background(), 1) + if err := daemon.ContainerRm(cid, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil { logrus.Errorf("Failed to remove container %s: %s", cid, err) } - removeGroup.Done() + + sem.Release(1) + group.Done() }(id) } - removeGroup.Wait() + group.Wait() // any containers that were started above would already have had this done, // however we need to now prepare the mountpoints for the rest of the containers as well. @@ -492,13 +537,16 @@ func (daemon *Daemon) restore() error { group.Add(1) go func(c *container.Container) { - defer group.Done() + _ = sem.Acquire(context.Background(), 1) + if err := daemon.prepareMountPoints(c); err != nil { logrus.Error(err) } + + sem.Release(1) + group.Done() }(c) } - group.Wait() logrus.Info("Loading containers: done.") @@ -509,7 +557,18 @@ func (daemon *Daemon) restore() error { // RestartSwarmContainers restarts any autostart container which has a // swarm endpoint. func (daemon *Daemon) RestartSwarmContainers() { - group := sync.WaitGroup{} + ctx := context.Background() + + // parallelLimit is the maximum number of parallel startup jobs that we + // allow (this is the limited used for all startup semaphores). The multipler + // (128) was chosen after some fairly significant benchmarking -- don't change + // it unless you've tested it significantly (this value is adjusted if + // RLIMIT_NOFILE is small to avoid EMFILE). + parallelLimit := adjustParallelLimit(len(daemon.List()), 128*runtime.NumCPU()) + + var group sync.WaitGroup + sem := semaphore.NewWeighted(int64(parallelLimit)) + for _, c := range daemon.List() { if !c.IsRunning() && !c.IsPaused() { // Autostart all the containers which has a @@ -518,14 +577,21 @@ func (daemon *Daemon) RestartSwarmContainers() { if daemon.configStore.AutoRestart && c.ShouldRestart() && c.NetworkSettings.HasSwarmEndpoint && c.HasBeenStartedBefore { group.Add(1) go func(c *container.Container) { - defer group.Done() + if err := sem.Acquire(ctx, 1); err != nil { + // ctx is done. + group.Done() + return + } + if err := daemon.containerStart(c, "", "", true); err != nil { logrus.Error(err) } + + sem.Release(1) + group.Done() }(c) } } - } group.Wait() } diff --git a/daemon/daemon_unix.go b/daemon/daemon_unix.go index b18fca17c7..3148b48840 100644 --- a/daemon/daemon_unix.go +++ b/daemon/daemon_unix.go @@ -257,6 +257,41 @@ func getBlkioThrottleDevices(devs []*blkiodev.ThrottleDevice) ([]specs.LinuxThro return throttleDevices, nil } +// adjustParallelLimit takes a number of objects and a proposed limit and +// figures out if it's reasonable (and adjusts it accordingly). This is only +// used for daemon startup, which does a lot of parallel loading of containers +// (and if we exceed RLIMIT_NOFILE then we're in trouble). +func adjustParallelLimit(n int, limit int) int { + // Rule-of-thumb overhead factor (how many files will each goroutine open + // simultaneously). Yes, this is ugly but to be frank this whole thing is + // ugly. + const overhead = 2 + + // On Linux, we need to ensure that parallelStartupJobs doesn't cause us to + // exceed RLIMIT_NOFILE. If parallelStartupJobs is too large, we reduce it + // and give a warning (since in theory the user should increase their + // ulimits to the largest possible value for dockerd). + var rlim unix.Rlimit + if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rlim); err != nil { + logrus.Warnf("Couldn't find dockerd's RLIMIT_NOFILE to double-check startup parallelism factor: %v", err) + return limit + } + softRlimit := int(rlim.Cur) + + // Much fewer containers than RLIMIT_NOFILE. No need to adjust anything. + if softRlimit > overhead*n { + return limit + } + + // RLIMIT_NOFILE big enough, no need to adjust anything. + if softRlimit > overhead*limit { + return limit + } + + logrus.Warnf("Found dockerd's open file ulimit (%v) is far too small -- consider increasing it significantly (at least %v)", softRlimit, overhead*limit) + return softRlimit / overhead +} + func checkKernel() error { // Check for unsupported kernel versions // FIXME: it would be cleaner to not test for specific versions, but rather diff --git a/daemon/daemon_windows.go b/daemon/daemon_windows.go index e534d7eccc..38b19eaf89 100644 --- a/daemon/daemon_windows.go +++ b/daemon/daemon_windows.go @@ -40,6 +40,11 @@ const ( windowsMaxCPUPercent = 100 ) +// Windows doesn't really have rlimits. +func adjustParallelLimit(n int, limit int) int { + return limit +} + // Windows has no concept of an execution state directory. So use config.Root here. func getPluginExecRoot(root string) string { return filepath.Join(root, "plugins")