Merge pull request #12490 from LK4D4/carry_12396

remove job from pull and import
This commit is contained in:
Brian Goff 2015-04-17 15:26:30 -04:00
commit 055c6dbaef
7 changed files with 89 additions and 71 deletions

View File

@ -668,7 +668,6 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
image = r.Form.Get("fromImage") image = r.Form.Get("fromImage")
repo = r.Form.Get("repo") repo = r.Form.Get("repo")
tag = r.Form.Get("tag") tag = r.Form.Get("tag")
job *engine.Job
) )
authEncoded := r.Header.Get("X-Registry-Auth") authEncoded := r.Header.Get("X-Registry-Auth")
authConfig := &registry.AuthConfig{} authConfig := &registry.AuthConfig{}
@ -680,6 +679,9 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
authConfig = &registry.AuthConfig{} authConfig = &registry.AuthConfig{}
} }
} }
d := getDaemon(eng)
if image != "" { //pull if image != "" { //pull
if tag == "" { if tag == "" {
image, tag = parsers.ParseRepositoryTag(image) image, tag = parsers.ParseRepositoryTag(image)
@ -690,31 +692,45 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
metaHeaders[k] = v metaHeaders[k] = v
} }
} }
job = eng.Job("pull", image, tag)
job.SetenvBool("parallel", version.GreaterThan("1.3")) imagePullConfig := &graph.ImagePullConfig{
job.SetenvJson("metaHeaders", metaHeaders) Parallel: version.GreaterThan("1.3"),
job.SetenvJson("authConfig", authConfig) MetaHeaders: metaHeaders,
AuthConfig: authConfig,
OutStream: utils.NewWriteFlusher(w),
}
if version.GreaterThan("1.0") {
imagePullConfig.Json = true
w.Header().Set("Content-Type", "application/json")
} else {
imagePullConfig.Json = false
}
if err := d.Repositories().Pull(image, tag, imagePullConfig, eng); err != nil {
return err
}
} else { //import } else { //import
if tag == "" { if tag == "" {
repo, tag = parsers.ParseRepositoryTag(repo) repo, tag = parsers.ParseRepositoryTag(repo)
} }
job = eng.Job("import", r.Form.Get("fromSrc"), repo, tag)
job.Stdin.Add(r.Body)
job.SetenvList("changes", r.Form["changes"])
}
if version.GreaterThan("1.0") { src := r.Form.Get("fromSrc")
job.SetenvBool("json", true) imageImportConfig := &graph.ImageImportConfig{
streamJSON(job, w, true) Changes: r.Form["changes"],
} else { InConfig: r.Body,
job.Stdout.Add(utils.NewWriteFlusher(w)) OutStream: utils.NewWriteFlusher(w),
} }
if err := job.Run(); err != nil { if version.GreaterThan("1.0") {
if !job.Stdout.Used() { imageImportConfig.Json = true
w.Header().Set("Content-Type", "application/json")
} else {
imageImportConfig.Json = false
}
if err := d.Repositories().Import(src, repo, tag, imageImportConfig, eng); err != nil {
return err return err
} }
sf := streamformatter.NewStreamFormatter(version.GreaterThan("1.0"))
w.Write(sf.FormatError(err))
} }
return nil return nil

View File

@ -22,6 +22,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/builder/parser" "github.com/docker/docker/builder/parser"
"github.com/docker/docker/daemon" "github.com/docker/docker/daemon"
"github.com/docker/docker/graph"
imagepkg "github.com/docker/docker/image" imagepkg "github.com/docker/docker/image"
"github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive" "github.com/docker/docker/pkg/chrootarchive"
@ -434,7 +435,7 @@ func (b *Builder) pullImage(name string) (*imagepkg.Image, error) {
if tag == "" { if tag == "" {
tag = "latest" tag = "latest"
} }
job := b.Engine.Job("pull", remote, tag)
pullRegistryAuth := b.AuthConfig pullRegistryAuth := b.AuthConfig
if len(b.AuthConfigFile.Configs) > 0 { if len(b.AuthConfigFile.Configs) > 0 {
// The request came with a full auth config file, we prefer to use that // The request came with a full auth config file, we prefer to use that
@ -445,13 +446,18 @@ func (b *Builder) pullImage(name string) (*imagepkg.Image, error) {
resolvedAuth := b.AuthConfigFile.ResolveAuthConfig(repoInfo.Index) resolvedAuth := b.AuthConfigFile.ResolveAuthConfig(repoInfo.Index)
pullRegistryAuth = &resolvedAuth pullRegistryAuth = &resolvedAuth
} }
job.SetenvBool("json", b.StreamFormatter.Json())
job.SetenvBool("parallel", true) imagePullConfig := &graph.ImagePullConfig{
job.SetenvJson("authConfig", pullRegistryAuth) Parallel: true,
job.Stdout.Add(ioutils.NopWriteCloser(b.OutOld)) AuthConfig: pullRegistryAuth,
if err := job.Run(); err != nil { OutStream: ioutils.NopWriteCloser(b.OutOld),
Json: b.StreamFormatter.Json(),
}
if err := b.Daemon.Repositories().Pull(remote, tag, imagePullConfig, b.Engine); err != nil {
return nil, err return nil, err
} }
image, err := b.Daemon.Repositories().LookupImage(name) image, err := b.Daemon.Repositories().LookupImage(name)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -3,7 +3,7 @@ package graph
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "io"
"net/http" "net/http"
"net/url" "net/url"
@ -16,26 +16,25 @@ import (
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
func (s *TagStore) CmdImport(job *engine.Job) error { type ImageImportConfig struct {
if n := len(job.Args); n != 2 && n != 3 { Changes []string
return fmt.Errorf("Usage: %s SRC REPO [TAG]", job.Name) InConfig io.ReadCloser
} Json bool
OutStream io.Writer
//OutStream WriteFlusher
}
func (s *TagStore) Import(src string, repo string, tag string, imageImportConfig *ImageImportConfig, eng *engine.Engine) error {
var ( var (
src = job.Args[0] sf = streamformatter.NewStreamFormatter(imageImportConfig.Json)
repo = job.Args[1]
tag string
sf = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
archive archive.ArchiveReader archive archive.ArchiveReader
resp *http.Response resp *http.Response
stdoutBuffer = bytes.NewBuffer(nil) stdoutBuffer = bytes.NewBuffer(nil)
newConfig runconfig.Config newConfig runconfig.Config
) )
if len(job.Args) > 2 {
tag = job.Args[2]
}
if src == "-" { if src == "-" {
archive = job.Stdin archive = imageImportConfig.InConfig
} else { } else {
u, err := url.Parse(src) u, err := url.Parse(src)
if err != nil { if err != nil {
@ -46,14 +45,14 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
u.Host = src u.Host = src
u.Path = "" u.Path = ""
} }
job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u)) imageImportConfig.OutStream.Write(sf.FormatStatus("", "Downloading from %s", u))
resp, err = httputils.Download(u.String()) resp, err = httputils.Download(u.String())
if err != nil { if err != nil {
return err return err
} }
progressReader := progressreader.New(progressreader.Config{ progressReader := progressreader.New(progressreader.Config{
In: resp.Body, In: resp.Body,
Out: job.Stdout, Out: imageImportConfig.OutStream,
Formatter: sf, Formatter: sf,
Size: int(resp.ContentLength), Size: int(resp.ContentLength),
NewLines: true, NewLines: true,
@ -64,11 +63,11 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
archive = progressReader archive = progressReader
} }
buildConfigJob := job.Eng.Job("build_config") buildConfigJob := eng.Job("build_config")
buildConfigJob.Stdout.Add(stdoutBuffer) buildConfigJob.Stdout.Add(stdoutBuffer)
buildConfigJob.Setenv("changes", job.Getenv("changes")) buildConfigJob.SetenvList("changes", imageImportConfig.Changes)
// FIXME this should be remove when we remove deprecated config param // FIXME this should be remove when we remove deprecated config param
buildConfigJob.Setenv("config", job.Getenv("config")) //buildConfigJob.Setenv("config", job.Getenv("config"))
if err := buildConfigJob.Run(); err != nil { if err := buildConfigJob.Run(); err != nil {
return err return err
@ -87,7 +86,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
return err return err
} }
} }
job.Stdout.Write(sf.FormatStatus("", img.ID)) imageImportConfig.OutStream.Write(sf.FormatStatus("", img.ID))
logID := img.ID logID := img.ID
if tag != "" { if tag != "" {
logID = utils.ImageReference(logID, tag) logID = utils.ImageReference(logID, tag)

View File

@ -21,37 +21,30 @@ import (
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
func (s *TagStore) CmdPull(job *engine.Job) error { type ImagePullConfig struct {
if n := len(job.Args); n != 1 && n != 2 { Parallel bool
return fmt.Errorf("Usage: %s IMAGE [TAG|DIGEST]", job.Name) MetaHeaders map[string][]string
} AuthConfig *registry.AuthConfig
Json bool
OutStream io.Writer
}
func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConfig, eng *engine.Engine) error {
var ( var (
localName = job.Args[0] sf = streamformatter.NewStreamFormatter(imagePullConfig.Json)
tag string
sf = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
authConfig = &registry.AuthConfig{}
metaHeaders map[string][]string
) )
// Resolve the Repository name from fqn to RepositoryInfo // Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := s.registryService.ResolveRepository(localName) repoInfo, err := s.registryService.ResolveRepository(image)
if err != nil { if err != nil {
return err return err
} }
if len(job.Args) > 1 {
tag = job.Args[1]
}
job.GetenvJson("authConfig", authConfig)
job.GetenvJson("metaHeaders", &metaHeaders)
c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag)) c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag))
if err != nil { if err != nil {
if c != nil { if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish // Another pull of the same repository is already taking place; just wait for it to finish
job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName)) imagePullConfig.OutStream.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
<-c <-c
return nil return nil
} }
@ -65,7 +58,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
return err return err
} }
r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true) r, err := registry.NewSession(imagePullConfig.AuthConfig, registry.HTTPRequestFactory(imagePullConfig.MetaHeaders), endpoint, true)
if err != nil { if err != nil {
return err return err
} }
@ -77,14 +70,14 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) { if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
if repoInfo.Official { if repoInfo.Official {
j := job.Eng.Job("trust_update_base") j := eng.Job("trust_update_base")
if err = j.Run(); err != nil { if err = j.Run(); err != nil {
logrus.Errorf("error updating trust base graph: %s", err) logrus.Errorf("error updating trust base graph: %s", err)
} }
} }
logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName) logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil { if err := s.pullV2Repository(eng, r, imagePullConfig.OutStream, repoInfo, tag, sf, imagePullConfig.Parallel); err == nil {
s.eventsService.Log("pull", logName, "") s.eventsService.Log("pull", logName, "")
return nil return nil
} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable { } else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
@ -95,7 +88,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
} }
logrus.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName) logrus.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName)
if err = s.pullRepository(r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err != nil { if err = s.pullRepository(r, imagePullConfig.OutStream, repoInfo, tag, sf, imagePullConfig.Parallel); err != nil {
return err return err
} }

View File

@ -19,8 +19,6 @@ func (s *TagStore) Install(eng *engine.Engine) error {
"image_export": s.CmdImageExport, "image_export": s.CmdImageExport,
"viz": s.CmdViz, "viz": s.CmdViz,
"load": s.CmdLoad, "load": s.CmdLoad,
"import": s.CmdImport,
"pull": s.CmdPull,
"push": s.CmdPush, "push": s.CmdPush,
} { } {
if err := eng.Register(name, handler); err != nil { if err := eng.Register(name, handler); err != nil {

View File

@ -143,6 +143,7 @@ func runCommandPipelineWithOutput(cmds ...*exec.Cmd) (output string, exitCode in
if i > 0 { if i > 0 {
prevCmd := cmds[i-1] prevCmd := cmds[i-1]
cmd.Stdin, err = prevCmd.StdoutPipe() cmd.Stdin, err = prevCmd.StdoutPipe()
if err != nil { if err != nil {
return "", 0, fmt.Errorf("cannot set stdout pipe for %s: %v", cmd.Path, err) return "", 0, fmt.Errorf("cannot set stdout pipe for %s: %v", cmd.Path, err)
} }

View File

@ -28,6 +28,7 @@ import (
"github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/reexec" "github.com/docker/docker/pkg/reexec"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig" "github.com/docker/docker/runconfig"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
@ -133,9 +134,13 @@ func setupBaseImage() {
// If the unit test is not found, try to download it. // If the unit test is not found, try to download it.
if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID { if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID {
// Retrieve the Image // Retrieve the Image
job = eng.Job("pull", unitTestImageName) imagePullConfig := &graph.ImagePullConfig{
job.Stdout.Add(ioutils.NopWriteCloser(os.Stdout)) Parallel: true,
if err := job.Run(); err != nil { OutStream: ioutils.NopWriteCloser(os.Stdout),
AuthConfig: &registry.AuthConfig{},
}
d := getDaemon(eng)
if err := d.Repositories().Pull(unitTestImageName, "", imagePullConfig, eng); err != nil {
logrus.Fatalf("Unable to pull the test image: %s", err) logrus.Fatalf("Unable to pull the test image: %s", err)
} }
} }