1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Use channels and set workers to make image lookup safe

This refactors the starting work by the prior commits to make this safe
for access.  A maximum of 5 worker go routines are started to lookup
images on the endpoint.  Another go routine consumes the images that are
required to be pushed into a map for quick lookups.  The map is required
because the pushing of the image json and layer have to be done in the
correct order or the registry will explode in fire.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-01-13 15:24:51 -08:00
parent 3328658929
commit 476cf1b906

View file

@ -62,103 +62,145 @@ func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string
return imageList, tagsByImage, nil return imageList, tagsByImage, nil
} }
func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error { // createImageIndex returns an index of an image's layer IDs and tags.
out = utils.NewWriteFlusher(out) func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
log.Debugf("Local repo: %s", localRepo) var imageIndex []*registry.ImgData
imgList, tagsByImage, err := s.getImageList(localRepo, tag) for _, id := range images {
if err != nil { if tags, hasTags := tags[id]; hasTags {
return err
}
out.Write(sf.FormatStatus("", "Sending image list"))
var (
repoData *registry.RepositoryData
imageIndex []*registry.ImgData
)
for _, imgId := range imgList {
if tags, exists := tagsByImage[imgId]; exists {
// If an image has tags you must add an entry in the image index // If an image has tags you must add an entry in the image index
// for each tag // for each tag
for _, tag := range tags { for _, tag := range tags {
imageIndex = append(imageIndex, &registry.ImgData{ imageIndex = append(imageIndex, &registry.ImgData{
ID: imgId, ID: id,
Tag: tag, Tag: tag,
}) })
} }
} else { continue
// If the image does not have a tag it still needs to be sent to the }
// registry with an empty tag so that it is accociated with the repository // If the image does not have a tag it still needs to be sent to the
imageIndex = append(imageIndex, &registry.ImgData{ // registry with an empty tag so that it is accociated with the repository
ID: imgId, imageIndex = append(imageIndex, &registry.ImgData{
Tag: "", ID: id,
}) Tag: "",
})
}
return imageIndex
}
type imagePushData struct {
id string
endpoint string
tokens []string
}
// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
// and if it is absent then it sends the image id to the channel to be pushed.
func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter,
images chan imagePushData, imagesToPush chan string) {
defer wg.Done()
for image := range images {
if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
log.Errorf("Error in LookupRemoteImage: %s", err)
imagesToPush <- image.id
continue
}
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id)))
}
}
func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error {
workerCount := len(imageIDs)
// start a maximum of 5 workers to check if images exist on the specified endpoint.
if workerCount > 5 {
workerCount = 5
}
var (
wg = &sync.WaitGroup{}
imageData = make(chan imagePushData, workerCount*2)
imagesToPush = make(chan string, workerCount*2)
pushes = make(chan map[string]struct{}, 1)
)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
}
// start a go routine that consumes the images to push
go func() {
shouldPush := make(map[string]struct{})
for id := range imagesToPush {
shouldPush[id] = struct{}{}
}
pushes <- shouldPush
}()
for _, id := range imageIDs {
imageData <- imagePushData{
id: id,
endpoint: endpoint,
tokens: repo.Tokens,
} }
} }
// close the channel to notify the workers that there will be no more images to check.
close(imageData)
wg.Wait()
close(imagesToPush)
// wait for all the images that require pushes to be collected into a consumable map.
shouldPush := <-pushes
// finish by pushing any images and tags to the endpoint. The order that the images are pushed
// is very important that is why we are still itterating over the ordered list of imageIDs.
for _, id := range imageIDs {
if _, push := shouldPush[id]; push {
if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
// FIXME: Continue on error?
return err
}
}
for _, tag := range tags[id] {
out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
return err
}
}
}
return nil
}
// pushRepository pushes layers that do not already exist on the registry.
func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
repoInfo *registry.RepositoryInfo, localRepo map[string]string,
tag string, sf *utils.StreamFormatter) error {
log.Debugf("Local repo: %s", localRepo)
out = utils.NewWriteFlusher(out)
imgList, tags, err := s.getImageList(localRepo, tag)
if err != nil {
return err
}
out.Write(sf.FormatStatus("", "Sending image list"))
imageIndex := s.createImageIndex(imgList, tags)
log.Debugf("Preparing to push %s with the following images and tags", localRepo) log.Debugf("Preparing to push %s with the following images and tags", localRepo)
for _, data := range imageIndex { for _, data := range imageIndex {
log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag) log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
} }
// Register all the images in a repository with the registry // Register all the images in a repository with the registry
// If an image is not in this list it will not be associated with the repository // If an image is not in this list it will not be associated with the repository
repoData, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil) repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
if err != nil { if err != nil {
return err return err
} }
nTag := 1 nTag := 1
if tag == "" { if tag == "" {
nTag = len(localRepo) nTag = len(localRepo)
} }
var wg sync.WaitGroup out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
needsPush := make([]bool, len(imgList)) // push the repository to each of the endpoints only if it does not exist.
for _, ep := range repoData.Endpoints { for _, endpoint := range repoData.Endpoints {
out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag)) if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
return err
for i, imgId := range imgList {
wg.Add(1)
go func(i int, imgId string) {
defer wg.Done()
if err := r.LookupRemoteImage(imgId, ep, repoData.Tokens); err == nil {
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
needsPush[i] = false
} else {
log.Errorf("Error in LookupRemoteImage: %s", err)
out.Write(sf.FormatStatus("", "Image %s not pushed, adding to queue", utils.TruncateID(imgId)))
needsPush[i] = true
}
}(i, imgId)
}
wg.Wait()
for i, imgId := range imgList {
if needsPush[i] {
if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
// FIXME: Continue on error?
return err
}
}
for _, tag := range tagsByImage[imgId] {
out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+repoInfo.RemoteName+"/tags/"+tag))
if err := r.PushRegistryTag(repoInfo.RemoteName, imgId, tag, ep, repoData.Tokens); err != nil {
return err
}
}
} }
} }
_, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
if _, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints); err != nil { return err
return err
}
return nil
} }
func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) { func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {