1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #37234 from kolyshkin/plugin-panic

Fix daemon panic on restart when a plugin is running
This commit is contained in:
Vincent Demeester 2018-06-08 09:45:25 +02:00 committed by GitHub
commit 712dd62a34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 199 additions and 55 deletions

View file

@ -58,6 +58,19 @@ type Executor struct {
exitHandler ExitHandler exitHandler ExitHandler
} }
// deleteTaskAndContainer deletes plugin task and then plugin container from containerd
func deleteTaskAndContainer(ctx context.Context, cli Client, id string) {
_, _, err := cli.DeleteTask(ctx, id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
}
err = cli.Delete(ctx, id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
}
}
// Create creates a new container // Create creates a new container
func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
opts := runctypes.RuncOptions{ opts := runctypes.RuncOptions{
@ -87,34 +100,21 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
if err != nil { if err != nil {
if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { deleteTaskAndContainer(ctx, e.client, id)
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
}
if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
}
} }
return err return err
} }
// Restore restores a container // Restore restores a container
func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error { func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
if err != nil && !errdefs.IsNotFound(err) { if err != nil && !errdefs.IsNotFound(err) {
return err return false, err
} }
if !alive { if !alive {
_, _, err = e.client.DeleteTask(context.Background(), id) deleteTaskAndContainer(context.Background(), e.client, id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
}
err = e.client.Delete(context.Background(), id)
if err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
}
} }
return nil return alive, nil
} }
// IsRunning returns if the container with the given id is running // IsRunning returns if the container with the given id is running
@ -133,14 +133,7 @@ func (e *Executor) Signal(id string, signal int) error {
func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error { func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error {
switch et { switch et {
case libcontainerd.EventExit: case libcontainerd.EventExit:
// delete task and container deleteTaskAndContainer(context.Background(), e.client, id)
if _, _, err := e.client.DeleteTask(context.Background(), id); err != nil {
logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
}
if err := e.client.Delete(context.Background(), id); err != nil {
logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
}
return e.exitHandler.HandleExitEvent(ei.ContainerID) return e.exitHandler.HandleExitEvent(ei.ContainerID)
} }
return nil return nil

View file

@ -37,14 +37,14 @@ var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
// Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
type Executor interface { type Executor interface {
Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
Restore(id string, stdout, stderr io.WriteCloser) error
IsRunning(id string) (bool, error) IsRunning(id string) (bool, error)
Restore(id string, stdout, stderr io.WriteCloser) (alive bool, err error)
Signal(id string, signal int) error Signal(id string, signal int) error
} }
func (pm *Manager) restorePlugin(p *v2.Plugin) error { func (pm *Manager) restorePlugin(p *v2.Plugin, c *controller) error {
if p.IsEnabled() { if p.IsEnabled() {
return pm.restore(p) return pm.restore(p, c)
} }
return nil return nil
} }
@ -143,12 +143,15 @@ func (pm *Manager) HandleExitEvent(id string) error {
return err return err
} }
os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)) if err := os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)); err != nil && !os.IsNotExist(err) {
logrus.WithError(err).WithField("id", id).Error("Could not remove plugin bundle dir")
}
pm.mu.RLock() pm.mu.RLock()
c := pm.cMap[p] c := pm.cMap[p]
if c.exitChan != nil { if c.exitChan != nil {
close(c.exitChan) close(c.exitChan)
c.exitChan = nil // ignore duplicate events (containerd issue #2299)
} }
restart := c.restart restart := c.restart
pm.mu.RUnlock() pm.mu.RUnlock()
@ -205,12 +208,15 @@ func (pm *Manager) reload() error { // todo: restore
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(plugins)) wg.Add(len(plugins))
for _, p := range plugins { for _, p := range plugins {
c := &controller{} // todo: remove this c := &controller{exitChan: make(chan bool)}
pm.mu.Lock()
pm.cMap[p] = c pm.cMap[p] = c
pm.mu.Unlock()
go func(p *v2.Plugin) { go func(p *v2.Plugin) {
defer wg.Done() defer wg.Done()
if err := pm.restorePlugin(p); err != nil { if err := pm.restorePlugin(p, c); err != nil {
logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err) logrus.WithError(err).WithField("id", p.GetID()).Error("Failed to restore plugin")
return return
} }
@ -248,7 +254,7 @@ func (pm *Manager) reload() error { // todo: restore
if requiresManualRestore { if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it // if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p, c, true); err != nil { if err := pm.enable(p, c, true); err != nil {
logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err) logrus.WithError(err).WithField("id", p.GetID()).Error("failed to enable plugin")
} }
} }
}(p) }(p)

View file

