package resumable // import "github.com/docker/docker/registry/resumable" import ( "fmt" "io" "net/http" "time" "github.com/sirupsen/logrus" ) type requestReader struct { client *http.Client request *http.Request lastRange int64 totalSize int64 currentResponse *http.Response failures uint32 maxFailures uint32 waitDuration time.Duration } // NewRequestReader makes it possible to resume reading a request's body transparently // maxfail is the number of times we retry to make requests again (not resumes) // totalsize is the total length of the body; auto detect if not provided func NewRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser { return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second} } // NewRequestReaderWithInitialResponse makes it possible to resume // reading the body of an already initiated request. func NewRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser { return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second} } func (r *requestReader) Read(p []byte) (n int, err error) { if r.client == nil || r.request == nil { return 0, fmt.Errorf("client and request can't be nil") } isFreshRequest := false if r.lastRange != 0 && r.currentResponse == nil { readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize) r.request.Header.Set("Range", readRange) time.Sleep(r.waitDuration) } if r.currentResponse == nil { r.currentResponse, err = r.client.Do(r.request) isFreshRequest = true } if err != nil && r.failures+1 != r.maxFailures { r.cleanUpResponse() r.failures++ time.Sleep(time.Duration(r.failures) * r.waitDuration) return 0, nil } else if err != nil { r.cleanUpResponse() return 0, err } if r.currentResponse.StatusCode == 416 && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 { r.cleanUpResponse() return 0, io.EOF } else if r.currentResponse.StatusCode != 206 && r.lastRange != 0 && isFreshRequest { r.cleanUpResponse() return 0, fmt.Errorf("the server doesn't support byte ranges") } if r.totalSize == 0 { r.totalSize = r.currentResponse.ContentLength } else if r.totalSize <= 0 { r.cleanUpResponse() return 0, fmt.Errorf("failed to auto detect content length") } n, err = r.currentResponse.Body.Read(p) r.lastRange += int64(n) if err != nil { r.cleanUpResponse() } if err != nil && err != io.EOF { logrus.Infof("encountered error during pull and clearing it before resume: %s", err) err = nil } return n, err } func (r *requestReader) Close() error { r.cleanUpResponse() r.client = nil r.request = nil return nil } func (r *requestReader) cleanUpResponse() { if r.currentResponse != nil { r.currentResponse.Body.Close() r.currentResponse = nil } }