diff --git a/pkg/plugins/client.go b/pkg/plugins/client.go new file mode 100644 index 0000000000..ab429f0689 --- /dev/null +++ b/pkg/plugins/client.go @@ -0,0 +1,100 @@ +package plugins + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +const ( + versionMimetype = "appplication/vnd.docker.plugins.v1+json" + defaultTimeOut = 30 +) + +func NewClient(addr string) *Client { + tr := &http.Transport{} + protoAndAddr := strings.Split(addr, "://") + configureTCPTransport(tr, protoAndAddr[0], protoAndAddr[1]) + return &Client{&http.Client{Transport: tr}, protoAndAddr[1]} +} + +type Client struct { + http *http.Client + addr string +} + +func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(args); err != nil { + return err + } + + req, err := http.NewRequest("POST", "/"+serviceMethod, &buf) + if err != nil { + return err + } + req.Header.Add("Accept", versionMimetype) + req.URL.Scheme = "http" + req.URL.Host = c.addr + + var retries int + start := time.Now() + + for { + resp, err := c.http.Do(req) + if err != nil { + timeOff := backoff(retries) + if timeOff+time.Since(start) > defaultTimeOut { + return err + } + retries++ + logrus.Warn("Unable to connect to plugin: %s, retrying in %ds\n", c.addr, timeOff) + time.Sleep(timeOff) + continue + } + + if resp.StatusCode != http.StatusOK { + remoteErr, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil + } + return fmt.Errorf("Plugin Error: %s", remoteErr) + } + + return json.NewDecoder(resp.Body).Decode(&ret) + } +} + +func backoff(retries int) time.Duration { + b, max := float64(1), float64(defaultTimeOut) + for b < max && retries > 0 { + b *= 2 + retries-- + } + if b > max { + b = max + } + return time.Duration(b) +} + +func configureTCPTransport(tr *http.Transport, proto, addr string) { + // Why 32? See https://github.com/docker/docker/pull/8035. + timeout := 32 * time.Second + if proto == "unix" { + // No need for compression in local communications. + tr.DisableCompression = true + tr.Dial = func(_, _ string) (net.Conn, error) { + return net.DialTimeout(proto, addr, timeout) + } + } else { + tr.Proxy = http.ProxyFromEnvironment + tr.Dial = (&net.Dialer{Timeout: timeout}).Dial + } +} diff --git a/pkg/plugins/client_test.go b/pkg/plugins/client_test.go new file mode 100644 index 0000000000..b414ecb5d9 --- /dev/null +++ b/pkg/plugins/client_test.go @@ -0,0 +1,63 @@ +package plugins + +import ( + "io" + "net/http" + "net/http/httptest" + "reflect" + "testing" +) + +var ( + mux *http.ServeMux + server *httptest.Server +) + +func setupRemotePluginServer() string { + mux = http.NewServeMux() + server = httptest.NewServer(mux) + return server.URL +} + +func teardownRemotePluginServer() { + if server != nil { + server.Close() + } +} + +func TestFailedConnection(t *testing.T) { + c := NewClient("tcp://127.0.0.1:1") + err := c.Call("Service.Method", nil, nil) + if err == nil { + t.Fatal("Unexpected successful connection") + } +} + +func TestEchoInputOutput(t *testing.T) { + addr := setupRemotePluginServer() + defer teardownRemotePluginServer() + + m := Manifest{[]string{"VolumeDriver", "NetworkDriver"}} + + mux.HandleFunc("/Test.Echo", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Fatalf("Expected POST, got %s\n", r.Method) + } + + header := w.Header() + header.Set("Content-Type", versionMimetype) + + io.Copy(w, r.Body) + }) + + c := NewClient(addr) + var output Manifest + err := c.Call("Test.Echo", m, &output) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(output, m) { + t.Fatalf("Expected %v, was %v\n", m, output) + } +} diff --git a/pkg/plugins/discovery.go b/pkg/plugins/discovery.go new file mode 100644 index 0000000000..3a42ba6d17 --- /dev/null +++ b/pkg/plugins/discovery.go @@ -0,0 +1,78 @@ +package plugins + +import ( + "errors" + "fmt" + "io/ioutil" + "net/url" + "os" + "path/filepath" + "strings" +) + +const defaultLocalRegistry = "/usr/share/docker/plugins" + +var ( + ErrNotFound = errors.New("Plugin not found") +) + +type Registry interface { + Plugins() ([]*Plugin, error) + Plugin(name string) (*Plugin, error) +} + +type LocalRegistry struct { + path string +} + +func newLocalRegistry(path string) *LocalRegistry { + if len(path) == 0 { + path = defaultLocalRegistry + } + + return &LocalRegistry{path} +} + +func (l *LocalRegistry) Plugin(name string) (*Plugin, error) { + filepath := filepath.Join(l.path, name) + specpath := filepath + ".spec" + if fi, err := os.Stat(specpath); err == nil { + return readPluginInfo(specpath, fi) + } + socketpath := filepath + ".sock" + if fi, err := os.Stat(socketpath); err == nil { + return readPluginInfo(socketpath, fi) + } + return nil, ErrNotFound +} + +func readPluginInfo(path string, fi os.FileInfo) (*Plugin, error) { + name := strings.Split(fi.Name(), ".")[0] + + if fi.Mode()&os.ModeSocket != 0 { + return &Plugin{ + Name: name, + Addr: "unix://" + path, + }, nil + } + + content, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + addr := strings.TrimSpace(string(content)) + + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + + if len(u.Scheme) == 0 { + return nil, fmt.Errorf("Unknown protocol") + } + + return &Plugin{ + Name: name, + Addr: addr, + }, nil +} diff --git a/pkg/plugins/discovery_test.go b/pkg/plugins/discovery_test.go new file mode 100644 index 0000000000..b6e66e289c --- /dev/null +++ b/pkg/plugins/discovery_test.go @@ -0,0 +1,108 @@ +package plugins + +import ( + "fmt" + "io/ioutil" + "net" + "os" + "path" + "path/filepath" + "reflect" + "testing" +) + +func TestUnknownLocalPath(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "docker-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpdir) + + l := newLocalRegistry(filepath.Join(tmpdir, "unknown")) + _, err = l.Plugin("foo") + if err == nil || err != ErrNotFound { + t.Fatalf("Expected error for unknown directory") + } +} + +func TestLocalSocket(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "docker-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpdir) + l, err := net.Listen("unix", filepath.Join(tmpdir, "echo.sock")) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + r := newLocalRegistry(tmpdir) + p, err := r.Plugin("echo") + if err != nil { + t.Fatal(err) + } + + pp, err := r.Plugin("echo") + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(p, pp) { + t.Fatalf("Expected %v, was %v\n", p, pp) + } + + if p.Name != "echo" { + t.Fatalf("Expected plugin `echo`, got %s\n", p.Name) + } + + addr := fmt.Sprintf("unix://%s/echo.sock", tmpdir) + if p.Addr != addr { + t.Fatalf("Expected plugin addr `%s`, got %s\n", addr, p.Addr) + } +} + +func TestFileSpecPlugin(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "docker-test") + if err != nil { + t.Fatal(err) + } + + cases := []struct { + path string + name string + addr string + fail bool + }{ + {filepath.Join(tmpdir, "echo.spec"), "echo", "unix://var/lib/docker/plugins/echo.sock", false}, + {filepath.Join(tmpdir, "foo.spec"), "foo", "tcp://localhost:8080", false}, + {filepath.Join(tmpdir, "bar.spec"), "bar", "localhost:8080", true}, // unknown transport + } + + for _, c := range cases { + if err = os.MkdirAll(path.Dir(c.path), 0755); err != nil { + t.Fatal(err) + } + if err = ioutil.WriteFile(c.path, []byte(c.addr), 0644); err != nil { + t.Fatal(err) + } + + r := newLocalRegistry(tmpdir) + p, err := r.Plugin(c.name) + if c.fail && err == nil { + continue + } + + if err != nil { + t.Fatal(err) + } + + if p.Name != c.name { + t.Fatalf("Expected plugin `%s`, got %s\n", c.name, p.Name) + } + + if p.Addr != c.addr { + t.Fatalf("Expected plugin addr `%s`, got %s\n", c.addr, p.Addr) + } + os.Remove(c.path) + } +} diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go new file mode 100644 index 0000000000..47519486bd --- /dev/null +++ b/pkg/plugins/plugins.go @@ -0,0 +1,100 @@ +package plugins + +import ( + "errors" + "sync" + + "github.com/Sirupsen/logrus" +) + +var ( + ErrNotImplements = errors.New("Plugin does not implement the requested driver") +) + +type plugins struct { + sync.Mutex + plugins map[string]*Plugin +} + +var ( + storage = plugins{plugins: make(map[string]*Plugin)} + extpointHandlers = make(map[string]func(string, *Client)) +) + +type Manifest struct { + Implements []string +} + +type Plugin struct { + Name string + Addr string + Client *Client + Manifest *Manifest +} + +func (p *Plugin) activate() error { + m := new(Manifest) + p.Client = NewClient(p.Addr) + err := p.Client.Call("Plugin.Activate", nil, m) + if err != nil { + return err + } + + logrus.Debugf("%s's manifest: %v", p.Name, m) + p.Manifest = m + for _, iface := range m.Implements { + handler, handled := extpointHandlers[iface] + if !handled { + continue + } + handler(p.Name, p.Client) + } + return nil +} + +func load(name string) (*Plugin, error) { + registry := newLocalRegistry("") + pl, err := registry.Plugin(name) + if err != nil { + return nil, err + } + if err := pl.activate(); err != nil { + return nil, err + } + return pl, nil +} + +func get(name string) (*Plugin, error) { + storage.Lock() + defer storage.Unlock() + pl, ok := storage.plugins[name] + if ok { + return pl, nil + } + pl, err := load(name) + if err != nil { + return nil, err + } + + logrus.Debugf("Plugin: %v", pl) + storage.plugins[name] = pl + return pl, nil +} + +func Get(name, imp string) (*Plugin, error) { + pl, err := get(name) + if err != nil { + return nil, err + } + for _, driver := range pl.Manifest.Implements { + logrus.Debugf("%s implements: %s", name, driver) + if driver == imp { + return pl, nil + } + } + return nil, ErrNotImplements +} + +func Handle(iface string, fn func(string, *Client)) { + extpointHandlers[iface] = fn +}