1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Refactor push and pull to move code out of cmd function

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
Derek McGowan 2015-01-13 15:48:49 -08:00
parent 92d5eafe03
commit 750b41ced4
2 changed files with 139 additions and 136 deletions

View file

@ -82,20 +82,14 @@ func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
log.Errorf("error updating trust base graph: %s", err)
}
auth, err := r.GetV2Authorization(repoInfo.RemoteName, true)
if err != nil {
log.Errorf("error getting authorization: %s", err)
} else {
log.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel"), auth); err == nil {
if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
log.Errorf("Error logging event 'pull' for %s: %s", logName, err)
}
return engine.StatusOK
} else if err != registry.ErrDoesNotExist {
log.Errorf("Error from V2 registry: %s", err)
log.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil {
if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
log.Errorf("Error logging event 'pull' for %s: %s", logName, err)
}
return engine.StatusOK
} else if err != registry.ErrDoesNotExist {
log.Errorf("Error from V2 registry: %s", err)
}
log.Debug("image does not exist on v2 registry, falling back to v1")
@ -384,7 +378,11 @@ type downloadInfo struct {
err chan error
}
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool, auth *registry.RequestAuthorization) error {
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool) error {
auth, err := r.GetV2Authorization(repoInfo.RemoteName, true)
if err != nil {
return fmt.Errorf("error getting authorization: %s", err)
}
var layersDownloaded bool
if tag == "" {
log.Debugf("Pulling tag list from V2 registry for %s", repoInfo.CanonicalName)

View file

@ -252,6 +252,105 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin
return imgData.Checksum, nil
}
func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out io.Writer, repoInfo *registry.RepositoryInfo, manifestBytes, tag string, sf *utils.StreamFormatter) error {
if repoInfo.Official {
j := eng.Job("trust_update_base")
if err := j.Run(); err != nil {
log.Errorf("error updating trust base graph: %s", err)
}
}
auth, err := r.GetV2Authorization(repoInfo.RemoteName, false)
if err != nil {
return fmt.Errorf("error getting authorization: %s", err)
}
// if no manifest is given, generate and sign with the key associated with the local tag store
if len(manifestBytes) == 0 {
mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag)
if err != nil {
return err
}
js, err := libtrust.NewJSONSignature(mBytes)
if err != nil {
return err
}
if err = js.Sign(s.trustKey); err != nil {
return err
}
signedBody, err := js.PrettySignature("signatures")
if err != nil {
return err
}
log.Infof("Signed manifest using daemon's key: %s", s.trustKey.KeyID())
manifestBytes = string(signedBody)
}
manifest, verified, err := s.verifyManifest(eng, []byte(manifestBytes))
if err != nil {
return fmt.Errorf("error verifying manifest: %s", err)
}
if err := checkValidManifest(manifest); err != nil {
return fmt.Errorf("invalid manifest: %s", err)
}
if !verified {
log.Debugf("Pushing unverified image")
}
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
var (
sumStr = manifest.FSLayers[i].BlobSum
imgJSON = []byte(manifest.History[i].V1Compatibility)
)
sumParts := strings.SplitN(sumStr, ":", 2)
if len(sumParts) < 2 {
return fmt.Errorf("Invalid checksum: %s", sumStr)
}
manifestSum := sumParts[1]
img, err := image.NewImgJSON(imgJSON)
if err != nil {
return fmt.Errorf("Failed to parse json: %s", err)
}
img, err = s.graph.Get(img.ID)
if err != nil {
return err
}
arch, err := img.TarLayer()
if err != nil {
return fmt.Errorf("Could not get tar layer: %s", err)
}
// Call mount blob
exists, err := r.PostV2ImageMountBlob(repoInfo.RemoteName, sumParts[0], manifestSum, auth)
if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
return err
}
if !exists {
err = r.PutV2ImageBlob(repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth)
if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
return err
}
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
} else {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil))
}
}
// push the manifest
return r.PutV2ImageManifest(repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth)
}
// FIXME: Allow to interrupt current push when new push of same image is done.
func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {
@ -296,129 +395,35 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
}
if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
if repoInfo.Official {
j := job.Eng.Job("trust_update_base")
if err = j.Run(); err != nil {
return job.Errorf("error updating trust base graph: %s", err)
}
err := s.pushV2Repository(r, job.Eng, job.Stdout, repoInfo, manifestBytes, tag, sf)
if err == nil {
return engine.StatusOK
}
auth, err := r.GetV2Authorization(repoInfo.RemoteName, false)
if err != nil {
return job.Errorf("error getting authorization: %s", err)
}
if len(manifestBytes) == 0 {
mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag)
if err != nil {
return job.Error(err)
}
js, err := libtrust.NewJSONSignature(mBytes)
if err != nil {
return job.Error(err)
}
if err = js.Sign(s.trustKey); err != nil {
return job.Error(err)
}
signedBody, err := js.PrettySignature("signatures")
if err != nil {
return job.Error(err)
}
log.Infof("Signed manifest using daemon's key: %s", s.trustKey.KeyID())
manifestBytes = string(signedBody)
}
manifest, verified, err := s.verifyManifest(job.Eng, []byte(manifestBytes))
if err != nil {
return job.Errorf("error verifying manifest: %s", err)
}
if err := checkValidManifest(manifest); err != nil {
return job.Errorf("invalid manifest: %s", err)
}
if !verified {
log.Debugf("Pushing unverified image")
}
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
var (
sumStr = manifest.FSLayers[i].BlobSum
imgJSON = []byte(manifest.History[i].V1Compatibility)
)
sumParts := strings.SplitN(sumStr, ":", 2)
if len(sumParts) < 2 {
return job.Errorf("Invalid checksum: %s", sumStr)
}
manifestSum := sumParts[1]
img, err := image.NewImgJSON(imgJSON)
if err != nil {
return job.Errorf("Failed to parse json: %s", err)
}
img, err = s.graph.Get(img.ID)
if err != nil {
return job.Error(err)
}
arch, err := img.TarLayer()
if err != nil {
return job.Errorf("Could not get tar layer: %s", err)
}
// Call mount blob
exists, err := r.PostV2ImageMountBlob(repoInfo.RemoteName, sumParts[0], manifestSum, auth)
if err != nil {
job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
return job.Error(err)
}
if !exists {
err = r.PutV2ImageBlob(repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), job.Stdout, sf, false, utils.TruncateID(img.ID), "Pushing"), auth)
if err != nil {
job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
return job.Error(err)
}
job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
} else {
job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil))
}
}
// push the manifest
err = r.PutV2ImageManifest(repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth)
if err != nil {
return job.Error(err)
}
// done, no fallback to V1
return engine.StatusOK
} else {
if err != nil {
reposLen := 1
if tag == "" {
reposLen = len(s.Repositories[repoInfo.LocalName])
}
job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
// If it fails, try to get the repository
if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists {
if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
return job.Error(err)
}
var token []string
job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName))
if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil {
return job.Error(err)
}
return engine.StatusOK
// error out, no fallback to V1
return job.Error(err)
}
if err != nil {
reposLen := 1
if tag == "" {
reposLen = len(s.Repositories[repoInfo.LocalName])
}
job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
// If it fails, try to get the repository
if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists {
if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
return job.Error(err)
}
return engine.StatusOK
}
return job.Error(err)
}
var token []string
job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName))
if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil {
return job.Error(err)
}
return engine.StatusOK
}