mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Daemon to take care of ingress cleanup on leave & shutdown
Signed-off-by: Alessandro Boch <aboch@docker.com>
This commit is contained in:
parent
bf5bebdfa0
commit
6f4bb796dd
4 changed files with 44 additions and 12 deletions
|
@ -27,8 +27,8 @@ type Backend interface {
|
||||||
CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
|
CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
|
||||||
DeleteManagedNetwork(name string) error
|
DeleteManagedNetwork(name string) error
|
||||||
FindNetwork(idName string) (libnetwork.Network, error)
|
FindNetwork(idName string) (libnetwork.Network, error)
|
||||||
SetupIngress(req clustertypes.NetworkCreateRequest, nodeIP string) error
|
SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)
|
||||||
ReleaseIngress() error
|
ReleaseIngress() (<-chan struct{}, error)
|
||||||
PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error
|
PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error
|
||||||
CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
|
CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
|
||||||
ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
|
ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
|
||||||
|
|
|
@ -139,13 +139,15 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {
|
||||||
options.IPAM.Config = append(options.IPAM.Config, c)
|
options.IPAM.Config = append(options.IPAM.Config, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
|
_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
|
||||||
ID: na.Network.ID,
|
ID: na.Network.ID,
|
||||||
NetworkCreateRequest: types.NetworkCreateRequest{
|
NetworkCreateRequest: types.NetworkCreateRequest{
|
||||||
Name: na.Network.Spec.Annotations.Name,
|
Name: na.Network.Spec.Annotations.Name,
|
||||||
NetworkCreate: options,
|
NetworkCreate: options,
|
||||||
},
|
},
|
||||||
}, na.Addresses[0])
|
}, na.Addresses[0])
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Controller returns a docker container runner.
|
// Controller returns a docker container runner.
|
||||||
|
|
|
@ -445,7 +445,25 @@ func (daemon *Daemon) DaemonLeavesCluster() {
|
||||||
// Daemon is in charge of removing the attachable networks with
|
// Daemon is in charge of removing the attachable networks with
|
||||||
// connected containers when the node leaves the swarm
|
// connected containers when the node leaves the swarm
|
||||||
daemon.clearAttachableNetworks()
|
daemon.clearAttachableNetworks()
|
||||||
|
// We no longer need the cluster provider, stop it now so that
|
||||||
|
// the network agent will stop listening to cluster events.
|
||||||
daemon.setClusterProvider(nil)
|
daemon.setClusterProvider(nil)
|
||||||
|
// Wait for the networking cluster agent to stop
|
||||||
|
daemon.netController.AgentStopWait()
|
||||||
|
// Daemon is in charge of removing the ingress network when the
|
||||||
|
// node leaves the swarm. Wait for job to be done or timeout.
|
||||||
|
// This is called also on graceful daemon shutdown. We need to
|
||||||
|
// wait, because the ingress release has to happen before the
|
||||||
|
// network controller is stopped.
|
||||||
|
if done, err := daemon.ReleaseIngress(); err == nil {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
logrus.Warnf("timeout while waiting for ingress network removal")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warnf("failed to initiate ingress network removal: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setClusterProvider sets a component for querying the current cluster state.
|
// setClusterProvider sets a component for querying the current cluster state.
|
||||||
|
@ -832,6 +850,12 @@ func (daemon *Daemon) Shutdown() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we are part of a cluster, clean up cluster's stuff
|
||||||
|
if daemon.clusterProvider != nil {
|
||||||
|
logrus.Debugf("start clean shutdown of cluster resources...")
|
||||||
|
daemon.DaemonLeavesCluster()
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown plugins after containers and layerstore. Don't change the order.
|
// Shutdown plugins after containers and layerstore. Don't change the order.
|
||||||
daemon.pluginShutdown()
|
daemon.pluginShutdown()
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,7 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
|
||||||
type ingressJob struct {
|
type ingressJob struct {
|
||||||
create *clustertypes.NetworkCreateRequest
|
create *clustertypes.NetworkCreateRequest
|
||||||
ip net.IP
|
ip net.IP
|
||||||
|
jobDone chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -124,6 +125,7 @@ func (daemon *Daemon) startIngressWorker() {
|
||||||
daemon.releaseIngress(ingressID)
|
daemon.releaseIngress(ingressID)
|
||||||
ingressID = ""
|
ingressID = ""
|
||||||
}
|
}
|
||||||
|
close(r.jobDone)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -137,19 +139,23 @@ func (daemon *Daemon) enqueueIngressJob(job *ingressJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetupIngress setups ingress networking.
|
// SetupIngress setups ingress networking.
|
||||||
func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) error {
|
// The function returns a channel which will signal the caller when the programming is completed.
|
||||||
|
func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) (<-chan struct{}, error) {
|
||||||
ip, _, err := net.ParseCIDR(nodeIP)
|
ip, _, err := net.ParseCIDR(nodeIP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
daemon.enqueueIngressJob(&ingressJob{&create, ip})
|
done := make(chan struct{})
|
||||||
return nil
|
daemon.enqueueIngressJob(&ingressJob{&create, ip, done})
|
||||||
|
return done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReleaseIngress releases the ingress networking.
|
// ReleaseIngress releases the ingress networking.
|
||||||
func (daemon *Daemon) ReleaseIngress() error {
|
// The function returns a channel which will signal the caller when the programming is completed.
|
||||||
daemon.enqueueIngressJob(&ingressJob{nil, nil})
|
func (daemon *Daemon) ReleaseIngress() (<-chan struct{}, error) {
|
||||||
return nil
|
done := make(chan struct{})
|
||||||
|
daemon.enqueueIngressJob(&ingressJob{nil, nil, done})
|
||||||
|
return done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {
|
func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {
|
||||||
|
|
Loading…
Reference in a new issue