@ -79,7 +79,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, p.Timeout()) client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, p.Timeout())
if err != nil { if err != nil {
c.restart = false c.restart = false
shutdownPlugin(p, c, pm.executor) shutdownPlugin(p, c.exitChan, pm.executor)
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -106,7 +106,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
c.restart = false c.restart = false
// While restoring plugins, we need to explicitly set the state to disabled // While restoring plugins, we need to explicitly set the state to disabled
pm.config.Store.SetState(p, false) pm.config.Store.SetState(p, false)
shutdownPlugin(p, c, pm.executor) shutdownPlugin(p, c.exitChan, pm.executor)
return err return err
} }
@ -117,16 +117,15 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
return pm.save(p) return pm.save(p)
} }
func (pm *Manager) restore(p *v2.Plugin) error { func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
stdout, stderr := makeLoggerStreams(p.GetID()) stdout, stderr := makeLoggerStreams(p.GetID())
if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil { alive, err := pm.executor.Restore(p.GetID(), stdout, stderr)
if err != nil {
return err return err
} }
if pm.config.LiveRestoreEnabled { if pm.config.LiveRestoreEnabled {
c := &controller{} if !alive {
if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning {
// plugin is not running, so follow normal startup procedure
return pm.enable(p, c, true) return pm.enable(p, c, true)
} }
@ -138,10 +137,16 @@ func (pm *Manager) restore(p *v2.Plugin) error {
return pm.pluginPostStart(p, c) return pm.pluginPostStart(p, c)
} }
if alive {
// TODO(@cpuguy83): Should we always just re-attach to the running plugin instead of doing this?
c.restart = false
shutdownPlugin(p, c.exitChan, pm.executor)
}
return nil return nil
} }
func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) { func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
pluginID := p.GetID() pluginID := p.GetID()
err := executor.Signal(pluginID, int(unix.SIGTERM)) err := executor.Signal(pluginID, int(unix.SIGTERM))
@ -149,7 +154,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err) logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
} else { } else {
select { select {
case <-c.exitChan: case <-ec:
logrus.Debug("Clean shutdown of plugin") logrus.Debug("Clean shutdown of plugin")
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
logrus.Debug("Force shutdown plugin") logrus.Debug("Force shutdown plugin")
@ -157,7 +162,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err) logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
} }
select { select {
case <-c.exitChan: case <-ec:
logrus.Debug("SIGKILL plugin shutdown") logrus.Debug("SIGKILL plugin shutdown")
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
logrus.Debug("Force shutdown plugin FAILED") logrus.Debug("Force shutdown plugin FAILED")
@ -172,7 +177,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
} }
c.restart = false c.restart = false
shutdownPlugin(p, c, pm.executor) shutdownPlugin(p, c.exitChan, pm.executor)
pm.config.Store.SetState(p, false) pm.config.Store.SetState(p, false)
return pm.save(p) return pm.save(p)
} }
@ -191,7 +196,7 @@ func (pm *Manager) Shutdown() {
} }
if pm.executor != nil && p.IsEnabled() { if pm.executor != nil && p.IsEnabled() {
c.restart = false c.restart = false
shutdownPlugin(p, c, pm.executor) shutdownPlugin(p, c.exitChan, pm.executor)
} }
} }
if err := mount.RecursiveUnmount(pm.config.Root); err != nil { if err := mount.RecursiveUnmount(pm.config.Root); err != nil {

View file

@ -3,12 +3,14 @@ package plugin // import "github.com/docker/docker/plugin"
import ( import (
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/mount" "github.com/docker/docker/pkg/mount"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/system"
"github.com/docker/docker/plugin/v2" "github.com/docker/docker/plugin/v2"
"github.com/gotestyourself/gotestyourself/skip" "github.com/gotestyourself/gotestyourself/skip"
@ -59,7 +61,7 @@ func TestManagerWithPluginMounts(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := m.Remove(p1.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil { if err := m.Remove(p1.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if mounted, err := mount.Mounted(p2Mount); !mounted || err != nil { if mounted, err := mount.Mounted(p2Mount); !mounted || err != nil {
@ -68,17 +70,18 @@ func TestManagerWithPluginMounts(t *testing.T) {
} }
func newTestPlugin(t *testing.T, name, cap, root string) *v2.Plugin { func newTestPlugin(t *testing.T, name, cap, root string) *v2.Plugin {
rootfs := filepath.Join(root, name) id := stringid.GenerateNonCryptoID()
rootfs := filepath.Join(root, id)
if err := os.MkdirAll(rootfs, 0755); err != nil { if err := os.MkdirAll(rootfs, 0755); err != nil {
t.Fatal(err) t.Fatal(err)
} }
p := v2.Plugin{PluginObj: types.Plugin{Name: name}} p := v2.Plugin{PluginObj: types.Plugin{ID: id, Name: name}}
p.Rootfs = rootfs p.Rootfs = rootfs
iType := types.PluginInterfaceType{Capability: cap, Prefix: "docker", Version: "1.0"} iType := types.PluginInterfaceType{Capability: cap, Prefix: "docker", Version: "1.0"}
i := types.PluginConfigInterface{Socket: "plugins.sock", Types: []types.PluginInterfaceType{iType}} i := types.PluginConfigInterface{Socket: "plugin.sock", Types: []types.PluginInterfaceType{iType}}
p.PluginObj.Config.Interface = i p.PluginObj.Config.Interface = i
p.PluginObj.ID = name p.PluginObj.ID = id
return &p return &p
} }
@ -90,8 +93,8 @@ func (e *simpleExecutor) Create(id string, spec specs.Spec, stdout, stderr io.Wr
return errors.New("Create failed") return errors.New("Create failed")
} }
func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) error { func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
return nil return false, nil
} }
func (e *simpleExecutor) IsRunning(id string) (bool, error) { func (e *simpleExecutor) IsRunning(id string) (bool, error) {
@ -133,7 +136,144 @@ func TestCreateFailed(t *testing.T) {
t.Fatalf("expected Create failed error, got %v", err) t.Fatalf("expected Create failed error, got %v", err)
} }
if err := m.Remove(p.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil { if err := m.Remove(p.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
type executorWithRunning struct {
m *Manager
root string
exitChans map[string]chan struct{}
}
func (e *executorWithRunning) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
sockAddr := filepath.Join(e.root, id, "plugin.sock")
ch := make(chan struct{})
if e.exitChans == nil {
e.exitChans = make(map[string]chan struct{})
}
e.exitChans[id] = ch
listenTestPlugin(sockAddr, ch)
return nil
}
func (e *executorWithRunning) IsRunning(id string) (bool, error) {
return true, nil
}
func (e *executorWithRunning) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
return true, nil
}
func (e *executorWithRunning) Signal(id string, signal int) error {
ch := e.exitChans[id]
ch <- struct{}{}
<-ch
e.m.HandleExitEvent(id)
return nil
}
func TestPluginAlreadyRunningOnStartup(t *testing.T) {
t.Parallel()
root, err := ioutil.TempDir("", t.Name())
if err != nil {
t.Fatal(err)
}
defer system.EnsureRemoveAll(root)
for _, test := range []struct {
desc string
config ManagerConfig
}{
{
desc: "live-restore-disabled",
config: ManagerConfig{
LogPluginEvent: func(_, _, _ string) {},
},
},
{
desc: "live-restore-enabled",
config: ManagerConfig{
LogPluginEvent: func(_, _, _ string) {},
LiveRestoreEnabled: true,
},
},
} {
t.Run(test.desc, func(t *testing.T) {
config := test.config
desc := test.desc
t.Parallel()
p := newTestPlugin(t, desc, desc, config.Root)
p.PluginObj.Enabled = true
// Need a short-ish path here so we don't run into unix socket path length issues.
config.ExecRoot, err = ioutil.TempDir("", "plugintest")
executor := &executorWithRunning{root: config.ExecRoot}
config.CreateExecutor = func(m *Manager) (Executor, error) { executor.m = m; return executor, nil }
if err := executor.Create(p.GetID(), specs.Spec{}, nil, nil); err != nil {
t.Fatal(err)
}
root := filepath.Join(root, desc)
config.Root = filepath.Join(root, "manager")
if err := os.MkdirAll(filepath.Join(config.Root, p.GetID()), 0755); err != nil {
t.Fatal(err)
}
if !p.IsEnabled() {
t.Fatal("plugin should be enabled")
}
if err := (&Manager{config: config}).save(p); err != nil {
t.Fatal(err)
}
s := NewStore()
config.Store = s
if err != nil {
t.Fatal(err)
}
defer system.EnsureRemoveAll(config.ExecRoot)
m, err := NewManager(config)
if err != nil {
t.Fatal(err)
}
defer m.Shutdown()
p = s.GetAll()[p.GetID()] // refresh `p` with what the manager knows
if p.Client() == nil {
t.Fatal("plugin client should not be nil")
}
})
}
}
func listenTestPlugin(sockAddr string, exit chan struct{}) (net.Listener, error) {
if err := os.MkdirAll(filepath.Dir(sockAddr), 0755); err != nil {
return nil, err
}
l, err := net.Listen("unix", sockAddr)
if err != nil {
return nil, err
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
return
}
conn.Close()
}
}()
go func() {
<-exit
l.Close()
os.Remove(sockAddr)
exit <- struct{}{}
}()
return l, nil
}

View file

@ -19,7 +19,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
return fmt.Errorf("Not implemented") return fmt.Errorf("Not implemented")
} }
func (pm *Manager) restore(p *v2.Plugin) error { func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
return fmt.Errorf("Not implemented") return fmt.Errorf("Not implemented")
} }