From eaad3ee3cf0d69aa22d1d1eca10c4ae46ff92f6e Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 9 Jan 2019 10:24:03 -0800 Subject: [PATCH] Make sure timers are stopped after use. `time.After` keeps a timer running until the specified duration is completed. It also allocates a new timer on each call. This can wind up leaving lots of uneccessary timers running in the background that are not needed and consume resources. Instead of `time.After`, use `time.NewTimer` so the timer can actually be stopped. In some of these cases it's not a big deal since the duraiton is really short, but in others it is much worse. Signed-off-by: Brian Goff --- api/server/router/system/system_routes.go | 4 +- cmd/dockerd/daemon.go | 6 ++- container/monitor.go | 5 ++- daemon/cluster/cluster.go | 5 ++- daemon/cluster/swarm.go | 5 ++- daemon/daemon.go | 22 +++++++++-- daemon/discovery/discovery.go | 6 ++- daemon/exec.go | 10 +++-- daemon/health.go | 8 +++- daemon/resize.go | 7 +++- libcontainerd/supervisor/remote_daemon.go | 47 +++++++++++++++-------- pkg/filenotify/poller.go | 11 +++++- pkg/pubsub/publisher.go | 5 ++- plugin/manager_linux.go | 15 ++++++-- restartmanager/restartmanager.go | 5 ++- 15 files changed, 121 insertions(+), 40 deletions(-) diff --git a/api/server/router/system/system_routes.go b/api/server/router/system/system_routes.go index a2ff692de3..9e43d8ad25 100644 --- a/api/server/router/system/system_routes.go +++ b/api/server/router/system/system_routes.go @@ -165,7 +165,9 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r * if !onlyPastEvents { dur := until.Sub(now) - timeout = time.After(dur) + timer := time.NewTimer(dur) + defer timer.Stop() + timeout = timer.C } } diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 0daf197270..252caa6504 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -378,10 +378,14 @@ func shutdownDaemon(d *daemon.Daemon) { logrus.Debug("Clean shutdown succeeded") return } + + timeout := time.NewTimer(time.Duration(shutdownTimeout) * time.Second) + defer timeout.Stop() + select { case <-ch: logrus.Debug("Clean shutdown succeeded") - case <-time.After(time.Duration(shutdownTimeout) * time.Second): + case <-timeout.C: logrus.Error("Force shutdown daemon") } } diff --git a/container/monitor.go b/container/monitor.go index 1735e3487e..ff4b3439e5 100644 --- a/container/monitor.go +++ b/container/monitor.go @@ -33,8 +33,11 @@ func (container *Container) Reset(lock bool) { container.LogCopier.Wait() close(exit) }() + + timer := time.NewTimer(loggerCloseTimeout) + defer timer.Stop() select { - case <-time.After(loggerCloseTimeout): + case <-timer.C: logrus.Warn("Logger didn't exit in time: logs may be truncated") case <-exit: } diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 828748bc5c..52cbaed772 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -186,8 +186,11 @@ func (c *Cluster) Start() error { } c.nr = nr + timer := time.NewTimer(swarmConnectTimeout) + defer timer.Stop() + select { - case <-time.After(swarmConnectTimeout): + case <-timer.C: logrus.Error("swarm component could not be started before timeout was reached") case err := <-nr.Ready(): if err != nil { diff --git a/daemon/cluster/swarm.go b/daemon/cluster/swarm.go index 8cc172e9ce..700c3b1eef 100644 --- a/daemon/cluster/swarm.go +++ b/daemon/cluster/swarm.go @@ -194,8 +194,11 @@ func (c *Cluster) Join(req types.JoinRequest) error { c.nr = nr c.mu.Unlock() + timeout := time.NewTimer(swarmConnectTimeout) + defer timeout.Stop() + select { - case <-time.After(swarmConnectTimeout): + case <-timeout.C: return errSwarmJoinTimeoutReached case err := <-nr.Ready(): if err != nil { diff --git a/daemon/daemon.go b/daemon/daemon.go index 651a57a139..f62d050abd 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -42,6 +42,7 @@ import ( "github.com/moby/buildkit/util/resolver" "github.com/moby/buildkit/util/tracing" "github.com/sirupsen/logrus" + // register graph drivers _ "github.com/docker/docker/daemon/graphdriver/register" "github.com/docker/docker/daemon/stats" @@ -479,12 +480,14 @@ func (daemon *Daemon) restore() error { // ignore errors here as this is a best effort to wait for children to be // running before we try to start the container children := daemon.children(c) - timeout := time.After(5 * time.Second) + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + for _, child := range children { if notifier, exists := restartContainers[child]; exists { select { case <-notifier: - case <-timeout: + case <-timeout.C: } } } @@ -602,6 +605,7 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) { if daemon.discoveryWatcher == nil { return } + // Make sure if the container has a network that requires discovery that the discovery service is available before starting for netName := range c.NetworkSettings.Networks { // If we get `ErrNoSuchNetwork` here, we can assume that it is due to discovery not being ready @@ -610,13 +614,19 @@ func (daemon *Daemon) waitForNetworks(c *container.Container) { if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok { continue } + // use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host // FIXME: why is this slow??? + dur := 60 * time.Second + timer := time.NewTimer(dur) + logrus.Debugf("Container %s waiting for network to be ready", c.Name) select { case <-daemon.discoveryWatcher.ReadyCh(): - case <-time.After(60 * time.Second): + case <-timer.C: } + timer.Stop() + return } } @@ -666,10 +676,14 @@ func (daemon *Daemon) DaemonLeavesCluster() { // This is called also on graceful daemon shutdown. We need to // wait, because the ingress release has to happen before the // network controller is stopped. + if done, err := daemon.ReleaseIngress(); err == nil { + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + select { case <-done: - case <-time.After(5 * time.Second): + case <-timeout.C: logrus.Warn("timeout while waiting for ingress network removal") } } else { diff --git a/daemon/discovery/discovery.go b/daemon/discovery/discovery.go index 092c57638a..985ab6a3a7 100644 --- a/daemon/discovery/discovery.go +++ b/daemon/discovery/discovery.go @@ -148,12 +148,14 @@ func (d *daemonDiscoveryReloader) initHeartbeat(address string) error { // Setup a short ticker until the first heartbeat has succeeded t := time.NewTicker(500 * time.Millisecond) defer t.Stop() + // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service - timeout := time.After(60 * time.Second) + timeout := time.NewTimer(60 * time.Second) + defer timeout.Stop() for { select { - case <-timeout: + case <-timeout.C: return errors.New("timeout waiting for initial discovery") case <-d.term: return errors.New("terminated") diff --git a/daemon/exec.go b/daemon/exec.go index f0b43d7253..eac2746b5c 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -22,7 +22,7 @@ import ( ) // Seconds to wait after sending TERM before trying KILL -const termProcessTimeout = 10 +const termProcessTimeout = 10 * time.Second func (d *Daemon) registerExecCommand(container *container.Container, config *exec.Config) { // Storing execs in container in order to kill them gracefully whenever the container is stopped or removed. @@ -265,9 +265,13 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R case <-ctx.Done(): logrus.Debugf("Sending TERM signal to process %v in container %v", name, c.ID) d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["TERM"])) + + timeout := time.NewTimer(termProcessTimeout) + defer timeout.Stop() + select { - case <-time.After(termProcessTimeout * time.Second): - logrus.Infof("Container %v, process %v failed to exit within %d seconds of signal TERM - using the force", c.ID, name, termProcessTimeout) + case <-timeout.C: + logrus.Infof("Container %v, process %v failed to exit within %v of signal TERM - using the force", c.ID, name, termProcessTimeout) d.containerd.SignalProcess(ctx, c.ID, name, int(signal.SignalMap["KILL"])) case <-attachErr: // TERM signal worked diff --git a/daemon/health.go b/daemon/health.go index ae0d7f8921..12bb9ee7f4 100644 --- a/daemon/health.go +++ b/daemon/health.go @@ -187,12 +187,18 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch func monitor(d *Daemon, c *container.Container, stop chan struct{}, probe probe) { probeTimeout := timeoutWithDefault(c.Config.Healthcheck.Timeout, defaultProbeTimeout) probeInterval := timeoutWithDefault(c.Config.Healthcheck.Interval, defaultProbeInterval) + + intervalTimer := time.NewTimer(probeInterval) + defer intervalTimer.Stop() + for { + intervalTimer.Reset(probeInterval) + select { case <-stop: logrus.Debugf("Stop healthcheck monitoring for container %s (received while idle)", c.ID) return - case <-time.After(probeInterval): + case <-intervalTimer.C: logrus.Debugf("Running health check for container %s ...", c.ID) startTime := time.Now() ctx, cancelProbe := context.WithTimeout(context.Background(), probeTimeout) diff --git a/daemon/resize.go b/daemon/resize.go index 21240650f8..78c78ba061 100644 --- a/daemon/resize.go +++ b/daemon/resize.go @@ -38,13 +38,16 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error if err != nil { return err } + // TODO: the timeout is hardcoded here, it would be more flexible to make it // a parameter in resize request context, which would need API changes. - timeout := 10 * time.Second + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + select { case <-ec.Started: return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height) - case <-time.After(timeout): + case <-timeout.C: return fmt.Errorf("timeout waiting for exec session ready") } } diff --git a/libcontainerd/supervisor/remote_daemon.go b/libcontainerd/supervisor/remote_daemon.go index 2acca6f33f..d1901f208a 100644 --- a/libcontainerd/supervisor/remote_daemon.go +++ b/libcontainerd/supervisor/remote_daemon.go @@ -89,8 +89,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da go r.monitorDaemon(ctx) + timeout := time.NewTimer(startupTimeout) + defer timeout.Stop() + select { - case <-time.After(startupTimeout): + case <-timeout.C: return nil, errors.New("timeout waiting for containerd to start") case err := <-r.daemonStartCh: if err != nil { @@ -101,8 +104,11 @@ func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Da return r, nil } func (r *remote) WaitTimeout(d time.Duration) error { + timeout := time.NewTimer(d) + defer timeout.Stop() + select { - case <-time.After(d): + case <-timeout.C: return errors.New("timeout waiting for containerd to stop") case <-r.daemonStopCh: } @@ -230,7 +236,8 @@ func (r *remote) monitorDaemon(ctx context.Context) { transientFailureCount = 0 client *containerd.Client err error - delay <-chan time.Time + delay time.Duration + timer = time.NewTimer(0) started bool ) @@ -245,19 +252,25 @@ func (r *remote) monitorDaemon(ctx context.Context) { r.platformCleanup() close(r.daemonStopCh) + timer.Stop() }() + // ensure no races on sending to timer.C even though there is a 0 duration. + if !timer.Stop() { + <-timer.C + } + for { - if delay != nil { - select { - case <-ctx.Done(): - r.logger.Info("stopping healthcheck following graceful shutdown") - if client != nil { - client.Close() - } - return - case <-delay: + timer.Reset(delay) + + select { + case <-ctx.Done(): + r.logger.Info("stopping healthcheck following graceful shutdown") + if client != nil { + client.Close() } + return + case <-timer.C: } if r.daemonPid == -1 { @@ -277,14 +290,14 @@ func (r *remote) monitorDaemon(ctx context.Context) { return } r.logger.WithError(err).Error("failed restarting containerd") - delay = time.After(50 * time.Millisecond) + delay = 50 * time.Millisecond continue } client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second)) if err != nil { r.logger.WithError(err).Error("failed connecting to containerd") - delay = time.After(100 * time.Millisecond) + delay = 100 * time.Millisecond continue } } @@ -300,7 +313,7 @@ func (r *remote) monitorDaemon(ctx context.Context) { } transientFailureCount = 0 - delay = time.After(500 * time.Millisecond) + delay = 500 * time.Millisecond continue } @@ -308,7 +321,7 @@ func (r *remote) monitorDaemon(ctx context.Context) { transientFailureCount++ if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) { - delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond) + delay = time.Duration(transientFailureCount) * 200 * time.Millisecond continue } } @@ -321,7 +334,7 @@ func (r *remote) monitorDaemon(ctx context.Context) { client.Close() client = nil r.daemonPid = -1 - delay = nil + delay = 0 transientFailureCount = 0 } } diff --git a/pkg/filenotify/poller.go b/pkg/filenotify/poller.go index 6161d4ab73..01ef057981 100644 --- a/pkg/filenotify/poller.go +++ b/pkg/filenotify/poller.go @@ -146,9 +146,18 @@ func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error { // upon finding changes to a file or errors, sendEvent/sendErr is called func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) { defer f.Close() + + timer := time.NewTimer(watchWaitTime) + if !timer.Stop() { + <-timer.C + } + defer timer.Stop() + for { + timer.Reset(watchWaitTime) + select { - case <-time.After(watchWaitTime): + case <-timer.C: case <-chClose: logrus.Debugf("watch for %s closed", f.Name()) return diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 76033ed9e4..32b2f18925 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -107,9 +107,12 @@ func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg // send under a select as to not block if the receiver is unavailable if p.timeout > 0 { + timeout := time.NewTimer(p.timeout) + defer timeout.Stop() + select { case sub <- v: - case <-time.After(p.timeout): + case <-timeout.C: } return } diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index 86ada8d02f..23fa462865 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -146,6 +146,8 @@ func (pm *Manager) restore(p *v2.Plugin, c *controller) error { return nil } +const shutdownTimeout = 10 * time.Second + func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) { pluginID := p.GetID() @@ -153,19 +155,26 @@ func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) { if err != nil { logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err) } else { + + timeout := time.NewTimer(shutdownTimeout) + defer timeout.Stop() + select { case <-ec: logrus.Debug("Clean shutdown of plugin") - case <-time.After(time.Second * 10): + case <-timeout.C: logrus.Debug("Force shutdown plugin") if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil { logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err) } + + timeout.Reset(shutdownTimeout) + select { case <-ec: logrus.Debug("SIGKILL plugin shutdown") - case <-time.After(time.Second * 10): - logrus.Debug("Force shutdown plugin FAILED") + case <-timeout.C: + logrus.WithField("plugin", p.Name).Warn("Force shutdown plugin FAILED") } } } diff --git a/restartmanager/restartmanager.go b/restartmanager/restartmanager.go index 6468ccf7e6..12094def60 100644 --- a/restartmanager/restartmanager.go +++ b/restartmanager/restartmanager.go @@ -107,11 +107,14 @@ func (rm *restartManager) ShouldRestart(exitCode uint32, hasBeenManuallyStopped ch := make(chan error) go func() { + timeout := time.NewTimer(rm.timeout) + defer timeout.Stop() + select { case <-rm.cancel: ch <- ErrRestartCanceled close(ch) - case <-time.After(rm.timeout): + case <-timeout.C: rm.Lock() close(ch) rm.active = false