Begin to implement push with new project structure

This commit is contained in:
Guillaume J. Charmes 2013-05-15 03:27:15 +00:00
parent e7077320ff
commit 828d1aa507
2 changed files with 331 additions and 311 deletions

View File

@ -1,6 +1,7 @@
package registry
import (
"bytes"
"encoding/json"
"fmt"
"github.com/dotcloud/docker/auth"
@ -288,322 +289,200 @@ func (r *Registry) GetRepositoryData(remote string) (*RepositoryData, error) {
}, nil
}
// // Push a local image to the registry
// func (r *Registry) PushImage(stdout io.Writer, img *Image, registry string, token []string) error {
// registry = "https://" + registry + "/v1"
// Push a local image to the registry
func (r *Registry) PushImage(imgData *ImgData, jsonRaw []byte, layer io.Reader, registry string, token []string) error {
registry = "https://" + registry + "/v1"
// client := graph.getHttpClient()
// jsonRaw, err := ioutil.ReadFile(path.Join(graph.Root, img.Id, "json"))
// if err != nil {
// return fmt.Errorf("Error while retreiving the path for {%s}: %s", img.Id, err)
// }
client := r.getHttpClient()
// fmt.Fprintf(stdout, "Pushing %s metadata\r\n", img.Id)
// FIXME: try json with UTF8
req, err := http.NewRequest("PUT", registry+"/images/"+imgData.Id+"/json", strings.NewReader(string(jsonRaw)))
if err != nil {
return err
}
req.Header.Add("Content-type", "application/json")
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
// // FIXME: try json with UTF8
// jsonData := strings.NewReader(string(jsonRaw))
// req, err := http.NewRequest("PUT", registry+"/images/"+img.Id+"/json", jsonData)
// if err != nil {
// return err
// }
// req.Header.Add("Content-type", "application/json")
// req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
utils.Debugf("Setting checksum for %s: %s", imgData.Id, imgData.Checksum)
res, err := doWithCookies(client, req)
if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err)
}
defer res.Body.Close()
if len(res.Cookies()) > 0 {
client.Jar.SetCookies(req.URL, res.Cookies())
}
if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when"+
" trying to parse response body: %v", res.StatusCode, err)
}
var jsonBody map[string]string
if err := json.Unmarshal(errBody, &jsonBody); err != nil {
errBody = []byte(err.Error())
} else if jsonBody["error"] == "Image already exists" {
utils.Debugf("Image %s already uploaded ; skipping\n", imgData.Id)
return nil
}
return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody)
}
// checksum, err := img.Checksum()
// if err != nil {
// return fmt.Errorf("Error while retrieving checksum for %s: %v", img.Id, err)
// }
// req.Header.Set("X-Docker-Checksum", checksum)
// utils.Debugf("Setting checksum for %s: %s", img.ShortId(), checksum)
// res, err := doWithCookies(client, req)
// if err != nil {
// return fmt.Errorf("Failed to upload metadata: %s", err)
// }
// defer res.Body.Close()
// if len(res.Cookies()) > 0 {
// client.Jar.SetCookies(req.URL, res.Cookies())
// }
// if res.StatusCode != 200 {
// errBody, err := ioutil.ReadAll(res.Body)
// if err != nil {
// return fmt.Errorf("HTTP code %d while uploading metadata and error when"+
// " trying to parse response body: %v", res.StatusCode, err)
// }
// var jsonBody map[string]string
// if err := json.Unmarshal(errBody, &jsonBody); err != nil {
// errBody = []byte(err.Error())
// } else if jsonBody["error"] == "Image already exists" {
// fmt.Fprintf(stdout, "Image %v already uploaded ; skipping\n", img.Id)
// return nil
// }
// return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody)
// }
req3, err := http.NewRequest("PUT", registry+"/images/"+imgData.Id+"/layer", layer)
if err != nil {
return err
}
// fmt.Fprintf(stdout, "Pushing %s fs layer\r\n", img.Id)
// root, err := img.root()
// if err != nil {
// return err
// }
req3.ContentLength = -1
req3.TransferEncoding = []string{"chunked"}
req3.Header.Set("Authorization", "Token "+strings.Join(token, ","))
res3, err := doWithCookies(client, req3)
if err != nil {
return fmt.Errorf("Failed to upload layer: %s", err)
}
defer res3.Body.Close()
// var layerData *TempArchive
// // If the archive exists, use it
// file, err := os.Open(layerArchivePath(root))
// if err != nil {
// if os.IsNotExist(err) {
// // If the archive does not exist, create one from the layer
// layerData, err = graph.TempLayerArchive(img.Id, Xz, stdout)
// if err != nil {
// return fmt.Errorf("Failed to generate layer archive: %s", err)
// }
// } else {
// return err
// }
// } else {
// defer file.Close()
// st, err := file.Stat()
// if err != nil {
// return err
// }
// layerData = &TempArchive{file, st.Size()}
// }
if res3.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res3.Body)
if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when"+
" trying to parse response body: %v", res.StatusCode, err)
}
return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res3.StatusCode, errBody)
}
return nil
}
// req3, err := http.NewRequest("PUT", registry+"/images/"+img.Id+"/layer", utils.ProgressReader(layerData, int(layerData.Size), stdout, ""))
// if err != nil {
// return err
// }
// push a tag on the registry.
// Remote has the format '<user>/<repo>
func (r *Registry) pushTag(remote, revision, tag, registry string, token []string) error {
// "jsonify" the string
revision = "\"" + revision + "\""
registry = "https://" + registry + "/v1"
// req3.ContentLength = -1
// req3.TransferEncoding = []string{"chunked"}
// req3.Header.Set("Authorization", "Token "+strings.Join(token, ","))
// res3, err := doWithCookies(client, req3)
// if err != nil {
// return fmt.Errorf("Failed to upload layer: %s", err)
// }
// defer res3.Body.Close()
utils.Debugf("Pushing tags for rev [%s] on {%s}\n", revision, registry+"/users/"+remote+"/"+tag)
// if res3.StatusCode != 200 {
// errBody, err := ioutil.ReadAll(res3.Body)
// if err != nil {
// return fmt.Errorf("HTTP code %d while uploading metadata and error when"+
// " trying to parse response body: %v", res.StatusCode, err)
// }
// return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res3.StatusCode, errBody)
// }
// return nil
// }
client := r.getHttpClient()
req, err := http.NewRequest("PUT", registry+"/repositories/"+remote+"/tags/"+tag, strings.NewReader(revision))
if err != nil {
return err
}
req.Header.Add("Content-type", "application/json")
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.ContentLength = int64(len(revision))
res, err := doWithCookies(client, req)
if err != nil {
return err
}
res.Body.Close()
if res.StatusCode != 200 && res.StatusCode != 201 {
return fmt.Errorf("Internal server error: %d trying to push tag %s on %s", res.StatusCode, tag, remote)
}
return nil
}
// // push a tag on the registry.
// // Remote has the format '<user>/<repo>
// func (r *Registry) pushTag(remote, revision, tag, registry string, token []string) error {
// // "jsonify" the string
// revision = "\"" + revision + "\""
// registry = "https://" + registry + "/v1"
// FIXME: this should really be PushTag
func (r *Registry) PushLayer(remote, tag, imgId, registry string, token []string) error {
// Check if the local impage exists
img, err := graph.Get(imgId)
if err != nil {
fmt.Fprintf(stdout, "Skipping tag %s:%s: %s does not exist\r\n", remote, tag, imgId)
return nil
}
fmt.Fprintf(stdout, "Pushing image %s:%s\r\n", remote, tag)
// Push the image
if err = graph.PushImage(stdout, img, registry, token); err != nil {
return err
}
fmt.Fprintf(stdout, "Registering tag %s:%s\r\n", remote, tag)
// And then the tag
if err = graph.pushTag(remote, imgId, tag, registry, token); err != nil {
return err
}
return nil
}
// utils.Debugf("Pushing tags for rev [%s] on {%s}\n", revision, registry+"/users/"+remote+"/"+tag)
func (r *Registry) PushJsonIndex(remote string, imgList []*ImgData, validate bool) (*RepositoryData, error) {
client := r.getHttpClient()
// client := graph.getHttpClient()
// req, err := http.NewRequest("PUT", registry+"/repositories/"+remote+"/tags/"+tag, strings.NewReader(revision))
// if err != nil {
// return err
// }
// req.Header.Add("Content-type", "application/json")
// req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
// req.ContentLength = int64(len(revision))
// res, err := doWithCookies(client, req)
// if err != nil {
// return err
// }
// res.Body.Close()
// if res.StatusCode != 200 && res.StatusCode != 201 {
// return fmt.Errorf("Internal server error: %d trying to push tag %s on %s", res.StatusCode, tag, remote)
// }
// return nil
// }
imgListJson, err := json.Marshal(imgList)
if err != nil {
return nil, err
}
// // FIXME: this should really be PushTag
// func (r *Registry) pushPrimitive(stdout io.Writer, remote, tag, imgId, registry string, token []string) error {
// // Check if the local impage exists
// img, err := graph.Get(imgId)
// if err != nil {
// fmt.Fprintf(stdout, "Skipping tag %s:%s: %s does not exist\r\n", remote, tag, imgId)
// return nil
// }
// fmt.Fprintf(stdout, "Pushing image %s:%s\r\n", remote, tag)
// // Push the image
// if err = graph.PushImage(stdout, img, registry, token); err != nil {
// return err
// }
// fmt.Fprintf(stdout, "Registering tag %s:%s\r\n", remote, tag)
// // And then the tag
// if err = graph.pushTag(remote, imgId, tag, registry, token); err != nil {
// return err
// }
// return nil
// }
utils.Debugf("json sent: %s\n", imgListJson)
// // Retrieve the checksum of an image
// // Priority:
// // - Check on the stored checksums
// // - Check if the archive exists, if it does not, ask the registry
// // - If the archive does exists, process the checksum from it
// // - If the archive does not exists and not found on registry, process checksum from layer
// func (r *Registry) getChecksum(imageId string) (string, error) {
// // FIXME: Use in-memory map instead of reading the file each time
// if sums, err := graph.getStoredChecksums(); err != nil {
// return "", err
// } else if checksum, exists := sums[imageId]; exists {
// return checksum, nil
// }
req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/", bytes.NewReader(imgListJson))
if err != nil {
return nil, err
}
req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password)
req.ContentLength = int64(len(imgListJson))
req.Header.Set("X-Docker-Token", "true")
// img, err := graph.Get(imageId)
// if err != nil {
// return "", err
// }
res, err := client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
// if _, err := os.Stat(layerArchivePath(graph.imageRoot(imageId))); err != nil {
// if os.IsNotExist(err) {
// // TODO: Ask the registry for the checksum
// // As the archive is not there, it is supposed to come from a pull.
// } else {
// return "", err
// }
// }
// Redirect if necessary
for res.StatusCode >= 300 && res.StatusCode < 400 {
utils.Debugf("Redirected to %s\n", res.Header.Get("Location"))
req, err = http.NewRequest("PUT", res.Header.Get("Location"), bytes.NewReader(imgListJson))
if err != nil {
return nil, err
}
req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password)
req.ContentLength = int64(len(imgListJson))
req.Header.Set("X-Docker-Token", "true")
// checksum, err := img.Checksum()
// if err != nil {
// return "", err
// }
// return checksum, nil
// }
res, err = client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
}
// // Push a repository to the registry.
// // Remote has the format '<user>/<repo>
// func (r *Registry) PushRepository(stdout io.Writer, remote string, localRepo Repository, authConfig *auth.AuthConfig) error {
// client := graph.getHttpClient()
// // FIXME: Do not reset the cookie each time? (need to reset it in case updating latest of a repo and repushing)
// client.Jar = cookiejar.NewCookieJar()
// var imgList []*ImgListJson
if res.StatusCode != 200 && res.StatusCode != 201 {
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody)
}
// fmt.Fprintf(stdout, "Processing checksums\n")
// imageSet := make(map[string]struct{})
var tokens []string
if res.Header.Get("X-Docker-Token") != "" {
tokens = res.Header["X-Docker-Token"]
utils.Debugf("Auth token: %v", tokens)
} else {
return nil, fmt.Errorf("Index response didn't contain an access token")
}
// for tag, id := range localRepo {
// img, err := graph.Get(id)
// if err != nil {
// return err
// }
// img.WalkHistory(func(img *Image) error {
// if _, exists := imageSet[img.Id]; exists {
// return nil
// }
// imageSet[img.Id] = struct{}{}
// checksum, err := graph.getChecksum(img.Id)
// if err != nil {
// return err
// }
// imgList = append([]*ImgListJson{{
// Id: img.Id,
// Checksum: checksum,
// tag: tag,
// }}, imgList...)
// return nil
// })
// }
var endpoints []string
if res.Header.Get("X-Docker-Endpoints") != "" {
endpoints = res.Header["X-Docker-Endpoints"]
} else {
return nil, fmt.Errorf("Index response didn't contain any endpoints")
}
// imgListJson, err := json.Marshal(imgList)
// if err != nil {
// return err
// }
if validate {
if res.StatusCode != 204 {
if errBody, err := ioutil.ReadAll(res.Body); err != nil {
return nil, err
} else {
return nil, fmt.Errorf("Error: Status %d trying to push checksums %s: %s", res.StatusCode, remote, errBody)
}
}
}
// utils.Debugf("json sent: %s\n", imgListJson)
// fmt.Fprintf(stdout, "Sending image list\n")
// req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/", bytes.NewReader(imgListJson))
// if err != nil {
// return err
// }
// req.SetBasicAuth(authConfig.Username, authConfig.Password)
// req.ContentLength = int64(len(imgListJson))
// req.Header.Set("X-Docker-Token", "true")
// res, err := client.Do(req)
// if err != nil {
// return err
// }
// defer res.Body.Close()
// for res.StatusCode >= 300 && res.StatusCode < 400 {
// utils.Debugf("Redirected to %s\n", res.Header.Get("Location"))
// req, err = http.NewRequest("PUT", res.Header.Get("Location"), bytes.NewReader(imgListJson))
// if err != nil {
// return err
// }
// req.SetBasicAuth(authConfig.Username, authConfig.Password)
// req.ContentLength = int64(len(imgListJson))
// req.Header.Set("X-Docker-Token", "true")
// res, err = client.Do(req)
// if err != nil {
// return err
// }
// defer res.Body.Close()
// }
// if res.StatusCode != 200 && res.StatusCode != 201 {
// errBody, err := ioutil.ReadAll(res.Body)
// if err != nil {
// return err
// }
// return fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody)
// }
// var token, endpoints []string
// if res.Header.Get("X-Docker-Token") != "" {
// token = res.Header["X-Docker-Token"]
// utils.Debugf("Auth token: %v", token)
// } else {
// return fmt.Errorf("Index response didn't contain an access token")
// }
// if res.Header.Get("X-Docker-Endpoints") != "" {
// endpoints = res.Header["X-Docker-Endpoints"]
// } else {
// return fmt.Errorf("Index response didn't contain any endpoints")
// }
// // FIXME: Send only needed images
// for _, registry := range endpoints {
// fmt.Fprintf(stdout, "Pushing repository %s to %s (%d tags)\r\n", remote, registry, len(localRepo))
// // For each image within the repo, push them
// for _, elem := range imgList {
// if err := graph.pushPrimitive(stdout, remote, elem.tag, elem.Id, registry, token); err != nil {
// // FIXME: Continue on error?
// return err
// }
// }
// }
// req2, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/images", bytes.NewReader(imgListJson))
// if err != nil {
// return err
// }
// req2.SetBasicAuth(authConfig.Username, authConfig.Password)
// req2.Header["X-Docker-Endpoints"] = endpoints
// req2.ContentLength = int64(len(imgListJson))
// res2, err := client.Do(req2)
// if err != nil {
// return err
// }
// defer res2.Body.Close()
// if res2.StatusCode != 204 {
// if errBody, err := ioutil.ReadAll(res2.Body); err != nil {
// return err
// } else {
// return fmt.Errorf("Error: Status %d trying to push checksums %s: %s", res2.StatusCode, remote, errBody)
// }
// }
// return nil
// }
return &RepositoryData{
Tokens: tokens,
Endpoints: endpoints,
}, nil
}
func (r *Registry) SearchRepositories(stdout io.Writer, term string) (*SearchResults, error) {
client := r.getHttpClient()

175
server.go
View File

@ -395,24 +395,165 @@ func (srv *Server) ImagePull(name, tag, registry string, out io.Writer) error {
return nil
}
func (srv *Server) ImagePush(name, registry string, out io.Writer) error {
// img, err := srv.runtime.graph.Get(name)
// if err != nil {
// utils.Debugf("The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name]))
// // If it fails, try to get the repository
// if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists {
// if err := srv.runtime.graph.PushRepository(out, name, localRepo, srv.runtime.authConfig); err != nil {
// return err
// }
// return nil
// }
// Retrieve the checksum of an image
// Priority:
// - Check on the stored checksums
// - Check if the archive exists, if it does not, ask the registry
// - If the archive does exists, process the checksum from it
// - If the archive does not exists and not found on registry, process checksum from layer
func (srv *Server) getChecksum(imageId string) (string, error) {
// FIXME: Use in-memory map instead of reading the file each time
if sums, err := srv.runtime.graph.getStoredChecksums(); err != nil {
return "", err
} else if checksum, exists := sums[imageId]; exists {
return checksum, nil
}
// return err
// }
// err = srv.runtime.graph.PushImage(out, img, registry, nil)
// if err != nil {
// return err
// }
img, err := srv.runtime.graph.Get(imageId)
if err != nil {
return "", err
}
if _, err := os.Stat(layerArchivePath(graph.imageRoot(imageId))); err != nil {
if os.IsNotExist(err) {
// TODO: Ask the registry for the checksum
// As the archive is not there, it is supposed to come from a pull.
} else {
return "", err
}
}
checksum, err := img.Checksum()
if err != nil {
return "", err
}
return checksum, nil
}
// Retrieve the all the images to be uploaded in the correct order
// Note: we can't use a map as it is not ordered
func (srv *Server) getImageList() ([]*registry.ImgData, error) {
var imgList []*ImgData
imageSet := make(map[string]struct{})
for tag, id := range localRepo {
img, err := graph.Get(id)
if err != nil {
return err
}
img.WalkHistory(func(img *Image) error {
if _, exists := imageSet[img.Id]; exists {
return nil
}
imageSet[img.Id] = struct{}{}
checksum, err := graph.getChecksum(img.Id)
if err != nil {
return err
}
imgList = append([]*ImgData{{
Id: img.Id,
Checksum: checksum,
Tag: tag,
}}, imgList...)
return nil
})
}
return imgList
}
func (srv *Server) pushRepository(out io.Writer, name string, localRepo Repository) error {
fmt.Fprintf(out, "Processing checksums\n")
imgList, err := getImageList()
if err != nil {
return err
}
fmt.Fprintf(stdout, "Sending image list\n")
repoData, err := srv.registry.PushJsonIndex(name, imgList, false)
if err != nil {
return err
}
repoData.ImgList = imgList
// FIXME: Send only needed images
for _, ep := range repoData.Endpoints {
fmt.Fprintf(stdout, "Pushing repository %s to %s (%d tags)\r\n", remote, ep, len(localRepo))
// For each image within the repo, push them
for _, elem := range imgList {
if err := srv.pushImage(out, remote, elem.tag, elem.Id, ep, repoData.Tokens); err != nil {
// FIXME: Continue on error?
return err
}
}
}
if _, err := srv.registry.PushJsonIndex(name, imgList, true); err != nil {
return err
}
}
func (srv *Server) pushImage(out io.Writer, remote, tag, imgId, registry string, token []string) error {
jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgId, "json"))
if err != nil {
return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err)
}
fmt.Fprintf(out, "Pushing %s\r\n", imgId)
checksum, err := srv.getChecksum(imgId)
if err != nil {
return err
}
imgData := &registry.ImgData{
Id: imgId,
Checksum: checksum,
}
// Retrieve the tarball to be sent
var layerData *TempArchive
// If the archive exists, use it
file, err := os.Open(layerArchivePath(srv.runtime.graph.imageRoot(imgId)))
if err != nil {
if os.IsNotExist(err) {
// If the archive does not exist, create one from the layer
layerData, err = srv.runtime.graph.TempLayerArchive(imgId, Xz, out)
if err != nil {
return fmt.Errorf("Failed to generate layer archive: %s", err)
}
} else {
return err
}
} else {
defer file.Close()
st, err := file.Stat()
if err != nil {
return err
}
layerData = &TempArchive{
File: file,
Size: st.Size(),
}
}
return srv.registry.PushImage(imgData, jsonRaw, ProgressReader(layerData, int(layerData.Size), out, ""), registry, token)
}
func (srv *Server) ImagePush(name, registry string, out io.Writer) error {
img, err := srv.runtime.graph.Get(name)
if err != nil {
utils.Debugf("The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name]))
// If it fails, try to get the repository
if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists {
if err := srv.pushRepository(out, name, localRepo); err != nil {
return err
}
return nil
}
return err
}
if err := srv.pushImage(out, name, imgData.Tag, registry, nil); err != nil {
return err
}
return nil
}