1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Refactor checksum

This commit is contained in:
Guillaume J. Charmes 2013-07-17 12:13:22 -07:00
parent 950d0312dc
commit 8ca7b0646e
6 changed files with 226 additions and 247 deletions

View file

@ -16,9 +16,9 @@ import (
const CONFIGFILE = ".dockercfg" const CONFIGFILE = ".dockercfg"
// Only used for user auth + account creation // Only used for user auth + account creation
const INDEXSERVER = "https://index.docker.io/v1/" //const INDEXSERVER = "https://index.docker.io/v1/"
//const INDEXSERVER = "http://indexstaging-docker.dotcloud.com/" const INDEXSERVER = "https://indexstaging-docker.dotcloud.com/v1/"
var ( var (
ErrConfigFileMissing = errors.New("The Auth config file is missing") ErrConfigFileMissing = errors.New("The Auth config file is missing")

View file

@ -1,9 +1,7 @@
package docker package docker
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/dotcloud/docker/registry"
"github.com/dotcloud/docker/utils" "github.com/dotcloud/docker/utils"
"io" "io"
"io/ioutil" "io/ioutil"
@ -11,17 +9,13 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
) )
// A Graph is a store for versioned filesystem images and the relationship between them. // A Graph is a store for versioned filesystem images and the relationship between them.
type Graph struct { type Graph struct {
Root string Root string
idIndex *utils.TruncIndex idIndex *utils.TruncIndex
checksumLock map[string]*sync.Mutex
lockSumFile *sync.Mutex
lockSumMap *sync.Mutex
} }
// NewGraph instantiates a new graph at the given root path in the filesystem. // NewGraph instantiates a new graph at the given root path in the filesystem.
@ -36,11 +30,8 @@ func NewGraph(root string) (*Graph, error) {
return nil, err return nil, err
} }
graph := &Graph{ graph := &Graph{
Root: abspath, Root: abspath,
idIndex: utils.NewTruncIndex(), idIndex: utils.NewTruncIndex(),
checksumLock: make(map[string]*sync.Mutex),
lockSumFile: &sync.Mutex{},
lockSumMap: &sync.Mutex{},
} }
if err := graph.restore(); err != nil { if err := graph.restore(); err != nil {
return nil, err return nil, err
@ -99,11 +90,6 @@ func (graph *Graph) Get(name string) (*Image, error) {
return nil, err return nil, err
} }
} }
graph.lockSumMap.Lock()
defer graph.lockSumMap.Unlock()
if _, exists := graph.checksumLock[img.ID]; !exists {
graph.checksumLock[img.ID] = &sync.Mutex{}
}
return img, nil return img, nil
} }
@ -126,7 +112,6 @@ func (graph *Graph) Create(layerData Archive, container *Container, comment, aut
if err := graph.Register(layerData, layerData != nil, img); err != nil { if err := graph.Register(layerData, layerData != nil, img); err != nil {
return nil, err return nil, err
} }
go img.Checksum()
return img, nil return img, nil
} }
@ -154,7 +139,6 @@ func (graph *Graph) Register(layerData Archive, store bool, img *Image) error {
} }
img.graph = graph img.graph = graph
graph.idIndex.Add(img.ID) graph.idIndex.Add(img.ID)
graph.checksumLock[img.ID] = &sync.Mutex{}
return nil return nil
} }
@ -311,40 +295,3 @@ func (graph *Graph) Heads() (map[string]*Image, error) {
func (graph *Graph) imageRoot(id string) string { func (graph *Graph) imageRoot(id string) string {
return path.Join(graph.Root, id) return path.Join(graph.Root, id)
} }
func (graph *Graph) getStoredChecksums() (map[string]string, error) {
checksums := make(map[string]string)
// FIXME: Store the checksum in memory
if checksumDict, err := ioutil.ReadFile(path.Join(graph.Root, "checksums")); err == nil {
if err := json.Unmarshal(checksumDict, &checksums); err != nil {
return nil, err
}
}
return checksums, nil
}
func (graph *Graph) storeChecksums(checksums map[string]string) error {
checksumJSON, err := json.Marshal(checksums)
if err != nil {
return err
}
if err := ioutil.WriteFile(path.Join(graph.Root, "checksums"), checksumJSON, 0600); err != nil {
return err
}
return nil
}
func (graph *Graph) UpdateChecksums(newChecksums map[string]*registry.ImgData) error {
graph.lockSumFile.Lock()
defer graph.lockSumFile.Unlock()
localChecksums, err := graph.getStoredChecksums()
if err != nil {
return err
}
for id, elem := range newChecksums {
localChecksums[id] = elem.Checksum
}
return graph.storeChecksums(localChecksums)
}

View file

@ -2,7 +2,6 @@ package docker
import ( import (
"crypto/rand" "crypto/rand"
"crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -72,26 +71,6 @@ func StoreImage(img *Image, layerData Archive, root string, store bool) error {
return err return err
} }
if store {
layerArchive := layerArchivePath(root)
file, err := os.OpenFile(layerArchive, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
// FIXME: Retrieve the image layer size from here?
if _, err := io.Copy(file, layerData); err != nil {
return err
}
// FIXME: Don't close/open, read/write instead of Copy
file.Close()
file, err = os.Open(layerArchive)
if err != nil {
return err
}
defer file.Close()
layerData = file
}
// If layerData is not nil, unpack it into the new layer // If layerData is not nil, unpack it into the new layer
if layerData != nil { if layerData != nil {
start := time.Now() start := time.Now()
@ -128,10 +107,6 @@ func layerPath(root string) string {
return path.Join(root, "layer") return path.Join(root, "layer")
} }
func layerArchivePath(root string) string {
return path.Join(root, "layer.tar.xz")
}
func jsonPath(root string) string { func jsonPath(root string) string {
return path.Join(root, "json") return path.Join(root, "json")
} }
@ -308,80 +283,6 @@ func (img *Image) layer() (string, error) {
return layerPath(root), nil return layerPath(root), nil
} }
func (img *Image) Checksum() (string, error) {
img.graph.checksumLock[img.ID].Lock()
defer img.graph.checksumLock[img.ID].Unlock()
root, err := img.root()
if err != nil {
return "", err
}
checksums, err := img.graph.getStoredChecksums()
if err != nil {
return "", err
}
if checksum, ok := checksums[img.ID]; ok {
return checksum, nil
}
layer, err := img.layer()
if err != nil {
return "", err
}
jsonData, err := ioutil.ReadFile(jsonPath(root))
if err != nil {
return "", err
}
var layerData io.Reader
if file, err := os.Open(layerArchivePath(root)); err != nil {
if os.IsNotExist(err) {
layerData, err = Tar(layer, Xz)
if err != nil {
return "", err
}
} else {
return "", err
}
} else {
defer file.Close()
layerData = file
}
h := sha256.New()
if _, err := h.Write(jsonData); err != nil {
return "", err
}
if _, err := h.Write([]byte("\n")); err != nil {
return "", err
}
if _, err := io.Copy(h, layerData); err != nil {
return "", err
}
hash := "sha256:" + hex.EncodeToString(h.Sum(nil))
// Reload the json file to make sure not to overwrite faster sums
img.graph.lockSumFile.Lock()
defer img.graph.lockSumFile.Unlock()
checksums, err = img.graph.getStoredChecksums()
if err != nil {
return "", err
}
checksums[img.ID] = hash
// Dump the checksums to disc
if err := img.graph.storeChecksums(checksums); err != nil {
return hash, err
}
return hash, nil
}
func (img *Image) getParentsSize(size int64) int64 { func (img *Image) getParentsSize(size int64) int64 {
parentImage, err := img.GetParent() parentImage, err := img.GetParent()
if err != nil || parentImage == nil { if err != nil || parentImage == nil {

View file

@ -330,16 +330,52 @@ func (r *Registry) GetRepositoryData(indexEp, remote string) (*RepositoryData, e
}, nil }, nil
} }
func (r *Registry) PushImageChecksumRegistry(imgData *ImgData, registry string, token []string) error {
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/checksum")
req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/checksum", nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
res, err := doWithCookies(r.client, req)
if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err)
}
defer res.Body.Close()
if len(res.Cookies()) > 0 {
r.client.Jar.SetCookies(req.URL, res.Cookies())
}
if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err)
}
var jsonBody map[string]string
if err := json.Unmarshal(errBody, &jsonBody); err != nil {
errBody = []byte(err.Error())
} else if jsonBody["error"] == "Image already exists" {
return ErrAlreadyExists
}
return fmt.Errorf("HTTP code %d while uploading metadata: %s", res.StatusCode, errBody)
}
return nil
}
// Push a local image to the registry // Push a local image to the registry
func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error { func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error {
// FIXME: try json with UTF8
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/json")
req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/json", bytes.NewReader(jsonRaw)) req, err := http.NewRequest("PUT", registry+"images/"+imgData.ID+"/json", bytes.NewReader(jsonRaw))
if err != nil { if err != nil {
return err return err
} }
req.Header.Add("Content-type", "application/json") req.Header.Add("Content-type", "application/json")
req.Header.Set("Authorization", "Token "+strings.Join(token, ",")) req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
r.setUserAgent(req) r.setUserAgent(req)
utils.Debugf("Setting checksum for %s: %s", imgData.ID, imgData.Checksum) utils.Debugf("Setting checksum for %s: %s", imgData.ID, imgData.Checksum)
@ -364,10 +400,14 @@ func (r *Registry) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, regis
return nil return nil
} }
func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, token []string) error { func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, token []string) (checksum string, err error) {
req, err := http.NewRequest("PUT", registry+"images/"+imgID+"/layer", layer)
utils.Debugf("[registry] Calling PUT %s", registry+"images/"+imgID+"/layer")
tarsumLayer := &utils.TarSum{Reader: layer}
req, err := http.NewRequest("PUT", registry+"images/"+imgID+"/layer", tarsumLayer)
if err != nil { if err != nil {
return err return "", err
} }
req.ContentLength = -1 req.ContentLength = -1
req.TransferEncoding = []string{"chunked"} req.TransferEncoding = []string{"chunked"}
@ -375,18 +415,18 @@ func (r *Registry) PushImageLayerRegistry(imgID string, layer io.Reader, registr
r.setUserAgent(req) r.setUserAgent(req)
res, err := doWithCookies(r.client, req) res, err := doWithCookies(r.client, req)
if err != nil { if err != nil {
return fmt.Errorf("Failed to upload layer: %s", err) return "", fmt.Errorf("Failed to upload layer: %s", err)
} }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != 200 { if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body) errBody, err := ioutil.ReadAll(res.Body)
if err != nil { if err != nil {
return fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err) return "", fmt.Errorf("HTTP code %d while uploading metadata and error when trying to parse response body: %s", res.StatusCode, err)
} }
return fmt.Errorf("Received HTTP code %d while uploading layer: %s", res.StatusCode, errBody) return "", fmt.Errorf("Received HTTP code %d while uploading layer: %s", res.StatusCode, errBody)
} }
return nil return tarsumLayer.Sum(), nil
} }
func (r *Registry) opaqueRequest(method, urlStr string, body io.Reader) (*http.Request, error) { func (r *Registry) opaqueRequest(method, urlStr string, body io.Reader) (*http.Request, error) {

View file

@ -455,12 +455,6 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
return err return err
} }
utils.Debugf("Updating checksums")
// Reload the json file to make sure not to overwrite faster sums
if err := srv.runtime.graph.UpdateChecksums(repoData.ImgList); err != nil {
return err
}
utils.Debugf("Retrieving the tag list") utils.Debugf("Retrieving the tag list")
tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens) tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
if err != nil { if err != nil {
@ -598,41 +592,6 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
return nil return nil
} }
// Retrieve the checksum of an image
// Priority:
// - Check on the stored checksums
// - Check if the archive exists, if it does not, ask the registry
// - If the archive does exists, process the checksum from it
// - If the archive does not exists and not found on registry, process checksum from layer
func (srv *Server) getChecksum(imageID string) (string, error) {
// FIXME: Use in-memory map instead of reading the file each time
if sums, err := srv.runtime.graph.getStoredChecksums(); err != nil {
return "", err
} else if checksum, exists := sums[imageID]; exists {
return checksum, nil
}
img, err := srv.runtime.graph.Get(imageID)
if err != nil {
return "", err
}
if _, err := os.Stat(layerArchivePath(srv.runtime.graph.imageRoot(imageID))); err != nil {
if os.IsNotExist(err) {
// TODO: Ask the registry for the checksum
// As the archive is not there, it is supposed to come from a pull.
} else {
return "", err
}
}
checksum, err := img.Checksum()
if err != nil {
return "", err
}
return checksum, nil
}
// Retrieve the all the images to be uploaded in the correct order // Retrieve the all the images to be uploaded in the correct order
// Note: we can't use a map as it is not ordered // Note: we can't use a map as it is not ordered
func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgData, error) { func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgData, error) {
@ -649,14 +608,10 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat
return nil return nil
} }
imageSet[img.ID] = struct{}{} imageSet[img.ID] = struct{}{}
checksum, err := srv.getChecksum(img.ID)
if err != nil {
return err
}
imgList = append([]*registry.ImgData{{ imgList = append([]*registry.ImgData{{
ID: img.ID, ID: img.ID,
Checksum: checksum, Tag: tag,
Tag: tag,
}}, imgList...) }}, imgList...)
return nil return nil
}) })
@ -666,7 +621,7 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat
func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, indexEp string, sf *utils.StreamFormatter) error { func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, indexEp string, sf *utils.StreamFormatter) error {
out = utils.NewWriteFlusher(out) out = utils.NewWriteFlusher(out)
out.Write(sf.FormatStatus("Processing checksums"))
imgList, err := srv.getImageList(localRepo) imgList, err := srv.getImageList(localRepo)
if err != nil { if err != nil {
return err return err
@ -716,14 +671,8 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
} }
out.Write(sf.FormatStatus("Pushing %s", imgID)) out.Write(sf.FormatStatus("Pushing %s", imgID))
// Make sure we have the image's checksum
checksum, err := srv.getChecksum(imgID)
if err != nil {
return err
}
imgData := &registry.ImgData{ imgData := &registry.ImgData{
ID: imgID, ID: imgID,
Checksum: checksum,
} }
// Send the json // Send the json
@ -735,36 +684,23 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
return err return err
} }
// Retrieve the tarball to be sent layerData, err := srv.runtime.graph.TempLayerArchive(imgID, Uncompressed, sf, out)
var layerData *TempArchive
// If the archive exists, use it
file, err := os.Open(layerArchivePath(srv.runtime.graph.imageRoot(imgID)))
if err != nil { if err != nil {
if os.IsNotExist(err) { return fmt.Errorf("Failed to generate layer archive: %s", err)
// If the archive does not exist, create one from the layer
layerData, err = srv.runtime.graph.TempLayerArchive(imgID, Xz, sf, out)
if err != nil {
return fmt.Errorf("Failed to generate layer archive: %s", err)
}
} else {
return err
}
} else {
defer file.Close()
st, err := file.Stat()
if err != nil {
return err
}
layerData = &TempArchive{
File: file,
Size: st.Size(),
}
} }
// Send the layer // Send the layer
if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)"), sf), ep, token); err != nil { if checksum, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)"), sf), ep, token); err != nil {
return err
} else {
imgData.Checksum = checksum
}
// Send the checksum
if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
return err return err
} }
return nil return nil
} }

155
utils/tarsum.go Normal file
View file

@ -0,0 +1,155 @@
package utils
import (
"archive/tar"
"bytes"
"compress/gzip"
"crypto/sha256"
"encoding/hex"
"hash"
"io"
"sort"
"strconv"
)
type verboseHash struct {
hash.Hash
}
func (h verboseHash) Write(buf []byte) (int, error) {
Debugf("--->%s<---", buf)
return h.Hash.Write(buf)
}
type TarSum struct {
io.Reader
tarR *tar.Reader
tarW *tar.Writer
gz *gzip.Writer
bufTar *bytes.Buffer
bufGz *bytes.Buffer
h hash.Hash
h2 verboseHash
sums []string
finished bool
first bool
}
func (ts *TarSum) encodeHeader(h *tar.Header) error {
for _, elem := range [][2]string{
{"name", h.Name},
{"mode", strconv.Itoa(int(h.Mode))},
{"uid", strconv.Itoa(h.Uid)},
{"gid", strconv.Itoa(h.Gid)},
{"size", strconv.Itoa(int(h.Size))},
{"mtime", strconv.Itoa(int(h.ModTime.UTC().Unix()))},
{"typeflag", string([]byte{h.Typeflag})},
{"linkname", h.Linkname},
{"uname", h.Uname},
{"gname", h.Gname},
{"devmajor", strconv.Itoa(int(h.Devmajor))},
{"devminor", strconv.Itoa(int(h.Devminor))},
// {"atime", strconv.Itoa(int(h.AccessTime.UTC().Unix()))},
// {"ctime", strconv.Itoa(int(h.ChangeTime.UTC().Unix()))},
} {
// Debugf("-->%s<-- -->%s<--", elem[0], elem[1])
if _, err := ts.h.Write([]byte(elem[0] + elem[1])); err != nil {
return err
}
}
return nil
}
func (ts *TarSum) Read(buf []byte) (int, error) {
if ts.gz == nil {
ts.bufTar = bytes.NewBuffer([]byte{})
ts.bufGz = bytes.NewBuffer([]byte{})
ts.tarR = tar.NewReader(ts.Reader)
ts.tarW = tar.NewWriter(ts.bufTar)
ts.gz = gzip.NewWriter(ts.bufGz)
ts.h = sha256.New()
// ts.h = verboseHash{sha256.New()}
ts.h.Reset()
ts.first = true
}
if ts.finished {
return ts.bufGz.Read(buf)
}
buf2 := make([]byte, len(buf), cap(buf))
n, err := ts.tarR.Read(buf2)
if err != nil {
if err == io.EOF {
if _, err := ts.h.Write(buf2[:n]); err != nil {
return 0, err
}
if !ts.first {
ts.sums = append(ts.sums, hex.EncodeToString(ts.h.Sum(nil)))
ts.h.Reset()
} else {
ts.first = false
}
currentHeader, err := ts.tarR.Next()
if err != nil {
if err == io.EOF {
if err := ts.gz.Close(); err != nil {
return 0, err
}
ts.finished = true
return n, nil
}
return n, err
}
if err := ts.encodeHeader(currentHeader); err != nil {
return 0, err
}
if err := ts.tarW.WriteHeader(currentHeader); err != nil {
return 0, err
}
if _, err := ts.tarW.Write(buf2[:n]); err != nil {
return 0, err
}
ts.tarW.Flush()
if _, err := io.Copy(ts.gz, ts.bufTar); err != nil {
return 0, err
}
ts.gz.Flush()
return ts.bufGz.Read(buf)
}
return n, err
}
// Filling the hash buffer
if _, err = ts.h.Write(buf2[:n]); err != nil {
return 0, err
}
// Filling the tar writter
if _, err = ts.tarW.Write(buf2[:n]); err != nil {
return 0, err
}
ts.tarW.Flush()
// Filling the gz writter
if _, err = io.Copy(ts.gz, ts.bufTar); err != nil {
return 0, err
}
ts.gz.Flush()
return ts.bufGz.Read(buf)
}
func (ts *TarSum) Sum() string {
sort.Strings(ts.sums)
h := sha256.New()
for _, sum := range ts.sums {
Debugf("-->%s<--", sum)
h.Write([]byte(sum))
}
checksum := "tarsum+sha256:" + hex.EncodeToString(ts.h.Sum(nil))
Debugf("checksum processed: %s", checksum)
return checksum
}