rebase master

This commit is contained in:
Victor Vieux 2013-07-30 16:51:50 +00:00
commit 6166380d76
9 changed files with 326 additions and 276 deletions

View File

@ -18,7 +18,7 @@ const CONFIGFILE = ".dockercfg"
// Only used for user auth + account creation // Only used for user auth + account creation
const INDEXSERVER = "https://index.docker.io/v1/" const INDEXSERVER = "https://index.docker.io/v1/"
//const INDEXSERVER = "http://indexstaging-docker.dotcloud.com/" //const INDEXSERVER = "https://indexstaging-docker.dotcloud.com/v1/"
var ( var (
ErrConfigFileMissing = errors.New("The Auth config file is missing") ErrConfigFileMissing = errors.New("The Auth config file is missing")

View File

@ -124,6 +124,10 @@ func (builder *Builder) Create(config *Config) (*Container, error) {
func (builder *Builder) Commit(container *Container, repository, tag, comment, author string, config *Config) (*Image, error) { func (builder *Builder) Commit(container *Container, repository, tag, comment, author string, config *Config) (*Image, error) {
// FIXME: freeze the container before copying it to avoid data corruption? // FIXME: freeze the container before copying it to avoid data corruption?
// FIXME: this shouldn't be in commands. // FIXME: this shouldn't be in commands.
if err := container.EnsureMounted(); err != nil {
return nil, err
}
rwTar, err := container.ExportRw() rwTar, err := container.ExportRw()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -1,9 +1,7 @@
package docker package docker
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/dotcloud/docker/registry"
"github.com/dotcloud/docker/utils" "github.com/dotcloud/docker/utils"
"io" "io"
"io/ioutil" "io/ioutil"
@ -11,17 +9,13 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
) )
// A Graph is a store for versioned filesystem images and the relationship between them. // A Graph is a store for versioned filesystem images and the relationship between them.
type Graph struct { type Graph struct {
Root string Root string
idIndex *utils.TruncIndex idIndex *utils.TruncIndex
checksumLock map[string]*sync.Mutex
lockSumFile *sync.Mutex
lockSumMap *sync.Mutex
} }
// NewGraph instantiates a new graph at the given root path in the filesystem. // NewGraph instantiates a new graph at the given root path in the filesystem.
@ -36,11 +30,8 @@ func NewGraph(root string) (*Graph, error) {
return nil, err return nil, err
} }
graph := &Graph{ graph := &Graph{
Root: abspath, Root: abspath,
idIndex: utils.NewTruncIndex(), idIndex: utils.NewTruncIndex(),
checksumLock: make(map[string]*sync.Mutex),
lockSumFile: &sync.Mutex{},
lockSumMap: &sync.Mutex{},
} }
if err := graph.restore(); err != nil { if err := graph.restore(); err != nil {
return nil, err return nil, err
@ -99,11 +90,6 @@ func (graph *Graph) Get(name string) (*Image, error) {
return nil, err return nil, err
} }
} }
graph.lockSumMap.Lock()
defer graph.lockSumMap.Unlock()
if _, exists := graph.checksumLock[img.ID]; !exists {
graph.checksumLock[img.ID] = &sync.Mutex{}
}
return img, nil return img, nil
} }
@ -123,16 +109,15 @@ func (graph *Graph) Create(layerData Archive, container *Container, comment, aut
img.Container = container.ID img.Container = container.ID
img.ContainerConfig = *container.Config img.ContainerConfig = *container.Config
} }
if err := graph.Register(layerData, layerData != nil, img); err != nil { if err := graph.Register(nil, layerData, img); err != nil {
return nil, err return nil, err
} }
go img.Checksum()
return img, nil return img, nil
} }
// Register imports a pre-existing image into the graph. // Register imports a pre-existing image into the graph.
// FIXME: pass img as first argument // FIXME: pass img as first argument
func (graph *Graph) Register(layerData Archive, store bool, img *Image) error { func (graph *Graph) Register(jsonData []byte, layerData Archive, img *Image) error {
if err := ValidateID(img.ID); err != nil { if err := ValidateID(img.ID); err != nil {
return err return err
} }
@ -145,7 +130,7 @@ func (graph *Graph) Register(layerData Archive, store bool, img *Image) error {
if err != nil { if err != nil {
return fmt.Errorf("Mktemp failed: %s", err) return fmt.Errorf("Mktemp failed: %s", err)
} }
if err := StoreImage(img, layerData, tmp, store); err != nil { if err := StoreImage(img, jsonData, layerData, tmp); err != nil {
return err return err
} }
// Commit // Commit
@ -154,7 +139,6 @@ func (graph *Graph) Register(layerData Archive, store bool, img *Image) error {
} }
img.graph = graph img.graph = graph
graph.idIndex.Add(img.ID) graph.idIndex.Add(img.ID)
graph.checksumLock[img.ID] = &sync.Mutex{}
return nil return nil
} }
@ -311,40 +295,3 @@ func (graph *Graph) Heads() (map[string]*Image, error) {
func (graph *Graph) imageRoot(id string) string { func (graph *Graph) imageRoot(id string) string {
return path.Join(graph.Root, id) return path.Join(graph.Root, id)
} }
func (graph *Graph) getStoredChecksums() (map[string]string, error) {
checksums := make(map[string]string)
// FIXME: Store the checksum in memory
if checksumDict, err := ioutil.ReadFile(path.Join(graph.Root, "checksums")); err == nil {
if err := json.Unmarshal(checksumDict, &checksums); err != nil {
return nil, err
}
}
return checksums, nil
}
func (graph *Graph) storeChecksums(checksums map[string]string) error {
checksumJSON, err := json.Marshal(checksums)
if err != nil {
return err
}
if err := ioutil.WriteFile(path.Join(graph.Root, "checksums"), checksumJSON, 0600); err != nil {
return err
}
return nil
}
func (graph *Graph) UpdateChecksums(newChecksums map[string]*registry.ImgData) error {
graph.lockSumFile.Lock()
defer graph.lockSumFile.Unlock()
localChecksums, err := graph.getStoredChecksums()
if err != nil {
return err
}
for id, elem := range newChecksums {
localChecksums[id] = elem.Checksum
}
return graph.storeChecksums(localChecksums)
}

View File

@ -38,7 +38,7 @@ func TestInterruptedRegister(t *testing.T) {
Comment: "testing", Comment: "testing",
Created: time.Now(), Created: time.Now(),
} }
go graph.Register(badArchive, false, image) go graph.Register(nil, badArchive, image)
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
w.CloseWithError(errors.New("But I'm not a tarball!")) // (Nobody's perfect, darling) w.CloseWithError(errors.New("But I'm not a tarball!")) // (Nobody's perfect, darling)
if _, err := graph.Get(image.ID); err == nil { if _, err := graph.Get(image.ID); err == nil {
@ -49,7 +49,7 @@ func TestInterruptedRegister(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := graph.Register(goodArchive, false, image); err != nil { if err := graph.Register(nil, goodArchive, image); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -95,7 +95,7 @@ func TestRegister(t *testing.T) {
Comment: "testing", Comment: "testing",
Created: time.Now(), Created: time.Now(),
} }
err = graph.Register(archive, false, image) err = graph.Register(nil, archive, image)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -225,7 +225,7 @@ func TestDelete(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Test delete twice (pull -> rm -> pull -> rm) // Test delete twice (pull -> rm -> pull -> rm)
if err := graph.Register(archive, false, img1); err != nil { if err := graph.Register(nil, archive, img1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := graph.Delete(img1.ID); err != nil { if err := graph.Delete(img1.ID); err != nil {

142
image.go
View File

@ -2,7 +2,6 @@ package docker
import ( import (
"crypto/rand" "crypto/rand"
"crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -14,6 +13,7 @@ import (
"os/exec" "os/exec"
"path" "path"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"time" "time"
) )
@ -47,6 +47,19 @@ func LoadImage(root string) (*Image, error) {
if err := ValidateID(img.ID); err != nil { if err := ValidateID(img.ID); err != nil {
return nil, err return nil, err
} }
if buf, err := ioutil.ReadFile(path.Join(root, "layersize")); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
if size, err := strconv.Atoi(string(buf)); err != nil {
return nil, err
} else {
img.Size = int64(size)
}
}
// Check that the filesystem layer exists // Check that the filesystem layer exists
if stat, err := os.Stat(layerPath(root)); err != nil { if stat, err := os.Stat(layerPath(root)); err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -59,7 +72,7 @@ func LoadImage(root string) (*Image, error) {
return img, nil return img, nil
} }
func StoreImage(img *Image, layerData Archive, root string, store bool) error { func StoreImage(img *Image, jsonData []byte, layerData Archive, root string) error {
// Check that root doesn't already exist // Check that root doesn't already exist
if _, err := os.Stat(root); err == nil { if _, err := os.Stat(root); err == nil {
return fmt.Errorf("Image %s already exists", img.ID) return fmt.Errorf("Image %s already exists", img.ID)
@ -72,26 +85,6 @@ func StoreImage(img *Image, layerData Archive, root string, store bool) error {
return err return err
} }
if store {
layerArchive := layerArchivePath(root)
file, err := os.OpenFile(layerArchive, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
// FIXME: Retrieve the image layer size from here?
if _, err := io.Copy(file, layerData); err != nil {
return err
}
// FIXME: Don't close/open, read/write instead of Copy
file.Close()
file, err = os.Open(layerArchive)
if err != nil {
return err
}
defer file.Close()
layerData = file
}
// If layerData is not nil, unpack it into the new layer // If layerData is not nil, unpack it into the new layer
if layerData != nil { if layerData != nil {
start := time.Now() start := time.Now()
@ -102,25 +95,36 @@ func StoreImage(img *Image, layerData Archive, root string, store bool) error {
utils.Debugf("Untar time: %vs\n", time.Now().Sub(start).Seconds()) utils.Debugf("Untar time: %vs\n", time.Now().Sub(start).Seconds())
} }
// If raw json is provided, then use it
if jsonData != nil {
return ioutil.WriteFile(jsonPath(root), jsonData, 0600)
} else { // Otherwise, unmarshal the image
jsonData, err := json.Marshal(img)
if err != nil {
return err
}
if err := ioutil.WriteFile(jsonPath(root), jsonData, 0600); err != nil {
return err
}
}
return StoreSize(img, root) return StoreSize(img, root)
} }
func StoreSize(img *Image, root string) error { func StoreSize(img *Image, root string) error {
layer := layerPath(root) layer := layerPath(root)
var totalSize int64 = 0
filepath.Walk(layer, func(path string, fileInfo os.FileInfo, err error) error { filepath.Walk(layer, func(path string, fileInfo os.FileInfo, err error) error {
img.Size += fileInfo.Size() totalSize += fileInfo.Size()
return nil return nil
}) })
img.Size = totalSize
// Store the json ball if err := ioutil.WriteFile(path.Join(root, "layersize"), []byte(strconv.Itoa(int(totalSize))), 0600); err != nil {
jsonData, err := json.Marshal(img) return nil
if err != nil {
return err
}
if err := ioutil.WriteFile(jsonPath(root), jsonData, 0600); err != nil {
return err
} }
return nil return nil
} }
@ -128,10 +132,6 @@ func layerPath(root string) string {
return path.Join(root, "layer") return path.Join(root, "layer")
} }
func layerArchivePath(root string) string {
return path.Join(root, "layer.tar.xz")
}
func jsonPath(root string) string { func jsonPath(root string) string {
return path.Join(root, "json") return path.Join(root, "json")
} }
@ -308,80 +308,6 @@ func (img *Image) layer() (string, error) {
return layerPath(root), nil return layerPath(root), nil
} }
func (img *Image) Checksum() (string, error) {
img.graph.checksumLock[img.ID].Lock()
defer img.graph.checksumLock[img.ID].Unlock()
root, err := img.root()
if err != nil {
return "", err
}
checksums, err := img.graph.getStoredChecksums()
if err != nil {
return "", err
}
if checksum, ok := checksums[img.ID]; ok {
return checksum, nil
}
layer, err := img.layer()
if err != nil {
return "", err
}
jsonData, err := ioutil.ReadFile(jsonPath(root))
if err != nil {
return "", err
}
var layerData io.Reader
if file, err := os.Open(layerArchivePath(root)); err != nil {
if os.IsNotExist(err) {
layerData, err = Tar(layer, Xz)
if err != nil {
return "", err
}
} else {
return "", err
}
} else {
defer file.Close()
layerData = file
}
h := sha256.New()
if _, err := h.Write(jsonData); err != nil {
return "", err
}
if _, err := h.Write([]byte("\n")); err != nil {
return "", err
}
if _, err := io.Copy(h, layerData); err != nil {
return "", err
}
hash := "sha256:" + hex.EncodeToString(h.Sum(nil))
// Reload the json file to make sure not to overwrite faster sums
img.graph.lockSumFile.Lock()
defer img.graph.lockSumFile.Unlock()
checksums, err = img.graph.getStoredChecksums()
if err != nil {
return "", err
}
checksums[img.ID] = hash
// Dump the checksums to disc
if err := img.graph.storeChecksums(checksums); err != nil {
return hash, err
}
return hash, nil
}
func (img *Image) getParentsSize(size int64) int64 { func (img *Image) getParentsSize(size int64) int64 {
parentImage, err := img.GetParent() parentImage, err := img.GetParent()
if err != nil || parentImage == nil { if err != nil || parentImage == nil {

View File

@ -17,8 +17,10 @@ import (
"strings" "strings"
) )
var ErrAlreadyExists = errors.New("Image already exists") var (
var ErrInvalidRepositoryName = errors.New("Invalid repository name (ex: \"registry.domain.tld/myrepos\")") ErrAlreadyExists = errors.New("Image already exists")
ErrInvalidRepositoryName = errors.New("Invalid repository name (ex: \"registry.domain.tld/myrepos\")")
)
func pingRegistryEndpoint(endpoint string) error { func pingRegistryEndpoint(endpoint string) error {
if endpoint == auth.IndexServerAddress() { if endpoint == auth.IndexServerAddress() {
@ -266,8 +268,11 @@ func (r *Registry) GetRemoteTags(registries []string, repository string, token [
} }
func (r *Registry) GetRepositoryData(indexEp, remote string) (*RepositoryData, error) { func (r *Registry) GetRepositoryData(indexEp, remote string) (*RepositoryData, error) {
repositoryTarget := fmt.Sprintf("%srepositories/%s/images", indexEp, remote) repositoryTarget := fmt.Sprintf("%srepositories/%s/images", indexEp, remote)
utils.Debugf("[registry] Calling GET %s", repositoryTarget)
req, err := r.opaqueRequest("GET", repositoryTarget, nil) req, err := r.opaqueRequest("GET", repositoryTarget, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -330,19 +335,54 @@ func (r *Registry) GetRepositoryData(indexEp, remote string) (*RepositoryData, e
}, nil }, nil
} }
func (r *Registry) PushImageChecksumRegistry(imgData *ImgData, registry string, token []string) error {
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/checksum")
req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/checksum", nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
res, err := doWithCookies(r.client, req)
if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err)
}
defer res.Body.Close()
if len(res.Cookies()) > 0 {
r.client.Jar.SetCookies(req.URL, res.Cookies())
}
if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err)
}
var jsonBody map[string]string
if err := json.Unmarshal(errBody, &jsonBody); err != nil {
errBody = []byte(err.Error())
} else if jsonBody["error"] == "Image already exists" {
return ErrAlreadyExists
}
return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody)
}
return nil
}
// Push a local image to the registry // Push a local image to the registry
func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error { func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error {
// FIXME: try json with UTF8
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/json")
req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/json", bytes.NewReader(jsonRaw)) req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/json", bytes.NewReader(jsonRaw))
if err != nil { if err != nil {
return err return err
} }
req.Header.Add("Content-type", "application/json") req.Header.Add("Content-type", "application/json")
req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
r.setUserAgent(req) r.setUserAgent(req)
utils.Debugf("Setting checksum for %s: %s", imgData.ID, imgData.Checksum)
res, err := doWithCookies(r.client, req) res, err := doWithCookies(r.client, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err) return fmt.Errorf("Failed to upload metadata: %s", err)
@ -364,10 +404,15 @@ func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, regis
return nil return nil
} }
func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, token []string) error { func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, token []string, jsonRaw []byte) (checksum string, err error) {
req, err := http.NewRequest("PUT", registry+"images/"+imgID+"/layer", layer)
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgID+"/layer")
tarsumLayer := &utils.TarSum{Reader: layer}
req, err := http.NewRequest("PUT", registry+"images/"+imgID+"/layer", tarsumLayer)
if err != nil { if err != nil {
return err return "", err
} }
req.ContentLength = -1 req.ContentLength = -1
req.TransferEncoding = []string{"chunked"} req.TransferEncoding = []string{"chunked"}
@ -375,18 +420,18 @@ func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registr
r.setUserAgent(req) r.setUserAgent(req)
res, err := doWithCookies(r.client, req) res, err := doWithCookies(r.client, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to upload layer: %s", err) return "", fmt.Errorf("Failed to upload layer: %s", err)
} }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != 200 { if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body) errBody, err := ioutil.ReadAll(res.Body)
if err != nil { if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err) return "", fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err)
} }
return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res.StatusCode, errBody) return "", fmt.Errorf("Received HTTP code %d while uploading layer: %s", res.StatusCode, errBody)
} }
return nil return tarsumLayer.Sum(jsonRaw), nil
} }
func (r *Registry) opaqueRequest(method, urlStr string, body io.Reader) (*http.Request, error) { func (r *Registry) opaqueRequest(method, urlStr string, body io.Reader) (*http.Request, error) {
@ -424,7 +469,19 @@ func (r *Registry) PushRegistryTag(remote, revision, tag, registry string, token
} }
func (r *Registry) PushImageJSONIndex(indexEp, remote string, imgList []*ImgData, validate bool, regs []string) (*RepositoryData, error) { func (r *Registry) PushImageJSONIndex(indexEp, remote string, imgList []*ImgData, validate bool, regs []string) (*RepositoryData, error) {
imgListJSON, err := json.Marshal(imgList) cleanImgList := []*ImgData{}
if validate {
for _, elem := range imgList {
if elem.Checksum != "" {
cleanImgList = append(cleanImgList, elem)
}
}
} else {
cleanImgList = imgList
}
imgListJSON, err := json.Marshal(cleanImgList)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -434,7 +491,7 @@ func (r *Registry) PushImageJSONIndex(indexEp, remote string, imgList []*ImgData
} }
u := fmt.Sprintf("%srepositories/%s/%s", indexEp, remote, suffix) u := fmt.Sprintf("%srepositories/%s/%s", indexEp, remote, suffix)
utils.Debugf("PUT %s", u) utils.Debugf("[registry] PUT %s", u)
utils.Debugf("Image list pushed to index:\n%s\n", imgListJSON) utils.Debugf("Image list pushed to index:\n%s\n", imgListJSON)
req, err := r.opaqueRequest("PUT", u, bytes.NewReader(imgListJSON)) req, err := r.opaqueRequest("PUT", u, bytes.NewReader(imgListJSON))
if err != nil { if err != nil {

115
server.go
View File

@ -438,7 +438,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
return err return err
} }
defer layer.Close() defer layer.Close()
if err := srv.runtime.graph.Register(utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf), false, img); err != nil { if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf), img); err != nil {
return err return err
} }
} }
@ -454,12 +454,6 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
return err return err
} }
utils.Debugf("Updating checksums")
// Reload the json file to make sure not to overwrite faster sums
if err := srv.runtime.graph.UpdateChecksums(repoData.ImgList); err != nil {
return err
}
utils.Debugf("Retrieving the tag list") utils.Debugf("Retrieving the tag list")
tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens) tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
if err != nil { if err != nil {
@ -618,41 +612,6 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
return nil return nil
} }
// Retrieve the checksum of an image
// Priority:
// - Check on the stored checksums
// - Check if the archive exists, if it does not, ask the registry
// - If the archive does exists, process the checksum from it
// - If the archive does not exists and not found on registry, process checksum from layer
func (srv *Server) getChecksum(imageID string) (string, error) {
// FIXME: Use in-memory map instead of reading the file each time
if sums, err := srv.runtime.graph.getStoredChecksums(); err != nil {
return "", err
} else if checksum, exists := sums[imageID]; exists {
return checksum, nil
}
img, err := srv.runtime.graph.Get(imageID)
if err != nil {
return "", err
}
if _, err := os.Stat(layerArchivePath(srv.runtime.graph.imageRoot(imageID))); err != nil {
if os.IsNotExist(err) {
// TODO: Ask the registry for the checksum
// As the archive is not there, it is supposed to come from a pull.
} else {
return "", err
}
}
checksum, err := img.Checksum()
if err != nil {
return "", err
}
return checksum, nil
}
// Retrieve the all the images to be uploaded in the correct order // Retrieve the all the images to be uploaded in the correct order
// Note: we can't use a map as it is not ordered // Note: we can't use a map as it is not ordered
func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgData, error) { func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgData, error) {
@ -669,14 +628,10 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat
return nil return nil
} }
imageSet[img.ID] = struct{}{} imageSet[img.ID] = struct{}{}
checksum, err := srv.getChecksum(img.ID)
if err != nil {
return err
}
imgList = append([]*registry.ImgData{{ imgList = append([]*registry.ImgData{{
ID: img.ID, ID: img.ID,
Checksum: checksum, Tag: tag,
Tag: tag,
}}, imgList...) }}, imgList...)
return nil return nil
}) })
@ -686,7 +641,6 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat
func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, indexEp string, sf *utils.StreamFormatter) error { func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, indexEp string, sf *utils.StreamFormatter) error {
out = utils.NewWriteFlusher(out) out = utils.NewWriteFlusher(out)
out.Write(sf.FormatStatus("", "Processing checksums"))
imgList, err := srv.getImageList(localRepo) imgList, err := srv.getImageList(localRepo)
if err != nil { if err != nil {
return err return err
@ -710,9 +664,11 @@ func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", elem.ID)) out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", elem.ID))
continue continue
} }
if err := srv.pushImage(r, out, remoteName, elem.ID, ep, repoData.Tokens, sf); err != nil { if checksum, err := srv.pushImage(r, out, remoteName, elem.ID, ep, repoData.Tokens, sf); err != nil {
// FIXME: Continue on error? // FIXME: Continue on error?
return err return err
} else {
elem.Checksum = checksum
} }
out.Write(sf.FormatStatus("", "Pushing tags for rev [%s] on {%s}", elem.ID, ep+"repositories/"+remoteName+"/tags/"+elem.Tag)) out.Write(sf.FormatStatus("", "Pushing tags for rev [%s] on {%s}", elem.ID, ep+"repositories/"+remoteName+"/tags/"+elem.Tag))
if err := r.PushRegistryTag(remoteName, elem.ID, elem.Tag, ep, repoData.Tokens); err != nil { if err := r.PushRegistryTag(remoteName, elem.ID, elem.Tag, ep, repoData.Tokens); err != nil {
@ -728,64 +684,45 @@ func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName
return nil return nil
} }
func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) error { func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
out = utils.NewWriteFlusher(out) out = utils.NewWriteFlusher(out)
jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgID, "json")) jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgID, "json"))
if err != nil { if err != nil {
return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgID, err) return "", fmt.Errorf("Error while retreiving the path for {%s}: %s", imgID, err)
} }
out.Write(sf.FormatStatus("", "Pushing %s", imgID)) out.Write(sf.FormatStatus("", "Pushing %s", imgID))
// Make sure we have the image's checksum
checksum, err := srv.getChecksum(imgID)
if err != nil {
return err
}
imgData := &registry.ImgData{ imgData := &registry.ImgData{
ID: imgID, ID: imgID,
Checksum: checksum,
} }
// Send the json // Send the json
if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil { if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
if err == registry.ErrAlreadyExists { if err == registry.ErrAlreadyExists {
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", imgData.ID)) out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", imgData.ID))
return nil return "", nil
} }
return err return "", err
} }
// Retrieve the tarball to be sent layerData, err := srv.runtime.graph.TempLayerArchive(imgID, Uncompressed, sf, out)
var layerData *TempArchive
// If the archive exists, use it
file, err := os.Open(layerArchivePath(srv.runtime.graph.imageRoot(imgID)))
if err != nil { if err != nil {
if os.IsNotExist(err) { return "", fmt.Errorf("Failed to generate layer archive: %s", err)
// If the archive does not exist, create one from the layer
layerData, err = srv.runtime.graph.TempLayerArchive(imgID, Xz, sf, out)
if err != nil {
return fmt.Errorf("Failed to generate layer archive: %s", err)
}
} else {
return err
}
} else {
defer file.Close()
st, err := file.Stat()
if err != nil {
return err
}
layerData = &TempArchive{
File: file,
Size: st.Size(),
}
} }
// Send the layer // Send the layer
if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf), ep, token); err != nil { if checksum, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf), ep, token, jsonRaw); err != nil {
return err return "", err
} else {
imgData.Checksum = checksum
} }
return nil
// Send the checksum
if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
return "", err
}
return imgData.Checksum, nil
} }
// FIXME: Allow to interupt current push when new push of same image is done. // FIXME: Allow to interupt current push when new push of same image is done.
@ -823,7 +760,7 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo
var token []string var token []string
out.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName)) out.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
if err := srv.pushImage(r, out, remoteName, img.ID, endpoint, token, sf); err != nil { if _, err := srv.pushImage(r, out, remoteName, img.ID, endpoint, token, sf); err != nil {
return err return err
} }
return nil return nil

