diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index 67895eb751..eef3a4863d 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -114,8 +114,21 @@ type client struct { containers map[string]*container } +func (c *client) setRemote(remote *containerd.Client) { + c.Lock() + c.remote = remote + c.Unlock() +} + +func (c *client) getRemote() *containerd.Client { + c.RLock() + remote := c.remote + c.RUnlock() + return remote +} + func (c *client) Version(ctx context.Context) (containerd.Version, error) { - return c.remote.Version(ctx) + return c.getRemote().Version(ctx) } func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) { @@ -187,7 +200,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") - cdCtr, err := c.remote.NewContainer(ctx, id, + cdCtr, err := c.getRemote().NewContainer(ctx, id, containerd.WithSpec(ociSpec), // TODO(mlaventure): when containerd support lcow, revisit runtime value containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions)) @@ -230,7 +243,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin // remove the checkpoint when we're done defer func() { if cp != nil { - err := c.remote.ContentStore().Delete(context.Background(), cp.Digest) + err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest) if err != nil { c.logger.WithError(err).WithFields(logrus.Fields{ "ref": checkpointDir, @@ -528,14 +541,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi } // Whatever happens, delete the checkpoint from containerd defer func() { - err := c.remote.ImageService().Delete(context.Background(), img.Name()) + err := c.getRemote().ImageService().Delete(context.Background(), img.Name()) if err != nil { c.logger.WithError(err).WithField("digest", img.Target().Digest). Warnf("failed to delete checkpoint image") } }() - b, err := content.ReadBlob(ctx, c.remote.ContentStore(), img.Target().Digest) + b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target().Digest) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) } @@ -555,7 +568,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) } - rat, err := c.remote.ContentStore().ReaderAt(ctx, cpDesc.Digest) + rat, err := c.getRemote().ContentStore().ReaderAt(ctx, cpDesc.Digest) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) } @@ -708,7 +721,7 @@ func (c *client) processEventStream(ctx context.Context) { } }() - eventStream, err = c.remote.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ + eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ Filters: []string{ // Filter on both namespace *and* topic. To create an "and" filter, // this must be a single, comma-separated string @@ -719,6 +732,8 @@ func (c *client) processEventStream(ctx context.Context) { return } + c.logger.WithField("namespace", c.namespace).Debug("processing event stream") + var oomKilled bool for { ev, err = eventStream.Recv() @@ -822,7 +837,7 @@ func (c *client) processEventStream(ctx context.Context) { } func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { - writer, err := c.remote.ContentStore().Writer(ctx, ref, 0, "") + writer, err := c.getRemote().ContentStore().Writer(ctx, ref, 0, "") if err != nil { return nil, err } diff --git a/libcontainerd/remote_daemon.go b/libcontainerd/remote_daemon.go index 1cc1ebb81e..fb3e0bdda8 100644 --- a/libcontainerd/remote_daemon.go +++ b/libcontainerd/remote_daemon.go @@ -260,7 +260,7 @@ func (r *remote) startContainerd() error { return nil } -func (r *remote) monitorConnection(client *containerd.Client) { +func (r *remote) monitorConnection(monitor *containerd.Client) { var transientFailureCount = 0 ticker := time.NewTicker(500 * time.Millisecond) @@ -269,7 +269,7 @@ func (r *remote) monitorConnection(client *containerd.Client) { for { <-ticker.C ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout) - _, err := client.IsServing(ctx) + _, err := monitor.IsServing(ctx) cancel() if err == nil { transientFailureCount = 0 @@ -279,39 +279,69 @@ func (r *remote) monitorConnection(client *containerd.Client) { select { case <-r.shutdownContext.Done(): r.logger.Info("stopping healthcheck following graceful shutdown") - client.Close() + monitor.Close() return default: } r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding") - if r.daemonPid != -1 { - transientFailureCount++ - if transientFailureCount >= maxConnectionRetryCount || !system.IsProcessAlive(r.daemonPid) { - transientFailureCount = 0 - if system.IsProcessAlive(r.daemonPid) { - r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd") - // Try to get a stack trace - syscall.Kill(r.daemonPid, syscall.SIGUSR1) - <-time.After(100 * time.Millisecond) - system.KillProcess(r.daemonPid) + if r.daemonPid == -1 { + continue + } + + transientFailureCount++ + if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) { + continue + } + + transientFailureCount = 0 + if system.IsProcessAlive(r.daemonPid) { + r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd") + // Try to get a stack trace + syscall.Kill(r.daemonPid, syscall.SIGUSR1) + <-time.After(100 * time.Millisecond) + system.KillProcess(r.daemonPid) + } + <-r.daemonWaitCh + + monitor.Close() + os.Remove(r.GRPC.Address) + if err := r.startContainerd(); err != nil { + r.logger.WithError(err).Error("failed restarting containerd") + continue + } + + newMonitor, err := containerd.New(r.GRPC.Address) + if err != nil { + r.logger.WithError(err).Error("failed connect to containerd") + continue + } + + monitor = newMonitor + var wg sync.WaitGroup + + for _, c := range r.clients { + wg.Add(1) + + go func(c *client) { + defer wg.Done() + c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client") + c.remote.Close() + + remote, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(c.namespace)) + if err != nil { + r.logger.WithError(err).Error("failed to connect to containerd") + // TODO: Better way to handle this? + // This *shouldn't* happen, but this could wind up where the daemon + // is not able to communicate with an eventually up containerd + return } - <-r.daemonWaitCh - var err error - client.Close() - os.Remove(r.GRPC.Address) - if err = r.startContainerd(); err != nil { - r.logger.WithError(err).Error("failed restarting containerd") - } else { - newClient, err := containerd.New(r.GRPC.Address) - if err != nil { - r.logger.WithError(err).Error("failed connect to containerd") - } else { - client = newClient - } - } - } + + c.setRemote(remote) + }(c) + + wg.Wait() } } }