mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			186 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			186 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package distribution
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"compress/gzip"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
 | 
						|
	"github.com/docker/distribution/reference"
 | 
						|
	"github.com/docker/docker/distribution/metadata"
 | 
						|
	"github.com/docker/docker/pkg/progress"
 | 
						|
	"github.com/docker/docker/registry"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
	"golang.org/x/net/context"
 | 
						|
)
 | 
						|
 | 
						|
// Pusher is an interface that abstracts pushing for different API versions.
 | 
						|
type Pusher interface {
 | 
						|
	// Push tries to push the image configured at the creation of Pusher.
 | 
						|
	// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
 | 
						|
	//
 | 
						|
	// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
 | 
						|
	Push(ctx context.Context) error
 | 
						|
}
 | 
						|
 | 
						|
const compressionBufSize = 32768
 | 
						|
 | 
						|
// NewPusher creates a new Pusher interface that will push to either a v1 or v2
 | 
						|
// registry. The endpoint argument contains a Version field that determines
 | 
						|
// whether a v1 or v2 pusher will be created. The other parameters are passed
 | 
						|
// through to the underlying pusher implementation for use during the actual
 | 
						|
// push operation.
 | 
						|
func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
 | 
						|
	switch endpoint.Version {
 | 
						|
	case registry.APIVersion2:
 | 
						|
		return &v2Pusher{
 | 
						|
			v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
 | 
						|
			ref:               ref,
 | 
						|
			endpoint:          endpoint,
 | 
						|
			repoInfo:          repoInfo,
 | 
						|
			config:            imagePushConfig,
 | 
						|
		}, nil
 | 
						|
	case registry.APIVersion1:
 | 
						|
		return &v1Pusher{
 | 
						|
			v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
 | 
						|
			ref:         ref,
 | 
						|
			endpoint:    endpoint,
 | 
						|
			repoInfo:    repoInfo,
 | 
						|
			config:      imagePushConfig,
 | 
						|
		}, nil
 | 
						|
	}
 | 
						|
	return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
 | 
						|
}
 | 
						|
 | 
						|
// Push initiates a push operation on ref.
 | 
						|
// ref is the specific variant of the image to be pushed.
 | 
						|
// If no tag is provided, all tags will be pushed.
 | 
						|
func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
 | 
						|
	// FIXME: Allow to interrupt current push when new push of same image is done.
 | 
						|
 | 
						|
	// Resolve the Repository name from fqn to RepositoryInfo
 | 
						|
	repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.Name.Name())
 | 
						|
 | 
						|
	associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
 | 
						|
	if len(associations) == 0 {
 | 
						|
		return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		lastErr error
 | 
						|
 | 
						|
		// confirmedV2 is set to true if a push attempt managed to
 | 
						|
		// confirm that it was talking to a v2 registry. This will
 | 
						|
		// prevent fallback to the v1 protocol.
 | 
						|
		confirmedV2 bool
 | 
						|
 | 
						|
		// confirmedTLSRegistries is a map indicating which registries
 | 
						|
		// are known to be using TLS. There should never be a plaintext
 | 
						|
		// retry for any of these.
 | 
						|
		confirmedTLSRegistries = make(map[string]struct{})
 | 
						|
	)
 | 
						|
 | 
						|
	for _, endpoint := range endpoints {
 | 
						|
		if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if confirmedV2 && endpoint.Version == registry.APIVersion1 {
 | 
						|
			logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if endpoint.URL.Scheme != "https" {
 | 
						|
			if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
 | 
						|
				logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
 | 
						|
 | 
						|
		pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
 | 
						|
		if err != nil {
 | 
						|
			lastErr = err
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if err := pusher.Push(ctx); err != nil {
 | 
						|
			// Was this push cancelled? If so, don't try to fall
 | 
						|
			// back.
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
			default:
 | 
						|
				if fallbackErr, ok := err.(fallbackError); ok {
 | 
						|
					confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
 | 
						|
					if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
 | 
						|
						confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
 | 
						|
					}
 | 
						|
					err = fallbackErr.err
 | 
						|
					lastErr = err
 | 
						|
					logrus.Infof("Attempting next endpoint for push after error: %v", err)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			logrus.Errorf("Not continuing with push after error: %v", err)
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if lastErr == nil {
 | 
						|
		lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
 | 
						|
	}
 | 
						|
	return lastErr
 | 
						|
}
 | 
						|
 | 
						|
// compress returns an io.ReadCloser which will supply a compressed version of
 | 
						|
// the provided Reader. The caller must close the ReadCloser after reading the
 | 
						|
// compressed data.
 | 
						|
//
 | 
						|
// Note that this function returns a reader instead of taking a writer as an
 | 
						|
// argument so that it can be used with httpBlobWriter's ReadFrom method.
 | 
						|
// Using httpBlobWriter's Write method would send a PATCH request for every
 | 
						|
// Write call.
 | 
						|
//
 | 
						|
// The second return value is a channel that gets closed when the goroutine
 | 
						|
// is finished. This allows the caller to make sure the goroutine finishes
 | 
						|
// before it releases any resources connected with the reader that was
 | 
						|
// passed in.
 | 
						|
func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
 | 
						|
	compressionDone := make(chan struct{})
 | 
						|
 | 
						|
	pipeReader, pipeWriter := io.Pipe()
 | 
						|
	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
 | 
						|
	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
 | 
						|
	compressor := gzip.NewWriter(bufWriter)
 | 
						|
 | 
						|
	go func() {
 | 
						|
		_, err := io.Copy(compressor, in)
 | 
						|
		if err == nil {
 | 
						|
			err = compressor.Close()
 | 
						|
		}
 | 
						|
		if err == nil {
 | 
						|
			err = bufWriter.Flush()
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			pipeWriter.CloseWithError(err)
 | 
						|
		} else {
 | 
						|
			pipeWriter.Close()
 | 
						|
		}
 | 
						|
		close(compressionDone)
 | 
						|
	}()
 | 
						|
 | 
						|
	return pipeReader, compressionDone
 | 
						|
}
 |