1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Parallel migration and optimizations

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2015-11-29 19:55:22 -08:00
parent 0641429ad8
commit a8f88ef403
9 changed files with 343 additions and 154 deletions

View file

@ -796,9 +796,11 @@ func NewDaemon(config *Config, registryService *registry.Service) (daemon *Daemo
return nil, fmt.Errorf("Couldn't restore custom images: %s", err) return nil, fmt.Errorf("Couldn't restore custom images: %s", err)
} }
migrationStart := time.Now()
if err := v1.Migrate(config.Root, graphDriver, d.layerStore, d.imageStore, referenceStore, distributionMetadataStore); err != nil { if err := v1.Migrate(config.Root, graphDriver, d.layerStore, d.imageStore, referenceStore, distributionMetadataStore); err != nil {
return nil, err return nil, err
} }
logrus.Infof("Graph migration to content-addressability took %.2f seconds", time.Since(migrationStart).Seconds())
// Discovery is only enabled when the daemon is launched with an address to advertise. When // Discovery is only enabled when the daemon is launched with an address to advertise. When
// initialized, the daemon is registered and we can store the discovery backend as its read-only // initialized, the daemon is registered and we can store the discovery backend as its read-only

View file

@ -97,16 +97,20 @@ func (fm *fileMetadataTransaction) SetCacheID(cacheID string) error {
return ioutil.WriteFile(filepath.Join(fm.root, "cache-id"), []byte(cacheID), 0644) return ioutil.WriteFile(filepath.Join(fm.root, "cache-id"), []byte(cacheID), 0644)
} }
func (fm *fileMetadataTransaction) TarSplitWriter() (io.WriteCloser, error) { func (fm *fileMetadataTransaction) TarSplitWriter(compressInput bool) (io.WriteCloser, error) {
f, err := os.OpenFile(filepath.Join(fm.root, "tar-split.json.gz"), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(filepath.Join(fm.root, "tar-split.json.gz"), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var wc io.WriteCloser
if compressInput {
wc = gzip.NewWriter(f)
} else {
wc = f
}
fz := gzip.NewWriter(f) return ioutils.NewWriteCloserWrapper(wc, func() error {
wc.Close()
return ioutils.NewWriteCloserWrapper(fz, func() error {
fz.Close()
return f.Close() return f.Close()
}), nil }), nil
} }

View file

@ -183,7 +183,7 @@ type MetadataTransaction interface {
SetParent(parent ChainID) error SetParent(parent ChainID) error
SetDiffID(DiffID) error SetDiffID(DiffID) error
SetCacheID(string) error SetCacheID(string) error
TarSplitWriter() (io.WriteCloser, error) TarSplitWriter(compressInput bool) (io.WriteCloser, error)
Commit(ChainID) error Commit(ChainID) error
Cancel() error Cancel() error

View file

@ -196,7 +196,7 @@ func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent stri
digester := digest.Canonical.New() digester := digest.Canonical.New()
tr := io.TeeReader(ts, digester.Hash()) tr := io.TeeReader(ts, digester.Hash())
tsw, err := tx.TarSplitWriter() tsw, err := tx.TarSplitWriter(true)
if err != nil { if err != nil {
return err return err
} }
@ -572,7 +572,7 @@ func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc Mou
return initID, nil return initID, nil
} }
func (ls *layerStore) assembleTar(graphID string, metadata io.ReadCloser, size *int64) (io.ReadCloser, error) { func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
type diffPathDriver interface { type diffPathDriver interface {
DiffPath(string) (string, func() error, error) DiffPath(string) (string, func() error, error)
} }
@ -582,34 +582,20 @@ func (ls *layerStore) assembleTar(graphID string, metadata io.ReadCloser, size *
diffDriver = &naiveDiffPathDriver{ls.driver} diffDriver = &naiveDiffPathDriver{ls.driver}
} }
defer metadata.Close()
// get our relative path to the container // get our relative path to the container
fsPath, releasePath, err := diffDriver.DiffPath(graphID) fsPath, releasePath, err := diffDriver.DiffPath(graphID)
if err != nil { if err != nil {
metadata.Close() return err
return nil, err
} }
defer releasePath()
pR, pW := io.Pipe() metaUnpacker := storage.NewJSONUnpacker(metadata)
// this will need to be in a goroutine, as we are returning the stream of a upackerCounter := &unpackSizeCounter{metaUnpacker, size}
// tar archive, but can not close the metadata reader early (when this fileGetter := storage.NewPathFileGetter(fsPath)
// function returns)... logrus.Debugf("Assembling tar data for %s from %s", graphID, fsPath)
go func() { return asm.WriteOutputTarStream(fileGetter, upackerCounter, w)
defer releasePath()
defer metadata.Close()
metaUnpacker := storage.NewJSONUnpacker(metadata)
upackerCounter := &unpackSizeCounter{metaUnpacker, size}
fileGetter := storage.NewPathFileGetter(fsPath)
logrus.Debugf("Assembling tar data for %s from %s", graphID, fsPath)
ots := asm.NewOutputTarStream(fileGetter, upackerCounter)
defer ots.Close()
if _, err := io.Copy(pW, ots); err != nil {
pW.CloseWithError(err)
return
}
pW.Close()
}()
return pR, nil
} }
func (ls *layerStore) Cleanup() error { func (ls *layerStore) Cleanup() error {

View file

@ -9,7 +9,6 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/docker/pkg/ioutils"
"github.com/vbatts/tar-split/tar/asm" "github.com/vbatts/tar-split/tar/asm"
"github.com/vbatts/tar-split/tar/storage" "github.com/vbatts/tar-split/tar/storage"
) )
@ -76,79 +75,75 @@ func (ls *layerStore) CreateRWLayerByGraphID(name string, graphID string, parent
return nil return nil
} }
func (ls *layerStore) migrateLayer(tx MetadataTransaction, tarDataFile string, layer *roLayer) error { func (ls *layerStore) ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID DiffID, size int64, err error) {
var ar io.Reader defer func() {
var tdf *os.File
var err error
if tarDataFile != "" {
tdf, err = os.Open(tarDataFile)
if err != nil { if err != nil {
if !os.IsNotExist(err) { logrus.Debugf("could not get checksum for %q with tar-split: %q", id, err)
return err diffID, size, err = ls.checksumForGraphIDNoTarsplit(id, parent, newTarDataPath)
}
tdf = nil
}
defer tdf.Close()
}
if tdf != nil {
tsw, err := tx.TarSplitWriter()
if err != nil {
return err
} }
}()
defer tsw.Close() if oldTarDataPath == "" {
err = errors.New("no tar-split file")
uncompressed, err := gzip.NewReader(tdf) return
if err != nil {
return err
}
defer uncompressed.Close()
tr := io.TeeReader(uncompressed, tsw)
trc := ioutils.NewReadCloserWrapper(tr, uncompressed.Close)
ar, err = ls.assembleTar(layer.cacheID, trc, &layer.size)
if err != nil {
return err
}
} else {
var graphParent string
if layer.parent != nil {
graphParent = layer.parent.cacheID
}
archiver, err := ls.driver.Diff(layer.cacheID, graphParent)
if err != nil {
return err
}
defer archiver.Close()
tsw, err := tx.TarSplitWriter()
if err != nil {
return err
}
metaPacker := storage.NewJSONPacker(tsw)
packerCounter := &packSizeCounter{metaPacker, &layer.size}
defer tsw.Close()
ar, err = asm.NewInputTarStream(archiver, packerCounter, nil)
if err != nil {
return err
}
} }
digester := digest.Canonical.New() tarDataFile, err := os.Open(oldTarDataPath)
_, err = io.Copy(digester.Hash(), ar)
if err != nil { if err != nil {
return err return
}
defer tarDataFile.Close()
uncompressed, err := gzip.NewReader(tarDataFile)
if err != nil {
return
} }
layer.diffID = DiffID(digester.Digest()) dgst := digest.Canonical.New()
err = ls.assembleTarTo(id, uncompressed, &size, dgst.Hash())
if err != nil {
return
}
return nil diffID = DiffID(dgst.Digest())
err = os.RemoveAll(newTarDataPath)
if err != nil {
return
}
err = os.Link(oldTarDataPath, newTarDataPath)
return
} }
func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, tarDataFile string) (Layer, error) { func (ls *layerStore) checksumForGraphIDNoTarsplit(id, parent, newTarDataPath string) (diffID DiffID, size int64, err error) {
rawarchive, err := ls.driver.Diff(id, parent)
if err != nil {
return
}
defer rawarchive.Close()
f, err := os.Create(newTarDataPath)
if err != nil {
return
}
defer f.Close()
mfz := gzip.NewWriter(f)
metaPacker := storage.NewJSONPacker(mfz)
packerCounter := &packSizeCounter{metaPacker, &size}
archive, err := asm.NewInputTarStream(rawarchive, packerCounter, nil)
if err != nil {
return
}
dgst, err := digest.FromReader(archive)
if err != nil {
return
}
diffID = DiffID(dgst)
return
}
func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, diffID DiffID, tarDataFile string, size int64) (Layer, error) {
// err is used to hold the error which will always trigger // err is used to hold the error which will always trigger
// cleanup of creates sources but may not be an error returned // cleanup of creates sources but may not be an error returned
// to the caller (already exists). // to the caller (already exists).
@ -177,6 +172,18 @@ func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, tarDataF
referenceCount: 1, referenceCount: 1,
layerStore: ls, layerStore: ls,
references: map[Layer]struct{}{}, references: map[Layer]struct{}{},
diffID: diffID,
size: size,
chainID: createChainIDFromParent(parent, diffID),
}
ls.layerL.Lock()
defer ls.layerL.Unlock()
if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
// Set error for cleanup, but do not return
err = errors.New("layer already exists")
return existingLayer.getReference(), nil
} }
tx, err := ls.store.StartTransaction() tx, err := ls.store.StartTransaction()
@ -193,25 +200,25 @@ func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, tarDataF
} }
}() }()
if err = ls.migrateLayer(tx, tarDataFile, layer); err != nil { tsw, err := tx.TarSplitWriter(false)
if err != nil {
return nil, err
}
defer tsw.Close()
tdf, err := os.Open(tarDataFile)
if err != nil {
return nil, err
}
defer tdf.Close()
_, err = io.Copy(tsw, tdf)
if err != nil {
return nil, err return nil, err
} }
layer.chainID = createChainIDFromParent(parent, layer.diffID)
if err = storeLayer(tx, layer); err != nil { if err = storeLayer(tx, layer); err != nil {
return nil, err return nil, err
} }
ls.layerL.Lock()
defer ls.layerL.Unlock()
if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
// Set error for cleanup, but do not return
err = errors.New("layer already exists")
return existingLayer.getReference(), nil
}
if err = tx.Commit(layer.chainID); err != nil { if err = tx.Commit(layer.chainID); err != nil {
return nil, err return nil, err
} }

View file

@ -94,7 +94,13 @@ func TestLayerMigration(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
layer1a, err := ls.(*layerStore).RegisterByGraphID(graphID1, "", tf1) newTarDataPath := filepath.Join(td, ".migration-tardata")
diffID, size, err := ls.(*layerStore).ChecksumForGraphID(graphID1, "", tf1, newTarDataPath)
if err != nil {
t.Fatal(err)
}
layer1a, err := ls.(*layerStore).RegisterByGraphID(graphID1, "", diffID, newTarDataPath, size)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -105,7 +111,6 @@ func TestLayerMigration(t *testing.T) {
} }
assertReferences(t, layer1a, layer1b) assertReferences(t, layer1a, layer1b)
// Attempt register, should be same // Attempt register, should be same
layer2a, err := ls.Register(bytes.NewReader(tar2), layer1a.ChainID()) layer2a, err := ls.Register(bytes.NewReader(tar2), layer1a.ChainID())
if err != nil { if err != nil {
@ -124,12 +129,15 @@ func TestLayerMigration(t *testing.T) {
if err := writeTarSplitFile(tf2, tar2); err != nil { if err := writeTarSplitFile(tf2, tar2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
diffID, size, err = ls.(*layerStore).ChecksumForGraphID(graphID2, graphID1, tf2, newTarDataPath)
layer2b, err := ls.(*layerStore).RegisterByGraphID(graphID2, layer1a.ChainID(), tf2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
layer2b, err := ls.(*layerStore).RegisterByGraphID(graphID2, layer1a.ChainID(), diffID, tf2, size)
if err != nil {
t.Fatal(err)
}
assertReferences(t, layer2a, layer2b) assertReferences(t, layer2a, layer2b)
if metadata, err := ls.Release(layer2a); err != nil { if metadata, err := ls.Release(layer2a); err != nil {
@ -210,7 +218,13 @@ func TestLayerMigrationNoTarsplit(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
layer1a, err := ls.(*layerStore).RegisterByGraphID(graphID1, "", "") newTarDataPath := filepath.Join(td, ".migration-tardata")
diffID, size, err := ls.(*layerStore).ChecksumForGraphID(graphID1, "", "", newTarDataPath)
if err != nil {
t.Fatal(err)
}
layer1a, err := ls.(*layerStore).RegisterByGraphID(graphID1, "", diffID, newTarDataPath, size)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -228,11 +242,15 @@ func TestLayerMigrationNoTarsplit(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
layer2b, err := ls.(*layerStore).RegisterByGraphID(graphID2, layer1a.ChainID(), "") diffID, size, err = ls.(*layerStore).ChecksumForGraphID(graphID2, graphID1, "", newTarDataPath)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
layer2b, err := ls.(*layerStore).RegisterByGraphID(graphID2, layer1a.ChainID(), diffID, newTarDataPath, size)
if err != nil {
t.Fatal(err)
}
assertReferences(t, layer2a, layer2b) assertReferences(t, layer2a, layer2b)
if metadata, err := ls.Release(layer2a); err != nil { if metadata, err := ls.Release(layer2a); err != nil {

View file

@ -20,7 +20,16 @@ func (rl *roLayer) TarStream() (io.ReadCloser, error) {
return nil, err return nil, err
} }
return rl.layerStore.assembleTar(rl.cacheID, r, nil) pr, pw := io.Pipe()
go func() {
err := rl.layerStore.assembleTarTo(rl.cacheID, r, nil, pw)
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
return pr, nil
} }
func (rl *roLayer) ChainID() ChainID { func (rl *roLayer) ChainID() ChainID {

View file

@ -6,6 +6,10 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strconv"
"sync"
"time"
"encoding/json" "encoding/json"
@ -19,7 +23,7 @@ import (
) )
type graphIDRegistrar interface { type graphIDRegistrar interface {
RegisterByGraphID(string, layer.ChainID, string) (layer.Layer, error) RegisterByGraphID(string, layer.ChainID, layer.DiffID, string, int64) (layer.Layer, error)
Release(layer.Layer) ([]layer.Metadata, error) Release(layer.Layer) ([]layer.Metadata, error)
} }
@ -27,11 +31,18 @@ type graphIDMounter interface {
CreateRWLayerByGraphID(string, string, layer.ChainID) error CreateRWLayerByGraphID(string, string, layer.ChainID) error
} }
type checksumCalculator interface {
ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID layer.DiffID, size int64, err error)
}
const ( const (
graphDirName = "graph" graphDirName = "graph"
tarDataFileName = "tar-data.json.gz" tarDataFileName = "tar-data.json.gz"
migrationFileName = ".migration-v1-images.json" migrationFileName = ".migration-v1-images.json"
migrationTagsFileName = ".migration-v1-tags" migrationTagsFileName = ".migration-v1-tags"
migrationDiffIDFileName = ".migration-diffid"
migrationSizeFileName = ".migration-size"
migrationTarDataFileName = ".migration-tardata"
containersDirName = "containers" containersDirName = "containers"
configFileNameLegacy = "config.json" configFileNameLegacy = "config.json"
configFileName = "config.v2.json" configFileName = "config.v2.json"
@ -45,7 +56,19 @@ var (
// Migrate takes an old graph directory and transforms the metadata into the // Migrate takes an old graph directory and transforms the metadata into the
// new format. // new format.
func Migrate(root, driverName string, ls layer.Store, is image.Store, rs reference.Store, ms metadata.Store) error { func Migrate(root, driverName string, ls layer.Store, is image.Store, rs reference.Store, ms metadata.Store) error {
mappings := make(map[string]image.ID) graphDir := filepath.Join(root, graphDirName)
if _, err := os.Lstat(graphDir); os.IsNotExist(err) {
return nil
}
mappings, err := restoreMappings(root)
if err != nil {
return err
}
if cc, ok := ls.(checksumCalculator); ok {
CalculateLayerChecksums(root, cc, mappings)
}
if registrar, ok := ls.(graphIDRegistrar); !ok { if registrar, ok := ls.(graphIDRegistrar); !ok {
return errUnsupported return errUnsupported
@ -53,6 +76,11 @@ func Migrate(root, driverName string, ls layer.Store, is image.Store, rs referen
return err return err
} }
err = saveMappings(root, mappings)
if err != nil {
return err
}
if mounter, ok := ls.(graphIDMounter); !ok { if mounter, ok := ls.(graphIDMounter); !ok {
return errUnsupported return errUnsupported
} else if err := migrateContainers(root, mounter, is, mappings); err != nil { } else if err := migrateContainers(root, mounter, is, mappings); err != nil {
@ -66,28 +94,115 @@ func Migrate(root, driverName string, ls layer.Store, is image.Store, rs referen
return nil return nil
} }
func migrateImages(root string, ls graphIDRegistrar, is image.Store, ms metadata.Store, mappings map[string]image.ID) error { // CalculateLayerChecksums walks an old graph directory and calculates checksums
// for each layer. These checksums are later used for migration.
func CalculateLayerChecksums(root string, ls checksumCalculator, mappings map[string]image.ID) {
graphDir := filepath.Join(root, graphDirName) graphDir := filepath.Join(root, graphDirName)
if _, err := os.Lstat(graphDir); err != nil { // spawn some extra workers also for maximum performance because the process is bounded by both cpu and io
if os.IsNotExist(err) { workers := runtime.NumCPU() * 3
return nil workQueue := make(chan string, workers)
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
for id := range workQueue {
start := time.Now()
if err := calculateLayerChecksum(graphDir, id, ls); err != nil {
logrus.Errorf("could not calculate checksum for %q, %q", id, err)
}
elapsed := time.Since(start)
logrus.Debugf("layer %s took %.2f seconds", id, elapsed.Seconds())
}
wg.Done()
}()
}
dir, err := ioutil.ReadDir(graphDir)
if err != nil {
logrus.Errorf("could not read directory %q", graphDir)
return
}
for _, v := range dir {
v1ID := v.Name()
if err := imagev1.ValidateID(v1ID); err != nil {
continue
} }
if _, ok := mappings[v1ID]; ok { // support old migrations without helper files
continue
}
workQueue <- v1ID
}
close(workQueue)
wg.Wait()
}
func calculateLayerChecksum(graphDir, id string, ls checksumCalculator) error {
diffIDFile := filepath.Join(graphDir, id, migrationDiffIDFileName)
if _, err := os.Lstat(diffIDFile); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err return err
} }
parent, err := getParent(filepath.Join(graphDir, id))
if err != nil {
return err
}
diffID, size, err := ls.ChecksumForGraphID(id, parent, filepath.Join(graphDir, id, tarDataFileName), filepath.Join(graphDir, id, migrationTarDataFileName))
if err != nil {
return err
}
if err := ioutil.WriteFile(filepath.Join(graphDir, id, migrationSizeFileName), []byte(strconv.Itoa(int(size))), 0600); err != nil {
return err
}
if err := ioutil.WriteFile(filepath.Join(graphDir, id, migrationDiffIDFileName), []byte(diffID), 0600); err != nil {
return err
}
logrus.Infof("calculated checksum for layer %s: %s", id, diffID)
return nil
}
func restoreMappings(root string) (map[string]image.ID, error) {
mappings := make(map[string]image.ID)
mfile := filepath.Join(root, migrationFileName) mfile := filepath.Join(root, migrationFileName)
f, err := os.Open(mfile) f, err := os.Open(mfile)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return nil, err
} else if err == nil { } else if err == nil {
err := json.NewDecoder(f).Decode(&mappings) err := json.NewDecoder(f).Decode(&mappings)
if err != nil { if err != nil {
f.Close() f.Close()
return err return nil, err
} }
f.Close() f.Close()
} }
return mappings, nil
}
func saveMappings(root string, mappings map[string]image.ID) error {
mfile := filepath.Join(root, migrationFileName)
f, err := os.OpenFile(mfile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(mappings); err != nil {
return err
}
return nil
}
func migrateImages(root string, ls graphIDRegistrar, is image.Store, ms metadata.Store, mappings map[string]image.ID) error {
graphDir := filepath.Join(root, graphDirName)
dir, err := ioutil.ReadDir(graphDir) dir, err := ioutil.ReadDir(graphDir)
if err != nil { if err != nil {
return err return err
@ -105,15 +220,6 @@ func migrateImages(root string, ls graphIDRegistrar, is image.Store, ms metadata
} }
} }
f, err = os.OpenFile(mfile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(mappings); err != nil {
return err
}
return nil return nil
} }
@ -251,6 +357,30 @@ func migrateRefs(root, driverName string, rs refAdder, mappings map[string]image
return nil return nil
} }
func getParent(confDir string) (string, error) {
jsonFile := filepath.Join(confDir, "json")
imageJSON, err := ioutil.ReadFile(jsonFile)
if err != nil {
return "", err
}
var parent struct {
Parent string
ParentID digest.Digest `json:"parent_id"`
}
if err := json.Unmarshal(imageJSON, &parent); err != nil {
return "", err
}
if parent.Parent == "" && parent.ParentID != "" { // v1.9
parent.Parent = parent.ParentID.Hex()
}
// compatibilityID for parent
parentCompatibilityID, err := ioutil.ReadFile(filepath.Join(confDir, "parent"))
if err == nil && len(parentCompatibilityID) > 0 {
parent.Parent = string(parentCompatibilityID)
}
return parent.Parent, nil
}
func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metadata.Store, mappings map[string]image.ID) (err error) { func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metadata.Store, mappings map[string]image.ID) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
@ -258,36 +388,20 @@ func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metad
} }
}() }()
jsonFile := filepath.Join(root, graphDirName, id, "json") parent, err := getParent(filepath.Join(root, graphDirName, id))
imageJSON, err := ioutil.ReadFile(jsonFile)
if err != nil { if err != nil {
return err return err
} }
var parent struct {
Parent string
ParentID digest.Digest `json:"parent_id"`
}
if err := json.Unmarshal(imageJSON, &parent); err != nil {
return err
}
if parent.Parent == "" && parent.ParentID != "" { // v1.9
parent.Parent = parent.ParentID.Hex()
}
// compatibilityID for parent
parentCompatibilityID, err := ioutil.ReadFile(filepath.Join(root, graphDirName, id, "parent"))
if err == nil && len(parentCompatibilityID) > 0 {
parent.Parent = string(parentCompatibilityID)
}
var parentID image.ID var parentID image.ID
if parent.Parent != "" { if parent != "" {
var exists bool var exists bool
if parentID, exists = mappings[parent.Parent]; !exists { if parentID, exists = mappings[parent]; !exists {
if err := migrateImage(parent.Parent, root, ls, is, ms, mappings); err != nil { if err := migrateImage(parent, root, ls, is, ms, mappings); err != nil {
// todo: fail or allow broken chains? // todo: fail or allow broken chains?
return err return err
} }
parentID = mappings[parent.Parent] parentID = mappings[parent]
} }
} }
@ -304,12 +418,32 @@ func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metad
history = parentImg.History history = parentImg.History
} }
layer, err := ls.RegisterByGraphID(id, rootFS.ChainID(), filepath.Join(filepath.Join(root, graphDirName, id, tarDataFileName))) diffID, err := ioutil.ReadFile(filepath.Join(root, graphDirName, id, migrationDiffIDFileName))
if err != nil {
return err
}
sizeStr, err := ioutil.ReadFile(filepath.Join(root, graphDirName, id, migrationSizeFileName))
if err != nil {
return err
}
size, err := strconv.ParseInt(string(sizeStr), 10, 64)
if err != nil {
return err
}
layer, err := ls.RegisterByGraphID(id, rootFS.ChainID(), layer.DiffID(diffID), filepath.Join(root, graphDirName, id, migrationTarDataFileName), size)
if err != nil { if err != nil {
return err return err
} }
logrus.Infof("migrated layer %s to %s", id, layer.DiffID()) logrus.Infof("migrated layer %s to %s", id, layer.DiffID())
jsonFile := filepath.Join(root, graphDirName, id, "json")
imageJSON, err := ioutil.ReadFile(jsonFile)
if err != nil {
return err
}
h, err := imagev1.HistoryFromConfig(imageJSON, false) h, err := imagev1.HistoryFromConfig(imageJSON, false)
if err != nil { if err != nil {
return err return err

View file

@ -234,12 +234,30 @@ func TestMigrateUnsupported(t *testing.T) {
} }
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
err = os.MkdirAll(filepath.Join(tmpdir, "graph"), 0700)
if err != nil {
t.Fatal(err)
}
err = Migrate(tmpdir, "generic", nil, nil, nil, nil) err = Migrate(tmpdir, "generic", nil, nil, nil, nil)
if err != errUnsupported { if err != errUnsupported {
t.Fatalf("expected unsupported error, got %q", err) t.Fatalf("expected unsupported error, got %q", err)
} }
} }
func TestMigrateEmptyDir(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "migrate-empty")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
err = Migrate(tmpdir, "generic", nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
}
func addImage(dest, jsonConfig, parent, checksum string) (string, error) { func addImage(dest, jsonConfig, parent, checksum string) (string, error) {
var config struct{ ID string } var config struct{ ID string }
if err := json.Unmarshal([]byte(jsonConfig), &config); err != nil { if err := json.Unmarshal([]byte(jsonConfig), &config); err != nil {
@ -257,6 +275,17 @@ func addImage(dest, jsonConfig, parent, checksum string) (string, error) {
if err := ioutil.WriteFile(filepath.Join(contDir, "json"), []byte(jsonConfig), 0600); err != nil { if err := ioutil.WriteFile(filepath.Join(contDir, "json"), []byte(jsonConfig), 0600); err != nil {
return "", err return "", err
} }
if checksum != "" {
if err := ioutil.WriteFile(filepath.Join(contDir, "checksum"), []byte(checksum), 0600); err != nil {
return "", err
}
}
if err := ioutil.WriteFile(filepath.Join(contDir, ".migration-diffid"), []byte(layer.EmptyLayer.DiffID()), 0600); err != nil {
return "", err
}
if err := ioutil.WriteFile(filepath.Join(contDir, ".migration-size"), []byte("0"), 0600); err != nil {
return "", err
}
if parent != "" { if parent != "" {
if err := ioutil.WriteFile(filepath.Join(contDir, "parent"), []byte(parent), 0600); err != nil { if err := ioutil.WriteFile(filepath.Join(contDir, "parent"), []byte(parent), 0600); err != nil {
return "", err return "", err
@ -305,7 +334,7 @@ type mockRegistrar struct {
count int count int
} }
func (r *mockRegistrar) RegisterByGraphID(graphID string, parent layer.ChainID, tarDataFile string) (layer.Layer, error) { func (r *mockRegistrar) RegisterByGraphID(graphID string, parent layer.ChainID, diffID layer.DiffID, tarDataFile string, size int64) (layer.Layer, error) {
r.count++ r.count++
l := &mockLayer{} l := &mockLayer{}
if parent != "" { if parent != "" {
@ -316,7 +345,7 @@ func (r *mockRegistrar) RegisterByGraphID(graphID string, parent layer.ChainID,
l.parent = p l.parent = p
l.diffIDs = append(l.diffIDs, p.diffIDs...) l.diffIDs = append(l.diffIDs, p.diffIDs...)
} }
l.diffIDs = append(l.diffIDs, layer.EmptyLayer.DiffID()) l.diffIDs = append(l.diffIDs, diffID)
if r.layers == nil { if r.layers == nil {
r.layers = make(map[layer.ChainID]*mockLayer) r.layers = make(map[layer.ChainID]*mockLayer)
} }