View File

@ -91,6 +91,27 @@ func TestCreateRm(t *testing.T) {
} }
func TestCommit(t *testing.T) {
runtime := mkRuntime(t)
defer nuke(runtime)
srv := &Server{runtime: runtime}
config, _, _, err := ParseRun([]string{GetTestImage(runtime).ID, "/bin/cat"}, nil)
if err != nil {
t.Fatal(err)
}
id, err := srv.ContainerCreate(config)
if err != nil {
t.Fatal(err)
}
if _, err := srv.ContainerCommit(id, "testrepo", "testtag", "", "", config); err != nil {
t.Fatal(err)
}
}
func TestCreateStartRestartStopStartKillRm(t *testing.T) { func TestCreateStartRestartStopStartKillRm(t *testing.T) {
runtime := mkRuntime(t) runtime := mkRuntime(t)
defer nuke(runtime) defer nuke(runtime)

158
utils/tarsum.go Normal file
View File

@ -0,0 +1,158 @@
package utils
import (
"archive/tar"
"bytes"
"compress/gzip"
"crypto/sha256"
"encoding/hex"
"hash"
"io"
"sort"
"strconv"
)
type verboseHash struct {
hash.Hash
}
func (h verboseHash) Write(buf []byte) (int, error) {
Debugf("--->%s<---", buf)
return h.Hash.Write(buf)
}
type TarSum struct {
io.Reader
tarR *tar.Reader
tarW *tar.Writer
gz *gzip.Writer
bufTar *bytes.Buffer
bufGz *bytes.Buffer
h hash.Hash
h2 verboseHash
sums []string
finished bool
first bool
}
func (ts *TarSum) encodeHeader(h *tar.Header) error {
for _, elem := range [][2]string{
{"name", h.Name},
{"mode", strconv.Itoa(int(h.Mode))},
{"uid", strconv.Itoa(h.Uid)},
{"gid", strconv.Itoa(h.Gid)},
{"size", strconv.Itoa(int(h.Size))},
{"mtime", strconv.Itoa(int(h.ModTime.UTC().Unix()))},
{"typeflag", string([]byte{h.Typeflag})},
{"linkname", h.Linkname},
{"uname", h.Uname},
{"gname", h.Gname},
{"devmajor", strconv.Itoa(int(h.Devmajor))},
{"devminor", strconv.Itoa(int(h.Devminor))},
// {"atime", strconv.Itoa(int(h.AccessTime.UTC().Unix()))},
// {"ctime", strconv.Itoa(int(h.ChangeTime.UTC().Unix()))},
} {
// Debugf("-->%s<-- -->%s<--", elem[0], elem[1])
if _, err := ts.h.Write([]byte(elem[0] + elem[1])); err != nil {
return err
}
}
return nil
}
func (ts *TarSum) Read(buf []byte) (int, error) {
if ts.gz == nil {
ts.bufTar = bytes.NewBuffer([]byte{})
ts.bufGz = bytes.NewBuffer([]byte{})
ts.tarR = tar.NewReader(ts.Reader)
ts.tarW = tar.NewWriter(ts.bufTar)
ts.gz = gzip.NewWriter(ts.bufGz)
ts.h = sha256.New()
// ts.h = verboseHash{sha256.New()}
ts.h.Reset()
ts.first = true
}
if ts.finished {
return ts.bufGz.Read(buf)
}
buf2 := make([]byte, len(buf), cap(buf))
n, err := ts.tarR.Read(buf2)
if err != nil {
if err == io.EOF {
if _, err := ts.h.Write(buf2[:n]); err != nil {
return 0, err
}
if !ts.first {
ts.sums = append(ts.sums, hex.EncodeToString(ts.h.Sum(nil)))
ts.h.Reset()
} else {
ts.first = false
}
currentHeader, err := ts.tarR.Next()
if err != nil {
if err == io.EOF {
if err := ts.gz.Close(); err != nil {
return 0, err
}
ts.finished = true
return n, nil
}
return n, err
}
if err := ts.encodeHeader(currentHeader); err != nil {
return 0, err
}
if err := ts.tarW.WriteHeader(currentHeader); err != nil {
return 0, err
}
if _, err := ts.tarW.Write(buf2[:n]); err != nil {
return 0, err
}
ts.tarW.Flush()
if _, err := io.Copy(ts.gz, ts.bufTar); err != nil {
return 0, err
}
ts.gz.Flush()
return ts.bufGz.Read(buf)
}
return n, err
}
// Filling the hash buffer
if _, err = ts.h.Write(buf2[:n]); err != nil {
return 0, err
}
// Filling the tar writter
if _, err = ts.tarW.Write(buf2[:n]); err != nil {
return 0, err
}
ts.tarW.Flush()
// Filling the gz writter
if _, err = io.Copy(ts.gz, ts.bufTar); err != nil {
return 0, err
}
ts.gz.Flush()
return ts.bufGz.Read(buf)
}
func (ts *TarSum) Sum(extra []byte) string {
sort.Strings(ts.sums)
h := sha256.New()
if extra != nil {
h.Write(extra)
}
for _, sum := range ts.sums {
Debugf("-->%s<--", sum)
h.Write([]byte(sum))
}
checksum := "tarsum+sha256:" + hex.EncodeToString(h.Sum(nil))
Debugf("checksum processed: %s", checksum)
return checksum
}