From dbeb4329655e91dbe0e6574405937f03fabf3f2f Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 20 Apr 2018 10:48:54 -0400 Subject: [PATCH] Fix panic on daemon restart with running plugin Scenario: Daemon is ungracefully shutdown and leaves plugins running (no live-restore). Daemon comes back up. The next time a container tries to use that plugin it will cause a daemon panic because the plugin client is not set. This fixes that by ensuring that the plugin does get shutdown. Note, I do not think there would be any harm in just re-attaching to the running plugin instead of shutting it down, however historically we shut down plugins and containers when live-restore is not enabled. [kir@: consolidate code to deleteTaskAndContainer, a few minor nits] Signed-off-by: Brian Goff Signed-off-by: Kir Kolyshkin --- plugin/executor/containerd/containerd.go | 45 +++---- plugin/manager.go | 22 ++-- plugin/manager_linux.go | 29 +++-- plugin/manager_linux_test.go | 156 +++++++++++++++++++++-- plugin/manager_windows.go | 2 +- 5 files changed, 199 insertions(+), 55 deletions(-) diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index e490ef0a9e..e0267582ec 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -58,6 +58,19 @@ type Executor struct { exitHandler ExitHandler } +// deleteTaskAndContainer deletes plugin task and then plugin container from containerd +func deleteTaskAndContainer(ctx context.Context, cli Client, id string) { + _, _, err := cli.DeleteTask(ctx, id) + if err != nil && !errdefs.IsNotFound(err) { + logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") + } + + err = cli.Delete(ctx, id) + if err != nil && !errdefs.IsNotFound(err) { + logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd") + } +} + // Create creates a new container func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { opts := runctypes.RuncOptions{ @@ -87,34 +100,21 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) if err != nil { - if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { - logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start") - } - if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { - logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start") - } + deleteTaskAndContainer(ctx, e.client, id) } return err } // Restore restores a container -func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error { +func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) if err != nil && !errdefs.IsNotFound(err) { - return err + return false, err } if !alive { - _, _, err = e.client.DeleteTask(context.Background(), id) - if err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id) - } - - err = e.client.Delete(context.Background(), id) - if err != nil && !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id) - } + deleteTaskAndContainer(context.Background(), e.client, id) } - return nil + return alive, nil } // IsRunning returns if the container with the given id is running @@ -133,14 +133,7 @@ func (e *Executor) Signal(id string, signal int) error { func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error { switch et { case libcontainerd.EventExit: - // delete task and container - if _, _, err := e.client.DeleteTask(context.Background(), id); err != nil { - logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id) - } - - if err := e.client.Delete(context.Background(), id); err != nil { - logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id) - } + deleteTaskAndContainer(context.Background(), e.client, id) return e.exitHandler.HandleExitEvent(ei.ContainerID) } return nil diff --git a/plugin/manager.go b/plugin/manager.go index 9c674f9545..c6f896129b 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -37,14 +37,14 @@ var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`) // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins type Executor interface { Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error - Restore(id string, stdout, stderr io.WriteCloser) error IsRunning(id string) (bool, error) + Restore(id string, stdout, stderr io.WriteCloser) (alive bool, err error) Signal(id string, signal int) error } -func (pm *Manager) restorePlugin(p *v2.Plugin) error { +func (pm *Manager) restorePlugin(p *v2.Plugin, c *controller) error { if p.IsEnabled() { - return pm.restore(p) + return pm.restore(p, c) } return nil } @@ -143,12 +143,15 @@ func (pm *Manager) HandleExitEvent(id string) error { return err } - os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)) + if err := os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)); err != nil && !os.IsNotExist(err) { + logrus.WithError(err).WithField("id", id).Error("Could not remove plugin bundle dir") + } pm.mu.RLock() c := pm.cMap[p] if c.exitChan != nil { close(c.exitChan) + c.exitChan = nil // ignore duplicate events (containerd issue #2299) } restart := c.restart pm.mu.RUnlock() @@ -205,12 +208,15 @@ func (pm *Manager) reload() error { // todo: restore var wg sync.WaitGroup wg.Add(len(plugins)) for _, p := range plugins { - c := &controller{} // todo: remove this + c := &controller{exitChan: make(chan bool)} + pm.mu.Lock() pm.cMap[p] = c + pm.mu.Unlock() + go func(p *v2.Plugin) { defer wg.Done() - if err := pm.restorePlugin(p); err != nil { - logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err) + if err := pm.restorePlugin(p, c); err != nil { + logrus.WithError(err).WithField("id", p.GetID()).Error("Failed to restore plugin") return } @@ -248,7 +254,7 @@ func (pm *Manager) reload() error { // todo: restore if requiresManualRestore { // if liveRestore is not enabled, the plugin will be stopped now so we should enable it if err := pm.enable(p, c, true); err != nil { - logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err) + logrus.WithError(err).WithField("id", p.GetID()).Error("failed to enable plugin") } } }(p) diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index 0029ff7868..3c6f9c553a 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -79,7 +79,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, p.Timeout()) if err != nil { c.restart = false - shutdownPlugin(p, c, pm.executor) + shutdownPlugin(p, c.exitChan, pm.executor) return errors.WithStack(err) } @@ -106,7 +106,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { c.restart = false // While restoring plugins, we need to explicitly set the state to disabled pm.config.Store.SetState(p, false) - shutdownPlugin(p, c, pm.executor) + shutdownPlugin(p, c.exitChan, pm.executor) return err } @@ -117,16 +117,15 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { return pm.save(p) } -func (pm *Manager) restore(p *v2.Plugin) error { +func (pm *Manager) restore(p *v2.Plugin, c *controller) error { stdout, stderr := makeLoggerStreams(p.GetID()) - if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil { + alive, err := pm.executor.Restore(p.GetID(), stdout, stderr) + if err != nil { return err } if pm.config.LiveRestoreEnabled { - c := &controller{} - if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning { - // plugin is not running, so follow normal startup procedure + if !alive { return pm.enable(p, c, true) } @@ -138,10 +137,16 @@ func (pm *Manager) restore(p *v2.Plugin) error { return pm.pluginPostStart(p, c) } + if alive { + // TODO(@cpuguy83): Should we always just re-attach to the running plugin instead of doing this? + c.restart = false + shutdownPlugin(p, c.exitChan, pm.executor) + } + return nil } -func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) { +func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) { pluginID := p.GetID() err := executor.Signal(pluginID, int(unix.SIGTERM)) @@ -149,7 +154,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) { logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err) } else { select { - case <-c.exitChan: + case <-ec: logrus.Debug("Clean shutdown of plugin") case <-time.After(time.Second * 10): logrus.Debug("Force shutdown plugin") @@ -157,7 +162,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) { logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err) } select { - case <-c.exitChan: + case <-ec: logrus.Debug("SIGKILL plugin shutdown") case <-time.After(time.Second * 10): logrus.Debug("Force shutdown plugin FAILED") @@ -172,7 +177,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error { } c.restart = false - shutdownPlugin(p, c, pm.executor) + shutdownPlugin(p, c.exitChan, pm.executor) pm.config.Store.SetState(p, false) return pm.save(p) } @@ -191,7 +196,7 @@ func (pm *Manager) Shutdown() { } if pm.executor != nil && p.IsEnabled() { c.restart = false - shutdownPlugin(p, c, pm.executor) + shutdownPlugin(p, c.exitChan, pm.executor) } } if err := mount.RecursiveUnmount(pm.config.Root); err != nil { diff --git a/plugin/manager_linux_test.go b/plugin/manager_linux_test.go index d4199c80da..740efd7a3a 100644 --- a/plugin/manager_linux_test.go +++ b/plugin/manager_linux_test.go @@ -3,12 +3,14 @@ package plugin // import "github.com/docker/docker/plugin" import ( "io" "io/ioutil" + "net" "os" "path/filepath" "testing" "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/mount" + "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/system" "github.com/docker/docker/plugin/v2" "github.com/gotestyourself/gotestyourself/skip" @@ -59,7 +61,7 @@ func TestManagerWithPluginMounts(t *testing.T) { t.Fatal(err) } - if err := m.Remove(p1.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil { + if err := m.Remove(p1.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil { t.Fatal(err) } if mounted, err := mount.Mounted(p2Mount); !mounted || err != nil { @@ -68,17 +70,18 @@ func TestManagerWithPluginMounts(t *testing.T) { } func newTestPlugin(t *testing.T, name, cap, root string) *v2.Plugin { - rootfs := filepath.Join(root, name) + id := stringid.GenerateNonCryptoID() + rootfs := filepath.Join(root, id) if err := os.MkdirAll(rootfs, 0755); err != nil { t.Fatal(err) } - p := v2.Plugin{PluginObj: types.Plugin{Name: name}} + p := v2.Plugin{PluginObj: types.Plugin{ID: id, Name: name}} p.Rootfs = rootfs iType := types.PluginInterfaceType{Capability: cap, Prefix: "docker", Version: "1.0"} - i := types.PluginConfigInterface{Socket: "plugins.sock", Types: []types.PluginInterfaceType{iType}} + i := types.PluginConfigInterface{Socket: "plugin.sock", Types: []types.PluginInterfaceType{iType}} p.PluginObj.Config.Interface = i - p.PluginObj.ID = name + p.PluginObj.ID = id return &p } @@ -90,8 +93,8 @@ func (e *simpleExecutor) Create(id string, spec specs.Spec, stdout, stderr io.Wr return errors.New("Create failed") } -func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) error { - return nil +func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { + return false, nil } func (e *simpleExecutor) IsRunning(id string) (bool, error) { @@ -133,7 +136,144 @@ func TestCreateFailed(t *testing.T) { t.Fatalf("expected Create failed error, got %v", err) } - if err := m.Remove(p.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil { + if err := m.Remove(p.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil { t.Fatal(err) } } + +type executorWithRunning struct { + m *Manager + root string + exitChans map[string]chan struct{} +} + +func (e *executorWithRunning) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { + sockAddr := filepath.Join(e.root, id, "plugin.sock") + ch := make(chan struct{}) + if e.exitChans == nil { + e.exitChans = make(map[string]chan struct{}) + } + e.exitChans[id] = ch + listenTestPlugin(sockAddr, ch) + return nil +} + +func (e *executorWithRunning) IsRunning(id string) (bool, error) { + return true, nil +} +func (e *executorWithRunning) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { + return true, nil +} + +func (e *executorWithRunning) Signal(id string, signal int) error { + ch := e.exitChans[id] + ch <- struct{}{} + <-ch + e.m.HandleExitEvent(id) + return nil +} + +func TestPluginAlreadyRunningOnStartup(t *testing.T) { + t.Parallel() + + root, err := ioutil.TempDir("", t.Name()) + if err != nil { + t.Fatal(err) + } + defer system.EnsureRemoveAll(root) + + for _, test := range []struct { + desc string + config ManagerConfig + }{ + { + desc: "live-restore-disabled", + config: ManagerConfig{ + LogPluginEvent: func(_, _, _ string) {}, + }, + }, + { + desc: "live-restore-enabled", + config: ManagerConfig{ + LogPluginEvent: func(_, _, _ string) {}, + LiveRestoreEnabled: true, + }, + }, + } { + t.Run(test.desc, func(t *testing.T) { + config := test.config + desc := test.desc + t.Parallel() + + p := newTestPlugin(t, desc, desc, config.Root) + p.PluginObj.Enabled = true + + // Need a short-ish path here so we don't run into unix socket path length issues. + config.ExecRoot, err = ioutil.TempDir("", "plugintest") + + executor := &executorWithRunning{root: config.ExecRoot} + config.CreateExecutor = func(m *Manager) (Executor, error) { executor.m = m; return executor, nil } + + if err := executor.Create(p.GetID(), specs.Spec{}, nil, nil); err != nil { + t.Fatal(err) + } + + root := filepath.Join(root, desc) + config.Root = filepath.Join(root, "manager") + if err := os.MkdirAll(filepath.Join(config.Root, p.GetID()), 0755); err != nil { + t.Fatal(err) + } + + if !p.IsEnabled() { + t.Fatal("plugin should be enabled") + } + if err := (&Manager{config: config}).save(p); err != nil { + t.Fatal(err) + } + + s := NewStore() + config.Store = s + if err != nil { + t.Fatal(err) + } + defer system.EnsureRemoveAll(config.ExecRoot) + + m, err := NewManager(config) + if err != nil { + t.Fatal(err) + } + defer m.Shutdown() + + p = s.GetAll()[p.GetID()] // refresh `p` with what the manager knows + if p.Client() == nil { + t.Fatal("plugin client should not be nil") + } + }) + } +} + +func listenTestPlugin(sockAddr string, exit chan struct{}) (net.Listener, error) { + if err := os.MkdirAll(filepath.Dir(sockAddr), 0755); err != nil { + return nil, err + } + l, err := net.Listen("unix", sockAddr) + if err != nil { + return nil, err + } + go func() { + for { + conn, err := l.Accept() + if err != nil { + return + } + conn.Close() + } + }() + go func() { + <-exit + l.Close() + os.Remove(sockAddr) + exit <- struct{}{} + }() + return l, nil +} diff --git a/plugin/manager_windows.go b/plugin/manager_windows.go index 9fafea5c22..90cc52c992 100644 --- a/plugin/manager_windows.go +++ b/plugin/manager_windows.go @@ -19,7 +19,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error { return fmt.Errorf("Not implemented") } -func (pm *Manager) restore(p *v2.Plugin) error { +func (pm *Manager) restore(p *v2.Plugin, c *controller) error { return fmt.Errorf("Not implemented") }