2015-02-12 13:23:22 -05:00
package graph
import (
2015-08-05 20:47:37 -04:00
"errors"
2015-02-12 13:23:22 -05:00
"fmt"
"io"
"io/ioutil"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
2015-07-20 13:57:15 -04:00
"github.com/docker/docker/image"
2015-02-12 13:23:22 -05:00
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/trust"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
2015-07-21 18:54:26 -04:00
"golang.org/x/net/context"
2015-02-12 13:23:22 -05:00
)
type v2Puller struct {
* TagStore
2015-04-07 22:29:29 -04:00
endpoint registry . APIEndpoint
config * ImagePullConfig
sf * streamformatter . StreamFormatter
repoInfo * registry . RepositoryInfo
repo distribution . Repository
sessionID string
2015-02-12 13:23:22 -05:00
}
func ( p * v2Puller ) Pull ( tag string ) ( fallback bool , err error ) {
// TODO(tiborvass): was ReceiveTimeout
p . repo , err = NewV2Repository ( p . repoInfo , p . endpoint , p . config . MetaHeaders , p . config . AuthConfig )
if err != nil {
logrus . Debugf ( "Error getting v2 registry: %v" , err )
return true , err
}
2015-04-07 22:29:29 -04:00
p . sessionID = stringid . GenerateRandomID ( )
2015-02-12 13:23:22 -05:00
if err := p . pullV2Repository ( tag ) ; err != nil {
if registry . ContinueOnError ( err ) {
logrus . Debugf ( "Error trying v2 registry: %v" , err )
return true , err
}
return false , err
}
return false , nil
}
func ( p * v2Puller ) pullV2Repository ( tag string ) ( err error ) {
var tags [ ] string
taggedName := p . repoInfo . LocalName
if len ( tag ) > 0 {
tags = [ ] string { tag }
taggedName = utils . ImageReference ( p . repoInfo . LocalName , tag )
} else {
var err error
2015-07-21 18:54:26 -04:00
manSvc , err := p . repo . Manifests ( context . Background ( ) )
if err != nil {
return err
}
tags , err = manSvc . Tags ( )
2015-02-12 13:23:22 -05:00
if err != nil {
return err
}
}
2015-08-11 13:12:47 -04:00
broadcaster , found := p . poolAdd ( "pull" , taggedName )
2015-08-11 12:44:50 -04:00
if found {
2015-08-25 17:17:42 -04:00
// Another pull of the same repository is already taking place; just wait for it to finish
2015-08-11 13:12:47 -04:00
broadcaster . Add ( p . config . OutStream )
broadcaster . Wait ( )
2015-08-25 17:17:42 -04:00
return nil
2015-02-12 13:23:22 -05:00
}
defer p . poolRemove ( "pull" , taggedName )
2015-08-11 13:12:47 -04:00
broadcaster . Add ( p . config . OutStream )
2015-02-12 13:23:22 -05:00
var layersDownloaded bool
for _ , tag := range tags {
// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
2015-08-11 13:12:47 -04:00
pulledNew , err := p . pullV2Tag ( broadcaster , tag , taggedName )
2015-02-12 13:23:22 -05:00
if err != nil {
return err
}
layersDownloaded = layersDownloaded || pulledNew
}
2015-08-11 13:12:47 -04:00
writeStatus ( taggedName , broadcaster , p . sf , layersDownloaded )
2015-02-12 13:23:22 -05:00
return nil
}
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
2015-08-01 02:27:19 -04:00
img * image . Image
tmpFile * os . File
digest digest . Digest
layer distribution . ReadSeekCloser
size int64
err chan error
2015-08-05 20:47:37 -04:00
out io . Writer // Download progress is written here.
2015-02-12 13:23:22 -05:00
}
type errVerification struct { }
func ( errVerification ) Error ( ) string { return "verification failed" }
func ( p * v2Puller ) download ( di * downloadInfo ) {
logrus . Debugf ( "pulling blob %q to %s" , di . digest , di . img . ID )
2015-08-05 20:47:37 -04:00
out := di . out
2015-02-12 13:23:22 -05:00
2015-08-11 13:12:47 -04:00
broadcaster , found := p . poolAdd ( "pull" , "img:" + di . img . ID )
2015-08-11 12:44:50 -04:00
if found {
2015-08-11 13:12:47 -04:00
broadcaster . Add ( out )
broadcaster . Wait ( )
2015-08-25 17:17:42 -04:00
out . Write ( p . sf . FormatProgress ( stringid . TruncateID ( di . img . ID ) , "Download complete" , nil ) )
2015-02-12 13:23:22 -05:00
di . err <- nil
return
}
2015-08-11 13:12:47 -04:00
broadcaster . Add ( out )
2015-02-12 13:23:22 -05:00
defer p . poolRemove ( "pull" , "img:" + di . img . ID )
tmpFile , err := ioutil . TempFile ( "" , "GetImageBlob" )
if err != nil {
di . err <- err
return
}
2015-08-12 23:23:56 -04:00
di . tmpFile = tmpFile
2015-02-12 13:23:22 -05:00
2015-08-21 13:28:35 -04:00
blobs := p . repo . Blobs ( context . Background ( ) )
2015-02-12 13:23:22 -05:00
2015-08-21 13:28:35 -04:00
desc , err := blobs . Stat ( context . Background ( ) , di . digest )
2015-02-12 13:23:22 -05:00
if err != nil {
logrus . Debugf ( "Error statting layer: %v" , err )
di . err <- err
return
}
2015-07-21 18:54:26 -04:00
di . size = desc . Size
2015-02-12 13:23:22 -05:00
2015-08-21 13:28:35 -04:00
layerDownload , err := blobs . Open ( context . Background ( ) , di . digest )
2015-02-12 13:23:22 -05:00
if err != nil {
logrus . Debugf ( "Error fetching layer: %v" , err )
di . err <- err
return
}
defer layerDownload . Close ( )
verifier , err := digest . NewDigestVerifier ( di . digest )
if err != nil {
di . err <- err
return
}
reader := progressreader . New ( progressreader . Config {
In : ioutil . NopCloser ( io . TeeReader ( layerDownload , verifier ) ) ,
2015-08-11 13:12:47 -04:00
Out : broadcaster ,
2015-02-12 13:23:22 -05:00
Formatter : p . sf ,
2015-07-23 17:19:58 -04:00
Size : di . size ,
2015-02-12 13:23:22 -05:00
NewLines : false ,
ID : stringid . TruncateID ( di . img . ID ) ,
Action : "Downloading" ,
} )
io . Copy ( tmpFile , reader )
2015-08-11 13:12:47 -04:00
broadcaster . Write ( p . sf . FormatProgress ( stringid . TruncateID ( di . img . ID ) , "Verifying Checksum" , nil ) )
2015-02-12 13:23:22 -05:00
2015-08-01 02:27:19 -04:00
if ! verifier . Verified ( ) {
err = fmt . Errorf ( "filesystem layer verification failed for digest %s" , di . digest )
logrus . Error ( err )
di . err <- err
return
2015-02-12 13:23:22 -05:00
}
2015-08-11 13:12:47 -04:00
broadcaster . Write ( p . sf . FormatProgress ( stringid . TruncateID ( di . img . ID ) , "Download complete" , nil ) )
2015-02-12 13:23:22 -05:00
logrus . Debugf ( "Downloaded %s to tempfile %s" , di . img . ID , tmpFile . Name ( ) )
di . layer = layerDownload
di . err <- nil
}
2015-08-25 17:17:42 -04:00
func ( p * v2Puller ) pullV2Tag ( out io . Writer , tag , taggedName string ) ( verified bool , err error ) {
2015-02-12 13:23:22 -05:00
logrus . Debugf ( "Pulling tag from V2 registry: %q" , tag )
2015-07-21 18:54:26 -04:00
manSvc , err := p . repo . Manifests ( context . Background ( ) )
if err != nil {
return false , err
}
manifest , err := manSvc . GetByTag ( tag )
2015-02-12 13:23:22 -05:00
if err != nil {
return false , err
}
2015-08-05 20:47:37 -04:00
verified , err = p . validateManifest ( manifest , tag )
2015-02-12 13:23:22 -05:00
if err != nil {
return false , err
}
if verified {
logrus . Printf ( "Image manifest for %s has been verified" , taggedName )
}
2015-08-05 20:47:37 -04:00
// By using a pipeWriter for each of the downloads to write their progress
// to, we can avoid an issue where this function returns an error but
// leaves behind running download goroutines. By splitting the writer
// with a pipe, we can close the pipe if there is any error, consequently
// causing each download to cancel due to an error writing to this pipe.
pipeReader , pipeWriter := io . Pipe ( )
go func ( ) {
if _ , err := io . Copy ( out , pipeReader ) ; err != nil {
logrus . Errorf ( "error copying from layer download progress reader: %s" , err )
2015-08-17 17:11:40 -04:00
if err := pipeReader . CloseWithError ( err ) ; err != nil {
logrus . Errorf ( "error closing the progress reader: %s" , err )
}
2015-08-05 20:47:37 -04:00
}
} ( )
defer func ( ) {
if err != nil {
// All operations on the pipe are synchronous. This call will wait
// until all current readers/writers are done using the pipe then
// set the error. All successive reads/writes will return with this
// error.
pipeWriter . CloseWithError ( errors . New ( "download canceled" ) )
2015-08-31 17:24:05 -04:00
} else {
// If no error then just close the pipe.
pipeWriter . Close ( )
2015-08-05 20:47:37 -04:00
}
} ( )
2015-02-12 13:23:22 -05:00
out . Write ( p . sf . FormatStatus ( tag , "Pulling from %s" , p . repo . Name ( ) ) )
2015-08-12 23:23:56 -04:00
var downloads [ ] * downloadInfo
2015-04-07 22:29:29 -04:00
2015-08-12 23:23:56 -04:00
var layerIDs [ ] string
2015-04-07 22:29:29 -04:00
defer func ( ) {
p . graph . Release ( p . sessionID , layerIDs ... )
} ( )
2015-02-12 13:23:22 -05:00
for i := len ( manifest . FSLayers ) - 1 ; i >= 0 ; i -- {
2015-07-20 13:57:15 -04:00
img , err := image . NewImgJSON ( [ ] byte ( manifest . History [ i ] . V1Compatibility ) )
2015-02-12 13:23:22 -05:00
if err != nil {
logrus . Debugf ( "error getting image v1 json: %v" , err )
return false , err
}
2015-04-07 22:29:29 -04:00
p . graph . Retain ( p . sessionID , img . ID )
layerIDs = append ( layerIDs , img . ID )
2015-02-12 13:23:22 -05:00
// Check if exists
if p . graph . Exists ( img . ID ) {
logrus . Debugf ( "Image already exists: %s" , img . ID )
2015-08-12 23:23:56 -04:00
out . Write ( p . sf . FormatProgress ( stringid . TruncateID ( img . ID ) , "Already exists" , nil ) )
2015-02-12 13:23:22 -05:00
continue
}
out . Write ( p . sf . FormatProgress ( stringid . TruncateID ( img . ID ) , "Pulling fs layer" , nil ) )
2015-08-12 23:23:56 -04:00
d := & downloadInfo {
img : img ,
digest : manifest . FSLayers [ i ] . BlobSum ,
// TODO: seems like this chan buffer solved hanging problem in go1.5,
// this can indicate some deeper problem that somehow we never take
// error from channel in loop below
err : make ( chan error , 1 ) ,
out : pipeWriter ,
}
downloads = append ( downloads , d )
go p . download ( d )
2015-02-12 13:23:22 -05:00
}
2015-08-12 23:23:56 -04:00
// run clean for all downloads to prevent leftovers
for _ , d := range downloads {
defer func ( d * downloadInfo ) {
2015-02-12 13:23:22 -05:00
if d . tmpFile != nil {
2015-08-12 23:23:56 -04:00
d . tmpFile . Close ( )
if err := os . RemoveAll ( d . tmpFile . Name ( ) ) ; err != nil {
logrus . Errorf ( "Failed to remove temp file: %s" , d . tmpFile . Name ( ) )
2015-02-12 13:23:22 -05:00
}
2015-08-12 23:23:56 -04:00
}
} ( d )
}
2015-02-12 13:23:22 -05:00
2015-08-12 23:23:56 -04:00
var tagUpdated bool
for _ , d := range downloads {
if err := <- d . err ; err != nil {
return false , err
}
if d . layer == nil {
continue
}
// if tmpFile is empty assume download and extracted elsewhere
d . tmpFile . Seek ( 0 , 0 )
reader := progressreader . New ( progressreader . Config {
In : d . tmpFile ,
Out : out ,
Formatter : p . sf ,
Size : d . size ,
NewLines : false ,
ID : stringid . TruncateID ( d . img . ID ) ,
Action : "Extracting" ,
} )
err = p . graph . Register ( d . img , reader )
if err != nil {
return false , err
}
2015-02-12 13:23:22 -05:00
2015-08-12 23:23:56 -04:00
if err := p . graph . SetDigest ( d . img . ID , d . digest ) ; err != nil {
return false , err
2015-02-12 13:23:22 -05:00
}
2015-08-12 23:23:56 -04:00
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
out . Write ( p . sf . FormatProgress ( stringid . TruncateID ( d . img . ID ) , "Pull complete" , nil ) )
tagUpdated = true
2015-02-12 13:23:22 -05:00
}
2015-07-15 16:42:45 -04:00
manifestDigest , _ , err := digestFromManifest ( manifest , p . repoInfo . LocalName )
2015-02-12 13:23:22 -05:00
if err != nil {
return false , err
}
// Check for new tag if no layers downloaded
if ! tagUpdated {
repo , err := p . Get ( p . repoInfo . LocalName )
if err != nil {
return false , err
}
if repo != nil {
if _ , exists := repo [ tag ] ; ! exists {
tagUpdated = true
}
} else {
tagUpdated = true
}
}
if verified && tagUpdated {
out . Write ( p . sf . FormatStatus ( p . repo . Name ( ) + ":" + tag , "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security." ) )
}
2015-08-12 23:23:56 -04:00
firstID := layerIDs [ len ( layerIDs ) - 1 ]
2015-02-12 13:23:22 -05:00
if utils . DigestReference ( tag ) {
// TODO(stevvooe): Ideally, we should always set the digest so we can
// use the digest whether we pull by it or not. Unfortunately, the tag
// store treats the digest as a separate tag, meaning there may be an
// untagged digest image that would seem to be dangling by a user.
2015-08-12 23:23:56 -04:00
if err = p . SetDigest ( p . repoInfo . LocalName , tag , firstID ) ; err != nil {
2015-02-12 13:23:22 -05:00
return false , err
}
} else {
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
2015-08-12 23:23:56 -04:00
if err = p . Tag ( p . repoInfo . LocalName , tag , firstID , true ) ; err != nil {
2015-02-12 13:23:22 -05:00
return false , err
}
}
if manifestDigest != "" {
out . Write ( p . sf . FormatStatus ( "" , "Digest: %s" , manifestDigest ) )
}
return tagUpdated , nil
}
// verifyTrustedKeys checks the keys provided against the trust store,
// ensuring that the provided keys are trusted for the namespace. The keys
// provided from this method must come from the signatures provided as part of
// the manifest JWS package, obtained from unpackSignedManifest or libtrust.
func ( p * v2Puller ) verifyTrustedKeys ( namespace string , keys [ ] libtrust . PublicKey ) ( verified bool , err error ) {
if namespace [ 0 ] != '/' {
namespace = "/" + namespace
}
for _ , key := range keys {
b , err := key . MarshalJSON ( )
if err != nil {
return false , fmt . Errorf ( "error marshalling public key: %s" , err )
}
// Check key has read/write permission (0x03)
v , err := p . trustService . CheckKey ( namespace , b , 0x03 )
if err != nil {
vErr , ok := err . ( trust . NotVerifiedError )
if ! ok {
return false , fmt . Errorf ( "error running key check: %s" , err )
}
logrus . Debugf ( "Key check result: %v" , vErr )
}
verified = v
}
if verified {
logrus . Debug ( "Key check result: verified" )
}
return
}
func ( p * v2Puller ) validateManifest ( m * manifest . SignedManifest , tag string ) ( verified bool , err error ) {
2015-08-01 02:27:19 -04:00
// If pull by digest, then verify the manifest digest. NOTE: It is
// important to do this first, before any other content validation. If the
// digest cannot be verified, don't even bother with those other things.
if manifestDigest , err := digest . ParseDigest ( tag ) ; err == nil {
verifier , err := digest . NewDigestVerifier ( manifestDigest )
if err != nil {
return false , err
}
payload , err := m . Payload ( )
if err != nil {
return false , err
}
if _ , err := verifier . Write ( payload ) ; err != nil {
return false , err
}
if ! verifier . Verified ( ) {
err := fmt . Errorf ( "image verification failed for digest %s" , manifestDigest )
logrus . Error ( err )
return false , err
}
}
2015-02-12 13:23:22 -05:00
// TODO(tiborvass): what's the usecase for having manifest == nil and err == nil ? Shouldn't be the error be "DoesNotExist" ?
if m == nil {
return false , fmt . Errorf ( "image manifest does not exist for tag %q" , tag )
}
if m . SchemaVersion != 1 {
return false , fmt . Errorf ( "unsupported schema version %d for tag %q" , m . SchemaVersion , tag )
}
if len ( m . FSLayers ) != len ( m . History ) {
return false , fmt . Errorf ( "length of history not equal to number of layers for tag %q" , tag )
}
if len ( m . FSLayers ) == 0 {
return false , fmt . Errorf ( "no FSLayers in manifest for tag %q" , tag )
}
keys , err := manifest . Verify ( m )
if err != nil {
return false , fmt . Errorf ( "error verifying manifest for tag %q: %v" , tag , err )
}
verified , err = p . verifyTrustedKeys ( m . Name , keys )
if err != nil {
return false , fmt . Errorf ( "error verifying manifest keys: %v" , err )
}
return verified , nil
}