From 2dce79e05ab8f8fd22ca7b2f73121b2d7723f7cf Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 4 May 2016 10:13:23 -0400 Subject: [PATCH] Wait for discovery on container start error This gives discovery a chance to initialize, particularly if the K/V store being used is in a container. Signed-off-by: Brian Goff --- daemon/daemon.go | 30 +++++++++++++++++++++ daemon/daemon_test.go | 26 ++++++++++++++++++ daemon/discovery.go | 61 +++++++++++++++++++++++++++++++++---------- 3 files changed, 103 insertions(+), 14 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index 1c7ad12700..116536bdec 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -380,6 +380,9 @@ func (daemon *Daemon) restore() error { } } } + + // Make sure networks are available before starting + daemon.waitForNetworks(c) if err := daemon.containerStart(c); err != nil { logrus.Errorf("Failed to start container %s: %s", c.ID, err) } @@ -423,6 +426,33 @@ func (daemon *Daemon) restore() error { return nil } +// waitForNetworks is used during daemon initialization when starting up containers +// It ensures that all of a container's networks are available before the daemon tries to start the container. +// In practice it just makes sure the discovery service is available for containers which use a network that require discovery. +func (daemon *Daemon) waitForNetworks(c *container.Container) { + if daemon.discoveryWatcher == nil { + return + } + // Make sure if the container has a network that requires discovery that the discovery service is available before starting + for netName := range c.NetworkSettings.Networks { + // If we get `ErrNoSuchNetwork` here, it can assumed that it is due to discovery not being ready + // Most likely this is because the K/V store used for discovery is in a container and needs to be started + if _, err := daemon.netController.NetworkByName(netName); err != nil { + if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok { + continue + } + // use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host + // FIXME: why is this slow??? + logrus.Debugf("Container %s waiting for network to be ready", c.Name) + select { + case <-daemon.discoveryWatcher.ReadyCh(): + case <-time.After(60 * time.Second): + } + return + } + } +} + func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error { if img != nil && img.Config != nil { if err := merge(config, img.Config); err != nil { diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index da6bc25d88..609ed95258 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -381,6 +381,12 @@ func TestDaemonDiscoveryReload(t *testing.T) { &discovery.Entry{Host: "127.0.0.1", Port: "3333"}, } + select { + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for discovery") + case <-daemon.discoveryWatcher.ReadyCh(): + } + stopCh := make(chan struct{}) defer close(stopCh) ch, errCh := daemon.discoveryWatcher.Watch(stopCh) @@ -414,6 +420,13 @@ func TestDaemonDiscoveryReload(t *testing.T) { if err := daemon.Reload(newConfig); err != nil { t.Fatal(err) } + + select { + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for discovery") + case <-daemon.discoveryWatcher.ReadyCh(): + } + ch, errCh = daemon.discoveryWatcher.Watch(stopCh) select { @@ -450,6 +463,13 @@ func TestDaemonDiscoveryReloadFromEmptyDiscovery(t *testing.T) { if err := daemon.Reload(newConfig); err != nil { t.Fatal(err) } + + select { + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for discovery") + case <-daemon.discoveryWatcher.ReadyCh(): + } + stopCh := make(chan struct{}) defer close(stopCh) ch, errCh := daemon.discoveryWatcher.Watch(stopCh) @@ -488,6 +508,12 @@ func TestDaemonDiscoveryReloadOnlyClusterAdvertise(t *testing.T) { if err := daemon.Reload(newConfig); err != nil { t.Fatal(err) } + + select { + case <-daemon.discoveryWatcher.ReadyCh(): + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for discovery") + } stopCh := make(chan struct{}) defer close(stopCh) ch, errCh := daemon.discoveryWatcher.Watch(stopCh) diff --git a/daemon/discovery.go b/daemon/discovery.go index 13469464d4..30d2e02a71 100644 --- a/daemon/discovery.go +++ b/daemon/discovery.go @@ -27,18 +27,24 @@ type discoveryReloader interface { discovery.Watcher Stop() Reload(backend, address string, clusterOpts map[string]string) error + ReadyCh() <-chan struct{} } type daemonDiscoveryReloader struct { backend discovery.Backend ticker *time.Ticker term chan bool + readyCh chan struct{} } func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { return d.backend.Watch(stopCh) } +func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} { + return d.readyCh +} + func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) { var ( heartbeat = defaultDiscoveryHeartbeat @@ -87,38 +93,64 @@ func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[stri backend: backend, ticker: time.NewTicker(heartbeat), term: make(chan bool), + readyCh: make(chan struct{}), } // We call Register() on the discovery backend in a loop for the whole lifetime of the daemon, // but we never actually Watch() for nodes appearing and disappearing for the moment. - reloader.advertise(advertiseAddress) + go reloader.advertiseHeartbeat(advertiseAddress) return reloader, nil } -func (d *daemonDiscoveryReloader) advertise(address string) { - d.registerAddr(address) - go d.advertiseHeartbeat(address) -} - -func (d *daemonDiscoveryReloader) registerAddr(addr string) { - if err := d.backend.Register(addr); err != nil { - log.Warnf("Registering as %q in discovery failed: %v", addr, err) - } -} - // advertiseHeartbeat registers the current node against the discovery backend using the specified // address. The function never returns, as registration against the backend comes with a TTL and // requires regular heartbeats. func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) { + var ready bool + if err := d.initHeartbeat(address); err == nil { + ready = true + close(d.readyCh) + } + for { select { case <-d.ticker.C: - d.registerAddr(address) + if err := d.backend.Register(address); err != nil { + log.Warnf("Registering as %q in discovery failed: %v", address, err) + } else { + if !ready { + close(d.readyCh) + ready = true + } + } case <-d.term: return } } } +// initHeartbeat is used to do the first heartbeat. It uses a tight loop until +// either the timeout period is reached or the heartbeat is successful and returns. +func (d *daemonDiscoveryReloader) initHeartbeat(address string) error { + // Setup a short ticker until the first heartbeat has succeeded + t := time.NewTicker(500 * time.Millisecond) + defer t.Stop() + // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service + timeout := time.After(60 * time.Second) + + for { + select { + case <-timeout: + return errors.New("timeout waiting for initial discovery") + case <-d.term: + return errors.New("terminated") + case <-t.C: + if err := d.backend.Register(address); err == nil { + return nil + } + } + } +} + // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address. func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error { d.Stop() @@ -130,8 +162,9 @@ func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string d.backend = backend d.ticker = time.NewTicker(heartbeat) + d.readyCh = make(chan struct{}) - d.advertise(advertiseAddress) + go d.advertiseHeartbeat(advertiseAddress) return nil }