diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go index f21076afb2..02930eb852 100644 --- a/libcontainerd/remote/client.go +++ b/libcontainerd/remote/client.go @@ -722,9 +722,12 @@ func (c *client) waitServe(ctx context.Context) bool { // `IsServing` will actually block until the service is ready. // However it can return early, so we'll loop with a delay to handle it. for { - serving, _ := c.client.IsServing(ctx) - if ctx.Err() != nil { - return false + serving, err := c.client.IsServing(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return false + } + logrus.WithError(err).Warn("Error while testing if containerd API is ready") } if serving { @@ -748,9 +751,16 @@ func (c *client) processEventStream(ctx context.Context, ns string) { ei libcontainerdtypes.EventInfo ) + // Create a new context specifically for this subscription. + // The context must be cancelled to cancel the subscription. + // In cases where we have to restart event stream processing, + // we'll need the original context b/c this one will be cancelled + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + // Filter on both namespace *and* topic. To create an "and" filter, // this must be a single, comma-separated string - eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|") + eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|") c.logger.Debug("processing event stream")