2015-11-18 17:18:44 -05:00
package distribution
import (
"fmt"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/image"
"github.com/docker/docker/image/v1"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
2015-11-13 19:59:01 -05:00
"github.com/docker/docker/pkg/progress"
2015-11-18 17:18:44 -05:00
"github.com/docker/docker/pkg/stringid"
2015-12-04 16:55:15 -05:00
"github.com/docker/docker/reference"
2015-11-18 17:18:44 -05:00
"github.com/docker/docker/registry"
2015-11-13 19:59:01 -05:00
"golang.org/x/net/context"
2015-11-18 17:18:44 -05:00
)
type v1Pusher struct {
2015-11-13 19:59:01 -05:00
ctx context . Context
2015-11-18 17:18:44 -05:00
v1IDService * metadata . V1IDService
endpoint registry . APIEndpoint
ref reference . Named
repoInfo * registry . RepositoryInfo
config * ImagePushConfig
session * registry . Session
}
2015-12-04 16:42:33 -05:00
func ( p * v1Pusher ) Push ( ctx context . Context ) error {
2015-11-18 17:18:44 -05:00
tlsConfig , err := p . config . RegistryService . TLSConfig ( p . repoInfo . Index . Name )
if err != nil {
2015-12-04 16:42:33 -05:00
return err
2015-11-18 17:18:44 -05:00
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport . NewTransport (
// TODO(tiborvass): was NoTimeout
registry . NewTransport ( tlsConfig ) ,
registry . DockerHeaders ( p . config . MetaHeaders ) ... ,
)
client := registry . HTTPClient ( tr )
v1Endpoint , err := p . endpoint . ToV1Endpoint ( p . config . MetaHeaders )
if err != nil {
logrus . Debugf ( "Could not get v1 endpoint: %v" , err )
2015-12-04 16:42:33 -05:00
return fallbackError { err : err }
2015-11-18 17:18:44 -05:00
}
p . session , err = registry . NewSession ( client , p . config . AuthConfig , v1Endpoint )
if err != nil {
// TODO(dmcgowan): Check if should fallback
2015-12-04 16:42:33 -05:00
return fallbackError { err : err }
2015-11-18 17:18:44 -05:00
}
2015-11-13 19:59:01 -05:00
if err := p . pushRepository ( ctx ) ; err != nil {
2015-11-18 17:18:44 -05:00
// TODO(dmcgowan): Check if should fallback
2015-12-04 16:42:33 -05:00
return err
2015-11-18 17:18:44 -05:00
}
2015-12-04 16:42:33 -05:00
return nil
2015-11-18 17:18:44 -05:00
}
// v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
// image being pushed to a v1 registry.
type v1Image interface {
Config ( ) [ ] byte
Layer ( ) layer . Layer
V1ID ( ) string
}
type v1ImageCommon struct {
layer layer . Layer
config [ ] byte
v1ID string
}
func ( common * v1ImageCommon ) Config ( ) [ ] byte {
return common . config
}
func ( common * v1ImageCommon ) V1ID ( ) string {
return common . v1ID
}
func ( common * v1ImageCommon ) Layer ( ) layer . Layer {
return common . layer
}
// v1TopImage defines a runnable (top layer) image being pushed to a v1
// registry.
type v1TopImage struct {
v1ImageCommon
imageID image . ID
}
func newV1TopImage ( imageID image . ID , img * image . Image , l layer . Layer , parent * v1DependencyImage ) ( * v1TopImage , error ) {
v1ID := digest . Digest ( imageID ) . Hex ( )
parentV1ID := ""
if parent != nil {
parentV1ID = parent . V1ID ( )
}
config , err := v1 . MakeV1ConfigFromConfig ( img , v1ID , parentV1ID , false )
if err != nil {
return nil , err
}
return & v1TopImage {
v1ImageCommon : v1ImageCommon {
v1ID : v1ID ,
config : config ,
layer : l ,
} ,
imageID : imageID ,
} , nil
}
// v1DependencyImage defines a dependency layer being pushed to a v1 registry.
type v1DependencyImage struct {
v1ImageCommon
}
func newV1DependencyImage ( l layer . Layer , parent * v1DependencyImage ) ( * v1DependencyImage , error ) {
v1ID := digest . Digest ( l . ChainID ( ) ) . Hex ( )
config := ""
if parent != nil {
config = fmt . Sprintf ( ` { "id":"%s","parent":"%s"} ` , v1ID , parent . V1ID ( ) )
} else {
config = fmt . Sprintf ( ` { "id":"%s"} ` , v1ID )
}
return & v1DependencyImage {
v1ImageCommon : v1ImageCommon {
v1ID : v1ID ,
config : [ ] byte ( config ) ,
layer : l ,
} ,
} , nil
}
// Retrieve the all the images to be uploaded in the correct order
func ( p * v1Pusher ) getImageList ( ) ( imageList [ ] v1Image , tagsByImage map [ image . ID ] [ ] string , referencedLayers [ ] layer . Layer , err error ) {
tagsByImage = make ( map [ image . ID ] [ ] string )
// Ignore digest references
2015-12-04 16:55:15 -05:00
if _ , isCanonical := p . ref . ( reference . Canonical ) ; isCanonical {
2015-11-18 17:18:44 -05:00
return
}
2015-12-04 16:55:15 -05:00
tagged , isTagged := p . ref . ( reference . NamedTagged )
2015-11-18 17:18:44 -05:00
if isTagged {
// Push a specific tag
var imgID image . ID
2015-12-04 16:55:15 -05:00
imgID , err = p . config . ReferenceStore . Get ( p . ref )
2015-11-18 17:18:44 -05:00
if err != nil {
return
}
imageList , err = p . imageListForTag ( imgID , nil , & referencedLayers )
if err != nil {
return
}
tagsByImage [ imgID ] = [ ] string { tagged . Tag ( ) }
return
}
imagesSeen := make ( map [ image . ID ] struct { } )
dependenciesSeen := make ( map [ layer . ChainID ] * v1DependencyImage )
2015-12-04 16:55:15 -05:00
associations := p . config . ReferenceStore . ReferencesByName ( p . ref )
2015-11-18 17:18:44 -05:00
for _ , association := range associations {
2015-12-04 16:55:15 -05:00
if tagged , isTagged = association . Ref . ( reference . NamedTagged ) ; ! isTagged {
2015-11-18 17:18:44 -05:00
// Ignore digest references.
continue
}
tagsByImage [ association . ImageID ] = append ( tagsByImage [ association . ImageID ] , tagged . Tag ( ) )
if _ , present := imagesSeen [ association . ImageID ] ; present {
// Skip generating image list for already-seen image
continue
}
imagesSeen [ association . ImageID ] = struct { } { }
imageListForThisTag , err := p . imageListForTag ( association . ImageID , dependenciesSeen , & referencedLayers )
if err != nil {
return nil , nil , nil , err
}
// append to main image list
imageList = append ( imageList , imageListForThisTag ... )
}
if len ( imageList ) == 0 {
return nil , nil , nil , fmt . Errorf ( "No images found for the requested repository / tag" )
}
logrus . Debugf ( "Image list: %v" , imageList )
logrus . Debugf ( "Tags by image: %v" , tagsByImage )
return
}
func ( p * v1Pusher ) imageListForTag ( imgID image . ID , dependenciesSeen map [ layer . ChainID ] * v1DependencyImage , referencedLayers * [ ] layer . Layer ) ( imageListForThisTag [ ] v1Image , err error ) {
img , err := p . config . ImageStore . Get ( imgID )
if err != nil {
return nil , err
}
topLayerID := img . RootFS . ChainID ( )
var l layer . Layer
if topLayerID == "" {
l = layer . EmptyLayer
} else {
l , err = p . config . LayerStore . Get ( topLayerID )
* referencedLayers = append ( * referencedLayers , l )
if err != nil {
return nil , fmt . Errorf ( "failed to get top layer from image: %v" , err )
}
}
dependencyImages , parent , err := generateDependencyImages ( l . Parent ( ) , dependenciesSeen )
if err != nil {
return nil , err
}
topImage , err := newV1TopImage ( imgID , img , l , parent )
if err != nil {
return nil , err
}
imageListForThisTag = append ( dependencyImages , topImage )
return
}
func generateDependencyImages ( l layer . Layer , dependenciesSeen map [ layer . ChainID ] * v1DependencyImage ) ( imageListForThisTag [ ] v1Image , parent * v1DependencyImage , err error ) {
if l == nil {
return nil , nil , nil
}
imageListForThisTag , parent , err = generateDependencyImages ( l . Parent ( ) , dependenciesSeen )
if dependenciesSeen != nil {
if dependencyImage , present := dependenciesSeen [ l . ChainID ( ) ] ; present {
// This layer is already on the list, we can ignore it
// and all its parents.
return imageListForThisTag , dependencyImage , nil
}
}
dependencyImage , err := newV1DependencyImage ( l , parent )
if err != nil {
return nil , nil , err
}
imageListForThisTag = append ( imageListForThisTag , dependencyImage )
if dependenciesSeen != nil {
dependenciesSeen [ l . ChainID ( ) ] = dependencyImage
}
return imageListForThisTag , dependencyImage , nil
}
// createImageIndex returns an index of an image's layer IDs and tags.
func createImageIndex ( images [ ] v1Image , tags map [ image . ID ] [ ] string ) [ ] * registry . ImgData {
var imageIndex [ ] * registry . ImgData
for _ , img := range images {
v1ID := img . V1ID ( )
if topImage , isTopImage := img . ( * v1TopImage ) ; isTopImage {
if tags , hasTags := tags [ topImage . imageID ] ; hasTags {
// If an image has tags you must add an entry in the image index
// for each tag
for _ , tag := range tags {
imageIndex = append ( imageIndex , & registry . ImgData {
ID : v1ID ,
Tag : tag ,
} )
}
continue
}
}
// If the image does not have a tag it still needs to be sent to the
// registry with an empty tag so that it is associated with the repository
imageIndex = append ( imageIndex , & registry . ImgData {
ID : v1ID ,
Tag : "" ,
} )
}
return imageIndex
}
// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
// and if it is absent then it sends the image id to the channel to be pushed.
func ( p * v1Pusher ) lookupImageOnEndpoint ( wg * sync . WaitGroup , endpoint string , images chan v1Image , imagesToPush chan string ) {
defer wg . Done ( )
for image := range images {
v1ID := image . V1ID ( )
2015-12-07 14:05:48 -05:00
truncID := stringid . TruncateID ( image . Layer ( ) . DiffID ( ) . String ( ) )
2015-11-18 17:18:44 -05:00
if err := p . session . LookupRemoteImage ( v1ID , endpoint ) ; err != nil {
logrus . Errorf ( "Error in LookupRemoteImage: %s" , err )
imagesToPush <- v1ID
2015-12-07 14:05:48 -05:00
progress . Update ( p . config . ProgressOutput , truncID , "Waiting" )
2015-11-18 17:18:44 -05:00
} else {
2015-12-07 14:05:48 -05:00
progress . Update ( p . config . ProgressOutput , truncID , "Already exists" )
2015-11-18 17:18:44 -05:00
}
}
}
2015-11-13 19:59:01 -05:00
func ( p * v1Pusher ) pushImageToEndpoint ( ctx context . Context , endpoint string , imageList [ ] v1Image , tags map [ image . ID ] [ ] string , repo * registry . RepositoryData ) error {
2015-11-18 17:18:44 -05:00
workerCount := len ( imageList )
// start a maximum of 5 workers to check if images exist on the specified endpoint.
if workerCount > 5 {
workerCount = 5
}
var (
wg = & sync . WaitGroup { }
imageData = make ( chan v1Image , workerCount * 2 )
imagesToPush = make ( chan string , workerCount * 2 )
pushes = make ( chan map [ string ] struct { } , 1 )
)
for i := 0 ; i < workerCount ; i ++ {
wg . Add ( 1 )
go p . lookupImageOnEndpoint ( wg , endpoint , imageData , imagesToPush )
}
// start a go routine that consumes the images to push
go func ( ) {
shouldPush := make ( map [ string ] struct { } )
for id := range imagesToPush {
shouldPush [ id ] = struct { } { }
}
pushes <- shouldPush
} ( )
for _ , v1Image := range imageList {
imageData <- v1Image
}
// close the channel to notify the workers that there will be no more images to check.
close ( imageData )
wg . Wait ( )
close ( imagesToPush )
// wait for all the images that require pushes to be collected into a consumable map.
shouldPush := <- pushes
// finish by pushing any images and tags to the endpoint. The order that the images are pushed
// is very important that is why we are still iterating over the ordered list of imageIDs.
for _ , img := range imageList {
v1ID := img . V1ID ( )
if _ , push := shouldPush [ v1ID ] ; push {
2015-11-13 19:59:01 -05:00
if _ , err := p . pushImage ( ctx , img , endpoint ) ; err != nil {
2015-11-18 17:18:44 -05:00
// FIXME: Continue on error?
return err
}
}
if topImage , isTopImage := img . ( * v1TopImage ) ; isTopImage {
for _ , tag := range tags [ topImage . imageID ] {
2015-12-11 14:00:13 -05:00
progress . Messagef ( p . config . ProgressOutput , "" , "Pushing tag for rev [%s] on {%s}" , stringid . TruncateID ( v1ID ) , endpoint + "repositories/" + p . repoInfo . RemoteName ( ) + "/tags/" + tag )
if err := p . session . PushRegistryTag ( p . repoInfo , v1ID , tag , endpoint ) ; err != nil {
2015-11-18 17:18:44 -05:00
return err
}
}
}
}
return nil
}
// pushRepository pushes layers that do not already exist on the registry.
2015-11-13 19:59:01 -05:00
func ( p * v1Pusher ) pushRepository ( ctx context . Context ) error {
2015-11-18 17:18:44 -05:00
imgList , tags , referencedLayers , err := p . getImageList ( )
defer func ( ) {
for _ , l := range referencedLayers {
p . config . LayerStore . Release ( l )
}
} ( )
if err != nil {
return err
}
imageIndex := createImageIndex ( imgList , tags )
for _ , data := range imageIndex {
logrus . Debugf ( "Pushing ID: %s with Tag: %s" , data . ID , data . Tag )
}
// Register all the images in a repository with the registry
// If an image is not in this list it will not be associated with the repository
2015-12-11 14:00:13 -05:00
repoData , err := p . session . PushImageJSONIndex ( p . repoInfo , imageIndex , false , nil )
2015-11-18 17:18:44 -05:00
if err != nil {
return err
}
// push the repository to each of the endpoints only if it does not exist.
for _ , endpoint := range repoData . Endpoints {
2015-11-13 19:59:01 -05:00
if err := p . pushImageToEndpoint ( ctx , endpoint , imgList , tags , repoData ) ; err != nil {
2015-11-18 17:18:44 -05:00
return err
}
}
2015-12-11 14:00:13 -05:00
_ , err = p . session . PushImageJSONIndex ( p . repoInfo , imageIndex , true , repoData . Endpoints )
2015-11-18 17:18:44 -05:00
return err
}
2015-11-13 19:59:01 -05:00
func ( p * v1Pusher ) pushImage ( ctx context . Context , v1Image v1Image , ep string ) ( checksum string , err error ) {
2015-12-07 14:05:48 -05:00
l := v1Image . Layer ( )
2015-11-18 17:18:44 -05:00
v1ID := v1Image . V1ID ( )
2015-12-07 14:05:48 -05:00
truncID := stringid . TruncateID ( l . DiffID ( ) . String ( ) )
2015-11-18 17:18:44 -05:00
jsonRaw := v1Image . Config ( )
2015-12-07 14:05:48 -05:00
progress . Update ( p . config . ProgressOutput , truncID , "Pushing" )
2015-11-18 17:18:44 -05:00
// General rule is to use ID for graph accesses and compatibilityID for
// calls to session.registry()
imgData := & registry . ImgData {
ID : v1ID ,
}
// Send the json
if err := p . session . PushImageJSONRegistry ( imgData , jsonRaw , ep ) ; err != nil {
if err == registry . ErrAlreadyExists {
2015-12-07 14:05:48 -05:00
progress . Update ( p . config . ProgressOutput , truncID , "Image already pushed, skipping" )
2015-11-18 17:18:44 -05:00
return "" , nil
}
return "" , err
}
arch , err := l . TarStream ( )
if err != nil {
return "" , err
}
2015-11-25 19:39:54 -05:00
defer arch . Close ( )
2015-11-18 17:18:44 -05:00
// don't care if this fails; best effort
2015-12-07 13:59:59 -05:00
size , _ := l . DiffSize ( )
2015-11-18 17:18:44 -05:00
// Send the layer
logrus . Debugf ( "rendered layer for %s of [%d] size" , v1ID , size )
2015-12-07 14:05:48 -05:00
reader := progress . NewProgressReader ( ioutils . NewCancelReadCloser ( ctx , arch ) , p . config . ProgressOutput , size , truncID , "Pushing" )
2015-11-13 19:59:01 -05:00
defer reader . Close ( )
2015-11-18 17:18:44 -05:00
checksum , checksumPayload , err := p . session . PushImageLayerRegistry ( v1ID , reader , ep , jsonRaw )
if err != nil {
return "" , err
}
imgData . Checksum = checksum
imgData . ChecksumPayload = checksumPayload
// Send the checksum
if err := p . session . PushImageChecksumRegistry ( imgData , ep ) ; err != nil {
return "" , err
}
2015-11-13 19:59:01 -05:00
if err := p . v1IDService . Set ( v1ID , p . repoInfo . Index . Name , l . DiffID ( ) ) ; err != nil {
2015-11-18 17:18:44 -05:00
logrus . Warnf ( "Could not set v1 ID mapping: %v" , err )
}
2015-12-07 14:05:48 -05:00
progress . Update ( p . config . ProgressOutput , truncID , "Image successfully pushed" )
2015-11-18 17:18:44 -05:00
return imgData . Checksum , nil
}