diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index f4b97ce660..720b8447fc 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -32,6 +32,9 @@ import ( "golang.org/x/time/rate" ) +// nodeAttachmentReadyInterval is the interval to poll +const nodeAttachmentReadyInterval = 100 * time.Millisecond + // containerAdapter conducts remote operations for a container. All calls // are mostly naked calls to the client API, seeded with information from // containerConfig. @@ -146,6 +149,55 @@ func (c *containerAdapter) pullImage(ctx context.Context) error { return nil } +// waitNodeAttachments validates that NetworkAttachments exist on this node +// for every network in use by this task. It blocks until the network +// attachments are ready, or the context times out. If it returns nil, then the +// node's network attachments are all there. +func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error { + // to do this, we're going to get the attachment store and try getting the + // IP address for each network. if any network comes back not existing, + // we'll wait and try again. + attachmentStore := c.backend.GetAttachmentStore() + if attachmentStore == nil { + return fmt.Errorf("error getting attachment store") + } + + // essentially, we're long-polling here. this is really sub-optimal, but a + // better solution based off signaling channels would require a more + // substantial rearchitecture and probably not be worth our time in terms + // of performance gains. + poll := time.NewTicker(nodeAttachmentReadyInterval) + defer poll.Stop() + for { + // set a flag ready to true. if we try to get a network IP that doesn't + // exist yet, we will set this flag to "false" + ready := true + for _, attachment := range c.container.networksAttachments { + // we only need node attachments (IP address) for overlay networks + // TODO(dperny): unsure if this will work with other network + // drivers, but i also don't think other network drivers use the + // node attachment IP address. + if attachment.Network.DriverState.Name == "overlay" { + if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists { + ready = false + } + } + } + + // if everything is ready here, then we can just return no error + if ready { + return nil + } + + // otherwise, try polling again, or wait for context canceled. + select { + case <-ctx.Done(): + return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted") + case <-poll.C: + } + } +} + func (c *containerAdapter) createNetworks(ctx context.Context) error { for name := range c.container.networksAttachments { ncr, err := c.container.networkCreateRequest(name) diff --git a/daemon/cluster/executor/container/adapter_test.go b/daemon/cluster/executor/container/adapter_test.go new file mode 100644 index 0000000000..c4ef2affbb --- /dev/null +++ b/daemon/cluster/executor/container/adapter_test.go @@ -0,0 +1,139 @@ +package container // import "github.com/docker/docker/daemon/cluster/executor/container" + +import ( + "testing" + + "context" + "time" + + "github.com/docker/docker/daemon" + "github.com/docker/swarmkit/api" +) + +// TestWaitNodeAttachment tests that the waitNodeAttachment method successfully +// blocks until the required node attachment becomes available. +func TestWaitNodeAttachment(t *testing.T) { + emptyDaemon := &daemon.Daemon{} + + // the daemon creates an attachment store as an object, which means it's + // initialized to an empty store by default. get that attachment store here + // and add some attachments to it + attachmentStore := emptyDaemon.GetAttachmentStore() + + // create a set of attachments to put into the attahcment store + attachments := map[string]string{ + "network1": "10.1.2.3/24", + } + + // this shouldn't fail, but check it anyway just in case + err := attachmentStore.ResetAttachments(attachments) + if err != nil { + t.Fatalf("error resetting attachments: %v", err) + } + + // create a containerConfig to put in the adapter. we don't need the task, + // actually; only the networkAttachments are needed. + container := &containerConfig{ + task: nil, + networksAttachments: map[string]*api.NetworkAttachment{ + // network1 is already present in the attachment store. + "network1": { + Network: &api.Network{ + ID: "network1", + DriverState: &api.Driver{ + Name: "overlay", + }, + }, + }, + // network2 is not yet present in the attachment store, and we + // should block while waiting for it. + "network2": { + Network: &api.Network{ + ID: "network2", + DriverState: &api.Driver{ + Name: "overlay", + }, + }, + }, + // localnetwork is not and will never be in the attachment store, + // but we should not block on it, because it is not an overlay + // network + "localnetwork": { + Network: &api.Network{ + ID: "localnetwork", + DriverState: &api.Driver{ + Name: "bridge", + }, + }, + }, + }, + } + + // we don't create an adapter using the newContainerAdapter package, + // because it does a bunch of checks and validations. instead, create one + // "from scratch" so we only have the fields we need. + adapter := &containerAdapter{ + backend: emptyDaemon, + container: container, + } + + // create a context to do call the method with + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create a channel to allow the goroutine that we run the method call in + // to signal that it's done. + doneChan := make(chan struct{}) + + // store the error return value of waitNodeAttachments in this variable + var waitNodeAttachmentsErr error + // NOTE(dperny): be careful running goroutines in test code. if a test + // terminates with ie t.Fatalf or a failed requirement, runtime.Goexit gets + // called, which does run defers but does not clean up child goroutines. + // we defer canceling the context here, which should stop this goroutine + // from leaking + go func() { + waitNodeAttachmentsErr = adapter.waitNodeAttachments(ctx) + // signal that we've completed + close(doneChan) + }() + + // wait 200ms to allow the waitNodeAttachments call to spin for a bit + time.Sleep(200 * time.Millisecond) + select { + case <-doneChan: + if waitNodeAttachmentsErr == nil { + t.Fatalf("waitNodeAttachments exited early with no error") + } else { + t.Fatalf( + "waitNodeAttachments exited early with an error: %v", + waitNodeAttachmentsErr, + ) + } + default: + // allow falling through; this is the desired case + } + + // now update the node attachments to include another network attachment + attachments["network2"] = "10.3.4.5/24" + err = attachmentStore.ResetAttachments(attachments) + if err != nil { + t.Fatalf("error resetting attachments: %v", err) + } + + // now wait 200 ms for waitNodeAttachments to pick up the change + time.Sleep(200 * time.Millisecond) + + // and check that waitNodeAttachments has exited with no error + select { + case <-doneChan: + if waitNodeAttachmentsErr != nil { + t.Fatalf( + "waitNodeAttachments returned an error: %v", + waitNodeAttachmentsErr, + ) + } + default: + t.Fatalf("waitNodeAttachments did not exit yet, but should have") + } +} diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index bcd426e73d..8d070799f3 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -23,6 +23,10 @@ import ( const defaultGossipConvergeDelay = 2 * time.Second +// waitNodeAttachmentsTimeout defines the total period of time we should wait +// for node attachments to be ready before giving up on starting a task +const waitNodeAttachmentsTimeout = 30 * time.Second + // controller implements agent.Controller against docker's API. // // Most operations against docker's API are done through the container name, @@ -98,6 +102,25 @@ func (r *controller) Prepare(ctx context.Context) error { return err } + // Before we create networks, we need to make sure that the node has all of + // the network attachments that the task needs. This will block until that + // is the case or the context has expired. + // NOTE(dperny): Prepare doesn't time out on its own (that is, the context + // passed in does not expire after any period of time), which means if the + // node attachment never arrives (for example, if the network's IP address + // space is exhausted), then the tasks on the node will park in PREPARING + // forever (or until the node dies). To avoid this case, we create a new + // context with a fixed deadline, and give up. In normal operation, a node + // update with the node IP address should come in hot on the tail of the + // task being assigned to the node, and this should exit on the order of + // milliseconds, but to be extra conservative we'll give it 30 seconds to + // time out before giving up. + waitNodeAttachmentsContext, waitCancel := context.WithTimeout(ctx, waitNodeAttachmentsTimeout) + defer waitCancel() + if err := r.adapter.waitNodeAttachments(waitNodeAttachmentsContext); err != nil { + return err + } + // Make sure all the networks that the task needs are created. if err := r.adapter.createNetworks(ctx); err != nil { return err diff --git a/daemon/network/settings.go b/daemon/network/settings.go index b0460ed6ae..7696d40201 100644 --- a/daemon/network/settings.go +++ b/daemon/network/settings.go @@ -2,6 +2,7 @@ package network // import "github.com/docker/docker/daemon/network" import ( "net" + "sync" networktypes "github.com/docker/docker/api/types/network" clustertypes "github.com/docker/docker/daemon/cluster/provider" @@ -37,6 +38,7 @@ type EndpointSettings struct { // AttachmentStore stores the load balancer IP address for a network id. type AttachmentStore struct { + sync.Mutex //key: networkd id //value: load balancer ip address networkToNodeLBIP map[string]net.IP @@ -45,7 +47,9 @@ type AttachmentStore struct { // ResetAttachments clears any existing load balancer IP to network mapping and // sets the mapping to the given attachments. func (store *AttachmentStore) ResetAttachments(attachments map[string]string) error { - store.ClearAttachments() + store.Lock() + defer store.Unlock() + store.clearAttachments() for nid, nodeIP := range attachments { ip, _, err := net.ParseCIDR(nodeIP) if err != nil { @@ -59,11 +63,19 @@ func (store *AttachmentStore) ResetAttachments(attachments map[string]string) er // ClearAttachments clears all the mappings of network to load balancer IP Address. func (store *AttachmentStore) ClearAttachments() { + store.Lock() + defer store.Unlock() + store.clearAttachments() +} + +func (store *AttachmentStore) clearAttachments() { store.networkToNodeLBIP = make(map[string]net.IP) } // GetIPForNetwork return the load balancer IP address for the given network. func (store *AttachmentStore) GetIPForNetwork(networkID string) (net.IP, bool) { + store.Lock() + defer store.Unlock() ip, exists := store.networkToNodeLBIP[networkID] return ip, exists }