package distribution

import (


	distreference ""

const (
	smallLayerMaximumSize  = 100 * (1 << 10) // 100KB
	middleLayerMaximumSize = 10 * (1 << 20)  // 10MB

// PushResult contains the tag, manifest digest, and manifest size from the
// push. It's used to signal this information to the trust code in the client
// so it can sign the manifest if necessary.
type PushResult struct {
	Tag    string
	Digest digest.Digest
	Size   int

type v2Pusher struct {
	v2MetadataService metadata.V2MetadataService
	ref               reference.Named
	endpoint          registry.APIEndpoint
	repoInfo          *registry.RepositoryInfo
	config            *ImagePushConfig
	repo              distribution.Repository

	// pushState is state built by the Upload functions.
	pushState pushState

type pushState struct {
	// remoteLayers is the set of layers known to exist on the remote side.
	// This avoids redundant queries when pushing multiple tags that
	// involve the same layers. It is also used to fill in digest and size
	// information when building the manifest.
	remoteLayers map[layer.DiffID]distribution.Descriptor
	// confirmedV2 is set to true if we confirm we're talking to a v2
	// registry. This is used to limit fallbacks to the v1 protocol.
	confirmedV2 bool

func (p *v2Pusher) Push(ctx context.Context) (err error) {
	p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)

	p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
	if err != nil {
		logrus.Debugf("Error getting v2 registry: %v", err)
		return err

	if err = p.pushV2Repository(ctx); err != nil {
		if continueOnError(err) {
			return fallbackError{
				err:         err,
				confirmedV2: p.pushState.confirmedV2,
				transportOK: true,
	return err

func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
	if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
		imageID, err := p.config.ReferenceStore.Get(p.ref)
		if err != nil {
			return fmt.Errorf("tag does not exist: %s", p.ref.String())

		return p.pushV2Tag(ctx, namedTagged, imageID)

	if !reference.IsNameOnly(p.ref) {
		return errors.New("cannot push a digest reference")

	// Pull all tags
	pushed := 0
	for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
		if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
			if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
				return err

	if pushed == 0 {
		return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())

	return nil

func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
	logrus.Debugf("Pushing repository: %s", ref.String())

	img, err := p.config.ImageStore.Get(image.IDFromDigest(id))
	if err != nil {
		return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)

	var l layer.Layer

	topLayerID := img.RootFS.ChainID()
	if topLayerID == "" {
		l = layer.EmptyLayer
	} else {
		l, err = p.config.LayerStore.Get(topLayerID)
		if err != nil {
			return fmt.Errorf("failed to get top layer from image: %v", err)
		defer layer.ReleaseAndLog(p.config.LayerStore, l)

	hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
	if err != nil {
		return fmt.Errorf("failed to compute hmac key of auth config: %v", err)

	var descriptors []xfer.UploadDescriptor

	descriptorTemplate := v2PushDescriptor{
		v2MetadataService: p.v2MetadataService,
		hmacKey:           hmacKey,
		repoInfo:          p.repoInfo,
		ref:               p.ref,
		repo:              p.repo,
		pushState:         &p.pushState,

	// Loop bounds condition is to avoid pushing the base layer on Windows.
	for i := 0; i < len(img.RootFS.DiffIDs); i++ {
		descriptor := descriptorTemplate
		descriptor.layer = l
		descriptor.checkedDigests = make(map[digest.Digest]struct{})
		descriptors = append(descriptors, &descriptor)

		l = l.Parent()

	if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
		return err

	// Try schema2 first
	builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
	manifest, err := manifestFromBuilder(ctx, builder, descriptors)
	if err != nil {
		return err

	manSvc, err := p.repo.Manifests(ctx)
	if err != nil {
		return err

	putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
	if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
		if runtime.GOOS == "windows" {
			logrus.Warnf("failed to upload schema2 manifest: %v", err)
			return err

		logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)

		manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
		if err != nil {
			return err
		builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
		manifest, err = manifestFromBuilder(ctx, builder, descriptors)
		if err != nil {
			return err

		if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
			return err

	var canonicalManifest []byte

	switch v := manifest.(type) {
	case *schema1.SignedManifest:
		canonicalManifest = v.Canonical
	case *schema2.DeserializedManifest:
		_, canonicalManifest, err = v.Payload()
		if err != nil {
			return err

	manifestDigest := digest.FromBytes(canonicalManifest)
	progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))

	if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
		return err

	// Signal digest to the trust client so it can sign the
	// push, if appropriate.
	progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})

	return nil

func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
	// descriptors is in reverse order; iterate backwards to get references
	// appended in the right order.
	for i := len(descriptors) - 1; i >= 0; i-- {
		if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
			return nil, err

	return builder.Build(ctx)

type v2PushDescriptor struct {
	layer             layer.Layer
	v2MetadataService metadata.V2MetadataService
	hmacKey           []byte
	repoInfo          reference.Named
	ref               reference.Named
	repo              distribution.Repository
	pushState         *pushState
	remoteDescriptor  distribution.Descriptor
	// a set of digests whose presence has been checked in a target repository
	checkedDigests map[digest.Digest]struct{}

func (pd *v2PushDescriptor) Key() string {
	return "v2push:" + pd.ref.FullName() + " " + pd.layer.DiffID().String()

func (pd *v2PushDescriptor) ID() string {
	return stringid.TruncateID(pd.layer.DiffID().String())

func (pd *v2PushDescriptor) DiffID() layer.DiffID {
	return pd.layer.DiffID()

func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
	if fs, ok := pd.layer.(distribution.Describable); ok {
		if d := fs.Descriptor(); len(d.URLs) > 0 {
			progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
			return d, nil

	diffID := pd.DiffID()

	if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
		// it is already known that the push is not needed and
		// therefore doing a stat is unnecessary
		progress.Update(progressOutput, pd.ID(), "Layer already exists")
		return descriptor, nil

	maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)

	// Do we have any metadata associated with this layer's DiffID?
	v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
	if err == nil {
		// check for blob existence in the target repository if we have a mapping with it
		descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, false, 1, v2Metadata)
		if exists || err != nil {
			return descriptor, err

	// if digest was empty or not saved, or if blob does not exist on the remote repository,
	// then push the blob.
	bs := pd.repo.Blobs(ctx)

	var layerUpload distribution.BlobWriter

	// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
	candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
	for _, mountCandidate := range candidates {
		logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
		createOpts := []distribution.BlobCreateOption{}

		if len(mountCandidate.SourceRepository) > 0 {
			namedRef, err := reference.WithName(mountCandidate.SourceRepository)
			if err != nil {
				logrus.Errorf("failed to parse source repository reference %v: %v", namedRef.String(), err)

			// TODO (brianbland): We need to construct a reference where the Name is
			// only the full remote name, so clean this up when distribution has a
			// richer reference package
			remoteRef, err := distreference.WithName(namedRef.RemoteName())
			if err != nil {
				logrus.Errorf("failed to make remote reference out of %q: %v", namedRef.RemoteName(), namedRef.RemoteName())

			canonicalRef, err := distreference.WithDigest(remoteRef, mountCandidate.Digest)
			if err != nil {
				logrus.Errorf("failed to make canonical reference: %v", err)

			createOpts = append(createOpts, client.WithMountFrom(canonicalRef))

		// send the layer
		lu, err := bs.Create(ctx, createOpts...)
		switch err := err.(type) {
		case nil:
			// noop
		case distribution.ErrBlobMounted:
			progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())

			err.Descriptor.MediaType = schema2.MediaTypeLayer

			pd.pushState.confirmedV2 = true
			pd.pushState.remoteLayers[diffID] = err.Descriptor

			// Cache mapping from this layer's DiffID to the blobsum
			if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
				Digest:           err.Descriptor.Digest,
				SourceRepository: pd.repoInfo.FullName(),
			}); err != nil {
				return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
			return err.Descriptor, nil
			logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)

		if len(mountCandidate.SourceRepository) > 0 &&
			(metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
				len(mountCandidate.HMAC) == 0) {
			cause := "blob mount failure"
			if err != nil {
				cause = fmt.Sprintf("an error: %v", err.Error())
			logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)

		if lu != nil {
			// cancel previous upload
			cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
			layerUpload = lu

	if maxExistenceChecks-len(pd.checkedDigests) > 0 {
		// do additional layer existence checks with other known digests if any
		descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
		if exists || err != nil {
			return descriptor, err

	logrus.Debugf("Pushing layer: %s", diffID)
	if layerUpload == nil {
		layerUpload, err = bs.Create(ctx)
		if err != nil {
			return distribution.Descriptor{}, retryOnError(err)
	defer layerUpload.Close()

	// upload the blob
	desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
	if err != nil {
		return desc, err

	return desc, nil

func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
	pd.remoteDescriptor = descriptor

func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
	return pd.remoteDescriptor

func (pd *v2PushDescriptor) uploadUsingSession(
	ctx context.Context,
	progressOutput progress.Output,
	diffID layer.DiffID,
	layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
	arch, err := pd.layer.TarStream()
	if err != nil {
		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}

	// don't care if this fails; best effort
	size, _ := pd.layer.DiffSize()

	reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
	compressedReader, compressionDone := compress(reader)
	defer func() {

	digester := digest.Canonical.New()
	tee := io.TeeReader(compressedReader, digester.Hash())

	nn, err := layerUpload.ReadFrom(tee)
	if err != nil {
		return distribution.Descriptor{}, retryOnError(err)

	pushDigest := digester.Digest()
	if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
		return distribution.Descriptor{}, retryOnError(err)

	logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
	progress.Update(progressOutput, pd.ID(), "Pushed")

	// Cache mapping from this layer's DiffID to the blobsum
	if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
		Digest:           pushDigest,
		SourceRepository: pd.repoInfo.FullName(),
	}); err != nil {
		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}

	desc := distribution.Descriptor{
		Digest:    pushDigest,
		MediaType: schema2.MediaTypeLayer,
		Size:      nn,

	// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
	pd.pushState.confirmedV2 = true
	pd.pushState.remoteLayers[diffID] = desc

	return desc, nil

// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *v2PushDescriptor) layerAlreadyExists(
	ctx context.Context,
	progressOutput progress.Output,
	diffID layer.DiffID,
	checkOtherRepositories bool,
	maxExistenceCheckAttempts int,
	v2Metadata []metadata.V2Metadata,
) (desc distribution.Descriptor, exists bool, err error) {
	// filter the metadata
	candidates := []metadata.V2Metadata{}
	for _, meta := range v2Metadata {
		if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.FullName() {
		candidates = append(candidates, meta)
	// sort the candidates by similarity
	sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)

	digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
	// an array of unique blob digests ordered from the best mount candidates to worst
	layerDigests := []digest.Digest{}
	for i := 0; i < len(candidates); i++ {
		if len(layerDigests) >= maxExistenceCheckAttempts {
		meta := &candidates[i]
		if _, exists := digestToMetadata[meta.Digest]; exists {
			// keep reference just to the first mapping (the best mount candidate)
		if _, exists := pd.checkedDigests[meta.Digest]; exists {
			// existence of this digest has already been tested
		digestToMetadata[meta.Digest] = meta
		layerDigests = append(layerDigests, meta.Digest)

	for _, dgst := range layerDigests {
		meta := digestToMetadata[dgst]
		logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName())
		desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
		pd.checkedDigests[meta.Digest] = struct{}{}
		switch err {
		case nil:
			if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.FullName() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
				// cache mapping from this layer's DiffID to the blobsum
				if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
					Digest:           desc.Digest,
					SourceRepository: pd.repoInfo.FullName(),
				}); err != nil {
					return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
			desc.MediaType = schema2.MediaTypeLayer
			exists = true
		case distribution.ErrBlobUnknown:
			if meta.SourceRepository == pd.repoInfo.FullName() {
				// remove the mapping to the target repository
			progress.Update(progressOutput, pd.ID(), "Image push failed")
			return desc, false, retryOnError(err)

	if exists {
		progress.Update(progressOutput, pd.ID(), "Layer already exists")
		pd.pushState.remoteLayers[diffID] = desc

	return desc, exists, nil

// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
// source repositories of target registry, maximum number of layer existence checks performed on the target
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
	size, err := layer.DiffSize()
	switch {
	// big blob
	case size > middleLayerMaximumSize:
		// 1st attempt to mount the blob few times
		// 2nd few existence checks with digests associated to any repository
		// then fallback to upload
		return 4, 3, true

	// middle sized blobs; if we could not get the size, assume we deal with middle sized blob
	case size > smallLayerMaximumSize, err != nil:
		// 1st attempt to mount blobs of average size few times
		// 2nd try at most 1 existence check if there's an existing mapping to the target repository
		// then fallback to upload
		return 3, 1, false

	// small blobs, do a minimum number of checks
		return 1, 1, false

// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
// array is sorted from youngest to oldest. If requireReigstryMatch is true, the resulting array will contain
// only metadata entries having registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates(
	repoInfo reference.Named,
	hmacKey []byte,
	max int,
	v2Metadata []metadata.V2Metadata,
) []metadata.V2Metadata {
	candidates := []metadata.V2Metadata{}
	for _, meta := range v2Metadata {
		sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
		if err != nil || repoInfo.Hostname() != sourceRepo.Hostname() {
		// target repository is not a viable candidate
		if meta.SourceRepository == repoInfo.FullName() {
		candidates = append(candidates, meta)

	sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
	if max >= 0 && len(candidates) > max {
		// select the youngest metadata
		candidates = candidates[:max]

	return candidates

// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//  1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
//     "b" was not
//  2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
	arr            []metadata.V2Metadata
	hmacKey        []byte
	pathComponents []string

func (bla byLikeness) Less(i, j int) bool {
	aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
	bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
	if aMacMatch != bMacMatch {
		return aMacMatch
	aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
	bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
	return aMatch > bMatch
func (bla byLikeness) Swap(i, j int) {
	bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
func (bla byLikeness) Len() int { return len(bla.arr) }

func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
	// reverse the metadata array to shift the newest entries to the beginning
	for i := 0; i < len(marr)/2; i++ {
		marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
	// keep equal entries ordered from the youngest to the oldest
		arr:            marr,
		hmacKey:        hmacKey,
		pathComponents: getPathComponents(repoInfo.FullName()),

// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents(pth string, matchComponents []string) int {
	pthComponents := getPathComponents(pth)
	i := 0
	for ; i < len(pthComponents) && i < len(matchComponents); i++ {
		if matchComponents[i] != pthComponents[i] {
			return i
	return i

func getPathComponents(path string) []string {
	// make sure to add prefix to the path
	named, err := reference.ParseNamed(path)
	if err == nil {
		path = named.FullName()
	return strings.Split(path, "/")

func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
	if layerUpload != nil {
		logrus.Debugf("cancelling upload of blob %s", dgst)
		err := layerUpload.Cancel(ctx)
		if err != nil {
			logrus.Warnf("failed to cancel upload: %v", err)