1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #3725 from vieux/pull_import_job

Pull import job
This commit is contained in:
Victor Vieux 2014-01-24 14:47:40 -08:00
commit 4a96c329c0
8 changed files with 109 additions and 49 deletions

38
api.go
View file

@ -413,11 +413,11 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
return err
}
src := r.Form.Get("fromSrc")
image := r.Form.Get("fromImage")
tag := r.Form.Get("tag")
repo := r.Form.Get("repo")
var (
image = r.Form.Get("fromImage")
tag = r.Form.Get("tag")
job *engine.Job
)
authEncoded := r.Header.Get("X-Registry-Auth")
authConfig := &auth.AuthConfig{}
if authEncoded != "" {
@ -431,7 +431,6 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
if version > 1.0 {
w.Header().Set("Content-Type", "application/json")
}
sf := utils.NewStreamFormatter(version > 1.0)
if image != "" { //pull
metaHeaders := map[string][]string{}
for k, v := range r.Header {
@ -439,22 +438,25 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
metaHeaders[k] = v
}
}
if err := srv.ImagePull(image, tag, w, sf, authConfig, metaHeaders, version > 1.3); err != nil {
if sf.Used() {
w.Write(sf.FormatError(err))
return nil
}
return err
}
job = srv.Eng.Job("pull", r.Form.Get("fromImage"), tag)
job.SetenvBool("parallel", version > 1.3)
job.SetenvJson("metaHeaders", metaHeaders)
job.SetenvJson("authConfig", authConfig)
} else { //import
if err := srv.ImageImport(src, repo, tag, r.Body, w, sf); err != nil {
if sf.Used() {
w.Write(sf.FormatError(err))
return nil
}
job = srv.Eng.Job("import", r.Form.Get("fromSrc"), r.Form.Get("repo"), tag)
job.Stdin.Add(r.Body)
}
job.SetenvBool("json", version > 1.0)
job.Stdout.Add(utils.NewWriteFlusher(w))
if err := job.Run(); err != nil {
if !job.Stdout.Used() {
return err
}
sf := utils.NewStreamFormatter(version > 1.0)
w.Write(sf.FormatError(err))
}
return nil
}

View file

@ -84,7 +84,12 @@ func (b *buildFile) CmdFrom(name string) error {
resolvedAuth := b.configFile.ResolveAuthConfig(endpoint)
pullRegistryAuth = &resolvedAuth
}
if err := b.srv.ImagePull(remote, tag, b.outOld, b.sf, pullRegistryAuth, nil, true); err != nil {
job := b.srv.Eng.Job("pull", remote, tag)
job.SetenvBool("json", b.sf.Json())
job.SetenvBool("parallel", true)
job.SetenvJson("authConfig", pullRegistryAuth)
job.Stdout.Add(b.outOld)
if err := job.Run(); err != nil {
return err
}
image, err = b.runtime.repositories.LookupImage(name)

View file

@ -137,6 +137,9 @@ func (eng *Engine) Job(name string, args ...string) *Job {
}
func (eng *Engine) Logf(format string, args ...interface{}) (n int, err error) {
prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n"))
return fmt.Fprintf(eng.Stderr, prefixedFormat, args...)
if os.Getenv("TEST") == "" {
prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n"))
return fmt.Fprintf(eng.Stderr, prefixedFormat, args...)
}
return 0, nil
}

View file

@ -3,6 +3,7 @@ package engine
import (
"fmt"
"io"
"os"
"strings"
"time"
)
@ -176,8 +177,11 @@ func (job *Job) Environ() map[string]string {
}
func (job *Job) Logf(format string, args ...interface{}) (n int, err error) {
prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n"))
return fmt.Fprintf(job.Stderr, prefixedFormat, args...)
if os.Getenv("TEST") == "" {
prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n"))
return fmt.Fprintf(job.Stderr, prefixedFormat, args...)
}
return 0, nil
}
func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {

View file

@ -137,7 +137,9 @@ func setupBaseImage() {
// If the unit test is not found, try to download it.
if img, err := srv.ImageInspect(unitTestImageName); err != nil || img.ID != unitTestImageID {
// Retrieve the Image
if err := srv.ImagePull(unitTestImageName, "", os.Stdout, utils.NewStreamFormatter(false), nil, nil, true); err != nil {
job = eng.Job("pull", unitTestImageName)
job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
if err := job.Run(); err != nil {
log.Fatalf("Unable to pull the test image: %s", err)
}
}

View file

@ -2,8 +2,6 @@ package docker
import (
"github.com/dotcloud/docker"
"github.com/dotcloud/docker/utils"
"io/ioutil"
"testing"
"time"
)
@ -53,5 +51,8 @@ func generateImage(name string, srv *docker.Server) error {
if err != nil {
return err
}
return srv.ImageImport("-", "repo", name, archive, ioutil.Discard, utils.NewStreamFormatter(true))
job := srv.Eng.Job("import", "-", "repo", name)
job.Stdin.Add(archive)
job.SetenvBool("json", true)
return job.Run()
}

View file

@ -97,6 +97,8 @@ func jobInitApi(job *engine.Job) engine.Status {
"top": srv.ContainerTop,
"load": srv.ImageLoad,
"build": srv.Build,
"pull": srv.ImagePull,
"import": srv.ImageImport,
} {
if err := job.Eng.Register(name, handler); err != nil {
job.Error(err)
@ -1312,30 +1314,49 @@ func (srv *Server) poolRemove(kind, key string) error {
return nil
}
func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string, parallel bool) error {
out = utils.NewWriteFlusher(out)
func (srv *Server) ImagePull(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 && n != 2 {
job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
return engine.StatusErr
}
var (
localName = job.Args[0]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
authConfig = &auth.AuthConfig{}
metaHeaders map[string][]string
)
if len(job.Args) > 1 {
tag = job.Args[1]
}
job.GetenvJson("authConfig", authConfig)
job.GetenvJson("metaHeaders", metaHeaders)
c, err := srv.poolAdd("pull", localName+":"+tag)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
<-c
return nil
return engine.StatusOK
}
return err
job.Error(err)
return engine.StatusErr
}
defer srv.poolRemove("pull", localName+":"+tag)
// Resolve the Repository name from fqn to endpoint + name
endpoint, remoteName, err := registry.ResolveRepositoryName(localName)
if err != nil {
return err
job.Error(err)
return engine.StatusErr
}
r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
if err != nil {
return err
job.Error(err)
return engine.StatusErr
}
if endpoint == auth.IndexServerAddress() {
@ -1343,11 +1364,12 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
localName = remoteName
}
if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, parallel); err != nil {
return err
if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
job.Error(err)
return engine.StatusErr
}
return nil
return engine.StatusOK
}
// Retrieve the all the images to be uploaded in the correct order
@ -1551,43 +1573,60 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo
return nil
}
func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Writer, sf *utils.StreamFormatter) error {
var archive io.Reader
var resp *http.Response
func (srv *Server) ImageImport(job *engine.Job) engine.Status {
if n := len(job.Args); n != 2 && n != 3 {
job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
return engine.StatusErr
}
var (
src = job.Args[0]
repo = job.Args[1]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
archive io.Reader
resp *http.Response
)
if len(job.Args) > 2 {
tag = job.Args[2]
}
if src == "-" {
archive = in
archive = job.Stdin
} else {
u, err := url.Parse(src)
if err != nil {
return err
job.Error(err)
return engine.StatusErr
}
if u.Scheme == "" {
u.Scheme = "http"
u.Host = src
u.Path = ""
}
out.Write(sf.FormatStatus("", "Downloading from %s", u))
job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
// Download with curl (pretty progress bar)
// If curl is not available, fallback to http.Get()
resp, err = utils.Download(u.String())
if err != nil {
return err
job.Error(err)
return engine.StatusErr
}
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
}
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
if err != nil {
return err
job.Error(err)
return engine.StatusErr
}
// Optionally register the image at REPO/TAG
if repo != "" {
if err := srv.runtime.repositories.Set(repo, tag, img.ID, true); err != nil {
return err
job.Error(err)
return engine.StatusErr
}
}
out.Write(sf.FormatStatus("", img.ID))
return nil
job.Stdout.Write(sf.FormatStatus("", img.ID))
return engine.StatusOK
}
func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {

View file

@ -82,3 +82,7 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgr
func (sf *StreamFormatter) Used() bool {
return sf.used
}
func (sf *StreamFormatter) Json() bool {
return sf.json
}