From 750b41ced42bda0ccda405c1aa7c43ded5821e40 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 13 Jan 2015 15:48:49 -0800 Subject: [PATCH] Refactor push and pull to move code out of cmd function Signed-off-by: Derek McGowan (github: dmcgowan) --- graph/pull.go | 26 +++--- graph/push.go | 249 +++++++++++++++++++++++++------------------------- 2 files changed, 139 insertions(+), 136 deletions(-) diff --git a/graph/pull.go b/graph/pull.go index 1c4bb9d88c..c70b220cc9 100644 --- a/graph/pull.go +++ b/graph/pull.go @@ -82,20 +82,14 @@ func (s *TagStore) CmdPull(job *engine.Job) engine.Status { log.Errorf("error updating trust base graph: %s", err) } - auth, err := r.GetV2Authorization(repoInfo.RemoteName, true) - if err != nil { - log.Errorf("error getting authorization: %s", err) - } else { - - log.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName) - if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel"), auth); err == nil { - if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { - log.Errorf("Error logging event 'pull' for %s: %s", logName, err) - } - return engine.StatusOK - } else if err != registry.ErrDoesNotExist { - log.Errorf("Error from V2 registry: %s", err) + log.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName) + if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil { + if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { + log.Errorf("Error logging event 'pull' for %s: %s", logName, err) } + return engine.StatusOK + } else if err != registry.ErrDoesNotExist { + log.Errorf("Error from V2 registry: %s", err) } log.Debug("image does not exist on v2 registry, falling back to v1") @@ -384,7 +378,11 @@ type downloadInfo struct { err chan error } -func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool, auth *registry.RequestAuthorization) error { +func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool) error { + auth, err := r.GetV2Authorization(repoInfo.RemoteName, true) + if err != nil { + return fmt.Errorf("error getting authorization: %s", err) + } var layersDownloaded bool if tag == "" { log.Debugf("Pulling tag list from V2 registry for %s", repoInfo.CanonicalName) diff --git a/graph/push.go b/graph/push.go index 4d6b1e0838..5b5011243b 100644 --- a/graph/push.go +++ b/graph/push.go @@ -252,6 +252,105 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin return imgData.Checksum, nil } +func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out io.Writer, repoInfo *registry.RepositoryInfo, manifestBytes, tag string, sf *utils.StreamFormatter) error { + if repoInfo.Official { + j := eng.Job("trust_update_base") + if err := j.Run(); err != nil { + log.Errorf("error updating trust base graph: %s", err) + } + } + + auth, err := r.GetV2Authorization(repoInfo.RemoteName, false) + if err != nil { + return fmt.Errorf("error getting authorization: %s", err) + } + + // if no manifest is given, generate and sign with the key associated with the local tag store + if len(manifestBytes) == 0 { + mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag) + if err != nil { + return err + } + js, err := libtrust.NewJSONSignature(mBytes) + if err != nil { + return err + } + + if err = js.Sign(s.trustKey); err != nil { + return err + } + + signedBody, err := js.PrettySignature("signatures") + if err != nil { + return err + } + log.Infof("Signed manifest using daemon's key: %s", s.trustKey.KeyID()) + + manifestBytes = string(signedBody) + } + + manifest, verified, err := s.verifyManifest(eng, []byte(manifestBytes)) + if err != nil { + return fmt.Errorf("error verifying manifest: %s", err) + } + + if err := checkValidManifest(manifest); err != nil { + return fmt.Errorf("invalid manifest: %s", err) + } + + if !verified { + log.Debugf("Pushing unverified image") + } + + for i := len(manifest.FSLayers) - 1; i >= 0; i-- { + var ( + sumStr = manifest.FSLayers[i].BlobSum + imgJSON = []byte(manifest.History[i].V1Compatibility) + ) + + sumParts := strings.SplitN(sumStr, ":", 2) + if len(sumParts) < 2 { + return fmt.Errorf("Invalid checksum: %s", sumStr) + } + manifestSum := sumParts[1] + + img, err := image.NewImgJSON(imgJSON) + if err != nil { + return fmt.Errorf("Failed to parse json: %s", err) + } + + img, err = s.graph.Get(img.ID) + if err != nil { + return err + } + + arch, err := img.TarLayer() + if err != nil { + return fmt.Errorf("Could not get tar layer: %s", err) + } + + // Call mount blob + exists, err := r.PostV2ImageMountBlob(repoInfo.RemoteName, sumParts[0], manifestSum, auth) + if err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) + return err + } + if !exists { + err = r.PutV2ImageBlob(repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth) + if err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) + return err + } + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil)) + } else { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil)) + } + } + + // push the manifest + return r.PutV2ImageManifest(repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth) +} + // FIXME: Allow to interrupt current push when new push of same image is done. func (s *TagStore) CmdPush(job *engine.Job) engine.Status { if n := len(job.Args); n != 1 { @@ -296,129 +395,35 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status { } if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 { - if repoInfo.Official { - j := job.Eng.Job("trust_update_base") - if err = j.Run(); err != nil { - return job.Errorf("error updating trust base graph: %s", err) - } + err := s.pushV2Repository(r, job.Eng, job.Stdout, repoInfo, manifestBytes, tag, sf) + if err == nil { + return engine.StatusOK } - auth, err := r.GetV2Authorization(repoInfo.RemoteName, false) - if err != nil { - return job.Errorf("error getting authorization: %s", err) - } - - if len(manifestBytes) == 0 { - mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag) - if err != nil { - return job.Error(err) - } - js, err := libtrust.NewJSONSignature(mBytes) - if err != nil { - return job.Error(err) - } - - if err = js.Sign(s.trustKey); err != nil { - return job.Error(err) - } - - signedBody, err := js.PrettySignature("signatures") - if err != nil { - return job.Error(err) - } - log.Infof("Signed manifest using daemon's key: %s", s.trustKey.KeyID()) - - manifestBytes = string(signedBody) - } - - manifest, verified, err := s.verifyManifest(job.Eng, []byte(manifestBytes)) - if err != nil { - return job.Errorf("error verifying manifest: %s", err) - } - - if err := checkValidManifest(manifest); err != nil { - return job.Errorf("invalid manifest: %s", err) - } - - if !verified { - log.Debugf("Pushing unverified image") - } - - for i := len(manifest.FSLayers) - 1; i >= 0; i-- { - var ( - sumStr = manifest.FSLayers[i].BlobSum - imgJSON = []byte(manifest.History[i].V1Compatibility) - ) - - sumParts := strings.SplitN(sumStr, ":", 2) - if len(sumParts) < 2 { - return job.Errorf("Invalid checksum: %s", sumStr) - } - manifestSum := sumParts[1] - - img, err := image.NewImgJSON(imgJSON) - if err != nil { - return job.Errorf("Failed to parse json: %s", err) - } - - img, err = s.graph.Get(img.ID) - if err != nil { - return job.Error(err) - } - - arch, err := img.TarLayer() - if err != nil { - return job.Errorf("Could not get tar layer: %s", err) - } - - // Call mount blob - exists, err := r.PostV2ImageMountBlob(repoInfo.RemoteName, sumParts[0], manifestSum, auth) - if err != nil { - job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) - return job.Error(err) - } - if !exists { - err = r.PutV2ImageBlob(repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), job.Stdout, sf, false, utils.TruncateID(img.ID), "Pushing"), auth) - if err != nil { - job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) - return job.Error(err) - } - job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil)) - } else { - job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil)) - } - } - - // push the manifest - err = r.PutV2ImageManifest(repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth) - if err != nil { - return job.Error(err) - } - - // done, no fallback to V1 - return engine.StatusOK - } else { - if err != nil { - reposLen := 1 - if tag == "" { - reposLen = len(s.Repositories[repoInfo.LocalName]) - } - job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen)) - // If it fails, try to get the repository - if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists { - if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil { - return job.Error(err) - } - return engine.StatusOK - } - return job.Error(err) - } - - var token []string - job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName)) - if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil { - return job.Error(err) - } - return engine.StatusOK + // error out, no fallback to V1 + return job.Error(err) } + + if err != nil { + reposLen := 1 + if tag == "" { + reposLen = len(s.Repositories[repoInfo.LocalName]) + } + job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen)) + // If it fails, try to get the repository + if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists { + if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil { + return job.Error(err) + } + return engine.StatusOK + } + return job.Error(err) + } + + var token []string + job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName)) + if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil { + return job.Error(err) + } + return engine.StatusOK }