diff --git a/daemon/health_test.go b/daemon/health_test.go index 545b57beb7..db8b94d582 100644 --- a/daemon/health_test.go +++ b/daemon/health_test.go @@ -39,6 +39,7 @@ func TestNoneHealthcheck(t *testing.T) { } } +// FIXME(vdemeester) This takes around 3s… This is *way* too long func TestHealthStates(t *testing.T) { e := events.New() _, l, _ := e.Subscribe() diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index aa80c1bce8..4370e72e13 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -22,8 +22,9 @@ const maxDownloadAttempts = 5 // registers and downloads those, taking into account dependencies between // layers. type LayerDownloadManager struct { - layerStore layer.Store - tm TransferManager + layerStore layer.Store + tm TransferManager + waitDuration time.Duration } // SetConcurrency sets the max concurrent downloads for each pull @@ -32,11 +33,16 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) { } // NewLayerDownloadManager returns a new LayerDownloadManager. -func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager { - return &LayerDownloadManager{ - layerStore: layerStore, - tm: NewTransferManager(concurrencyLimit), +func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager { + manager := LayerDownloadManager{ + layerStore: layerStore, + tm: NewTransferManager(concurrencyLimit), + waitDuration: time.Second, } + for _, option := range options { + option(&manager) + } + return &manager } type downloadTransfer struct { @@ -269,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, logrus.Errorf("Download failed, retrying: %v", err) delay := retries * 5 - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(ldm.waitDuration) selectLoop: for { diff --git a/distribution/xfer/download_test.go b/distribution/xfer/download_test.go index 6beb161f8d..0d3af82313 100644 --- a/distribution/xfer/download_test.go +++ b/distribution/xfer/download_test.go @@ -265,8 +265,9 @@ func TestSuccessfulDownload(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("Needs fixing on Windows") } + layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)} - ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency) + ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond }) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) @@ -327,7 +328,7 @@ func TestSuccessfulDownload(t *testing.T) { } func TestCancelledDownload(t *testing.T) { - ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency) + ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond }) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index 7c1395118d..58422e57a1 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -16,7 +16,8 @@ const maxUploadAttempts = 5 // LayerUploadManager provides task management and progress reporting for // uploads. type LayerUploadManager struct { - tm TransferManager + tm TransferManager + waitDuration time.Duration } // SetConcurrency sets the max concurrent uploads for each push @@ -25,10 +26,15 @@ func (lum *LayerUploadManager) SetConcurrency(concurrency int) { } // NewLayerUploadManager returns a new LayerUploadManager. -func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager { - return &LayerUploadManager{ - tm: NewTransferManager(concurrencyLimit), +func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager { + manager := LayerUploadManager{ + tm: NewTransferManager(concurrencyLimit), + waitDuration: time.Second, } + for _, option := range options { + option(&manager) + } + return &manager } type uploadTransfer struct { @@ -142,7 +148,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun logrus.Errorf("Upload failed, retrying: %v", err) delay := retries * 5 - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(lum.waitDuration) selectLoop: for { diff --git a/distribution/xfer/upload_test.go b/distribution/xfer/upload_test.go index 16bd187336..066019f263 100644 --- a/distribution/xfer/upload_test.go +++ b/distribution/xfer/upload_test.go @@ -79,7 +79,7 @@ func uploadDescriptors(currentUploads *int32) []UploadDescriptor { } func TestSuccessfulUpload(t *testing.T) { - lum := NewLayerUploadManager(maxUploadConcurrency) + lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond }) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) @@ -105,7 +105,7 @@ func TestSuccessfulUpload(t *testing.T) { } func TestCancelledUpload(t *testing.T) { - lum := NewLayerUploadManager(maxUploadConcurrency) + lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond }) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) diff --git a/pkg/httputils/resumablerequestreader.go b/pkg/httputils/resumablerequestreader.go index bebc8608cd..de488fb531 100644 --- a/pkg/httputils/resumablerequestreader.go +++ b/pkg/httputils/resumablerequestreader.go @@ -17,19 +17,20 @@ type resumableRequestReader struct { currentResponse *http.Response failures uint32 maxFailures uint32 + waitDuration time.Duration } // ResumableRequestReader makes it possible to resume reading a request's body transparently // maxfail is the number of times we retry to make requests again (not resumes) // totalsize is the total length of the body; auto detect if not provided func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser { - return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize} + return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second} } // ResumableRequestReaderWithInitialResponse makes it possible to resume // reading the body of an already initiated request. func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser { - return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse} + return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second} } func (r *resumableRequestReader) Read(p []byte) (n int, err error) { @@ -40,7 +41,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) { if r.lastRange != 0 && r.currentResponse == nil { readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize) r.request.Header.Set("Range", readRange) - time.Sleep(5 * time.Second) + time.Sleep(r.waitDuration) } if r.currentResponse == nil { r.currentResponse, err = r.client.Do(r.request) @@ -49,7 +50,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) { if err != nil && r.failures+1 != r.maxFailures { r.cleanUpResponse() r.failures++ - time.Sleep(5 * time.Duration(r.failures) * time.Second) + time.Sleep(time.Duration(r.failures) * r.waitDuration) return 0, nil } else if err != nil { r.cleanUpResponse() diff --git a/pkg/httputils/resumablerequestreader_test.go b/pkg/httputils/resumablerequestreader_test.go index 51b7f2cb54..239760c6dd 100644 --- a/pkg/httputils/resumablerequestreader_test.go +++ b/pkg/httputils/resumablerequestreader_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" ) func TestResumableRequestHeaderSimpleErrors(t *testing.T) { @@ -55,10 +56,11 @@ func TestResumableRequestHeaderNotTooMuchFailures(t *testing.T) { } resreq := &resumableRequestReader{ - client: client, - request: badReq, - failures: 0, - maxFailures: 2, + client: client, + request: badReq, + failures: 0, + maxFailures: 2, + waitDuration: 10 * time.Millisecond, } read, err := resreq.Read([]byte{}) if err != nil || read != 0 { diff --git a/pkg/integration/utils_test.go b/pkg/integration/utils_test.go index 0b2ef4aff5..8e8798d42d 100644 --- a/pkg/integration/utils_test.go +++ b/pkg/integration/utils_test.go @@ -234,7 +234,7 @@ func TestConsumeWithSpeed(t *testing.T) { reader := strings.NewReader("1234567890") chunksize := 2 - bytes1, err := ConsumeWithSpeed(reader, chunksize, 1*time.Second, nil) + bytes1, err := ConsumeWithSpeed(reader, chunksize, 10*time.Millisecond, nil) if err != nil { t.Fatal(err) }