mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Change push to use manifest builder
Currently this always uses the schema1 manifest builder. Later, it will be changed to attempt schema2 first, and fall back when necessary. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
c168a0059f
commit
f33fa1b8d3
7 changed files with 103 additions and 448 deletions
|
|
@ -5,7 +5,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/docker/layer"
|
||||
"github.com/docker/docker/pkg/progress"
|
||||
"golang.org/x/net/context"
|
||||
|
|
@ -30,7 +29,6 @@ type uploadTransfer struct {
|
|||
Transfer
|
||||
|
||||
diffID layer.DiffID
|
||||
digest digest.Digest
|
||||
err error
|
||||
}
|
||||
|
||||
|
|
@ -43,16 +41,15 @@ type UploadDescriptor interface {
|
|||
// DiffID should return the DiffID for this layer.
|
||||
DiffID() layer.DiffID
|
||||
// Upload is called to perform the Upload.
|
||||
Upload(ctx context.Context, progressOutput progress.Output) (digest.Digest, error)
|
||||
Upload(ctx context.Context, progressOutput progress.Output) error
|
||||
}
|
||||
|
||||
// Upload is a blocking function which ensures the listed layers are present on
|
||||
// the remote registry. It uses the string returned by the Key method to
|
||||
// deduplicate uploads.
|
||||
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) (map[layer.DiffID]digest.Digest, error) {
|
||||
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
|
||||
var (
|
||||
uploads []*uploadTransfer
|
||||
digests = make(map[layer.DiffID]digest.Digest)
|
||||
dedupDescriptors = make(map[string]struct{})
|
||||
)
|
||||
|
||||
|
|
@ -74,16 +71,15 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
|
|||
for _, upload := range uploads {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return ctx.Err()
|
||||
case <-upload.Transfer.Done():
|
||||
if upload.err != nil {
|
||||
return nil, upload.err
|
||||
return upload.err
|
||||
}
|
||||
digests[upload.diffID] = upload.digest
|
||||
}
|
||||
}
|
||||
|
||||
return digests, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
|
||||
|
|
@ -109,9 +105,8 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
|
|||
|
||||
retries := 0
|
||||
for {
|
||||
digest, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
|
||||
err := descriptor.Upload(u.Transfer.Context(), progressOutput)
|
||||
if err == nil {
|
||||
u.digest = digest
|
||||
break
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,12 +36,12 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID {
|
|||
}
|
||||
|
||||
// Upload is called to perform the upload.
|
||||
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (digest.Digest, error) {
|
||||
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
|
||||
if u.currentUploads != nil {
|
||||
defer atomic.AddInt32(u.currentUploads, -1)
|
||||
|
||||
if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
|
||||
return "", errors.New("concurrency limit exceeded")
|
||||
return errors.New("concurrency limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -49,7 +49,7 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
|
|||
for i := int64(0); i <= 10; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
return ctx.Err()
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
|
||||
}
|
||||
|
|
@ -57,12 +57,10 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
|
|||
|
||||
if u.simulateRetries != 0 {
|
||||
u.simulateRetries--
|
||||
return "", errors.New("simulating retry")
|
||||
return errors.New("simulating retry")
|
||||
}
|
||||
|
||||
// For the mock implementation, use SHA256(DiffID) as the returned
|
||||
// digest.
|
||||
return digest.FromBytes([]byte(u.diffID.String())), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
|
||||
|
|
@ -101,26 +99,13 @@ func TestSuccessfulUpload(t *testing.T) {
|
|||
var currentUploads int32
|
||||
descriptors := uploadDescriptors(¤tUploads)
|
||||
|
||||
digests, err := lum.Upload(context.Background(), descriptors, progress.ChanOutput(progressChan))
|
||||
err := lum.Upload(context.Background(), descriptors, progress.ChanOutput(progressChan))
|
||||
if err != nil {
|
||||
t.Fatalf("upload error: %v", err)
|
||||
}
|
||||
|
||||
close(progressChan)
|
||||
<-progressDone
|
||||
|
||||
if len(digests) != len(expectedDigests) {
|
||||
t.Fatal("wrong number of keys in digests map")
|
||||
}
|
||||
|
||||
for key, val := range expectedDigests {
|
||||
if digests[key] != val {
|
||||
t.Fatalf("mismatch in digest array for key %v (expected %v, got %v)", key, val, digests[key])
|
||||
}
|
||||
if receivedProgress[key.String()] != 10 {
|
||||
t.Fatalf("missing or wrong progress output for %v", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelledUpload(t *testing.T) {
|
||||
|
|
@ -143,7 +128,7 @@ func TestCancelledUpload(t *testing.T) {
|
|||
}()
|
||||
|
||||
descriptors := uploadDescriptors(nil)
|
||||
_, err := lum.Upload(ctx, descriptors, progress.ChanOutput(progressChan))
|
||||
err := lum.Upload(ctx, descriptors, progress.ChanOutput(progressChan))
|
||||
if err != context.Canceled {
|
||||
t.Fatal("expected upload to be cancelled")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue