From 828d1aa50769999ee50bbac893f4e9ef6d49f10c Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Wed, 15 May 2013 03:27:15 +0000 Subject: [PATCH] Begin to implement push with new project structure --- registry/registry.go | 467 ++++++++++++++++--------------------------- server.go | 175 ++++++++++++++-- 2 files changed, 331 insertions(+), 311 deletions(-) diff --git a/registry/registry.go b/registry/registry.go index 8900f27550..7254b49efc 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,6 +1,7 @@ package registry import ( + "bytes" "encoding/json" "fmt" "github.com/dotcloud/docker/auth" @@ -288,322 +289,200 @@ func (r *Registry) GetRepositoryData(remote string) (*RepositoryData, error) { }, nil } -// // Push a local image to the registry -// func (r *Registry) PushImage(stdout io.Writer, img *Image, registry string, token []string) error { -// registry = "https://" + registry + "/v1" +// Push a local image to the registry +func (r *Registry) PushImage(imgData *ImgData, jsonRaw []byte, layer io.Reader, registry string, token []string) error { + registry = "https://" + registry + "/v1" -// client := graph.getHttpClient() -// jsonRaw, err := ioutil.ReadFile(path.Join(graph.Root, img.Id, "json")) -// if err != nil { -// return fmt.Errorf("Error while retreiving the path for {%s}: %s", img.Id, err) -// } + client := r.getHttpClient() -// fmt.Fprintf(stdout, "Pushing %s metadata\r\n", img.Id) + // FIXME: try json with UTF8 + req, err := http.NewRequest("PUT", registry+"/images/"+imgData.Id+"/json", strings.NewReader(string(jsonRaw))) + if err != nil { + return err + } + req.Header.Add("Content-type", "application/json") + req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) -// // FIXME: try json with UTF8 -// jsonData := strings.NewReader(string(jsonRaw)) -// req, err := http.NewRequest("PUT", registry+"/images/"+img.Id+"/json", jsonData) -// if err != nil { -// return err -// } -// req.Header.Add("Content-type", "application/json") -// req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) + req.Header.Set("X-Docker-Checksum", imgData.Checksum) + utils.Debugf("Setting checksum for %s: %s", imgData.Id, imgData.Checksum) + res, err := doWithCookies(client, req) + if err != nil { + return fmt.Errorf("Failed to upload metadata: %s", err) + } + defer res.Body.Close() + if len(res.Cookies()) > 0 { + client.Jar.SetCookies(req.URL, res.Cookies()) + } + if res.StatusCode != 200 { + errBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("HTTP code %d while uploading metadata and error when"+ + " trying to parse response body: %v", res.StatusCode, err) + } + var jsonBody map[string]string + if err := json.Unmarshal(errBody, &jsonBody); err != nil { + errBody = []byte(err.Error()) + } else if jsonBody["error"] == "Image already exists" { + utils.Debugf("Image %s already uploaded ; skipping\n", imgData.Id) + return nil + } + return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody) + } -// checksum, err := img.Checksum() -// if err != nil { -// return fmt.Errorf("Error while retrieving checksum for %s: %v", img.Id, err) -// } -// req.Header.Set("X-Docker-Checksum", checksum) -// utils.Debugf("Setting checksum for %s: %s", img.ShortId(), checksum) -// res, err := doWithCookies(client, req) -// if err != nil { -// return fmt.Errorf("Failed to upload metadata: %s", err) -// } -// defer res.Body.Close() -// if len(res.Cookies()) > 0 { -// client.Jar.SetCookies(req.URL, res.Cookies()) -// } -// if res.StatusCode != 200 { -// errBody, err := ioutil.ReadAll(res.Body) -// if err != nil { -// return fmt.Errorf("HTTP code %d while uploading metadata and error when"+ -// " trying to parse response body: %v", res.StatusCode, err) -// } -// var jsonBody map[string]string -// if err := json.Unmarshal(errBody, &jsonBody); err != nil { -// errBody = []byte(err.Error()) -// } else if jsonBody["error"] == "Image already exists" { -// fmt.Fprintf(stdout, "Image %v already uploaded ; skipping\n", img.Id) -// return nil -// } -// return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody) -// } + req3, err := http.NewRequest("PUT", registry+"/images/"+imgData.Id+"/layer", layer) + if err != nil { + return err + } -// fmt.Fprintf(stdout, "Pushing %s fs layer\r\n", img.Id) -// root, err := img.root() -// if err != nil { -// return err -// } + req3.ContentLength = -1 + req3.TransferEncoding = []string{"chunked"} + req3.Header.Set("Authorization", "Token "+strings.Join(token, ",")) + res3, err := doWithCookies(client, req3) + if err != nil { + return fmt.Errorf("Failed to upload layer: %s", err) + } + defer res3.Body.Close() -// var layerData *TempArchive -// // If the archive exists, use it -// file, err := os.Open(layerArchivePath(root)) -// if err != nil { -// if os.IsNotExist(err) { -// // If the archive does not exist, create one from the layer -// layerData, err = graph.TempLayerArchive(img.Id, Xz, stdout) -// if err != nil { -// return fmt.Errorf("Failed to generate layer archive: %s", err) -// } -// } else { -// return err -// } -// } else { -// defer file.Close() -// st, err := file.Stat() -// if err != nil { -// return err -// } -// layerData = &TempArchive{file, st.Size()} -// } + if res3.StatusCode != 200 { + errBody, err := ioutil.ReadAll(res3.Body) + if err != nil { + return fmt.Errorf("HTTP code %d while uploading metadata and error when"+ + " trying to parse response body: %v", res.StatusCode, err) + } + return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res3.StatusCode, errBody) + } + return nil +} -// req3, err := http.NewRequest("PUT", registry+"/images/"+img.Id+"/layer", utils.ProgressReader(layerData, int(layerData.Size), stdout, "")) -// if err != nil { -// return err -// } +// push a tag on the registry. +// Remote has the format '/ +func (r *Registry) pushTag(remote, revision, tag, registry string, token []string) error { + // "jsonify" the string + revision = "\"" + revision + "\"" + registry = "https://" + registry + "/v1" -// req3.ContentLength = -1 -// req3.TransferEncoding = []string{"chunked"} -// req3.Header.Set("Authorization", "Token "+strings.Join(token, ",")) -// res3, err := doWithCookies(client, req3) -// if err != nil { -// return fmt.Errorf("Failed to upload layer: %s", err) -// } -// defer res3.Body.Close() + utils.Debugf("Pushing tags for rev [%s] on {%s}\n", revision, registry+"/users/"+remote+"/"+tag) -// if res3.StatusCode != 200 { -// errBody, err := ioutil.ReadAll(res3.Body) -// if err != nil { -// return fmt.Errorf("HTTP code %d while uploading metadata and error when"+ -// " trying to parse response body: %v", res.StatusCode, err) -// } -// return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res3.StatusCode, errBody) -// } -// return nil -// } + client := r.getHttpClient() + req, err := http.NewRequest("PUT", registry+"/repositories/"+remote+"/tags/"+tag, strings.NewReader(revision)) + if err != nil { + return err + } + req.Header.Add("Content-type", "application/json") + req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) + req.ContentLength = int64(len(revision)) + res, err := doWithCookies(client, req) + if err != nil { + return err + } + res.Body.Close() + if res.StatusCode != 200 && res.StatusCode != 201 { + return fmt.Errorf("Internal server error: %d trying to push tag %s on %s", res.StatusCode, tag, remote) + } + return nil +} -// // push a tag on the registry. -// // Remote has the format '/ -// func (r *Registry) pushTag(remote, revision, tag, registry string, token []string) error { -// // "jsonify" the string -// revision = "\"" + revision + "\"" -// registry = "https://" + registry + "/v1" +// FIXME: this should really be PushTag +func (r *Registry) PushLayer(remote, tag, imgId, registry string, token []string) error { + // Check if the local impage exists + img, err := graph.Get(imgId) + if err != nil { + fmt.Fprintf(stdout, "Skipping tag %s:%s: %s does not exist\r\n", remote, tag, imgId) + return nil + } + fmt.Fprintf(stdout, "Pushing image %s:%s\r\n", remote, tag) + // Push the image + if err = graph.PushImage(stdout, img, registry, token); err != nil { + return err + } + fmt.Fprintf(stdout, "Registering tag %s:%s\r\n", remote, tag) + // And then the tag + if err = graph.pushTag(remote, imgId, tag, registry, token); err != nil { + return err + } + return nil +} -// utils.Debugf("Pushing tags for rev [%s] on {%s}\n", revision, registry+"/users/"+remote+"/"+tag) +func (r *Registry) PushJsonIndex(remote string, imgList []*ImgData, validate bool) (*RepositoryData, error) { + client := r.getHttpClient() -// client := graph.getHttpClient() -// req, err := http.NewRequest("PUT", registry+"/repositories/"+remote+"/tags/"+tag, strings.NewReader(revision)) -// if err != nil { -// return err -// } -// req.Header.Add("Content-type", "application/json") -// req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) -// req.ContentLength = int64(len(revision)) -// res, err := doWithCookies(client, req) -// if err != nil { -// return err -// } -// res.Body.Close() -// if res.StatusCode != 200 && res.StatusCode != 201 { -// return fmt.Errorf("Internal server error: %d trying to push tag %s on %s", res.StatusCode, tag, remote) -// } -// return nil -// } + imgListJson, err := json.Marshal(imgList) + if err != nil { + return nil, err + } -// // FIXME: this should really be PushTag -// func (r *Registry) pushPrimitive(stdout io.Writer, remote, tag, imgId, registry string, token []string) error { -// // Check if the local impage exists -// img, err := graph.Get(imgId) -// if err != nil { -// fmt.Fprintf(stdout, "Skipping tag %s:%s: %s does not exist\r\n", remote, tag, imgId) -// return nil -// } -// fmt.Fprintf(stdout, "Pushing image %s:%s\r\n", remote, tag) -// // Push the image -// if err = graph.PushImage(stdout, img, registry, token); err != nil { -// return err -// } -// fmt.Fprintf(stdout, "Registering tag %s:%s\r\n", remote, tag) -// // And then the tag -// if err = graph.pushTag(remote, imgId, tag, registry, token); err != nil { -// return err -// } -// return nil -// } + utils.Debugf("json sent: %s\n", imgListJson) -// // Retrieve the checksum of an image -// // Priority: -// // - Check on the stored checksums -// // - Check if the archive exists, if it does not, ask the registry -// // - If the archive does exists, process the checksum from it -// // - If the archive does not exists and not found on registry, process checksum from layer -// func (r *Registry) getChecksum(imageId string) (string, error) { -// // FIXME: Use in-memory map instead of reading the file each time -// if sums, err := graph.getStoredChecksums(); err != nil { -// return "", err -// } else if checksum, exists := sums[imageId]; exists { -// return checksum, nil -// } + req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/", bytes.NewReader(imgListJson)) + if err != nil { + return nil, err + } + req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password) + req.ContentLength = int64(len(imgListJson)) + req.Header.Set("X-Docker-Token", "true") -// img, err := graph.Get(imageId) -// if err != nil { -// return "", err -// } + res, err := client.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() -// if _, err := os.Stat(layerArchivePath(graph.imageRoot(imageId))); err != nil { -// if os.IsNotExist(err) { -// // TODO: Ask the registry for the checksum -// // As the archive is not there, it is supposed to come from a pull. -// } else { -// return "", err -// } -// } + // Redirect if necessary + for res.StatusCode >= 300 && res.StatusCode < 400 { + utils.Debugf("Redirected to %s\n", res.Header.Get("Location")) + req, err = http.NewRequest("PUT", res.Header.Get("Location"), bytes.NewReader(imgListJson)) + if err != nil { + return nil, err + } + req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password) + req.ContentLength = int64(len(imgListJson)) + req.Header.Set("X-Docker-Token", "true") -// checksum, err := img.Checksum() -// if err != nil { -// return "", err -// } -// return checksum, nil -// } + res, err = client.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + } -// // Push a repository to the registry. -// // Remote has the format '/ -// func (r *Registry) PushRepository(stdout io.Writer, remote string, localRepo Repository, authConfig *auth.AuthConfig) error { -// client := graph.getHttpClient() -// // FIXME: Do not reset the cookie each time? (need to reset it in case updating latest of a repo and repushing) -// client.Jar = cookiejar.NewCookieJar() -// var imgList []*ImgListJson + if res.StatusCode != 200 && res.StatusCode != 201 { + errBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody) + } -// fmt.Fprintf(stdout, "Processing checksums\n") -// imageSet := make(map[string]struct{}) + var tokens []string + if res.Header.Get("X-Docker-Token") != "" { + tokens = res.Header["X-Docker-Token"] + utils.Debugf("Auth token: %v", tokens) + } else { + return nil, fmt.Errorf("Index response didn't contain an access token") + } -// for tag, id := range localRepo { -// img, err := graph.Get(id) -// if err != nil { -// return err -// } -// img.WalkHistory(func(img *Image) error { -// if _, exists := imageSet[img.Id]; exists { -// return nil -// } -// imageSet[img.Id] = struct{}{} -// checksum, err := graph.getChecksum(img.Id) -// if err != nil { -// return err -// } -// imgList = append([]*ImgListJson{{ -// Id: img.Id, -// Checksum: checksum, -// tag: tag, -// }}, imgList...) -// return nil -// }) -// } + var endpoints []string + if res.Header.Get("X-Docker-Endpoints") != "" { + endpoints = res.Header["X-Docker-Endpoints"] + } else { + return nil, fmt.Errorf("Index response didn't contain any endpoints") + } -// imgListJson, err := json.Marshal(imgList) -// if err != nil { -// return err -// } + if validate { + if res.StatusCode != 204 { + if errBody, err := ioutil.ReadAll(res.Body); err != nil { + return nil, err + } else { + return nil, fmt.Errorf("Error: Status %d trying to push checksums %s: %s", res.StatusCode, remote, errBody) + } + } + } -// utils.Debugf("json sent: %s\n", imgListJson) - -// fmt.Fprintf(stdout, "Sending image list\n") -// req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/", bytes.NewReader(imgListJson)) -// if err != nil { -// return err -// } -// req.SetBasicAuth(authConfig.Username, authConfig.Password) -// req.ContentLength = int64(len(imgListJson)) -// req.Header.Set("X-Docker-Token", "true") - -// res, err := client.Do(req) -// if err != nil { -// return err -// } -// defer res.Body.Close() - -// for res.StatusCode >= 300 && res.StatusCode < 400 { -// utils.Debugf("Redirected to %s\n", res.Header.Get("Location")) -// req, err = http.NewRequest("PUT", res.Header.Get("Location"), bytes.NewReader(imgListJson)) -// if err != nil { -// return err -// } -// req.SetBasicAuth(authConfig.Username, authConfig.Password) -// req.ContentLength = int64(len(imgListJson)) -// req.Header.Set("X-Docker-Token", "true") - -// res, err = client.Do(req) -// if err != nil { -// return err -// } -// defer res.Body.Close() -// } - -// if res.StatusCode != 200 && res.StatusCode != 201 { -// errBody, err := ioutil.ReadAll(res.Body) -// if err != nil { -// return err -// } -// return fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody) -// } - -// var token, endpoints []string -// if res.Header.Get("X-Docker-Token") != "" { -// token = res.Header["X-Docker-Token"] -// utils.Debugf("Auth token: %v", token) -// } else { -// return fmt.Errorf("Index response didn't contain an access token") -// } -// if res.Header.Get("X-Docker-Endpoints") != "" { -// endpoints = res.Header["X-Docker-Endpoints"] -// } else { -// return fmt.Errorf("Index response didn't contain any endpoints") -// } - -// // FIXME: Send only needed images -// for _, registry := range endpoints { -// fmt.Fprintf(stdout, "Pushing repository %s to %s (%d tags)\r\n", remote, registry, len(localRepo)) -// // For each image within the repo, push them -// for _, elem := range imgList { -// if err := graph.pushPrimitive(stdout, remote, elem.tag, elem.Id, registry, token); err != nil { -// // FIXME: Continue on error? -// return err -// } -// } -// } - -// req2, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/images", bytes.NewReader(imgListJson)) -// if err != nil { -// return err -// } -// req2.SetBasicAuth(authConfig.Username, authConfig.Password) -// req2.Header["X-Docker-Endpoints"] = endpoints -// req2.ContentLength = int64(len(imgListJson)) -// res2, err := client.Do(req2) -// if err != nil { -// return err -// } -// defer res2.Body.Close() -// if res2.StatusCode != 204 { -// if errBody, err := ioutil.ReadAll(res2.Body); err != nil { -// return err -// } else { -// return fmt.Errorf("Error: Status %d trying to push checksums %s: %s", res2.StatusCode, remote, errBody) -// } -// } - -// return nil -// } + return &RepositoryData{ + Tokens: tokens, + Endpoints: endpoints, + }, nil +} func (r *Registry) SearchRepositories(stdout io.Writer, term string) (*SearchResults, error) { client := r.getHttpClient() diff --git a/server.go b/server.go index 680d13d3ef..e954c57399 100644 --- a/server.go +++ b/server.go @@ -395,24 +395,165 @@ func (srv *Server) ImagePull(name, tag, registry string, out io.Writer) error { return nil } -func (srv *Server) ImagePush(name, registry string, out io.Writer) error { - // img, err := srv.runtime.graph.Get(name) - // if err != nil { - // utils.Debugf("The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) - // // If it fails, try to get the repository - // if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists { - // if err := srv.runtime.graph.PushRepository(out, name, localRepo, srv.runtime.authConfig); err != nil { - // return err - // } - // return nil - // } +// Retrieve the checksum of an image +// Priority: +// - Check on the stored checksums +// - Check if the archive exists, if it does not, ask the registry +// - If the archive does exists, process the checksum from it +// - If the archive does not exists and not found on registry, process checksum from layer +func (srv *Server) getChecksum(imageId string) (string, error) { + // FIXME: Use in-memory map instead of reading the file each time + if sums, err := srv.runtime.graph.getStoredChecksums(); err != nil { + return "", err + } else if checksum, exists := sums[imageId]; exists { + return checksum, nil + } - // return err - // } - // err = srv.runtime.graph.PushImage(out, img, registry, nil) - // if err != nil { - // return err - // } + img, err := srv.runtime.graph.Get(imageId) + if err != nil { + return "", err + } + + if _, err := os.Stat(layerArchivePath(graph.imageRoot(imageId))); err != nil { + if os.IsNotExist(err) { + // TODO: Ask the registry for the checksum + // As the archive is not there, it is supposed to come from a pull. + } else { + return "", err + } + } + + checksum, err := img.Checksum() + if err != nil { + return "", err + } + return checksum, nil +} + +// Retrieve the all the images to be uploaded in the correct order +// Note: we can't use a map as it is not ordered +func (srv *Server) getImageList() ([]*registry.ImgData, error) { + var imgList []*ImgData + + imageSet := make(map[string]struct{}) + for tag, id := range localRepo { + img, err := graph.Get(id) + if err != nil { + return err + } + img.WalkHistory(func(img *Image) error { + if _, exists := imageSet[img.Id]; exists { + return nil + } + imageSet[img.Id] = struct{}{} + checksum, err := graph.getChecksum(img.Id) + if err != nil { + return err + } + imgList = append([]*ImgData{{ + Id: img.Id, + Checksum: checksum, + Tag: tag, + }}, imgList...) + return nil + }) + } + return imgList +} + +func (srv *Server) pushRepository(out io.Writer, name string, localRepo Repository) error { + fmt.Fprintf(out, "Processing checksums\n") + imgList, err := getImageList() + if err != nil { + return err + } + fmt.Fprintf(stdout, "Sending image list\n") + + repoData, err := srv.registry.PushJsonIndex(name, imgList, false) + if err != nil { + return err + } + repoData.ImgList = imgList + + // FIXME: Send only needed images + for _, ep := range repoData.Endpoints { + fmt.Fprintf(stdout, "Pushing repository %s to %s (%d tags)\r\n", remote, ep, len(localRepo)) + // For each image within the repo, push them + for _, elem := range imgList { + if err := srv.pushImage(out, remote, elem.tag, elem.Id, ep, repoData.Tokens); err != nil { + // FIXME: Continue on error? + return err + } + } + } + + if _, err := srv.registry.PushJsonIndex(name, imgList, true); err != nil { + return err + } +} + +func (srv *Server) pushImage(out io.Writer, remote, tag, imgId, registry string, token []string) error { + jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgId, "json")) + if err != nil { + return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err) + } + fmt.Fprintf(out, "Pushing %s\r\n", imgId) + + checksum, err := srv.getChecksum(imgId) + if err != nil { + return err + } + imgData := ®istry.ImgData{ + Id: imgId, + Checksum: checksum, + } + + // Retrieve the tarball to be sent + var layerData *TempArchive + // If the archive exists, use it + file, err := os.Open(layerArchivePath(srv.runtime.graph.imageRoot(imgId))) + if err != nil { + if os.IsNotExist(err) { + // If the archive does not exist, create one from the layer + layerData, err = srv.runtime.graph.TempLayerArchive(imgId, Xz, out) + if err != nil { + return fmt.Errorf("Failed to generate layer archive: %s", err) + } + } else { + return err + } + } else { + defer file.Close() + st, err := file.Stat() + if err != nil { + return err + } + layerData = &TempArchive{ + File: file, + Size: st.Size(), + } + } + return srv.registry.PushImage(imgData, jsonRaw, ProgressReader(layerData, int(layerData.Size), out, ""), registry, token) +} + +func (srv *Server) ImagePush(name, registry string, out io.Writer) error { + img, err := srv.runtime.graph.Get(name) + if err != nil { + utils.Debugf("The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) + // If it fails, try to get the repository + if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists { + if err := srv.pushRepository(out, name, localRepo); err != nil { + return err + } + return nil + } + + return err + } + + if err := srv.pushImage(out, name, imgData.Tag, registry, nil); err != nil { + return err + } return nil }