mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #29684 from vdemeester/quick-unit
Enhance pkg/{httputils,integration}, distribution/xfer unit tests
This commit is contained in:
commit
631f51015e
8 changed files with 42 additions and 25 deletions
|
@ -39,6 +39,7 @@ func TestNoneHealthcheck(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME(vdemeester) This takes around 3s… This is *way* too long
|
||||||
func TestHealthStates(t *testing.T) {
|
func TestHealthStates(t *testing.T) {
|
||||||
e := events.New()
|
e := events.New()
|
||||||
_, l, _ := e.Subscribe()
|
_, l, _ := e.Subscribe()
|
||||||
|
|
|
@ -22,8 +22,9 @@ const maxDownloadAttempts = 5
|
||||||
// registers and downloads those, taking into account dependencies between
|
// registers and downloads those, taking into account dependencies between
|
||||||
// layers.
|
// layers.
|
||||||
type LayerDownloadManager struct {
|
type LayerDownloadManager struct {
|
||||||
layerStore layer.Store
|
layerStore layer.Store
|
||||||
tm TransferManager
|
tm TransferManager
|
||||||
|
waitDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetConcurrency sets the max concurrent downloads for each pull
|
// SetConcurrency sets the max concurrent downloads for each pull
|
||||||
|
@ -32,11 +33,16 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLayerDownloadManager returns a new LayerDownloadManager.
|
// NewLayerDownloadManager returns a new LayerDownloadManager.
|
||||||
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager {
|
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
|
||||||
return &LayerDownloadManager{
|
manager := LayerDownloadManager{
|
||||||
layerStore: layerStore,
|
layerStore: layerStore,
|
||||||
tm: NewTransferManager(concurrencyLimit),
|
tm: NewTransferManager(concurrencyLimit),
|
||||||
|
waitDuration: time.Second,
|
||||||
}
|
}
|
||||||
|
for _, option := range options {
|
||||||
|
option(&manager)
|
||||||
|
}
|
||||||
|
return &manager
|
||||||
}
|
}
|
||||||
|
|
||||||
type downloadTransfer struct {
|
type downloadTransfer struct {
|
||||||
|
@ -269,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
|
||||||
|
|
||||||
logrus.Errorf("Download failed, retrying: %v", err)
|
logrus.Errorf("Download failed, retrying: %v", err)
|
||||||
delay := retries * 5
|
delay := retries * 5
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(ldm.waitDuration)
|
||||||
|
|
||||||
selectLoop:
|
selectLoop:
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -265,8 +265,9 @@ func TestSuccessfulDownload(t *testing.T) {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
t.Skip("Needs fixing on Windows")
|
t.Skip("Needs fixing on Windows")
|
||||||
}
|
}
|
||||||
|
|
||||||
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
||||||
ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency)
|
ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
||||||
|
|
||||||
progressChan := make(chan progress.Progress)
|
progressChan := make(chan progress.Progress)
|
||||||
progressDone := make(chan struct{})
|
progressDone := make(chan struct{})
|
||||||
|
@ -327,7 +328,7 @@ func TestSuccessfulDownload(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCancelledDownload(t *testing.T) {
|
func TestCancelledDownload(t *testing.T) {
|
||||||
ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency)
|
ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
||||||
|
|
||||||
progressChan := make(chan progress.Progress)
|
progressChan := make(chan progress.Progress)
|
||||||
progressDone := make(chan struct{})
|
progressDone := make(chan struct{})
|
||||||
|
|
|
@ -16,7 +16,8 @@ const maxUploadAttempts = 5
|
||||||
// LayerUploadManager provides task management and progress reporting for
|
// LayerUploadManager provides task management and progress reporting for
|
||||||
// uploads.
|
// uploads.
|
||||||
type LayerUploadManager struct {
|
type LayerUploadManager struct {
|
||||||
tm TransferManager
|
tm TransferManager
|
||||||
|
waitDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetConcurrency sets the max concurrent uploads for each push
|
// SetConcurrency sets the max concurrent uploads for each push
|
||||||
|
@ -25,10 +26,15 @@ func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLayerUploadManager returns a new LayerUploadManager.
|
// NewLayerUploadManager returns a new LayerUploadManager.
|
||||||
func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
|
func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
|
||||||
return &LayerUploadManager{
|
manager := LayerUploadManager{
|
||||||
tm: NewTransferManager(concurrencyLimit),
|
tm: NewTransferManager(concurrencyLimit),
|
||||||
|
waitDuration: time.Second,
|
||||||
}
|
}
|
||||||
|
for _, option := range options {
|
||||||
|
option(&manager)
|
||||||
|
}
|
||||||
|
return &manager
|
||||||
}
|
}
|
||||||
|
|
||||||
type uploadTransfer struct {
|
type uploadTransfer struct {
|
||||||
|
@ -142,7 +148,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
|
||||||
|
|
||||||
logrus.Errorf("Upload failed, retrying: %v", err)
|
logrus.Errorf("Upload failed, retrying: %v", err)
|
||||||
delay := retries * 5
|
delay := retries * 5
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(lum.waitDuration)
|
||||||
|
|
||||||
selectLoop:
|
selectLoop:
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -79,7 +79,7 @@ func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSuccessfulUpload(t *testing.T) {
|
func TestSuccessfulUpload(t *testing.T) {
|
||||||
lum := NewLayerUploadManager(maxUploadConcurrency)
|
lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
|
||||||
|
|
||||||
progressChan := make(chan progress.Progress)
|
progressChan := make(chan progress.Progress)
|
||||||
progressDone := make(chan struct{})
|
progressDone := make(chan struct{})
|
||||||
|
@ -105,7 +105,7 @@ func TestSuccessfulUpload(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCancelledUpload(t *testing.T) {
|
func TestCancelledUpload(t *testing.T) {
|
||||||
lum := NewLayerUploadManager(maxUploadConcurrency)
|
lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
|
||||||
|
|
||||||
progressChan := make(chan progress.Progress)
|
progressChan := make(chan progress.Progress)
|
||||||
progressDone := make(chan struct{})
|
progressDone := make(chan struct{})
|
||||||
|
|
|
@ -17,19 +17,20 @@ type resumableRequestReader struct {
|
||||||
currentResponse *http.Response
|
currentResponse *http.Response
|
||||||
failures uint32
|
failures uint32
|
||||||
maxFailures uint32
|
maxFailures uint32
|
||||||
|
waitDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumableRequestReader makes it possible to resume reading a request's body transparently
|
// ResumableRequestReader makes it possible to resume reading a request's body transparently
|
||||||
// maxfail is the number of times we retry to make requests again (not resumes)
|
// maxfail is the number of times we retry to make requests again (not resumes)
|
||||||
// totalsize is the total length of the body; auto detect if not provided
|
// totalsize is the total length of the body; auto detect if not provided
|
||||||
func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
|
func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
|
||||||
return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize}
|
return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumableRequestReaderWithInitialResponse makes it possible to resume
|
// ResumableRequestReaderWithInitialResponse makes it possible to resume
|
||||||
// reading the body of an already initiated request.
|
// reading the body of an already initiated request.
|
||||||
func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
|
func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
|
||||||
return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse}
|
return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
||||||
|
@ -40,7 +41,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
||||||
if r.lastRange != 0 && r.currentResponse == nil {
|
if r.lastRange != 0 && r.currentResponse == nil {
|
||||||
readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
|
readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
|
||||||
r.request.Header.Set("Range", readRange)
|
r.request.Header.Set("Range", readRange)
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(r.waitDuration)
|
||||||
}
|
}
|
||||||
if r.currentResponse == nil {
|
if r.currentResponse == nil {
|
||||||
r.currentResponse, err = r.client.Do(r.request)
|
r.currentResponse, err = r.client.Do(r.request)
|
||||||
|
@ -49,7 +50,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
||||||
if err != nil && r.failures+1 != r.maxFailures {
|
if err != nil && r.failures+1 != r.maxFailures {
|
||||||
r.cleanUpResponse()
|
r.cleanUpResponse()
|
||||||
r.failures++
|
r.failures++
|
||||||
time.Sleep(5 * time.Duration(r.failures) * time.Second)
|
time.Sleep(time.Duration(r.failures) * r.waitDuration)
|
||||||
return 0, nil
|
return 0, nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
r.cleanUpResponse()
|
r.cleanUpResponse()
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestResumableRequestHeaderSimpleErrors(t *testing.T) {
|
func TestResumableRequestHeaderSimpleErrors(t *testing.T) {
|
||||||
|
@ -55,10 +56,11 @@ func TestResumableRequestHeaderNotTooMuchFailures(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
resreq := &resumableRequestReader{
|
resreq := &resumableRequestReader{
|
||||||
client: client,
|
client: client,
|
||||||
request: badReq,
|
request: badReq,
|
||||||
failures: 0,
|
failures: 0,
|
||||||
maxFailures: 2,
|
maxFailures: 2,
|
||||||
|
waitDuration: 10 * time.Millisecond,
|
||||||
}
|
}
|
||||||
read, err := resreq.Read([]byte{})
|
read, err := resreq.Read([]byte{})
|
||||||
if err != nil || read != 0 {
|
if err != nil || read != 0 {
|
||||||
|
|
|
@ -234,7 +234,7 @@ func TestConsumeWithSpeed(t *testing.T) {
|
||||||
reader := strings.NewReader("1234567890")
|
reader := strings.NewReader("1234567890")
|
||||||
chunksize := 2
|
chunksize := 2
|
||||||
|
|
||||||
bytes1, err := ConsumeWithSpeed(reader, chunksize, 1*time.Second, nil)
|
bytes1, err := ConsumeWithSpeed(reader, chunksize, 10*time.Millisecond, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue