From b7687cc6737cc11759b5f61df401e6c65ae260de Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Mon, 11 Jul 2016 08:55:39 -0700 Subject: [PATCH] Do not rely on "live" event anymore Signed-off-by: Kenfe-Mickael Laventure (cherry picked from commit 64483c3bdaa1887b8b932e0564362fbbff025dc0) Signed-off-by: Tibor Vass --- daemon/daemon.go | 2 +- .../docker_cli_daemon_experimental_test.go | 8 +- libcontainerd/client_linux.go | 144 +++++++++++++----- libcontainerd/remote_linux.go | 47 +++--- 4 files changed, 137 insertions(+), 64 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index 7c345c30dc..332cd60b81 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -176,7 +176,7 @@ func (daemon *Daemon) restore() error { rm := c.RestartManager(false) if c.IsRunning() || c.IsPaused() { if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil { - logrus.Errorf("Failed to restore with containerd: %q", err) + logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err) return } if !c.HostConfig.NetworkMode.IsContainer() && c.IsRunning() { diff --git a/integration-cli/docker_cli_daemon_experimental_test.go b/integration-cli/docker_cli_daemon_experimental_test.go index ad0055ccce..d6cc0f71ad 100644 --- a/integration-cli/docker_cli_daemon_experimental_test.go +++ b/integration-cli/docker_cli_daemon_experimental_test.go @@ -7,7 +7,7 @@ import ( "os/exec" "path/filepath" "strings" - "time" + "syscall" "github.com/docker/docker/pkg/integration/checker" "github.com/go-check/check" @@ -129,7 +129,11 @@ func (s *DockerDaemonSuite) TestDaemonShutdownWithPlugins(c *check.C) { c.Fatalf("Could not kill daemon: %v", err) } - time.Sleep(5 * time.Second) + for { + if err := syscall.Kill(s.d.cmd.Process.Pid, 0); err == syscall.ESRCH { + break + } + } cmd := exec.Command("pgrep", "-f", "plugin-no-remove") if out, ec, err := runCommandWithOutput(cmd); ec != 1 { diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index 425842a51c..5e4857e3ee 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -281,16 +281,10 @@ func (clnt *client) cleanupOldRootfs(containerID string) { } } -func (clnt *client) setExited(containerID string) error { +func (clnt *client) setExited(containerID string, exitCode uint32) error { clnt.lock(containerID) defer clnt.unlock(containerID) - var exitCode uint32 - if event, ok := clnt.remote.pastEvents[containerID]; ok { - exitCode = event.Status - delete(clnt.remote.pastEvents, containerID) - } - err := clnt.backend.StateChanged(containerID, StateInfo{ CommonStateInfo: CommonStateInfo{ State: StateExit, @@ -393,7 +387,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { return w } -func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) (err error) { +func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) { clnt.lock(cont.Id) defer clnt.unlock(cont.Id) @@ -441,66 +435,132 @@ func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) return err } - if event, ok := clnt.remote.pastEvents[containerID]; ok { + if lastEvent != nil { // This should only be a pause or resume event - if event.Type == StatePause || event.Type == StateResume { + if lastEvent.Type == StatePause || lastEvent.Type == StateResume { return clnt.backend.StateChanged(containerID, StateInfo{ CommonStateInfo: CommonStateInfo{ - State: event.Type, + State: lastEvent.Type, Pid: container.systemPid, }}) } - logrus.Warnf("unexpected backlog event: %#v", event) + logrus.Warnf("unexpected backlog event: %#v", lastEvent) } return nil } -func (clnt *client) Restore(containerID string, options ...CreateOption) error { - if clnt.liveRestore { - cont, err := clnt.getContainerdContainer(containerID) - if err == nil && cont.Status != "stopped" { - if err := clnt.restore(cont, options...); err != nil { - logrus.Errorf("error restoring %s: %v", containerID, err) - } - return nil - } - return clnt.setExited(containerID) +func (clnt *client) getContainerLastEvent(containerID string) (*containerd.Event, error) { + er := &containerd.EventsRequest{ + Timestamp: clnt.remote.restoreFromTimestamp, + StoredOnly: true, + Id: containerID, + } + events, err := clnt.remote.apiClient.Events(context.Background(), er) + if err != nil { + logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err) + return nil, err } + var ev *containerd.Event + for { + e, err := events.Recv() + if err != nil { + if err.Error() == "EOF" { + break + } + logrus.Errorf("libcontainerd: failed to get container event for %s: %q", containerID, err) + return nil, err + } + + logrus.Debugf("libcontainerd: received past event %#v", e) + + switch e.Type { + case StateExit, StatePause, StateResume: + ev = e + } + } + + return ev, nil +} + +func (clnt *client) Restore(containerID string, options ...CreateOption) error { + // Synchronize with live events + clnt.remote.Lock() + defer clnt.remote.Unlock() + // Check that containerd still knows this container. + // + // In the unlikely event that Restore for this container process + // the its past event before the main loop, the event will be + // processed twice. However, this is not an issue as all those + // events will do is change the state of the container to be + // exactly the same. cont, err := clnt.getContainerdContainer(containerID) - if err == nil && cont.Status != "stopped" { - w := clnt.getOrCreateExitNotifier(containerID) - clnt.lock(cont.Id) - container := clnt.newContainer(cont.BundlePath) - container.systemPid = systemPid(cont) - clnt.appendContainer(container) - clnt.unlock(cont.Id) + // Get its last event + ev, eerr := clnt.getContainerLastEvent(containerID) + if err != nil || cont.Status == "Stopped" { + if err != nil && !strings.Contains(err.Error(), "container not found") { + // Legitimate error + return err + } - container.discardFifos() + // If ev is nil, then we already consumed all the event of the + // container, included the "exit" one. + // Thus we return to avoid overriding the Exit Code. + if ev == nil { + logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID) + return nil + } - if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { - logrus.Errorf("error sending sigterm to %v: %v", containerID, err) + // get the exit status for this container + ec := uint32(0) + if eerr == nil && ev.Type == StateExit { + ec = ev.Status + } + clnt.setExited(containerID, ec) + + return nil + } + + // container is still alive + if clnt.liveRestore { + if err := clnt.restore(cont, ev, options...); err != nil { + logrus.Errorf("error restoring %s: %v", containerID, err) + } + return nil + } + + // Kill the container if liveRestore == false + w := clnt.getOrCreateExitNotifier(containerID) + clnt.lock(cont.Id) + container := clnt.newContainer(cont.BundlePath) + container.systemPid = systemPid(cont) + clnt.appendContainer(container) + clnt.unlock(cont.Id) + + container.discardFifos() + + if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { + logrus.Errorf("error sending sigterm to %v: %v", containerID, err) + } + select { + case <-time.After(10 * time.Second): + if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { + logrus.Errorf("error sending sigkill to %v: %v", containerID, err) } select { - case <-time.After(10 * time.Second): - if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { - logrus.Errorf("error sending sigkill to %v: %v", containerID, err) - } - select { - case <-time.After(2 * time.Second): - case <-w.wait(): - return nil - } + case <-time.After(2 * time.Second): case <-w.wait(): return nil } + case <-w.wait(): + return nil } clnt.deleteContainer(containerID) - return clnt.setExited(containerID) + return clnt.setExited(containerID, uint32(255)) } type exitNotifier struct { diff --git a/libcontainerd/remote_linux.go b/libcontainerd/remote_linux.go index f6b6cbf770..4a5ea4f887 100644 --- a/libcontainerd/remote_linux.go +++ b/libcontainerd/remote_linux.go @@ -21,6 +21,7 @@ import ( sysinfo "github.com/docker/docker/pkg/system" "github.com/docker/docker/utils" "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/timestamp" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" @@ -40,22 +41,22 @@ const ( type remote struct { sync.RWMutex - apiClient containerd.APIClient - daemonPid int - stateDir string - rpcAddr string - startDaemon bool - closeManually bool - debugLog bool - rpcConn *grpc.ClientConn - clients []*client - eventTsPath string - pastEvents map[string]*containerd.Event - runtime string - runtimeArgs []string - daemonWaitCh chan struct{} - liveRestore bool - oomScore int + apiClient containerd.APIClient + daemonPid int + stateDir string + rpcAddr string + startDaemon bool + closeManually bool + debugLog bool + rpcConn *grpc.ClientConn + clients []*client + eventTsPath string + runtime string + runtimeArgs []string + daemonWaitCh chan struct{} + liveRestore bool + oomScore int + restoreFromTimestamp *timestamp.Timestamp } // New creates a fresh instance of libcontainerd remote. @@ -69,7 +70,6 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) { stateDir: stateDir, daemonPid: -1, eventTsPath: filepath.Join(stateDir, eventTimestampFilename), - pastEvents: make(map[string]*containerd.Event), } for _, option := range options { if err := option.Apply(r); err != nil { @@ -106,6 +106,14 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) { r.rpcConn = conn r.apiClient = containerd.NewAPIClient(conn) + // Get the timestamp to restore from + t := r.getLastEventTimestamp() + tsp, err := ptypes.TimestampProto(t) + if err != nil { + logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err) + } + r.restoreFromTimestamp = tsp + go r.handleConnectionChange() if err := r.startEventsMonitor(); err != nil { @@ -257,7 +265,8 @@ func (r *remote) getLastEventTimestamp() time.Time { func (r *remote) startEventsMonitor() error { // First, get past events - tsp, err := ptypes.TimestampProto(r.getLastEventTimestamp()) + t := r.getLastEventTimestamp() + tsp, err := ptypes.TimestampProto(t) if err != nil { logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err) } @@ -299,7 +308,7 @@ func (r *remote) handleEventStream(events containerd.API_EventsClient) { } r.RUnlock() if container == nil { - logrus.Errorf("libcontainerd: %q", err) + logrus.Warnf("libcontainerd: unknown container %s", e.Id) continue }