diff --git a/pkg/plugins/client.go b/pkg/plugins/client.go index 9ee1f89976..63bdc81ea1 100644 --- a/pkg/plugins/client.go +++ b/pkg/plugins/client.go @@ -2,6 +2,7 @@ package plugins import ( "bytes" + "context" "encoding/json" "io" "io/ioutil" @@ -9,6 +10,7 @@ import ( "net/url" "time" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/plugins/transport" "github.com/docker/go-connections/sockets" "github.com/docker/go-connections/tlsconfig" @@ -82,16 +84,33 @@ type Client struct { requestFactory transport.RequestFactory } +// RequestOpts is the set of options that can be passed into a request +type RequestOpts struct { + Timeout time.Duration +} + +// WithRequestTimeout sets a timeout duration for plugin requests +func WithRequestTimeout(t time.Duration) func(*RequestOpts) { + return func(o *RequestOpts) { + o.Timeout = t + } +} + // Call calls the specified method with the specified arguments for the plugin. // It will retry for 30 seconds if a failure occurs when calling. -func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error { +func (c *Client) Call(serviceMethod string, args, ret interface{}) error { + return c.CallWithOptions(serviceMethod, args, ret) +} + +// CallWithOptions is just like call except it takes options +func (c *Client) CallWithOptions(serviceMethod string, args interface{}, ret interface{}, opts ...func(*RequestOpts)) error { var buf bytes.Buffer if args != nil { if err := json.NewEncoder(&buf).Encode(args); err != nil { return err } } - body, err := c.callWithRetry(serviceMethod, &buf, true) + body, err := c.callWithRetry(serviceMethod, &buf, true, opts...) if err != nil { return err } @@ -128,18 +147,31 @@ func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{}) return nil } -func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) { +func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool, reqOpts ...func(*RequestOpts)) (io.ReadCloser, error) { var retries int start := time.Now() + var opts RequestOpts + for _, o := range reqOpts { + o(&opts) + } + for { req, err := c.requestFactory.NewRequest(serviceMethod, data) if err != nil { return nil, err } + cancelRequest := func() {} + if opts.Timeout > 0 { + var ctx context.Context + ctx, cancelRequest = context.WithTimeout(req.Context(), opts.Timeout) + req = req.WithContext(ctx) + } + resp, err := c.http.Do(req) if err != nil { + cancelRequest() if !retry { return nil, err } @@ -157,6 +189,7 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) if resp.StatusCode != http.StatusOK { b, err := ioutil.ReadAll(resp.Body) resp.Body.Close() + cancelRequest() if err != nil { return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()} } @@ -176,7 +209,11 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) // old way... return nil, &statusError{resp.StatusCode, serviceMethod, string(b)} } - return resp.Body, nil + return ioutils.NewReadCloserWrapper(resp.Body, func() error { + err := resp.Body.Close() + cancelRequest() + return err + }), nil } } diff --git a/volume/drivers/proxy.go b/volume/drivers/proxy.go index b23db6258f..f4020d600f 100644 --- a/volume/drivers/proxy.go +++ b/volume/drivers/proxy.go @@ -4,12 +4,19 @@ package volumedrivers import ( "errors" + "time" + "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/volume" ) +const ( + longTimeout = 2 * time.Minute + shortTimeout = 1 * time.Minute +) + type client interface { - Call(string, interface{}, interface{}) error + CallWithOptions(string, interface{}, interface{}, ...func(*plugins.RequestOpts)) error } type volumeDriverProxy struct { @@ -33,7 +40,8 @@ func (pp *volumeDriverProxy) Create(name string, opts map[string]string) (err er req.Name = name req.Opts = opts - if err = pp.Call("VolumeDriver.Create", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Create", req, &ret, plugins.WithRequestTimeout(longTimeout)); err != nil { return } @@ -59,7 +67,8 @@ func (pp *volumeDriverProxy) Remove(name string) (err error) { ) req.Name = name - if err = pp.Call("VolumeDriver.Remove", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Remove", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return } @@ -86,7 +95,8 @@ func (pp *volumeDriverProxy) Path(name string) (mountpoint string, err error) { ) req.Name = name - if err = pp.Call("VolumeDriver.Path", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Path", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return } @@ -117,7 +127,8 @@ func (pp *volumeDriverProxy) Mount(name string, id string) (mountpoint string, e req.Name = name req.ID = id - if err = pp.Call("VolumeDriver.Mount", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Mount", req, &ret, plugins.WithRequestTimeout(longTimeout)); err != nil { return } @@ -147,7 +158,8 @@ func (pp *volumeDriverProxy) Unmount(name string, id string) (err error) { req.Name = name req.ID = id - if err = pp.Call("VolumeDriver.Unmount", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Unmount", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return } @@ -172,7 +184,7 @@ func (pp *volumeDriverProxy) List() (volumes []*proxyVolume, err error) { ret volumeDriverProxyListResponse ) - if err = pp.Call("VolumeDriver.List", req, &ret); err != nil { + if err = pp.CallWithOptions("VolumeDriver.List", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return } @@ -201,7 +213,8 @@ func (pp *volumeDriverProxy) Get(name string) (volume *proxyVolume, err error) { ) req.Name = name - if err = pp.Call("VolumeDriver.Get", req, &ret); err != nil { + + if err = pp.CallWithOptions("VolumeDriver.Get", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return } @@ -228,7 +241,7 @@ func (pp *volumeDriverProxy) Capabilities() (capabilities volume.Capability, err ret volumeDriverProxyCapabilitiesResponse ) - if err = pp.Call("VolumeDriver.Capabilities", req, &ret); err != nil { + if err = pp.CallWithOptions("VolumeDriver.Capabilities", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil { return }