1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Do not fall back to the V1 protocol when we know we are talking to a V2 registry

If we detect a Docker-Distribution-Api-Version header indicating that
the registry speaks the V2 protocol, no fallback to V1 should take
place.

The same applies if a V2 registry operation succeeds while attempting a
push or pull.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-12-04 13:42:33 -08:00
parent 905f3336b2
commit a57478d65f
11 changed files with 163 additions and 112 deletions

View file

@ -49,7 +49,7 @@ type Puller interface {
// Pull tries to pull the image referenced by `tag`
// Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint.
//
Pull(ctx context.Context, ref reference.Named) (fallback bool, err error)
Pull(ctx context.Context, ref reference.Named) error
}
// newPuller returns a Puller interface that will pull from either a v1 or v2
@ -108,8 +108,17 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
// returned and displayed, but if there was a v2 endpoint which supports pull-by-digest, then the last relevant
// error is the ones from v2 endpoints not v1.
discardNoSupportErrors bool
// confirmedV2 is set to true if a pull attempt managed to
// confirm that it was talking to a v2 registry. This will
// prevent fallback to the v1 protocol.
confirmedV2 bool
)
for _, endpoint := range endpoints {
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue
}
logrus.Debugf("Trying to pull %s from %s %s", repoInfo.Name(), endpoint.URL, endpoint.Version)
puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
@ -117,13 +126,18 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
errors = append(errors, err.Error())
continue
}
if fallback, err := puller.Pull(ctx, ref); err != nil {
if err := puller.Pull(ctx, ref); err != nil {
// Was this pull cancelled? If so, don't try to fall
// back.
fallback := false
select {
case <-ctx.Done():
fallback = false
default:
if fallbackErr, ok := err.(fallbackError); ok {
fallback = true
confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
err = fallbackErr.err
}
}
if fallback {
if _, ok := err.(registry.ErrNoSupport); !ok {

View file

@ -33,15 +33,15 @@ type v1Puller struct {
session *registry.Session
}
func (p *v1Puller) Pull(ctx context.Context, ref reference.Named) (fallback bool, err error) {
func (p *v1Puller) Pull(ctx context.Context, ref reference.Named) error {
if _, isCanonical := ref.(reference.Canonical); isCanonical {
// Allowing fallback, because HTTPS v1 is before HTTP v2
return true, registry.ErrNoSupport{Err: errors.New("Cannot pull by digest with v1 registry")}
return fallbackError{err: registry.ErrNoSupport{Err: errors.New("Cannot pull by digest with v1 registry")}}
}
tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
if err != nil {
return false, err
return err
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
@ -53,21 +53,21 @@ func (p *v1Puller) Pull(ctx context.Context, ref reference.Named) (fallback bool
v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
if err != nil {
logrus.Debugf("Could not get v1 endpoint: %v", err)
return true, err
return fallbackError{err: err}
}
p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
if err != nil {
// TODO(dmcgowan): Check if should fallback
logrus.Debugf("Fallback from error: %s", err)
return true, err
return fallbackError{err: err}
}
if err := p.pullRepository(ctx, ref); err != nil {
// TODO(dmcgowan): Check if should fallback
return false, err
return err
}
progress.Message(p.config.ProgressOutput, "", p.repoInfo.FullName()+": this image was pulled from a legacy registry. Important: This registry version will not be supported in future versions of docker.")
return false, nil
return nil
}
func (p *v1Puller) pullRepository(ctx context.Context, ref reference.Named) error {

View file

@ -32,24 +32,26 @@ type v2Puller struct {
config *ImagePullConfig
repoInfo *registry.RepositoryInfo
repo distribution.Repository
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool
}
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (fallback bool, err error) {
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
// TODO(tiborvass): was ReceiveTimeout
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
p.repo, p.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
logrus.Warnf("Error getting v2 registry: %v", err)
return true, err
return fallbackError{err: err, confirmedV2: p.confirmedV2}
}
if err := p.pullV2Repository(ctx, ref); err != nil {
if err = p.pullV2Repository(ctx, ref); err != nil {
if registry.ContinueOnError(err) {
logrus.Debugf("Error trying v2 registry: %v", err)
return true, err
return fallbackError{err: err, confirmedV2: p.confirmedV2}
}
return false, err
}
return false, nil
return err
}
func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) {
@ -67,6 +69,10 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
return err
}
// If this call succeeded, we can be confident that the
// registry on the other side speaks the v2 protocol.
p.confirmedV2 = true
// This probably becomes a lot nicer after the manifest
// refactor...
for _, tag := range tags {
@ -208,6 +214,11 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
if unverifiedManifest == nil {
return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
}
// If GetByTag succeeded, we can be confident that the registry on
// the other side speaks the v2 protocol.
p.confirmedV2 = true
var verifiedManifest *schema1.Manifest
verifiedManifest, err = verifyManifest(unverifiedManifest, ref)
if err != nil {

View file

@ -59,7 +59,7 @@ type Pusher interface {
// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
//
// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
Push(ctx context.Context) (fallback bool, err error)
Push(ctx context.Context) error
}
const compressionBufSize = 32768
@ -116,8 +116,21 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
return fmt.Errorf("Repository does not exist: %s", repoInfo.Name())
}
var lastErr error
var (
lastErr error
// confirmedV2 is set to true if a push attempt managed to
// confirm that it was talking to a v2 registry. This will
// prevent fallback to the v1 protocol.
confirmedV2 bool
)
for _, endpoint := range endpoints {
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue
}
logrus.Debugf("Trying to push %s to %s %s", repoInfo.FullName(), endpoint.URL, endpoint.Version)
pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
@ -125,22 +138,22 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
lastErr = err
continue
}
if fallback, err := pusher.Push(ctx); err != nil {
if err := pusher.Push(ctx); err != nil {
// Was this push cancelled? If so, don't try to fall
// back.
select {
case <-ctx.Done():
fallback = false
default:
}
if fallback {
if fallbackErr, ok := err.(fallbackError); ok {
confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
err = fallbackErr.err
lastErr = err
continue
}
}
logrus.Debugf("Not continuing with error: %v", err)
return err
}
imagePushConfig.EventsService.Log("push", repoInfo.Name(), "")

View file

@ -29,10 +29,10 @@ type v1Pusher struct {
session *registry.Session
}
func (p *v1Pusher) Push(ctx context.Context) (fallback bool, err error) {
func (p *v1Pusher) Push(ctx context.Context) error {
tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
if err != nil {
return false, err
return err
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
@ -44,18 +44,18 @@ func (p *v1Pusher) Push(ctx context.Context) (fallback bool, err error) {
v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
if err != nil {
logrus.Debugf("Could not get v1 endpoint: %v", err)
return true, err
return fallbackError{err: err}
}
p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
if err != nil {
// TODO(dmcgowan): Check if should fallback
return true, err
return fallbackError{err: err}
}
if err := p.pushRepository(ctx); err != nil {
// TODO(dmcgowan): Check if should fallback
return false, err
return err
}
return false, nil
return nil
}
// v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an

View file

@ -34,6 +34,10 @@ type v2Pusher struct {
config *ImagePushConfig
repo distribution.Repository
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool
// layersPushed is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers.
@ -45,18 +49,27 @@ type pushMap struct {
layersPushed map[digest.Digest]bool
}
func (p *v2Pusher) Push(ctx context.Context) (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
func (p *v2Pusher) Push(ctx context.Context) (err error) {
p.repo, p.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
return fallbackError{err: err, confirmedV2: p.confirmedV2}
}
if err = p.pushV2Repository(ctx); err != nil {
if registry.ContinueOnError(err) {
return fallbackError{err: err, confirmedV2: p.confirmedV2}
}
}
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
var associations []reference.Association
if _, isTagged := p.ref.(reference.NamedTagged); isTagged {
imageID, err := p.config.ReferenceStore.Get(p.ref)
if err != nil {
return false, fmt.Errorf("tag does not exist: %s", p.ref.String())
return fmt.Errorf("tag does not exist: %s", p.ref.String())
}
associations = []reference.Association{
@ -70,19 +83,19 @@ func (p *v2Pusher) Push(ctx context.Context) (fallback bool, err error) {
associations = p.config.ReferenceStore.ReferencesByName(p.ref)
}
if err != nil {
return false, fmt.Errorf("error getting tags for %s: %s", p.repoInfo.Name(), err)
return fmt.Errorf("error getting tags for %s: %s", p.repoInfo.Name(), err)
}
if len(associations) == 0 {
return false, fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
}
for _, association := range associations {
if err := p.pushV2Tag(ctx, association); err != nil {
return false, err
return err
}
}
return false, nil
return nil
}
func (p *v2Pusher) pushV2Tag(ctx context.Context, association reference.Association) error {
@ -109,30 +122,28 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, association reference.Associat
var descriptors []xfer.UploadDescriptor
// Push empty layer if necessary
for _, h := range img.History {
if h.EmptyLayer {
descriptors = []xfer.UploadDescriptor{
&v2PushDescriptor{
layer: layer.EmptyLayer,
descriptorTemplate := v2PushDescriptor{
blobSumService: p.blobSumService,
repo: p.repo,
layersPushed: &p.layersPushed,
},
confirmedV2: &p.confirmedV2,
}
// Push empty layer if necessary
for _, h := range img.History {
if h.EmptyLayer {
descriptor := descriptorTemplate
descriptor.layer = layer.EmptyLayer
descriptors = []xfer.UploadDescriptor{&descriptor}
break
}
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
for i := 0; i < len(img.RootFS.DiffIDs); i++ {
descriptor := &v2PushDescriptor{
layer: l,
blobSumService: p.blobSumService,
repo: p.repo,
layersPushed: &p.layersPushed,
}
descriptors = append(descriptors, descriptor)
descriptor := descriptorTemplate
descriptor.layer = l
descriptors = append(descriptors, &descriptor)
l = l.Parent()
}
@ -181,6 +192,7 @@ type v2PushDescriptor struct {
blobSumService *metadata.BlobSumService
repo distribution.Repository
layersPushed *pushMap
confirmedV2 *bool
}
func (pd *v2PushDescriptor) Key() string {
@ -251,6 +263,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
return "", retryOnError(err)
}
// If Commit succeded, that's an indication that the remote registry
// speaks the v2 protocol.
*pd.confirmedV2 = true
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")

View file

@ -1,7 +1,6 @@
package distribution
import (
"errors"
"fmt"
"net"
"net/http"
@ -24,6 +23,22 @@ import (
"golang.org/x/net/context"
)
// fallbackError wraps an error that can possibly allow fallback to a different
// endpoint.
type fallbackError struct {
// err is the error being wrapped.
err error
// confirmedV2 is set to true if it was confirmed that the registry
// supports the v2 protocol. This is used to limit fallbacks to the v1
// protocol.
confirmedV2 bool
}
// Error renders the FallbackError as a string.
func (f fallbackError) Error() string {
return f.err.Error()
}
type dumbCredentialStore struct {
auth *types.AuthConfig
}
@ -35,9 +50,7 @@ func (dcs dumbCredentialStore) Basic(*url.URL) (string, string) {
// NewV2Repository returns a repository (v2 only). It creates a HTTP transport
// providing timeout settings and authentication support, and also verifies the
// remote API version.
func NewV2Repository(repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint, metaHeaders http.Header, authConfig *types.AuthConfig, actions ...string) (distribution.Repository, error) {
ctx := context.Background()
func NewV2Repository(ctx context.Context, repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint, metaHeaders http.Header, authConfig *types.AuthConfig, actions ...string) (repo distribution.Repository, foundVersion bool, err error) {
repoName := repoInfo.FullName()
// If endpoint does not support CanonicalName, use the RemoteName instead
if endpoint.TrimHostname {
@ -67,32 +80,34 @@ func NewV2Repository(repoInfo *registry.RepositoryInfo, endpoint registry.APIEnd
endpointStr := strings.TrimRight(endpoint.URL, "/") + "/v2/"
req, err := http.NewRequest("GET", endpointStr, nil)
if err != nil {
return nil, err
return nil, false, err
}
resp, err := pingClient.Do(req)
if err != nil {
return nil, err
return nil, false, err
}
defer resp.Body.Close()
versions := auth.APIVersions(resp, endpoint.VersionHeader)
if endpoint.VersionHeader != "" && len(endpoint.Versions) > 0 {
var foundVersion bool
for _, version := range endpoint.Versions {
v2Version := auth.APIVersion{
Type: "registry",
Version: "2.0",
}
versions := auth.APIVersions(resp, registry.DefaultRegistryVersionHeader)
for _, pingVersion := range versions {
if version == pingVersion {
if pingVersion == v2Version {
// The version header indicates we're definitely
// talking to a v2 registry. So don't allow future
// fallbacks to the v1 protocol.
foundVersion = true
}
}
}
if !foundVersion {
return nil, errors.New("endpoint does not support v2 API")
break
}
}
challengeManager := auth.NewSimpleChallengeManager()
if err := challengeManager.AddResponse(resp); err != nil {
return nil, err
return nil, foundVersion, err
}
if authConfig.RegistryToken != "" {
@ -106,7 +121,8 @@ func NewV2Repository(repoInfo *registry.RepositoryInfo, endpoint registry.APIEnd
}
tr := transport.NewTransport(base, modifiers...)
return client.NewRepository(ctx, repoName, endpoint.URL, tr)
repo, err = client.NewRepository(ctx, repoName, endpoint.URL, tr)
return repo, foundVersion, err
}
func digestFromManifest(m *schema1.SignedManifest, name reference.Named) (digest.Digest, int, error) {
@ -147,7 +163,8 @@ func retryOnError(err error) error {
case errcode.ErrorCodeUnauthorized, errcode.ErrorCodeUnsupported, errcode.ErrorCodeDenied:
return xfer.DoNotRetry{Err: err}
}
case *client.UnexpectedHTTPResponseError:
return xfer.DoNotRetry{Err: err}
}
// let's be nice and fallback if the error is a completely
// unexpected one.

View file

@ -8,7 +8,6 @@ import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/docker/api/types"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/reference"
@ -49,12 +48,6 @@ func TestTokenPassThru(t *testing.T) {
TrimHostname: false,
TLSConfig: nil,
//VersionHeader: "verheader",
Versions: []auth.APIVersion{
{
Type: "registry",
Version: "2",
},
},
}
n, _ := reference.ParseNamed("testremotename")
repoInfo := &registry.RepositoryInfo{
@ -76,7 +69,8 @@ func TestTokenPassThru(t *testing.T) {
t.Fatal(err)
}
p := puller.(*v2Puller)
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
ctx := context.Background()
p.repo, _, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
t.Fatal(err)
}
@ -84,7 +78,7 @@ func TestTokenPassThru(t *testing.T) {
logrus.Debug("About to pull")
// We expect it to fail, since we haven't mock'd the full registry exchange in our handler above
tag, _ := reference.WithTag(n, "tag_goes_here")
_ = p.pullV2Repository(context.Background(), tag)
_ = p.pullV2Repository(ctx, tag)
if !gotToken {
t.Fatal("Failed to receive registry token")

View file

@ -1,7 +1,6 @@
package main
import (
"fmt"
"regexp"
"strings"
"time"
@ -54,7 +53,8 @@ func (s *DockerHubPullSuite) TestPullNonExistingImage(c *check.C) {
} {
out, err := s.CmdWithError("pull", e.Alias)
c.Assert(err, checker.NotNil, check.Commentf("expected non-zero exit status when pulling non-existing image: %s", out))
c.Assert(out, checker.Contains, fmt.Sprintf("Error: image %s not found", e.Repo), check.Commentf("expected image not found error messages"))
// Hub returns 401 rather than 404 for nonexistent library/ repos.
c.Assert(out, checker.Contains, "unauthorized: access to the requested resource is not authorized", check.Commentf("expected unauthorized error message"))
}
}

View file

@ -6,7 +6,6 @@ import (
"net/url"
"strings"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/docker/api/types"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/reference"
@ -127,8 +126,6 @@ type APIEndpoint struct {
Official bool
TrimHostname bool
TLSConfig *tls.Config
VersionHeader string
Versions []auth.APIVersion
}
// ToV1Endpoint returns a V1 API endpoint based on the APIEndpoint

View file

@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/docker/pkg/tlsconfig"
"github.com/docker/docker/reference"
)
@ -52,20 +51,12 @@ func (s *Service) lookupV2Endpoints(repoName reference.Named) (endpoints []APIEn
return nil, err
}
v2Versions := []auth.APIVersion{
{
Type: "registry",
Version: "2.0",
},
}
endpoints = []APIEndpoint{
{
URL: "https://" + hostname,
Version: APIVersion2,
TrimHostname: true,
TLSConfig: tlsConfig,
VersionHeader: DefaultRegistryVersionHeader,
Versions: v2Versions,
},
}
@ -76,8 +67,6 @@ func (s *Service) lookupV2Endpoints(repoName reference.Named) (endpoints []APIEn
TrimHostname: true,
// used to check if supposed to be secure via InsecureSkipVerify
TLSConfig: tlsConfig,
VersionHeader: DefaultRegistryVersionHeader,
Versions: v2Versions,
})
}