diff --git a/graph/push.go b/graph/push.go index 1dc08c6f9d..0ec81a5152 100644 --- a/graph/push.go +++ b/graph/push.go @@ -62,103 +62,145 @@ func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string return imageList, tagsByImage, nil } -func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error { - out = utils.NewWriteFlusher(out) - log.Debugf("Local repo: %s", localRepo) - imgList, tagsByImage, err := s.getImageList(localRepo, tag) - if err != nil { - return err - } - - out.Write(sf.FormatStatus("", "Sending image list")) - - var ( - repoData *registry.RepositoryData - imageIndex []*registry.ImgData - ) - - for _, imgId := range imgList { - if tags, exists := tagsByImage[imgId]; exists { +// createImageIndex returns an index of an image's layer IDs and tags. +func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData { + var imageIndex []*registry.ImgData + for _, id := range images { + if tags, hasTags := tags[id]; hasTags { // If an image has tags you must add an entry in the image index // for each tag for _, tag := range tags { imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, + ID: id, Tag: tag, }) } - } else { - // If the image does not have a tag it still needs to be sent to the - // registry with an empty tag so that it is accociated with the repository - imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, - Tag: "", - }) + continue + } + // If the image does not have a tag it still needs to be sent to the + // registry with an empty tag so that it is accociated with the repository + imageIndex = append(imageIndex, ®istry.ImgData{ + ID: id, + Tag: "", + }) + } + return imageIndex +} +type imagePushData struct { + id string + endpoint string + tokens []string +} + +// lookupImageOnEndpoint checks the specified endpoint to see if an image exists +// and if it is absent then it sends the image id to the channel to be pushed. +func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter, + images chan imagePushData, imagesToPush chan string) { + defer wg.Done() + for image := range images { + if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil { + log.Errorf("Error in LookupRemoteImage: %s", err) + imagesToPush <- image.id + continue + } + out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id))) + } +} + +func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string, + tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error { + workerCount := len(imageIDs) + // start a maximum of 5 workers to check if images exist on the specified endpoint. + if workerCount > 5 { + workerCount = 5 + } + var ( + wg = &sync.WaitGroup{} + imageData = make(chan imagePushData, workerCount*2) + imagesToPush = make(chan string, workerCount*2) + pushes = make(chan map[string]struct{}, 1) + ) + for i := 0; i < workerCount; i++ { + wg.Add(1) + go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush) + } + // start a go routine that consumes the images to push + go func() { + shouldPush := make(map[string]struct{}) + for id := range imagesToPush { + shouldPush[id] = struct{}{} + } + pushes <- shouldPush + }() + for _, id := range imageIDs { + imageData <- imagePushData{ + id: id, + endpoint: endpoint, + tokens: repo.Tokens, } } + // close the channel to notify the workers that there will be no more images to check. + close(imageData) + wg.Wait() + close(imagesToPush) + // wait for all the images that require pushes to be collected into a consumable map. + shouldPush := <-pushes + // finish by pushing any images and tags to the endpoint. The order that the images are pushed + // is very important that is why we are still itterating over the ordered list of imageIDs. + for _, id := range imageIDs { + if _, push := shouldPush[id]; push { + if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil { + // FIXME: Continue on error? + return err + } + } + for _, tag := range tags[id] { + out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag)) + if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil { + return err + } + } + } + return nil +} +// pushRepository pushes layers that do not already exist on the registry. +func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, + repoInfo *registry.RepositoryInfo, localRepo map[string]string, + tag string, sf *utils.StreamFormatter) error { + log.Debugf("Local repo: %s", localRepo) + out = utils.NewWriteFlusher(out) + imgList, tags, err := s.getImageList(localRepo, tag) + if err != nil { + return err + } + out.Write(sf.FormatStatus("", "Sending image list")) + + imageIndex := s.createImageIndex(imgList, tags) log.Debugf("Preparing to push %s with the following images and tags", localRepo) for _, data := range imageIndex { log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag) } - // Register all the images in a repository with the registry // If an image is not in this list it will not be associated with the repository - repoData, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil) + repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil) if err != nil { return err } - nTag := 1 if tag == "" { nTag = len(localRepo) } - var wg sync.WaitGroup - needsPush := make([]bool, len(imgList)) - for _, ep := range repoData.Endpoints { - out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) - - for i, imgId := range imgList { - wg.Add(1) - go func(i int, imgId string) { - defer wg.Done() - if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err == nil { - out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) - needsPush[i] = false - } else { - log.Errorf("Error in LookupRemoteImage: %s", err) - out.Write(sf.FormatStatus("", "Image %s not pushed, adding to queue", utils.TruncateID(imgId))) - needsPush[i] = true - } - }(i, imgId) - } - - wg.Wait() - - for i, imgId := range imgList { - if needsPush[i] { - if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { - // FIXME: Continue on error? - return err - } - } - - for _, tag := range tagsByImage[imgId] { - out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+repoInfo.RemoteName+"/tags/"+tag)) - - if err := r.PushRegistryTag(repoInfo.RemoteName, imgId, tag, ep, repoData.Tokens); err != nil { - return err - } - } + out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) + // push the repository to each of the endpoints only if it does not exist. + for _, endpoint := range repoData.Endpoints { + if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil { + return err } } - - if _, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints); err != nil { - return err - } - - return nil + _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints) + return err } func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {