2014-08-08 02:58:58 -04:00
package graph
import (
"fmt"
"io"
2014-10-01 21:26:06 -04:00
"io/ioutil"
2014-08-08 02:58:58 -04:00
"net"
"net/url"
2014-10-01 21:26:06 -04:00
"os"
2014-08-08 02:58:58 -04:00
"strings"
"time"
2014-10-24 18:11:48 -04:00
log "github.com/Sirupsen/logrus"
2014-08-08 02:58:58 -04:00
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
2014-12-23 16:40:06 -05:00
"github.com/docker/docker/pkg/tarsum"
2014-08-08 02:58:58 -04:00
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
func ( s * TagStore ) CmdPull ( job * engine . Job ) engine . Status {
if n := len ( job . Args ) ; n != 1 && n != 2 {
return job . Errorf ( "Usage: %s IMAGE [TAG]" , job . Name )
}
2014-10-03 18:24:14 -04:00
2014-08-08 02:58:58 -04:00
var (
localName = job . Args [ 0 ]
tag string
sf = utils . NewStreamFormatter ( job . GetenvBool ( "json" ) )
authConfig = & registry . AuthConfig { }
metaHeaders map [ string ] [ ] string
)
2014-10-03 18:24:14 -04:00
2014-10-06 21:54:52 -04:00
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo , err := registry . ResolveRepositoryInfo ( job , localName )
if err != nil {
return job . Error ( err )
}
2014-08-08 02:58:58 -04:00
if len ( job . Args ) > 1 {
tag = job . Args [ 1 ]
}
job . GetenvJson ( "authConfig" , authConfig )
job . GetenvJson ( "metaHeaders" , & metaHeaders )
2014-10-06 21:54:52 -04:00
c , err := s . poolAdd ( "pull" , repoInfo . LocalName + ":" + tag )
2014-08-08 02:58:58 -04:00
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
2014-10-06 21:54:52 -04:00
job . Stdout . Write ( sf . FormatStatus ( "" , "Repository %s already being pulled by another client. Waiting." , repoInfo . LocalName ) )
2014-08-08 02:58:58 -04:00
<- c
return engine . StatusOK
}
return job . Error ( err )
}
2014-10-06 21:54:52 -04:00
defer s . poolRemove ( "pull" , repoInfo . LocalName + ":" + tag )
2014-08-08 02:58:58 -04:00
2014-12-23 16:40:06 -05:00
log . Debugf ( "pulling image from host %q with remote name %q" , repoInfo . Index . Name , repoInfo . RemoteName )
2014-10-06 21:54:52 -04:00
endpoint , err := repoInfo . GetEndpoint ( )
2014-08-08 02:58:58 -04:00
if err != nil {
return job . Error ( err )
}
2014-08-07 10:43:06 -04:00
r , err := registry . NewSession ( authConfig , registry . HTTPRequestFactory ( metaHeaders ) , endpoint , true )
2014-08-08 02:58:58 -04:00
if err != nil {
return job . Error ( err )
}
2014-10-06 21:54:52 -04:00
logName := repoInfo . LocalName
2014-10-22 09:48:02 -04:00
if tag != "" {
logName += ":" + tag
}
2014-12-19 17:44:18 -05:00
if len ( repoInfo . Index . Mirrors ) == 0 && ( repoInfo . Index . Official || endpoint . Version == registry . APIVersion2 ) {
2014-10-07 14:00:17 -04:00
j := job . Eng . Job ( "trust_update_base" )
if err = j . Run ( ) ; err != nil {
2015-01-12 17:17:50 -05:00
log . Errorf ( "error updating trust base graph: %s" , err )
2014-10-07 14:00:17 -04:00
}
2015-01-13 18:48:49 -05:00
log . Debugf ( "pulling v2 repository with local name %q" , repoInfo . LocalName )
if err := s . pullV2Repository ( job . Eng , r , job . Stdout , repoInfo , tag , sf , job . GetenvBool ( "parallel" ) ) ; err == nil {
if err = job . Eng . Job ( "log" , "pull" , logName , "" ) . Run ( ) ; err != nil {
log . Errorf ( "Error logging event 'pull' for %s: %s" , logName , err )
2014-10-22 09:48:02 -04:00
}
2015-01-13 18:48:49 -05:00
return engine . StatusOK
} else if err != registry . ErrDoesNotExist {
log . Errorf ( "Error from V2 registry: %s" , err )
2014-10-07 14:00:17 -04:00
}
2014-12-23 16:40:06 -05:00
log . Debug ( "image does not exist on v2 registry, falling back to v1" )
2014-10-07 14:00:17 -04:00
}
2014-12-23 16:40:06 -05:00
log . Debugf ( "pulling v1 repository with local name %q" , repoInfo . LocalName )
2014-10-06 21:54:52 -04:00
if err = s . pullRepository ( r , job . Stdout , repoInfo , tag , sf , job . GetenvBool ( "parallel" ) ) ; err != nil {
2014-08-08 02:58:58 -04:00
return job . Error ( err )
}
2014-10-22 09:48:02 -04:00
if err = job . Eng . Job ( "log" , "pull" , logName , "" ) . Run ( ) ; err != nil {
log . Errorf ( "Error logging event 'pull' for %s: %s" , logName , err )
}
2014-08-08 02:58:58 -04:00
return engine . StatusOK
}
2014-10-06 21:54:52 -04:00
func ( s * TagStore ) pullRepository ( r * registry . Session , out io . Writer , repoInfo * registry . RepositoryInfo , askedTag string , sf * utils . StreamFormatter , parallel bool ) error {
out . Write ( sf . FormatStatus ( "" , "Pulling repository %s" , repoInfo . CanonicalName ) )
2014-08-08 02:58:58 -04:00
2014-10-06 21:54:52 -04:00
repoData , err := r . GetRepositoryData ( repoInfo . RemoteName )
2014-08-08 02:58:58 -04:00
if err != nil {
if strings . Contains ( err . Error ( ) , "HTTP code: 404" ) {
2014-10-06 21:54:52 -04:00
return fmt . Errorf ( "Error: image %s:%s not found" , repoInfo . RemoteName , askedTag )
2014-08-08 02:58:58 -04:00
}
2014-08-29 07:21:28 -04:00
// Unexpected HTTP error
return err
2014-08-08 02:58:58 -04:00
}
2014-07-24 16:37:44 -04:00
log . Debugf ( "Retrieving the tag list" )
2014-10-06 21:54:52 -04:00
tagsList , err := r . GetRemoteTags ( repoData . Endpoints , repoInfo . RemoteName , repoData . Tokens )
2014-08-08 02:58:58 -04:00
if err != nil {
2014-12-23 16:40:06 -05:00
log . Errorf ( "unable to get remote tags: %s" , err )
2014-08-08 02:58:58 -04:00
return err
}
for tag , id := range tagsList {
repoData . ImgList [ id ] = & registry . ImgData {
ID : id ,
Tag : tag ,
Checksum : "" ,
}
}
2014-07-24 16:37:44 -04:00
log . Debugf ( "Registering tags" )
2014-08-08 02:58:58 -04:00
// If no tag has been specified, pull them all
2014-09-23 18:58:05 -04:00
var imageId string
2014-08-08 02:58:58 -04:00
if askedTag == "" {
for tag , id := range tagsList {
repoData . ImgList [ id ] . Tag = tag
}
} else {
// Otherwise, check that the tag exists and use only that one
id , exists := tagsList [ askedTag ]
if ! exists {
2014-10-06 21:54:52 -04:00
return fmt . Errorf ( "Tag %s not found in repository %s" , askedTag , repoInfo . CanonicalName )
2014-08-08 02:58:58 -04:00
}
2014-09-23 18:58:05 -04:00
imageId = id
2014-08-08 02:58:58 -04:00
repoData . ImgList [ id ] . Tag = askedTag
}
errors := make ( chan error )
2014-09-23 18:53:43 -04:00
layers_downloaded := false
2014-08-08 02:58:58 -04:00
for _ , image := range repoData . ImgList {
downloadImage := func ( img * registry . ImgData ) {
if askedTag != "" && img . Tag != askedTag {
if parallel {
errors <- nil
}
return
}
if img . Tag == "" {
2014-07-24 16:37:44 -04:00
log . Debugf ( "Image (id: %s) present in this repository but untagged, skipping" , img . ID )
2014-08-08 02:58:58 -04:00
if parallel {
errors <- nil
}
return
}
// ensure no two downloads of the same image happen at the same time
if c , err := s . poolAdd ( "pull" , "img:" + img . ID ) ; err != nil {
if c != nil {
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Layer already being pulled by another client. Waiting." , nil ) )
<- c
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Download complete" , nil ) )
} else {
2014-07-24 16:37:44 -04:00
log . Debugf ( "Image (id: %s) pull is already running, skipping: %v" , img . ID , err )
2014-08-08 02:58:58 -04:00
}
if parallel {
errors <- nil
}
return
}
defer s . poolRemove ( "pull" , "img:" + img . ID )
2014-10-06 21:54:52 -04:00
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , fmt . Sprintf ( "Pulling image (%s) from %s" , img . Tag , repoInfo . CanonicalName ) , nil ) )
2014-08-08 02:58:58 -04:00
success := false
2014-09-23 18:53:43 -04:00
var lastErr , err error
var is_downloaded bool
2014-10-06 21:54:52 -04:00
for _ , ep := range repoInfo . Index . Mirrors {
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , fmt . Sprintf ( "Pulling image (%s) from %s, mirror: %s" , img . Tag , repoInfo . CanonicalName , ep ) , nil ) )
if is_downloaded , err = s . pullImage ( r , out , img . ID , ep , repoData . Tokens , sf ) ; err != nil {
// Don't report errors when pulling from mirrors.
log . Debugf ( "Error pulling image (%s) from %s, mirror: %s, %s" , img . Tag , repoInfo . CanonicalName , ep , err )
continue
2014-07-18 14:48:19 -04:00
}
2014-10-06 21:54:52 -04:00
layers_downloaded = layers_downloaded || is_downloaded
success = true
break
2014-07-18 14:48:19 -04:00
}
if ! success {
for _ , ep := range repoData . Endpoints {
2014-10-06 21:54:52 -04:00
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , fmt . Sprintf ( "Pulling image (%s) from %s, endpoint: %s" , img . Tag , repoInfo . CanonicalName , ep ) , nil ) )
2014-09-23 18:53:43 -04:00
if is_downloaded , err = s . pullImage ( r , out , img . ID , ep , repoData . Tokens , sf ) ; err != nil {
2014-07-18 14:48:19 -04:00
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
2014-10-06 21:54:52 -04:00
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , fmt . Sprintf ( "Error pulling image (%s) from %s, endpoint: %s, %s" , img . Tag , repoInfo . CanonicalName , ep , err ) , nil ) )
2014-07-18 14:48:19 -04:00
continue
}
2014-09-23 18:53:43 -04:00
layers_downloaded = layers_downloaded || is_downloaded
2014-07-18 14:48:19 -04:00
success = true
break
2014-08-08 02:58:58 -04:00
}
}
if ! success {
2014-10-06 21:54:52 -04:00
err := fmt . Errorf ( "Error pulling image (%s) from %s, %v" , img . Tag , repoInfo . CanonicalName , lastErr )
2014-08-08 02:58:58 -04:00
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , err . Error ( ) , nil ) )
if parallel {
errors <- err
return
}
}
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Download complete" , nil ) )
if parallel {
errors <- nil
}
}
if parallel {
go downloadImage ( image )
} else {
downloadImage ( image )
}
}
if parallel {
var lastError error
for i := 0 ; i < len ( repoData . ImgList ) ; i ++ {
if err := <- errors ; err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
}
}
for tag , id := range tagsList {
2014-09-23 18:58:05 -04:00
if askedTag != "" && id != imageId {
2014-08-08 02:58:58 -04:00
continue
}
2014-10-06 21:54:52 -04:00
if err := s . Set ( repoInfo . LocalName , tag , id , true ) ; err != nil {
2014-08-08 02:58:58 -04:00
return err
}
}
2014-10-06 21:54:52 -04:00
requestedTag := repoInfo . CanonicalName
2014-09-23 18:53:43 -04:00
if len ( askedTag ) > 0 {
2014-10-06 21:54:52 -04:00
requestedTag = repoInfo . CanonicalName + ":" + askedTag
2014-09-23 18:53:43 -04:00
}
WriteStatus ( requestedTag , out , sf , layers_downloaded )
2014-08-08 02:58:58 -04:00
return nil
}
2014-09-23 18:53:43 -04:00
func ( s * TagStore ) pullImage ( r * registry . Session , out io . Writer , imgID , endpoint string , token [ ] string , sf * utils . StreamFormatter ) ( bool , error ) {
2014-08-08 02:58:58 -04:00
history , err := r . GetRemoteHistory ( imgID , endpoint , token )
if err != nil {
2014-09-23 18:53:43 -04:00
return false , err
2014-08-08 02:58:58 -04:00
}
out . Write ( sf . FormatProgress ( utils . TruncateID ( imgID ) , "Pulling dependent layers" , nil ) )
// FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines
2014-09-23 18:53:43 -04:00
layers_downloaded := false
2014-08-08 02:58:58 -04:00
for i := len ( history ) - 1 ; i >= 0 ; i -- {
id := history [ i ]
// ensure no two downloads of the same layer happen at the same time
if c , err := s . poolAdd ( "pull" , "layer:" + id ) ; err != nil {
2014-07-24 16:37:44 -04:00
log . Debugf ( "Image (id: %s) pull is already running, skipping: %v" , id , err )
2014-08-08 02:58:58 -04:00
<- c
}
defer s . poolRemove ( "pull" , "layer:" + id )
if ! s . graph . Exists ( id ) {
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Pulling metadata" , nil ) )
var (
imgJSON [ ] byte
imgSize int
err error
img * image . Image
)
retries := 5
for j := 1 ; j <= retries ; j ++ {
imgJSON , imgSize , err = r . GetRemoteImageJSON ( id , endpoint , token )
if err != nil && j == retries {
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Error pulling dependent layers" , nil ) )
2014-09-23 18:53:43 -04:00
return layers_downloaded , err
2014-08-08 02:58:58 -04:00
} else if err != nil {
time . Sleep ( time . Duration ( j ) * 500 * time . Millisecond )
continue
}
img , err = image . NewImgJSON ( imgJSON )
2014-09-23 18:53:43 -04:00
layers_downloaded = true
2014-08-08 02:58:58 -04:00
if err != nil && j == retries {
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Error pulling dependent layers" , nil ) )
2014-09-23 18:53:43 -04:00
return layers_downloaded , fmt . Errorf ( "Failed to parse json: %s" , err )
2014-08-08 02:58:58 -04:00
} else if err != nil {
time . Sleep ( time . Duration ( j ) * 500 * time . Millisecond )
continue
} else {
break
}
}
for j := 1 ; j <= retries ; j ++ {
// Get the layer
status := "Pulling fs layer"
if j > 1 {
status = fmt . Sprintf ( "Pulling fs layer [retries: %d]" , j )
}
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , status , nil ) )
layer , err := r . GetRemoteImageLayer ( img . ID , endpoint , token , int64 ( imgSize ) )
if uerr , ok := err . ( * url . Error ) ; ok {
err = uerr . Err
}
if terr , ok := err . ( net . Error ) ; ok && terr . Timeout ( ) && j < retries {
time . Sleep ( time . Duration ( j ) * 500 * time . Millisecond )
continue
} else if err != nil {
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Error pulling dependent layers" , nil ) )
2014-09-23 18:53:43 -04:00
return layers_downloaded , err
2014-08-08 02:58:58 -04:00
}
2014-09-23 18:53:43 -04:00
layers_downloaded = true
2014-08-08 02:58:58 -04:00
defer layer . Close ( )
2014-10-27 14:00:29 -04:00
err = s . graph . Register ( img ,
2014-08-28 06:57:54 -04:00
utils . ProgressReader ( layer , imgSize , out , sf , false , utils . TruncateID ( id ) , "Downloading" ) )
2014-08-08 02:58:58 -04:00
if terr , ok := err . ( net . Error ) ; ok && terr . Timeout ( ) && j < retries {
time . Sleep ( time . Duration ( j ) * 500 * time . Millisecond )
continue
} else if err != nil {
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Error downloading dependent layers" , nil ) )
2014-09-23 18:53:43 -04:00
return layers_downloaded , err
2014-08-08 02:58:58 -04:00
} else {
break
}
}
}
out . Write ( sf . FormatProgress ( utils . TruncateID ( id ) , "Download complete" , nil ) )
2014-09-23 18:53:43 -04:00
}
return layers_downloaded , nil
}
2014-08-08 02:58:58 -04:00
2014-09-23 18:53:43 -04:00
func WriteStatus ( requestedTag string , out io . Writer , sf * utils . StreamFormatter , layers_downloaded bool ) {
if layers_downloaded {
out . Write ( sf . FormatStatus ( "" , "Status: Downloaded newer image for %s" , requestedTag ) )
} else {
out . Write ( sf . FormatStatus ( "" , "Status: Image is up to date for %s" , requestedTag ) )
2014-08-08 02:58:58 -04:00
}
}
2014-10-01 21:26:06 -04:00
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
imgJSON [ ] byte
img * image . Image
tmpFile * os . File
length int64
downloaded bool
err chan error
}
2015-01-13 18:48:49 -05:00
func ( s * TagStore ) pullV2Repository ( eng * engine . Engine , r * registry . Session , out io . Writer , repoInfo * registry . RepositoryInfo , tag string , sf * utils . StreamFormatter , parallel bool ) error {
2015-01-14 19:46:31 -05:00
endpoint , err := r . V2RegistryEndpoint ( repoInfo . Index )
if err != nil {
return fmt . Errorf ( "error getting registry endpoint: %s" , err )
}
auth , err := r . GetV2Authorization ( endpoint , repoInfo . RemoteName , true )
2015-01-13 18:48:49 -05:00
if err != nil {
return fmt . Errorf ( "error getting authorization: %s" , err )
}
2014-10-08 19:14:54 -04:00
var layersDownloaded bool
2014-10-01 21:26:06 -04:00
if tag == "" {
2014-10-06 21:54:52 -04:00
log . Debugf ( "Pulling tag list from V2 registry for %s" , repoInfo . CanonicalName )
2015-01-14 19:46:31 -05:00
tags , err := r . GetV2RemoteTags ( endpoint , repoInfo . RemoteName , auth )
2014-10-01 21:26:06 -04:00
if err != nil {
return err
}
for _ , t := range tags {
2015-01-14 19:46:31 -05:00
if downloaded , err := s . pullV2Tag ( eng , r , out , endpoint , repoInfo , t , sf , parallel , auth ) ; err != nil {
2014-10-01 21:26:06 -04:00
return err
2014-10-08 19:14:54 -04:00
} else if downloaded {
layersDownloaded = true
2014-10-01 21:26:06 -04:00
}
}
} else {
2015-01-14 19:46:31 -05:00
if downloaded , err := s . pullV2Tag ( eng , r , out , endpoint , repoInfo , tag , sf , parallel , auth ) ; err != nil {
2014-10-01 21:26:06 -04:00
return err
2014-10-08 19:14:54 -04:00
} else if downloaded {
layersDownloaded = true
2014-10-01 21:26:06 -04:00
}
}
2014-10-06 21:54:52 -04:00
requestedTag := repoInfo . CanonicalName
2014-10-08 19:14:54 -04:00
if len ( tag ) > 0 {
2014-10-06 21:54:52 -04:00
requestedTag = repoInfo . CanonicalName + ":" + tag
2014-10-08 19:14:54 -04:00
}
WriteStatus ( requestedTag , out , sf , layersDownloaded )
2014-10-01 21:26:06 -04:00
return nil
}
2015-01-14 19:46:31 -05:00
func ( s * TagStore ) pullV2Tag ( eng * engine . Engine , r * registry . Session , out io . Writer , endpoint * registry . Endpoint , repoInfo * registry . RepositoryInfo , tag string , sf * utils . StreamFormatter , parallel bool , auth * registry . RequestAuthorization ) ( bool , error ) {
2014-10-01 21:26:06 -04:00
log . Debugf ( "Pulling tag from V2 registry: %q" , tag )
2015-01-14 19:46:31 -05:00
manifestBytes , err := r . GetV2ImageManifest ( endpoint , repoInfo . RemoteName , tag , auth )
2014-10-01 21:26:06 -04:00
if err != nil {
2014-10-08 19:14:54 -04:00
return false , err
2014-10-01 21:26:06 -04:00
}
manifest , verified , err := s . verifyManifest ( eng , manifestBytes )
if err != nil {
2014-10-08 19:14:54 -04:00
return false , fmt . Errorf ( "error verifying manifest: %s" , err )
2014-10-01 21:26:06 -04:00
}
2015-01-02 14:13:11 -05:00
if err := checkValidManifest ( manifest ) ; err != nil {
return false , err
2014-10-01 21:26:06 -04:00
}
if verified {
2015-01-20 22:19:11 -05:00
log . Printf ( "Image manifest for %s:%s has been verified" , repoInfo . CanonicalName , tag )
2014-10-08 19:11:04 -04:00
} else {
2014-10-06 21:54:52 -04:00
out . Write ( sf . FormatStatus ( tag , "Pulling from %s" , repoInfo . CanonicalName ) )
2014-10-01 21:26:06 -04:00
}
2015-01-20 22:19:11 -05:00
2014-10-09 20:34:34 -04:00
downloads := make ( [ ] downloadInfo , len ( manifest . FSLayers ) )
2014-10-01 21:26:06 -04:00
2014-10-09 20:34:34 -04:00
for i := len ( manifest . FSLayers ) - 1 ; i >= 0 ; i -- {
2014-10-01 21:26:06 -04:00
var (
2014-10-09 20:34:34 -04:00
sumStr = manifest . FSLayers [ i ] . BlobSum
imgJSON = [ ] byte ( manifest . History [ i ] . V1Compatibility )
2014-10-01 21:26:06 -04:00
)
img , err := image . NewImgJSON ( imgJSON )
if err != nil {
2014-10-08 19:14:54 -04:00
return false , fmt . Errorf ( "failed to parse json: %s" , err )
2014-10-01 21:26:06 -04:00
}
downloads [ i ] . img = img
// Check if exists
if s . graph . Exists ( img . ID ) {
log . Debugf ( "Image already exists: %s" , img . ID )
continue
}
chunks := strings . SplitN ( sumStr , ":" , 2 )
if len ( chunks ) < 2 {
2014-10-08 19:14:54 -04:00
return false , fmt . Errorf ( "expected 2 parts in the sumStr, got %#v" , chunks )
2014-10-01 21:26:06 -04:00
}
sumType , checksum := chunks [ 0 ] , chunks [ 1 ]
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Pulling fs layer" , nil ) )
downloadFunc := func ( di * downloadInfo ) error {
2014-10-08 19:11:04 -04:00
log . Debugf ( "pulling blob %q to V1 img %s" , sumStr , img . ID )
2014-10-01 21:26:06 -04:00
if c , err := s . poolAdd ( "pull" , "img:" + img . ID ) ; err != nil {
if c != nil {
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Layer already being pulled by another client. Waiting." , nil ) )
<- c
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Download complete" , nil ) )
} else {
log . Debugf ( "Image (id: %s) pull is already running, skipping: %v" , img . ID , err )
}
} else {
2014-10-03 19:16:03 -04:00
defer s . poolRemove ( "pull" , "img:" + img . ID )
2014-10-01 21:26:06 -04:00
tmpFile , err := ioutil . TempFile ( "" , "GetV2ImageBlob" )
if err != nil {
return err
}
2015-01-14 19:46:31 -05:00
r , l , err := r . GetV2ImageBlobReader ( endpoint , repoInfo . RemoteName , sumType , checksum , auth )
2014-10-01 21:26:06 -04:00
if err != nil {
return err
}
defer r . Close ( )
2014-12-23 16:40:06 -05:00
// Wrap the reader with the appropriate TarSum reader.
tarSumReader , err := tarsum . NewTarSumForLabel ( r , true , sumType )
if err != nil {
return fmt . Errorf ( "unable to wrap image blob reader with TarSum: %s" , err )
}
io . Copy ( tmpFile , utils . ProgressReader ( ioutil . NopCloser ( tarSumReader ) , int ( l ) , out , sf , false , utils . TruncateID ( img . ID ) , "Downloading" ) )
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Verifying Checksum" , nil ) )
if finalChecksum := tarSumReader . Sum ( nil ) ; ! strings . EqualFold ( finalChecksum , sumStr ) {
return fmt . Errorf ( "image verification failed: checksum mismatch - expected %q but got %q" , sumStr , finalChecksum )
}
2014-10-01 21:26:06 -04:00
out . Write ( sf . FormatProgress ( utils . TruncateID ( img . ID ) , "Download complete" , nil ) )
log . Debugf ( "Downloaded %s to tempfile %s" , img . ID , tmpFile . Name ( ) )
di . tmpFile = tmpFile
di . length = l
di . downloaded = true
}
di . imgJSON = imgJSON
return nil
}
if parallel {
downloads [ i ] . err = make ( chan error )
go func ( di * downloadInfo ) {
di . err <- downloadFunc ( di )
} ( & downloads [ i ] )
} else {
err := downloadFunc ( & downloads [ i ] )
if err != nil {
2014-10-08 19:14:54 -04:00
return false , err
2014-10-01 21:26:06 -04:00
}
}
}
2014-10-08 19:14:54 -04:00
var layersDownloaded bool
2014-10-01 21:26:06 -04:00
for i := len ( downloads ) - 1 ; i >= 0 ; i -- {
d := & downloads [ i ]
if d . err != nil {
err := <- d . err
if err != nil {
2014-10-08 19:14:54 -04:00
return false , err
2014-10-01 21:26:06 -04:00
}
}
if d . downloaded {
// if tmpFile is empty assume download and extracted elsewhere
defer os . Remove ( d . tmpFile . Name ( ) )
defer d . tmpFile . Close ( )
d . tmpFile . Seek ( 0 , 0 )
if d . tmpFile != nil {
2014-10-27 14:00:29 -04:00
err = s . graph . Register ( d . img ,
2014-10-01 21:26:06 -04:00
utils . ProgressReader ( d . tmpFile , int ( d . length ) , out , sf , false , utils . TruncateID ( d . img . ID ) , "Extracting" ) )
if err != nil {
2014-10-08 19:14:54 -04:00
return false , err
2014-10-01 21:26:06 -04:00
}
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
}
out . Write ( sf . FormatProgress ( utils . TruncateID ( d . img . ID ) , "Pull complete" , nil ) )
2014-10-08 19:14:54 -04:00
layersDownloaded = true
2014-10-01 21:26:06 -04:00
} else {
out . Write ( sf . FormatProgress ( utils . TruncateID ( d . img . ID ) , "Already exists" , nil ) )
}
}
2015-01-21 21:07:10 -05:00
out . Write ( sf . FormatStatus ( repoInfo . CanonicalName + ":" + tag , "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security." ) )
2015-01-20 22:19:11 -05:00
2014-10-06 21:54:52 -04:00
if err = s . Set ( repoInfo . LocalName , tag , downloads [ 0 ] . img . ID , true ) ; err != nil {
2014-10-08 19:14:54 -04:00
return false , err
2014-10-01 21:26:06 -04:00
}
2014-10-08 19:14:54 -04:00
return layersDownloaded , nil
2014-10-01 21:26:06 -04:00
}