mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Refactor registry Push
This commit is contained in:
parent
828d1aa507
commit
49b61af1f8
2 changed files with 56 additions and 62 deletions
|
@ -290,9 +290,8 @@ func (r *Registry) GetRepositoryData(remote string) (*RepositoryData, error) {
|
|||
}
|
||||
|
||||
// Push a local image to the registry
|
||||
func (r *Registry) PushImage(imgData *ImgData, jsonRaw []byte, layer io.Reader, registry string, token []string) error {
|
||||
func (r *Registry) PushImageJsonRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error {
|
||||
registry = "https://" + registry + "/v1"
|
||||
|
||||
client := r.getHttpClient()
|
||||
|
||||
// FIXME: try json with UTF8
|
||||
|
@ -302,8 +301,8 @@ func (r *Registry) PushImage(imgData *ImgData, jsonRaw []byte, layer io.Reader,
|
|||
}
|
||||
req.Header.Add("Content-type", "application/json")
|
||||
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
|
||||
|
||||
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
|
||||
|
||||
utils.Debugf("Setting checksum for %s: %s", imgData.Id, imgData.Checksum)
|
||||
res, err := doWithCookies(client, req)
|
||||
if err != nil {
|
||||
|
@ -328,35 +327,39 @@ func (r *Registry) PushImage(imgData *ImgData, jsonRaw []byte, layer io.Reader,
|
|||
}
|
||||
return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
req3, err := http.NewRequest("PUT", registry+"/images/"+imgData.Id+"/layer", layer)
|
||||
func (r *Registry) PushImageLayerRegistry(imgId string, layer io.Reader, registry string, token []string) error {
|
||||
registry = "https://" + registry + "/v1"
|
||||
client := r.getHttpClient()
|
||||
|
||||
req, err := http.NewRequest("PUT", registry+"/images/"+imgId+"/layer", layer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req3.ContentLength = -1
|
||||
req3.TransferEncoding = []string{"chunked"}
|
||||
req3.Header.Set("Authorization", "Token "+strings.Join(token, ","))
|
||||
res3, err := doWithCookies(client, req3)
|
||||
req.ContentLength = -1
|
||||
req.TransferEncoding = []string{"chunked"}
|
||||
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
|
||||
res, err := doWithCookies(client, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to upload layer: %s", err)
|
||||
}
|
||||
defer res3.Body.Close()
|
||||
defer res.Body.Close()
|
||||
|
||||
if res3.StatusCode != 200 {
|
||||
errBody, err := ioutil.ReadAll(res3.Body)
|
||||
if res.StatusCode != 200 {
|
||||
errBody, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("HTTP code %d while uploading metadata and error when"+
|
||||
" trying to parse response body: %v", res.StatusCode, err)
|
||||
return fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err)
|
||||
}
|
||||
return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res3.StatusCode, errBody)
|
||||
return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res.StatusCode, errBody)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// push a tag on the registry.
|
||||
// Remote has the format '<user>/<repo>
|
||||
func (r *Registry) pushTag(remote, revision, tag, registry string, token []string) error {
|
||||
func (r *Registry) PushRegistryTag(remote, revision, tag, registry string, token []string) error {
|
||||
// "jsonify" the string
|
||||
revision = "\"" + revision + "\""
|
||||
registry = "https://" + registry + "/v1"
|
||||
|
@ -382,28 +385,7 @@ func (r *Registry) pushTag(remote, revision, tag, registry string, token []strin
|
|||
return nil
|
||||
}
|
||||
|
||||
// FIXME: this should really be PushTag
|
||||
func (r *Registry) PushLayer(remote, tag, imgId, registry string, token []string) error {
|
||||
// Check if the local impage exists
|
||||
img, err := graph.Get(imgId)
|
||||
if err != nil {
|
||||
fmt.Fprintf(stdout, "Skipping tag %s:%s: %s does not exist\r\n", remote, tag, imgId)
|
||||
return nil
|
||||
}
|
||||
fmt.Fprintf(stdout, "Pushing image %s:%s\r\n", remote, tag)
|
||||
// Push the image
|
||||
if err = graph.PushImage(stdout, img, registry, token); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(stdout, "Registering tag %s:%s\r\n", remote, tag)
|
||||
// And then the tag
|
||||
if err = graph.pushTag(remote, imgId, tag, registry, token); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Registry) PushJsonIndex(remote string, imgList []*ImgData, validate bool) (*RepositoryData, error) {
|
||||
func (r *Registry) PushImageJsonIndex(remote string, imgList []*ImgData, validate bool) (*RepositoryData, error) {
|
||||
client := r.getHttpClient()
|
||||
|
||||
imgListJson, err := json.Marshal(imgList)
|
||||
|
|
60
server.go
60
server.go
|
@ -5,10 +5,12 @@ import (
|
|||
"github.com/dotcloud/docker/registry"
|
||||
"github.com/dotcloud/docker/utils"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
)
|
||||
|
@ -304,12 +306,9 @@ func (srv *Server) pullImage(stdout io.Writer, imgId, registry string, token []s
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse json: %s", err)
|
||||
}
|
||||
if img.Id != imgId {
|
||||
return fmt.Errorf("The retrieved image mismatch the requested one. (expected %s, received %s)", imgId, img.Id)
|
||||
}
|
||||
|
||||
// Get the layer
|
||||
fmt.Fprintf(stdout, "Pulling %s fs layer\r\n", imgId)
|
||||
fmt.Fprintf(stdout, "Pulling %s fs layer\r\n", img.Id)
|
||||
layer, err := srv.registry.GetRemoteImageLayer(stdout, img.Id, registry, token)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -354,7 +353,7 @@ func (srv *Server) pullRepository(stdout io.Writer, remote, askedTag string) err
|
|||
success := false
|
||||
for _, ep := range repoData.Endpoints {
|
||||
if err := srv.pullImage(stdout, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil {
|
||||
fmt.Fprintf(stdout, "Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err)
|
||||
fmt.Fprintf(stdout, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err)
|
||||
continue
|
||||
}
|
||||
if err := srv.runtime.repositories.Set(remote, img.Tag, img.Id, true); err != nil {
|
||||
|
@ -414,7 +413,7 @@ func (srv *Server) getChecksum(imageId string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
|
||||
if _, err := os.Stat(layerArchivePath(graph.imageRoot(imageId))); err != nil {
|
||||
if _, err := os.Stat(layerArchivePath(srv.runtime.graph.imageRoot(imageId))); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// TODO: Ask the registry for the checksum
|
||||
// As the archive is not there, it is supposed to come from a pull.
|
||||
|
@ -432,25 +431,25 @@ func (srv *Server) getChecksum(imageId string) (string, error) {
|
|||
|
||||
// Retrieve the all the images to be uploaded in the correct order
|
||||
// Note: we can't use a map as it is not ordered
|
||||
func (srv *Server) getImageList() ([]*registry.ImgData, error) {
|
||||
var imgList []*ImgData
|
||||
func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgData, error) {
|
||||
var imgList []*registry.ImgData
|
||||
|
||||
imageSet := make(map[string]struct{})
|
||||
for tag, id := range localRepo {
|
||||
img, err := graph.Get(id)
|
||||
img, err := srv.runtime.graph.Get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
img.WalkHistory(func(img *Image) error {
|
||||
if _, exists := imageSet[img.Id]; exists {
|
||||
return nil
|
||||
}
|
||||
imageSet[img.Id] = struct{}{}
|
||||
checksum, err := graph.getChecksum(img.Id)
|
||||
checksum, err := srv.getChecksum(img.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
imgList = append([]*ImgData{{
|
||||
imgList = append([]*registry.ImgData{{
|
||||
Id: img.Id,
|
||||
Checksum: checksum,
|
||||
Tag: tag,
|
||||
|
@ -458,47 +457,51 @@ func (srv *Server) getImageList() ([]*registry.ImgData, error) {
|
|||
return nil
|
||||
})
|
||||
}
|
||||
return imgList
|
||||
return imgList, nil
|
||||
}
|
||||
|
||||
func (srv *Server) pushRepository(out io.Writer, name string, localRepo Repository) error {
|
||||
func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[string]string) error {
|
||||
fmt.Fprintf(out, "Processing checksums\n")
|
||||
imgList, err := getImageList()
|
||||
imgList, err := srv.getImageList(localRepo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(stdout, "Sending image list\n")
|
||||
fmt.Fprintf(out, "Sending image list\n")
|
||||
|
||||
repoData, err := srv.registry.PushJsonIndex(name, imgList, false)
|
||||
repoData, err := srv.registry.PushImageJsonIndex(name, imgList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
repoData.ImgList = imgList
|
||||
|
||||
// FIXME: Send only needed images
|
||||
for _, ep := range repoData.Endpoints {
|
||||
fmt.Fprintf(stdout, "Pushing repository %s to %s (%d tags)\r\n", remote, ep, len(localRepo))
|
||||
fmt.Fprintf(out, "Pushing repository %s to %s (%d tags)\r\n", name, ep, len(localRepo))
|
||||
// For each image within the repo, push them
|
||||
for _, elem := range imgList {
|
||||
if err := srv.pushImage(out, remote, elem.tag, elem.Id, ep, repoData.Tokens); err != nil {
|
||||
if err := srv.pushImage(out, name, elem.Id, ep, repoData.Tokens); err != nil {
|
||||
// FIXME: Continue on error?
|
||||
return err
|
||||
}
|
||||
if err := srv.registry.PushRegistryTag(name, elem.Id, elem.Tag, ep, repoData.Tokens); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := srv.registry.PushJsonIndex(name, imgList, true); err != nil {
|
||||
if _, err := srv.registry.PushImageJsonIndex(name, imgList, true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srv *Server) pushImage(out io.Writer, remote, tag, imgId, registry string, token []string) error {
|
||||
func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []string) error {
|
||||
jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgId, "json"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err)
|
||||
}
|
||||
fmt.Fprintf(out, "Pushing %s\r\n", imgId)
|
||||
|
||||
// Make sure we have the image's checksum
|
||||
checksum, err := srv.getChecksum(imgId)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -533,7 +536,16 @@ func (srv *Server) pushImage(out io.Writer, remote, tag, imgId, registry string,
|
|||
Size: st.Size(),
|
||||
}
|
||||
}
|
||||
return srv.registry.PushImage(imgData, jsonRaw, ProgressReader(layerData, int(layerData.Size), out, ""), registry, token)
|
||||
|
||||
// Send the json
|
||||
if err := srv.registry.PushImageJsonRegistry(imgData, jsonRaw, ep, token); err != nil {
|
||||
return err
|
||||
}
|
||||
// Send the layer
|
||||
if err := srv.registry.PushImageLayerRegistry(imgData.Id, utils.ProgressReader(layerData, int(layerData.Size), out, ""), ep, token); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srv *Server) ImagePush(name, registry string, out io.Writer) error {
|
||||
|
@ -551,7 +563,7 @@ func (srv *Server) ImagePush(name, registry string, out io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := srv.pushImage(out, name, imgData.Tag, registry, nil); err != nil {
|
||||
if err := srv.pushImage(out, name, img.Id, registry, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue