mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #29733 from cpuguy83/fix_v1plugin_deadlock
Fix race/deadlock in v1 plugin handlers
This commit is contained in:
commit
48ed4f0639
3 changed files with 92 additions and 23 deletions
|
@ -57,6 +57,10 @@ func (s *DockerExternalGraphdriverSuite) SetUpTest(c *check.C) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *DockerExternalGraphdriverSuite) OnTimeout(c *check.C) {
|
||||||
|
s.d.DumpStackAndQuit()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *DockerExternalGraphdriverSuite) TearDownTest(c *check.C) {
|
func (s *DockerExternalGraphdriverSuite) TearDownTest(c *check.C) {
|
||||||
if s.d != nil {
|
if s.d != nil {
|
||||||
s.d.Stop(c)
|
s.d.Stop(c)
|
||||||
|
|
37
pkg/plugins/plugin_test.go
Normal file
37
pkg/plugins/plugin_test.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// regression test for deadlock in handlers
|
||||||
|
func TestPluginAddHandler(t *testing.T) {
|
||||||
|
// make a plugin which is pre-activated
|
||||||
|
p := &Plugin{activateWait: sync.NewCond(&sync.Mutex{})}
|
||||||
|
p.Manifest = &Manifest{Implements: []string{"bananas"}}
|
||||||
|
storage.plugins["qwerty"] = p
|
||||||
|
|
||||||
|
testActive(t, p)
|
||||||
|
Handle("bananas", func(_ string, _ *Client) {})
|
||||||
|
testActive(t, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testActive(t *testing.T, p *Plugin) {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
p.waitActive()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
_, f, l, _ := runtime.Caller(1)
|
||||||
|
t.Fatalf("%s:%d: deadlock in waitActive", filepath.Base(f), l)
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -70,12 +70,12 @@ type Plugin struct {
|
||||||
// Manifest of the plugin (see above)
|
// Manifest of the plugin (see above)
|
||||||
Manifest *Manifest `json:"-"`
|
Manifest *Manifest `json:"-"`
|
||||||
|
|
||||||
// error produced by activation
|
|
||||||
activateErr error
|
|
||||||
// specifies if the activation sequence is completed (not if it is successful or not)
|
|
||||||
activated bool
|
|
||||||
// wait for activation to finish
|
// wait for activation to finish
|
||||||
activateWait *sync.Cond
|
activateWait *sync.Cond
|
||||||
|
// error produced by activation
|
||||||
|
activateErr error
|
||||||
|
// keeps track of callback handlers run against this plugin
|
||||||
|
handlersRun bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// BasePath returns the path to which all paths returned by the plugin are relative to.
|
// BasePath returns the path to which all paths returned by the plugin are relative to.
|
||||||
|
@ -112,19 +112,51 @@ func NewLocalPlugin(name, addr string) *Plugin {
|
||||||
|
|
||||||
func (p *Plugin) activate() error {
|
func (p *Plugin) activate() error {
|
||||||
p.activateWait.L.Lock()
|
p.activateWait.L.Lock()
|
||||||
if p.activated {
|
|
||||||
|
if p.activated() {
|
||||||
|
p.runHandlers()
|
||||||
p.activateWait.L.Unlock()
|
p.activateWait.L.Unlock()
|
||||||
return p.activateErr
|
return p.activateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
p.activateErr = p.activateWithLock()
|
p.activateErr = p.activateWithLock()
|
||||||
p.activated = true
|
|
||||||
|
|
||||||
|
p.runHandlers()
|
||||||
p.activateWait.L.Unlock()
|
p.activateWait.L.Unlock()
|
||||||
p.activateWait.Broadcast()
|
p.activateWait.Broadcast()
|
||||||
return p.activateErr
|
return p.activateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runHandlers runs the registered handlers for the implemented plugin types
|
||||||
|
// This should only be run after activation, and while the activation lock is held.
|
||||||
|
func (p *Plugin) runHandlers() {
|
||||||
|
if !p.activated() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers.RLock()
|
||||||
|
if !p.handlersRun {
|
||||||
|
for _, iface := range p.Manifest.Implements {
|
||||||
|
hdlrs, handled := handlers.extpointHandlers[iface]
|
||||||
|
if !handled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, handler := range hdlrs {
|
||||||
|
handler(p.name, p.client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.handlersRun = true
|
||||||
|
}
|
||||||
|
handlers.RUnlock()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// activated returns if the plugin has already been activated.
|
||||||
|
// This should only be called with the activation lock held
|
||||||
|
func (p *Plugin) activated() bool {
|
||||||
|
return p.Manifest != nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Plugin) activateWithLock() error {
|
func (p *Plugin) activateWithLock() error {
|
||||||
c, err := NewClient(p.Addr, p.TLSConfig)
|
c, err := NewClient(p.Addr, p.TLSConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -138,24 +170,12 @@ func (p *Plugin) activateWithLock() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Manifest = m
|
p.Manifest = m
|
||||||
|
|
||||||
handlers.RLock()
|
|
||||||
for _, iface := range m.Implements {
|
|
||||||
hdlrs, handled := handlers.extpointHandlers[iface]
|
|
||||||
if !handled {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, handler := range hdlrs {
|
|
||||||
handler(p.name, p.client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
handlers.RUnlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plugin) waitActive() error {
|
func (p *Plugin) waitActive() error {
|
||||||
p.activateWait.L.Lock()
|
p.activateWait.L.Lock()
|
||||||
for !p.activated {
|
for !p.activated() {
|
||||||
p.activateWait.Wait()
|
p.activateWait.Wait()
|
||||||
}
|
}
|
||||||
p.activateWait.L.Unlock()
|
p.activateWait.L.Unlock()
|
||||||
|
@ -163,7 +183,7 @@ func (p *Plugin) waitActive() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plugin) implements(kind string) bool {
|
func (p *Plugin) implements(kind string) bool {
|
||||||
if err := p.waitActive(); err != nil {
|
if p.Manifest == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, driver := range p.Manifest.Implements {
|
for _, driver := range p.Manifest.Implements {
|
||||||
|
@ -232,7 +252,7 @@ func Get(name, imp string) (*Plugin, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pl.implements(imp) {
|
if err := pl.waitActive(); err == nil && pl.implements(imp) {
|
||||||
logrus.Debugf("%s implements: %s", name, imp)
|
logrus.Debugf("%s implements: %s", name, imp)
|
||||||
return pl, nil
|
return pl, nil
|
||||||
}
|
}
|
||||||
|
@ -249,9 +269,17 @@ func Handle(iface string, fn func(string, *Client)) {
|
||||||
|
|
||||||
hdlrs = append(hdlrs, fn)
|
hdlrs = append(hdlrs, fn)
|
||||||
handlers.extpointHandlers[iface] = hdlrs
|
handlers.extpointHandlers[iface] = hdlrs
|
||||||
|
|
||||||
|
storage.Lock()
|
||||||
for _, p := range storage.plugins {
|
for _, p := range storage.plugins {
|
||||||
p.activated = false
|
p.activateWait.L.Lock()
|
||||||
|
if p.activated() && p.implements(iface) {
|
||||||
|
p.handlersRun = false
|
||||||
|
}
|
||||||
|
p.activateWait.L.Unlock()
|
||||||
}
|
}
|
||||||
|
storage.Unlock()
|
||||||
|
|
||||||
handlers.Unlock()
|
handlers.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +320,7 @@ func GetAll(imp string) ([]*Plugin, error) {
|
||||||
logrus.Error(pl.err)
|
logrus.Error(pl.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if pl.pl.implements(imp) {
|
if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
|
||||||
out = append(out, pl.pl)
|
out = append(out, pl.pl)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue