Create extpoint for graphdrivers

Allows people to create out-of-process graphdrivers that can be used
with Docker.

Extensions must be started before Docker otherwise Docker will fail to
start.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2015-06-05 15:09:53 -05:00
parent 9f517fc5bb
commit b78e4216a2
10 changed files with 967 additions and 17 deletions

View File

@ -111,6 +111,9 @@ func GetDriver(name, home string, options []string) (Driver, error) {
if initFunc, exists := drivers[name]; exists {
return initFunc(filepath.Join(home, name), options)
}
if pluginDriver, err := lookupPlugin(name, home, options); err == nil {
return pluginDriver, nil
}
logrus.Errorf("Failed to GetDriver graph %s %s", name, home)
return nil, ErrNotSupported
}

View File

@ -0,0 +1,33 @@
// +build experimental
// +build daemon
package graphdriver
import (
"fmt"
"io"
"github.com/docker/docker/pkg/plugins"
)
type pluginClient interface {
// Call calls the specified method with the specified arguments for the plugin.
Call(string, interface{}, interface{}) error
// Stream calls the specified method with the specified arguments for the plugin and returns the response IO stream
Stream(string, interface{}) (io.ReadCloser, error)
// SendFile calls the specified method, and passes through the IO stream
SendFile(string, io.Reader, interface{}) error
}
func lookupPlugin(name, home string, opts []string) (Driver, error) {
pl, err := plugins.Get(name, "GraphDriver")
if err != nil {
return nil, fmt.Errorf("Error looking up graphdriver plugin %s: %v", name, err)
}
return newPluginDriver(name, home, opts, pl.Client)
}
func newPluginDriver(name, home string, opts []string, c pluginClient) (Driver, error) {
proxy := &graphDriverProxy{name, c}
return proxy, proxy.Init(home, opts)
}

View File

@ -0,0 +1,7 @@
// +build !experimental
package graphdriver
func lookupPlugin(name, home string, opts []string) (Driver, error) {
return nil, ErrNotSupported
}

210
daemon/graphdriver/proxy.go Normal file
View File

@ -0,0 +1,210 @@
// +build experimental
// +build daemon
package graphdriver
import (
"errors"
"fmt"
"github.com/docker/docker/pkg/archive"
)
type graphDriverProxy struct {
name string
client pluginClient
}
type graphDriverRequest struct {
ID string `json:",omitempty"`
Parent string `json:",omitempty"`
MountLabel string `json:",omitempty"`
}
type graphDriverResponse struct {
Err string `json:",omitempty"`
Dir string `json:",omitempty"`
Exists bool `json:",omitempty"`
Status [][2]string `json:",omitempty"`
Changes []archive.Change `json:",omitempty"`
Size int64 `json:",omitempty"`
Metadata map[string]string `json:",omitempty"`
}
type graphDriverInitRequest struct {
Home string
Opts []string
}
func (d *graphDriverProxy) Init(home string, opts []string) error {
args := &graphDriverInitRequest{
Home: home,
Opts: opts,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Init", args, &ret); err != nil {
return err
}
if ret.Err != "" {
return errors.New(ret.Err)
}
return nil
}
func (d *graphDriverProxy) String() string {
return d.name
}
func (d *graphDriverProxy) Create(id, parent string) error {
args := &graphDriverRequest{
ID: id,
Parent: parent,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Create", args, &ret); err != nil {
return err
}
if ret.Err != "" {
return errors.New(ret.Err)
}
return nil
}
func (d *graphDriverProxy) Remove(id string) error {
args := &graphDriverRequest{ID: id}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Remove", args, &ret); err != nil {
return err
}
if ret.Err != "" {
return errors.New(ret.Err)
}
return nil
}
func (d *graphDriverProxy) Get(id, mountLabel string) (string, error) {
args := &graphDriverRequest{
ID: id,
MountLabel: mountLabel,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Get", args, &ret); err != nil {
return "", err
}
var err error
if ret.Err != "" {
err = errors.New(ret.Err)
}
return ret.Dir, err
}
func (d *graphDriverProxy) Put(id string) error {
args := &graphDriverRequest{ID: id}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Put", args, &ret); err != nil {
return err
}
if ret.Err != "" {
return errors.New(ret.Err)
}
return nil
}
func (d *graphDriverProxy) Exists(id string) bool {
args := &graphDriverRequest{ID: id}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Exists", args, &ret); err != nil {
return false
}
return ret.Exists
}
func (d *graphDriverProxy) Status() [][2]string {
args := &graphDriverRequest{}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Status", args, &ret); err != nil {
return nil
}
return ret.Status
}
func (d *graphDriverProxy) GetMetadata(id string) (map[string]string, error) {
args := &graphDriverRequest{
ID: id,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.GetMetadata", args, &ret); err != nil {
return nil, err
}
if ret.Err != "" {
return nil, errors.New(ret.Err)
}
return ret.Metadata, nil
}
func (d *graphDriverProxy) Cleanup() error {
args := &graphDriverRequest{}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Cleanup", args, &ret); err != nil {
return nil
}
if ret.Err != "" {
return errors.New(ret.Err)
}
return nil
}
func (d *graphDriverProxy) Diff(id, parent string) (archive.Archive, error) {
args := &graphDriverRequest{
ID: id,
Parent: parent,
}
body, err := d.client.Stream("GraphDriver.Diff", args)
if err != nil {
body.Close()
return nil, err
}
return archive.Archive(body), nil
}
func (d *graphDriverProxy) Changes(id, parent string) ([]archive.Change, error) {
args := &graphDriverRequest{
ID: id,
Parent: parent,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.Changes", args, &ret); err != nil {
return nil, err
}
if ret.Err != "" {
return nil, errors.New(ret.Err)
}
return ret.Changes, nil
}
func (d *graphDriverProxy) ApplyDiff(id, parent string, diff archive.Reader) (int64, error) {
var ret graphDriverResponse
if err := d.client.SendFile(fmt.Sprintf("GraphDriver.ApplyDiff?id=%s&parent=%s", id, parent), diff, &ret); err != nil {
return -1, err
}
if ret.Err != "" {
return -1, errors.New(ret.Err)
}
return ret.Size, nil
}
func (d *graphDriverProxy) DiffSize(id, parent string) (int64, error) {
args := &graphDriverRequest{
ID: id,
Parent: parent,
}
var ret graphDriverResponse
if err := d.client.Call("GraphDriver.DiffSize", args, &ret); err != nil {
return -1, err
}
if ret.Err != "" {
return -1, errors.New(ret.Err)
}
return ret.Size, nil
}

View File

@ -0,0 +1,321 @@
# Experimental: Docker graph driver plugins
Docker graph driver plugins enable admins to use an external/out-of-process
graph driver for use with Docker engine. This is an alternative to using the
built-in storage drivers, such as aufs/overlay/devicemapper/btrfs.
A graph driver plugin is used for image and container fs storage, as such
the plugin must be started and available for connections prior to Docker Engine
being started.
# Write a graph driver plugin
See the [plugin documentation](/docs/extend/plugins.md) for detailed information
on the underlying plugin protocol.
## Graph Driver plugin protocol
If a plugin registers itself as a `GraphDriver` when activated, then it is
expected to provide the rootfs for containers as well as image layer storage.
### /GraphDriver.Init
**Request**:
```
{
"Home": "/graph/home/path",
"Opts": []
}
```
Initialize the graph driver plugin with a home directory and array of options.
Plugins are not required to accept these options as the Docker Engine does not
require that the plugin use this path or options, they are only being passed
through from the user.
**Response**:
```
{
"Err": null
}
```
Respond with a string error if an error occurred.
### /GraphDriver.Create
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187",
"Parent": "2cd9c322cb78a55e8212aa3ea8425a4180236d7106938ec921d0935a4b8ca142"
}
```
Create a new, empty, filesystem layer with the specified `ID` and `Parent`.
`Parent` may be an empty string, which would indicate that there is no parent
layer.
**Response**:
```
{
"Err: null
}
```
Respond with a string error if an error occurred.
### /GraphDriver.Remove
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187"
}
```
Remove the filesystem layer with this given `ID`.
**Response**:
```
{
"Err: null
}
```
Respond with a string error if an error occurred.
### /GraphDriver.Get
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187"
"MountLabel": ""
}
```
Get the mountpoint for the layered filesystem referred to by the given `ID`.
**Response**:
```
{
"Dir": "/var/mygraph/46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187",
"Err": ""
}
```
Respond with the absolute path to the mounted layered filesystem.
Respond with a string error if an error occurred.
### /GraphDriver.Put
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187"
}
```
Release the system resources for the specified `ID`, such as unmounting the
filesystem layer.
**Response**:
```
{
"Err: null
}
```
Respond with a string error if an error occurred.
### /GraphDriver.Exists
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187"
}
```
Determine if a filesystem layer with the specified `ID` exists.
**Response**:
```
{
"Exists": true
}
```
Respond with a boolean for whether or not the filesystem layer with the specified
`ID` exists.
### /GraphDriver.Status
**Request**:
```
{}
```
Get low-level diagnostic information about the graph driver.
**Response**:
```
{
"Status": [[]]
}
```
Respond with a 2-D array with key/value pairs for the underlying status
information.
### /GraphDriver.GetMetadata
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187"
}
```
Get low-level diagnostic information about the layered filesystem with the
with the specified `ID`
**Response**:
```
{
"Metadata": {},
"Err": null
}
```
Respond with a set of key/value pairs containing the low-level diagnostic
information about the layered filesystem.
Respond with a string error if an error occurred.
### /GraphDriver.Cleanup
**Request**:
```
{}
```
Perform neccessary tasks to release resources help by the plugin, for example
unmounting all the layered file systems.
**Response**:
```
{
"Err: null
}
```
Respond with a string error if an error occurred.
### /GraphDriver.Diff
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187",
"Parent": "2cd9c322cb78a55e8212aa3ea8425a4180236d7106938ec921d0935a4b8ca142"
}
```
Get an archive of the changes between the filesystem layers specified by the `ID`
and `Parent`. `Parent` may be an empty string, in which case there is no parent.
**Response**:
```
{{ TAR STREAM }}
```
### /GraphDriver.Changes
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187",
"Parent": "2cd9c322cb78a55e8212aa3ea8425a4180236d7106938ec921d0935a4b8ca142"
}
```
Get a list of changes between the filesystem layers specified by the `ID` and
`Parent`. `Parent` may be an empty string, in which case there is no parent.
**Response**:
```
{
"Changes": [{}],
"Err": null
}
```
Responds with a list of changes. The structure of a change is:
```
"Path": "/some/path",
"Kind": 0,
```
Where teh `Path` is the filesystem path within the layered filesystem that is
changed and `Kind` is an integer specifying the type of change that occurred:
- 0 - Modified
- 1 - Added
- 2 - Deleted
Respond with a string error if an error occurred.
### /GraphDriver.ApplyDiff
**Request**:
```
{{ TAR STREAM }}
```
Extract the changeset from the given diff into the layer with the specified `ID`
and `Parent`
**Query Parameters**:
- id (required)- the `ID` of the new filesystem layer to extract the diff to
- parent (required)- the `Parent` of the given `ID`
**Response**:
```
{
"Size": 512366,
"Err": null
}
```
Respond with the size of the new layer in bytes.
Respond with a string error if an error occurred.
### /GraphDriver.DiffSize
**Request**:
```
{
"ID": "46fe8644f2572fd1e505364f7581e0c9dbc7f14640bd1fb6ce97714fb6fc5187",
"Parent": "2cd9c322cb78a55e8212aa3ea8425a4180236d7106938ec921d0935a4b8ca142"
}
```
Calculate the changes between the specified `ID`
**Response**:
```
{
"Size": 512366,
"Err": null
}
```
Respond with the size changes between the specified `ID` and `Parent`
Respond with a string error if an error occurred.

View File

@ -1,12 +1,22 @@
package main
import (
"fmt"
"testing"
"github.com/docker/docker/pkg/reexec"
"github.com/go-check/check"
)
func Test(t *testing.T) {
reexec.Init() // This is required for external graphdriver tests
if !isLocalDaemon {
fmt.Println("INFO: Testing against a remote daemon")
} else {
fmt.Println("INFO: Testing against a local daemon")
}
check.TestingT(t)
}

View File

@ -0,0 +1,348 @@
// +build experimental
// +build !windows
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"strings"
"github.com/docker/docker/daemon/graphdriver"
"github.com/docker/docker/daemon/graphdriver/vfs"
"github.com/docker/docker/pkg/archive"
"github.com/go-check/check"
)
func init() {
check.Suite(&DockerExternalGraphdriverSuite{
ds: &DockerSuite{},
})
}
type DockerExternalGraphdriverSuite struct {
server *httptest.Server
ds *DockerSuite
d *Daemon
ec *graphEventsCounter
}
type graphEventsCounter struct {
activations int
creations int
removals int
gets int
puts int
stats int
cleanups int
exists int
init int
metadata int
diff int
applydiff int
changes int
diffsize int
}
func (s *DockerExternalGraphdriverSuite) SetUpTest(c *check.C) {
s.d = NewDaemon(c)
s.ec = &graphEventsCounter{}
}
func (s *DockerExternalGraphdriverSuite) TearDownTest(c *check.C) {
s.d.Stop()
s.ds.TearDownTest(c)
}
func (s *DockerExternalGraphdriverSuite) SetUpSuite(c *check.C) {
mux := http.NewServeMux()
s.server = httptest.NewServer(mux)
type graphDriverRequest struct {
ID string `json:",omitempty"`
Parent string `json:",omitempty"`
MountLabel string `json:",omitempty"`
}
type graphDriverResponse struct {
Err error `json:",omitempty"`
Dir string `json:",omitempty"`
Exists bool `json:",omitempty"`
Status [][2]string `json:",omitempty"`
Metadata map[string]string `json:",omitempty"`
Changes []archive.Change `json:",omitempty"`
Size int64 `json:",omitempty"`
}
respond := func(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "appplication/vnd.docker.plugins.v1+json")
switch t := data.(type) {
case error:
fmt.Fprintln(w, fmt.Sprintf(`{"Err": %s}`, t.Error()))
case string:
fmt.Fprintln(w, t)
default:
json.NewEncoder(w).Encode(&data)
}
}
base, err := ioutil.TempDir("", "external-graph-test")
c.Assert(err, check.IsNil)
vfsProto, err := vfs.Init(base, []string{})
if err != nil {
c.Fatalf("error initializing graph driver: %v", err)
}
driver := graphdriver.NaiveDiffDriver(vfsProto)
mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
s.ec.activations++
respond(w, `{"Implements": ["GraphDriver"]}`)
})
mux.HandleFunc("/GraphDriver.Init", func(w http.ResponseWriter, r *http.Request) {
s.ec.init++
respond(w, "{}")
})
mux.HandleFunc("/GraphDriver.Create", func(w http.ResponseWriter, r *http.Request) {
s.ec.creations++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
if err := driver.Create(req.ID, req.Parent); err != nil {
respond(w, err)
return
}
respond(w, "{}")
})
mux.HandleFunc("/GraphDriver.Remove", func(w http.ResponseWriter, r *http.Request) {
s.ec.removals++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
if err := driver.Remove(req.ID); err != nil {
respond(w, err)
return
}
respond(w, "{}")
})
mux.HandleFunc("/GraphDriver.Get", func(w http.ResponseWriter, r *http.Request) {
s.ec.gets++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
}
dir, err := driver.Get(req.ID, req.MountLabel)
if err != nil {
respond(w, err)
return
}
respond(w, &graphDriverResponse{Dir: dir})
})
mux.HandleFunc("/GraphDriver.Put", func(w http.ResponseWriter, r *http.Request) {
s.ec.puts++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
if err := driver.Put(req.ID); err != nil {
respond(w, err)
return
}
respond(w, "{}")
})
mux.HandleFunc("/GraphDriver.Exists", func(w http.ResponseWriter, r *http.Request) {
s.ec.exists++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
respond(w, &graphDriverResponse{Exists: driver.Exists(req.ID)})
})
mux.HandleFunc("/GraphDriver.Status", func(w http.ResponseWriter, r *http.Request) {
s.ec.stats++
respond(w, `{"Status":{}}`)
})
mux.HandleFunc("/GraphDriver.Cleanup", func(w http.ResponseWriter, r *http.Request) {
s.ec.cleanups++
err := driver.Cleanup()
if err != nil {
respond(w, err)
return
}
respond(w, `{}`)
})
mux.HandleFunc("/GraphDriver.GetMetadata", func(w http.ResponseWriter, r *http.Request) {
s.ec.metadata++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
data, err := driver.GetMetadata(req.ID)
if err != nil {
respond(w, err)
return
}
respond(w, &graphDriverResponse{Metadata: data})
})
mux.HandleFunc("/GraphDriver.Diff", func(w http.ResponseWriter, r *http.Request) {
s.ec.diff++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
diff, err := driver.Diff(req.ID, req.Parent)
if err != nil {
respond(w, err)
return
}
io.Copy(w, diff)
})
mux.HandleFunc("/GraphDriver.Changes", func(w http.ResponseWriter, r *http.Request) {
s.ec.changes++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
changes, err := driver.Changes(req.ID, req.Parent)
if err != nil {
respond(w, err)
return
}
respond(w, &graphDriverResponse{Changes: changes})
})
mux.HandleFunc("/GraphDriver.ApplyDiff", func(w http.ResponseWriter, r *http.Request) {
s.ec.applydiff++
id := r.URL.Query().Get("id")
parent := r.URL.Query().Get("parent")
size, err := driver.ApplyDiff(id, parent, r.Body)
if err != nil {
respond(w, err)
return
}
respond(w, &graphDriverResponse{Size: size})
})
mux.HandleFunc("/GraphDriver.DiffSize", func(w http.ResponseWriter, r *http.Request) {
s.ec.diffsize++
var req graphDriverRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 500)
return
}
size, err := driver.DiffSize(req.ID, req.Parent)
if err != nil {
respond(w, err)
return
}
respond(w, &graphDriverResponse{Size: size})
})
if err := os.MkdirAll("/etc/docker/plugins", 0755); err != nil {
c.Fatal(err)
}
if err := ioutil.WriteFile("/etc/docker/plugins/test-external-graph-driver.spec", []byte(s.server.URL), 0644); err != nil {
c.Fatal(err)
}
}
func (s *DockerExternalGraphdriverSuite) TearDownSuite(c *check.C) {
s.server.Close()
if err := os.RemoveAll("/etc/docker/plugins"); err != nil {
c.Fatal(err)
}
}
func (s *DockerExternalGraphdriverSuite) TestExternalGraphDriver(c *check.C) {
c.Assert(s.d.StartWithBusybox("-s", "test-external-graph-driver"), check.IsNil)
out, err := s.d.Cmd("run", "-d", "--name=graphtest", "busybox", "sh", "-c", "echo hello > /hello")
c.Assert(err, check.IsNil, check.Commentf(out))
err = s.d.Restart("-s", "test-external-graph-driver")
out, err = s.d.Cmd("inspect", "--format='{{.GraphDriver.Name}}'", "graphtest")
c.Assert(err, check.IsNil, check.Commentf(out))
c.Assert(strings.TrimSpace(out), check.Equals, "test-external-graph-driver")
out, err = s.d.Cmd("diff", "graphtest")
c.Assert(err, check.IsNil, check.Commentf(out))
c.Assert(strings.Contains(out, "A /hello"), check.Equals, true)
out, err = s.d.Cmd("rm", "-f", "graphtest")
c.Assert(err, check.IsNil, check.Commentf(out))
out, err = s.d.Cmd("info")
c.Assert(err, check.IsNil, check.Commentf(out))
err = s.d.Stop()
c.Assert(err, check.IsNil)
c.Assert(s.ec.activations, check.Equals, 2)
c.Assert(s.ec.init, check.Equals, 2)
c.Assert(s.ec.creations >= 1, check.Equals, true)
c.Assert(s.ec.removals >= 1, check.Equals, true)
c.Assert(s.ec.gets >= 1, check.Equals, true)
c.Assert(s.ec.puts >= 1, check.Equals, true)
c.Assert(s.ec.stats, check.Equals, 1)
c.Assert(s.ec.cleanups, check.Equals, 2)
c.Assert(s.ec.exists >= 1, check.Equals, true)
c.Assert(s.ec.applydiff >= 1, check.Equals, true)
c.Assert(s.ec.changes, check.Equals, 1)
c.Assert(s.ec.diffsize, check.Equals, 0)
c.Assert(s.ec.diff, check.Equals, 0)
c.Assert(s.ec.metadata, check.Equals, 1)
}
func (s *DockerExternalGraphdriverSuite) TestExternalGraphDriverPull(c *check.C) {
testRequires(c, Network)
c.Assert(s.d.Start(), check.IsNil)
out, err := s.d.Cmd("pull", "busybox:latest")
c.Assert(err, check.IsNil, check.Commentf(out))
out, err = s.d.Cmd("run", "-d", "busybox", "top")
c.Assert(err, check.IsNil, check.Commentf(out))
}

View File

@ -68,11 +68,8 @@ func init() {
// Similarly, it will be perfectly valid to also run CLI tests from
// a Linux CLI (built with the daemon tag) against a Windows daemon.
if len(os.Getenv("DOCKER_REMOTE_DAEMON")) > 0 {
fmt.Println("INFO: Testing against a remote daemon")
isLocalDaemon = false
} else {
fmt.Println("INFO: Testing against a local daemon")
isLocalDaemon = true
}
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
@ -52,19 +53,41 @@ type Client struct {
// Call calls the specified method with the specified arguments for the plugin.
// It will retry for 30 seconds if a failure occurs when calling.
func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
return c.callWithRetry(serviceMethod, args, ret, true)
}
func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret interface{}, retry bool) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(args); err != nil {
return err
}
req, err := http.NewRequest("POST", "/"+serviceMethod, &buf)
body, err := c.callWithRetry(serviceMethod, &buf, true)
if err != nil {
return err
}
defer body.Close()
return json.NewDecoder(body).Decode(&ret)
}
// Stream calls the specified method with the specified arguments for the plugin and returns the response body
func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(args); err != nil {
return nil, err
}
return c.callWithRetry(serviceMethod, &buf, true)
}
// SendFile calls the specified method, and passes through the IO stream
func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{}) error {
body, err := c.callWithRetry(serviceMethod, data, true)
if err != nil {
return err
}
return json.NewDecoder(body).Decode(&ret)
}
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
req, err := http.NewRequest("POST", "/"+serviceMethod, data)
if err != nil {
return nil, err
}
req.Header.Add("Accept", versionMimetype)
req.URL.Scheme = "http"
req.URL.Host = c.addr
@ -76,12 +99,12 @@ func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret inter
resp, err := c.http.Do(req)
if err != nil {
if !retry {
return err
return nil, err
}
timeOff := backoff(retries)
if abort(start, timeOff) {
return err
return nil, err
}
retries++
logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", c.addr, timeOff)
@ -89,16 +112,14 @@ func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret inter
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
remoteErr, err := ioutil.ReadAll(resp.Body)
if err != nil {
return &remoteError{err.Error(), serviceMethod}
return nil, &remoteError{err.Error(), serviceMethod}
}
return &remoteError{string(remoteErr), serviceMethod}
return nil, &remoteError{string(remoteErr), serviceMethod}
}
return json.NewDecoder(resp.Body).Decode(&ret)
return resp.Body, nil
}
}

View File

@ -30,7 +30,7 @@ func teardownRemotePluginServer() {
func TestFailedConnection(t *testing.T) {
c, _ := NewClient("tcp://127.0.0.1:1", tlsconfig.Options{InsecureSkipVerify: true})
err := c.callWithRetry("Service.Method", nil, nil, false)
_, err := c.callWithRetry("Service.Method", nil, false)
if err == nil {
t.Fatal("Unexpected successful connection")
}