1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Do not rely on "live" event anymore

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2016-07-11 08:55:39 -07:00
parent 29b2714580
commit 64483c3bda
4 changed files with 137 additions and 64 deletions

View file

@ -178,7 +178,7 @@ func (daemon *Daemon) restore() error {
rm := c.RestartManager(false) rm := c.RestartManager(false)
if c.IsRunning() || c.IsPaused() { if c.IsRunning() || c.IsPaused() {
if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil { if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil {
logrus.Errorf("Failed to restore with containerd: %q", err) logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
return return
} }
if !c.HostConfig.NetworkMode.IsContainer() && c.IsRunning() { if !c.HostConfig.NetworkMode.IsContainer() && c.IsRunning() {

View file

@ -7,7 +7,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "syscall"
"github.com/docker/docker/pkg/integration/checker" "github.com/docker/docker/pkg/integration/checker"
"github.com/go-check/check" "github.com/go-check/check"
@ -129,7 +129,11 @@ func (s *DockerDaemonSuite) TestDaemonShutdownWithPlugins(c *check.C) {
c.Fatalf("Could not kill daemon: %v", err) c.Fatalf("Could not kill daemon: %v", err)
} }
time.Sleep(5 * time.Second) for {
if err := syscall.Kill(s.d.cmd.Process.Pid, 0); err == syscall.ESRCH {
break
}
}
cmd := exec.Command("pgrep", "-f", "plugin-no-remove") cmd := exec.Command("pgrep", "-f", "plugin-no-remove")
if out, ec, err := runCommandWithOutput(cmd); ec != 1 { if out, ec, err := runCommandWithOutput(cmd); ec != 1 {

View file

@ -281,16 +281,10 @@ func (clnt *client) cleanupOldRootfs(containerID string) {
} }
} }
func (clnt *client) setExited(containerID string) error { func (clnt *client) setExited(containerID string, exitCode uint32) error {
clnt.lock(containerID) clnt.lock(containerID)
defer clnt.unlock(containerID) defer clnt.unlock(containerID)
var exitCode uint32
if event, ok := clnt.remote.pastEvents[containerID]; ok {
exitCode = event.Status
delete(clnt.remote.pastEvents, containerID)
}
err := clnt.backend.StateChanged(containerID, StateInfo{ err := clnt.backend.StateChanged(containerID, StateInfo{
CommonStateInfo: CommonStateInfo{ CommonStateInfo: CommonStateInfo{
State: StateExit, State: StateExit,
@ -393,7 +387,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
return w return w
} }
func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) (err error) { func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
clnt.lock(cont.Id) clnt.lock(cont.Id)
defer clnt.unlock(cont.Id) defer clnt.unlock(cont.Id)
@ -441,66 +435,132 @@ func (clnt *client) restore(cont *containerd.Container, options ...CreateOption)
return err return err
} }
if event, ok := clnt.remote.pastEvents[containerID]; ok { if lastEvent != nil {
// This should only be a pause or resume event // This should only be a pause or resume event
if event.Type == StatePause || event.Type == StateResume { if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
return clnt.backend.StateChanged(containerID, StateInfo{ return clnt.backend.StateChanged(containerID, StateInfo{
CommonStateInfo: CommonStateInfo{ CommonStateInfo: CommonStateInfo{
State: event.Type, State: lastEvent.Type,
Pid: container.systemPid, Pid: container.systemPid,
}}) }})
} }
logrus.Warnf("unexpected backlog event: %#v", event) logrus.Warnf("unexpected backlog event: %#v", lastEvent)
} }
return nil return nil
} }
func (clnt *client) Restore(containerID string, options ...CreateOption) error { func (clnt *client) getContainerLastEvent(containerID string) (*containerd.Event, error) {
if clnt.liveRestore { er := &containerd.EventsRequest{
cont, err := clnt.getContainerdContainer(containerID) Timestamp: clnt.remote.restoreFromTimestamp,
if err == nil && cont.Status != "stopped" { StoredOnly: true,
if err := clnt.restore(cont, options...); err != nil { Id: containerID,
logrus.Errorf("error restoring %s: %v", containerID, err) }
} events, err := clnt.remote.apiClient.Events(context.Background(), er)
return nil if err != nil {
} logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
return clnt.setExited(containerID) return nil, err
} }
var ev *containerd.Event
for {
e, err := events.Recv()
if err != nil {
if err.Error() == "EOF" {
break
}
logrus.Errorf("libcontainerd: failed to get container event for %s: %q", containerID, err)
return nil, err
}
logrus.Debugf("libcontainerd: received past event %#v", e)
switch e.Type {
case StateExit, StatePause, StateResume:
ev = e
}
}
return ev, nil
}
func (clnt *client) Restore(containerID string, options ...CreateOption) error {
// Synchronize with live events
clnt.remote.Lock()
defer clnt.remote.Unlock()
// Check that containerd still knows this container.
//
// In the unlikely event that Restore for this container process
// the its past event before the main loop, the event will be
// processed twice. However, this is not an issue as all those
// events will do is change the state of the container to be
// exactly the same.
cont, err := clnt.getContainerdContainer(containerID) cont, err := clnt.getContainerdContainer(containerID)
if err == nil && cont.Status != "stopped" { // Get its last event
w := clnt.getOrCreateExitNotifier(containerID) ev, eerr := clnt.getContainerLastEvent(containerID)
clnt.lock(cont.Id) if err != nil || cont.Status == "Stopped" {
container := clnt.newContainer(cont.BundlePath) if err != nil && !strings.Contains(err.Error(), "container not found") {
container.systemPid = systemPid(cont) // Legitimate error
clnt.appendContainer(container) return err
clnt.unlock(cont.Id) }
container.discardFifos() // If ev is nil, then we already consumed all the event of the
// container, included the "exit" one.
// Thus we return to avoid overriding the Exit Code.
if ev == nil {
logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
return nil
}
if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { // get the exit status for this container
logrus.Errorf("error sending sigterm to %v: %v", containerID, err) ec := uint32(0)
if eerr == nil && ev.Type == StateExit {
ec = ev.Status
}
clnt.setExited(containerID, ec)
return nil
}
// container is still alive
if clnt.liveRestore {
if err := clnt.restore(cont, ev, options...); err != nil {
logrus.Errorf("error restoring %s: %v", containerID, err)
}
return nil
}
// Kill the container if liveRestore == false
w := clnt.getOrCreateExitNotifier(containerID)
clnt.lock(cont.Id)
container := clnt.newContainer(cont.BundlePath)
container.systemPid = systemPid(cont)
clnt.appendContainer(container)
clnt.unlock(cont.Id)
container.discardFifos()
if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
logrus.Errorf("error sending sigterm to %v: %v", containerID, err)
}
select {
case <-time.After(10 * time.Second):
if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
logrus.Errorf("error sending sigkill to %v: %v", containerID, err)
} }
select { select {
case <-time.After(10 * time.Second): case <-time.After(2 * time.Second):
if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
logrus.Errorf("error sending sigkill to %v: %v", containerID, err)
}
select {
case <-time.After(2 * time.Second):
case <-w.wait():
return nil
}
case <-w.wait(): case <-w.wait():
return nil return nil
} }
case <-w.wait():
return nil
} }
clnt.deleteContainer(containerID) clnt.deleteContainer(containerID)
return clnt.setExited(containerID) return clnt.setExited(containerID, uint32(255))
} }
type exitNotifier struct { type exitNotifier struct {

View file

@ -21,6 +21,7 @@ import (
sysinfo "github.com/docker/docker/pkg/system" sysinfo "github.com/docker/docker/pkg/system"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -40,22 +41,22 @@ const (
type remote struct { type remote struct {
sync.RWMutex sync.RWMutex
apiClient containerd.APIClient apiClient containerd.APIClient
daemonPid int daemonPid int
stateDir string stateDir string
rpcAddr string rpcAddr string
startDaemon bool startDaemon bool
closeManually bool closeManually bool
debugLog bool debugLog bool
rpcConn *grpc.ClientConn rpcConn *grpc.ClientConn
clients []*client clients []*client
eventTsPath string eventTsPath string
pastEvents map[string]*containerd.Event runtime string
runtime string runtimeArgs []string
runtimeArgs []string daemonWaitCh chan struct{}
daemonWaitCh chan struct{} liveRestore bool
liveRestore bool oomScore int
oomScore int restoreFromTimestamp *timestamp.Timestamp
} }
// New creates a fresh instance of libcontainerd remote. // New creates a fresh instance of libcontainerd remote.
@ -69,7 +70,6 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
stateDir: stateDir, stateDir: stateDir,
daemonPid: -1, daemonPid: -1,
eventTsPath: filepath.Join(stateDir, eventTimestampFilename), eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
pastEvents: make(map[string]*containerd.Event),
} }
for _, option := range options { for _, option := range options {
if err := option.Apply(r); err != nil { if err := option.Apply(r); err != nil {
@ -106,6 +106,14 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
r.rpcConn = conn r.rpcConn = conn
r.apiClient = containerd.NewAPIClient(conn) r.apiClient = containerd.NewAPIClient(conn)
// Get the timestamp to restore from
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
}
r.restoreFromTimestamp = tsp
go r.handleConnectionChange() go r.handleConnectionChange()
if err := r.startEventsMonitor(); err != nil { if err := r.startEventsMonitor(); err != nil {
@ -257,7 +265,8 @@ func (r *remote) getLastEventTimestamp() time.Time {
func (r *remote) startEventsMonitor() error { func (r *remote) startEventsMonitor() error {
// First, get past events // First, get past events
tsp, err := ptypes.TimestampProto(r.getLastEventTimestamp()) t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil { if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err) logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
} }
@ -299,7 +308,7 @@ func (r *remote) handleEventStream(events containerd.API_EventsClient) {
} }
r.RUnlock() r.RUnlock()
if container == nil { if container == nil {
logrus.Errorf("libcontainerd: %q", err) logrus.Warnf("libcontainerd: unknown container %s", e.Id)
continue continue
} }