1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #3800 from vieux/cleanup_jobs

Cleanup jobs
This commit is contained in:
unclejack 2014-01-31 06:55:12 -08:00
commit 35ef0d37c2
7 changed files with 318 additions and 462 deletions

85
api.go
View file

@ -89,18 +89,10 @@ func httpError(w http.ResponseWriter, err error) {
}
}
func writeJSON(w http.ResponseWriter, code int, v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
func writeJSON(w http.ResponseWriter, code int, v engine.Env) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
w.Write(b)
return nil
return v.Encode(w)
}
func getBoolParam(value string) (bool, error) {
@ -352,25 +344,28 @@ func postCommit(srv *Server, version float64, w http.ResponseWriter, r *http.Req
if err := parseForm(r); err != nil {
return err
}
config := &Config{}
if err := json.NewDecoder(r.Body).Decode(config); err != nil && err != io.EOF {
var (
config engine.Env
env engine.Env
job = srv.Eng.Job("commit", r.Form.Get("container"))
)
if err := config.Import(r.Body); err != nil {
utils.Errorf("%s", err)
}
job := srv.Eng.Job("commit", r.Form.Get("container"))
job.Setenv("repo", r.Form.Get("repo"))
job.Setenv("tag", r.Form.Get("tag"))
job.Setenv("author", r.Form.Get("author"))
job.Setenv("comment", r.Form.Get("comment"))
job.SetenvJson("config", config)
job.SetenvSubEnv("config", &config)
var id string
job.Stdout.AddString(&id)
if err := job.Run(); err != nil {
return err
}
return writeJSON(w, http.StatusCreated, &APIID{id})
env.Set("Id", id)
return writeJSON(w, http.StatusCreated, env)
}
// Creates an image from Pull or from Import
@ -555,15 +550,19 @@ func postContainersCreate(srv *Server, version float64, w http.ResponseWriter, r
if err := parseForm(r); err != nil {
return nil
}
out := &APIRun{}
job := srv.Eng.Job("create", r.Form.Get("name"))
var (
out engine.Env
job = srv.Eng.Job("create", r.Form.Get("name"))
outWarnings []string
outId string
warnings = bytes.NewBuffer(nil)
)
if err := job.DecodeEnv(r.Body); err != nil {
return err
}
// Read container ID from the first line of stdout
job.Stdout.AddString(&out.ID)
job.Stdout.AddString(&outId)
// Read warnings from stderr
warnings := &bytes.Buffer{}
job.Stderr.Add(warnings)
if err := job.Run(); err != nil {
return err
@ -571,8 +570,10 @@ func postContainersCreate(srv *Server, version float64, w http.ResponseWriter, r
// Parse warnings from stderr
scanner := bufio.NewScanner(warnings)
for scanner.Scan() {
out.Warnings = append(out.Warnings, scanner.Text())
outWarnings = append(outWarnings, scanner.Text())
}
out.Set("Id", outId)
out.SetList("Warnings", outWarnings)
return writeJSON(w, http.StatusCreated, out)
}
@ -664,18 +665,22 @@ func postContainersWait(srv *Server, version float64, w http.ResponseWriter, r *
if vars == nil {
return fmt.Errorf("Missing parameter")
}
job := srv.Eng.Job("wait", vars["name"])
var statusStr string
job.Stdout.AddString(&statusStr)
var (
env engine.Env
status string
job = srv.Eng.Job("wait", vars["name"])
)
job.Stdout.AddString(&status)
if err := job.Run(); err != nil {
return err
}
// Parse a 16-bit encoded integer to map typical unix exit status.
status, err := strconv.ParseInt(statusStr, 10, 16)
_, err := strconv.ParseInt(status, 10, 16)
if err != nil {
return err
}
return writeJSON(w, http.StatusOK, &APIWait{StatusCode: int(status)})
env.Set("StatusCode", status)
return writeJSON(w, http.StatusOK, env)
}
func postContainersResize(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -699,18 +704,14 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
return fmt.Errorf("Missing parameter")
}
// TODO: replace the buffer by job.AddEnv()
var (
job = srv.Eng.Job("inspect", vars["name"], "container")
buffer = bytes.NewBuffer(nil)
c Container
c, err = job.Stdout.AddEnv()
)
job.Stdout.Add(buffer)
if err := job.Run(); err != nil {
if err != nil {
return err
}
if err := json.Unmarshal(buffer.Bytes(), &c); err != nil {
if err = job.Run(); err != nil {
return err
}
@ -737,7 +738,7 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
if !c.Config.Tty && version >= 1.6 {
if c.GetSubEnv("Config") != nil && !c.GetSubEnv("Config").GetBool("Tty") && version >= 1.6 {
errStream = utils.NewStdWriter(outStream, utils.Stderr)
outStream = utils.NewStdWriter(outStream, utils.Stdout)
} else {
@ -874,24 +875,24 @@ func postContainersCopy(srv *Server, version float64, w http.ResponseWriter, r *
return fmt.Errorf("Missing parameter")
}
copyData := &APICopy{}
contentType := r.Header.Get("Content-Type")
if contentType == "application/json" {
if err := json.NewDecoder(r.Body).Decode(copyData); err != nil {
var copyData engine.Env
if contentType := r.Header.Get("Content-Type"); contentType == "application/json" {
if err := copyData.Decode(r.Body); err != nil {
return err
}
} else {
return fmt.Errorf("Content-Type not supported: %s", contentType)
}
if copyData.Resource == "" {
if copyData.Get("Resource") == "" {
return fmt.Errorf("Path cannot be empty")
}
if copyData.Resource[0] == '/' {
copyData.Resource = copyData.Resource[1:]
if copyData.Get("Resource")[0] == '/' {
copyData.Set("Resource", copyData.Get("Resource")[1:])
}
job := srv.Eng.Job("container_copy", vars["name"], copyData.Resource)
job := srv.Eng.Job("container_copy", vars["name"], copyData.Get("Resource"))
job.Stdout.Add(w)
if err := job.Run(); err != nil {
utils.Errorf("%s", err.Error())

View file

@ -1,43 +0,0 @@
package docker
type (
APITop struct {
Titles []string
Processes [][]string
}
APIRmi struct {
Deleted string `json:",omitempty"`
Untagged string `json:",omitempty"`
}
APIID struct {
ID string `json:"Id"`
}
APIRun struct {
ID string `json:"Id"`
Warnings []string `json:",omitempty"`
}
APIPort struct {
PrivatePort int64
PublicPort int64
Type string
IP string
}
APIWait struct {
StatusCode int
}
APIImageConfig struct {
ID string `json:"Id"`
*Config
}
APICopy struct {
Resource string
HostPath string
}
)

View file

@ -755,18 +755,21 @@ func (cli *DockerCli) CmdTop(args ...string) error {
val.Set("ps_args", strings.Join(cmd.Args()[1:], " "))
}
body, _, err := readBody(cli.call("GET", "/containers/"+cmd.Arg(0)+"/top?"+val.Encode(), nil, false))
stream, _, err := cli.call("GET", "/containers/"+cmd.Arg(0)+"/top?"+val.Encode(), nil, false)
if err != nil {
return err
}
procs := APITop{}
err = json.Unmarshal(body, &procs)
if err != nil {
var procs engine.Env
if err := procs.Decode(stream); err != nil {
return err
}
w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
fmt.Fprintln(w, strings.Join(procs.Titles, "\t"))
for _, proc := range procs.Processes {
fmt.Fprintln(w, strings.Join(procs.GetList("Titles"), "\t"))
processes := [][]string{}
if err := procs.GetJson("Processes", &processes); err != nil {
return err
}
for _, proc := range processes {
fmt.Fprintln(w, strings.Join(proc, "\t"))
}
w.Flush()
@ -1451,25 +1454,25 @@ func (cli *DockerCli) CmdCommit(args ...string) error {
v.Set("tag", tag)
v.Set("comment", *flComment)
v.Set("author", *flAuthor)
var config *Config
var (
config *Config
env engine.Env
)
if *flConfig != "" {
config = &Config{}
if err := json.Unmarshal([]byte(*flConfig), config); err != nil {
return err
}
}
body, _, err := readBody(cli.call("POST", "/commit?"+v.Encode(), config, false))
stream, _, err := cli.call("POST", "/commit?"+v.Encode(), config, false)
if err != nil {
return err
}
apiID := &APIID{}
err = json.Unmarshal(body, apiID)
if err != nil {
if err := env.Decode(stream); err != nil {
return err
}
fmt.Fprintf(cli.out, "%s\n", apiID.ID)
fmt.Fprintf(cli.out, "%s\n", env.Get("ID"))
return nil
}
@ -1989,7 +1992,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
}
//create the container
body, statusCode, err := readBody(cli.call("POST", "/containers/create?"+containerValues.Encode(), config, false))
stream, statusCode, err := cli.call("POST", "/containers/create?"+containerValues.Encode(), config, false)
//if image not found try to pull it
if statusCode == 404 {
_, tag := utils.ParseRepositoryTag(config.Image)
@ -2026,30 +2029,30 @@ func (cli *DockerCli) CmdRun(args ...string) error {
if err = cli.stream("POST", "/images/create?"+v.Encode(), nil, cli.err, map[string][]string{"X-Registry-Auth": registryAuthHeader}); err != nil {
return err
}
if body, _, err = readBody(cli.call("POST", "/containers/create?"+containerValues.Encode(), config, false)); err != nil {
if stream, _, err = cli.call("POST", "/containers/create?"+containerValues.Encode(), config, false); err != nil {
return err
}
} else if err != nil {
return err
}
var runResult APIRun
if err := json.Unmarshal(body, &runResult); err != nil {
var runResult engine.Env
if err := runResult.Decode(stream); err != nil {
return err
}
for _, warning := range runResult.Warnings {
for _, warning := range runResult.GetList("Warnings") {
fmt.Fprintf(cli.err, "WARNING: %s\n", warning)
}
if len(hostConfig.ContainerIDFile) > 0 {
if _, err = containerIDFile.Write([]byte(runResult.ID)); err != nil {
if _, err = containerIDFile.Write([]byte(runResult.Get("Id"))); err != nil {
return fmt.Errorf("failed to write the container ID to the file: %s", err)
}
}
if sigProxy {
sigc := cli.forwardAllSignals(runResult.ID)
sigc := cli.forwardAllSignals(runResult.Get("Id"))
defer utils.StopCatch(sigc)
}
@ -2063,7 +2066,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
waitDisplayId = make(chan struct{})
go func() {
defer close(waitDisplayId)
fmt.Fprintf(cli.out, "%s\n", runResult.ID)
fmt.Fprintf(cli.out, "%s\n", runResult.Get("Id"))
}()
}
@ -2105,7 +2108,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
}
errCh = utils.Go(func() error {
return cli.hijack("POST", "/containers/"+runResult.ID+"/attach?"+v.Encode(), config.Tty, in, out, stderr, hijacked)
return cli.hijack("POST", "/containers/"+runResult.Get("Id")+"/attach?"+v.Encode(), config.Tty, in, out, stderr, hijacked)
})
} else {
close(hijacked)
@ -2127,12 +2130,12 @@ func (cli *DockerCli) CmdRun(args ...string) error {
}
//start the container
if _, _, err = readBody(cli.call("POST", "/containers/"+runResult.ID+"/start", hostConfig, false)); err != nil {
if _, _, err = readBody(cli.call("POST", "/containers/"+runResult.Get("Id")+"/start", hostConfig, false)); err != nil {
return err
}
if (config.AttachStdin || config.AttachStdout || config.AttachStderr) && config.Tty && cli.isTerminal {
if err := cli.monitorTtySize(runResult.ID); err != nil {
if err := cli.monitorTtySize(runResult.Get("Id")); err != nil {
utils.Errorf("Error monitoring TTY size: %s\n", err)
}
}
@ -2157,26 +2160,26 @@ func (cli *DockerCli) CmdRun(args ...string) error {
if autoRemove {
// Autoremove: wait for the container to finish, retrieve
// the exit code and remove the container
if _, _, err := readBody(cli.call("POST", "/containers/"+runResult.ID+"/wait", nil, false)); err != nil {
if _, _, err := readBody(cli.call("POST", "/containers/"+runResult.Get("Id")+"/wait", nil, false)); err != nil {
return err
}
if _, status, err = getExitCode(cli, runResult.ID); err != nil {
if _, status, err = getExitCode(cli, runResult.Get("Id")); err != nil {
return err
}
if _, _, err := readBody(cli.call("DELETE", "/containers/"+runResult.ID+"?v=1", nil, false)); err != nil {
if _, _, err := readBody(cli.call("DELETE", "/containers/"+runResult.Get("Id")+"?v=1", nil, false)); err != nil {
return err
}
} else {
if !config.Tty {
// In non-tty mode, we can't dettach, so we know we need to wait.
if status, err = waitForExit(cli, runResult.ID); err != nil {
if status, err = waitForExit(cli, runResult.Get("Id")); err != nil {
return err
}
} else {
// In TTY mode, there is a race. If the process dies too slowly, the state can be update after the getExitCode call
// and result in a wrong exit code.
// No Autoremove: Simply retrieve the exit code
if _, status, err = getExitCode(cli, runResult.ID); err != nil {
if _, status, err = getExitCode(cli, runResult.Get("Id")); err != nil {
return err
}
}
@ -2198,15 +2201,15 @@ func (cli *DockerCli) CmdCp(args ...string) error {
return nil
}
var copyData APICopy
var copyData engine.Env
info := strings.Split(cmd.Arg(0), ":")
if len(info) != 2 {
return fmt.Errorf("Error: Path not specified")
}
copyData.Resource = info[1]
copyData.HostPath = cmd.Arg(1)
copyData.Set("Resource", info[1])
copyData.Set("HostPath", cmd.Arg(1))
stream, statusCode, err := cli.call("POST", "/containers/"+info[0]+"/copy", copyData, false)
if stream != nil {
@ -2217,7 +2220,7 @@ func (cli *DockerCli) CmdCp(args ...string) error {
}
if statusCode == 200 {
if err := archive.Untar(stream, copyData.HostPath, nil); err != nil {
if err := archive.Untar(stream, copyData.Get("HostPath"), nil); err != nil {
return err
}
}
@ -2260,13 +2263,21 @@ func (cli *DockerCli) CmdLoad(args ...string) error {
}
func (cli *DockerCli) call(method, path string, data interface{}, passAuthInfo bool) (io.ReadCloser, int, error) {
var params io.Reader
params := bytes.NewBuffer(nil)
if data != nil {
if env, ok := data.(engine.Env); ok {
if err := env.Encode(params); err != nil {
return nil, -1, err
}
} else {
buf, err := json.Marshal(data)
if err != nil {
return nil, -1, err
}
params = bytes.NewBuffer(buf)
if _, err := params.Write(buf); err != nil {
return nil, -1, err
}
}
}
// fixme: refactor client to support redirect
re := regexp.MustCompile("/+")
@ -2569,16 +2580,16 @@ func (cli *DockerCli) LoadConfigFile() (err error) {
}
func waitForExit(cli *DockerCli, containerId string) (int, error) {
body, _, err := readBody(cli.call("POST", "/containers/"+containerId+"/wait", nil, false))
stream, _, err := cli.call("POST", "/containers/"+containerId+"/wait", nil, false)
if err != nil {
return -1, err
}
var out APIWait
if err := json.Unmarshal(body, &out); err != nil {
var out engine.Env
if err := out.Decode(stream); err != nil {
return -1, err
}
return out.StatusCode, nil
return out.GetInt("StatusCode"), nil
}
// getExitCode perform an inspect on the container. It returns

View file

@ -86,6 +86,28 @@ func (env *Env) GetList(key string) []string {
return l
}
func (env *Env) GetSubEnv(key string) *Env {
sval := env.Get(key)
if sval == "" {
return nil
}
buf := bytes.NewBufferString(sval)
var sub Env
if err := sub.Decode(buf); err != nil {
return nil
}
return &sub
}
func (env *Env) SetSubEnv(key string, sub *Env) error {
var buf bytes.Buffer
if err := sub.Encode(&buf); err != nil {
return err
}
env.Set(key, string(buf.Bytes()))
return nil
}
func (env *Env) GetJson(key string, iface interface{}) error {
sval := env.Get(key)
if sval == "" {

View file

@ -118,6 +118,14 @@ func (job *Job) SetenvBool(key string, value bool) {
job.env.SetBool(key, value)
}
func (job *Job) GetenvSubEnv(key string) *Env {
return job.env.GetSubEnv(key)
}
func (job *Job) SetenvSubEnv(key string, value *Env) error {
return job.env.SetSubEnv(key, value)
}
func (job *Job) GetenvInt64(key string) int64 {
return job.env.GetInt64(key)
}
@ -188,10 +196,12 @@ func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {
return fmt.Fprintf(job.Stdout, format, args...)
}
func (job *Job) Errorf(format string, args ...interface{}) (n int, err error) {
return fmt.Fprintf(job.Stderr, format, args...)
func (job *Job) Errorf(format string, args ...interface{}) Status {
fmt.Fprintf(job.Stderr, format, args...)
return StatusErr
}
func (job *Job) Error(err error) (int, error) {
return fmt.Fprintf(job.Stderr, "%s", err)
func (job *Job) Error(err error) Status {
fmt.Fprintf(job.Stderr, "%s", err)
return StatusErr
}

View file

@ -485,26 +485,29 @@ func TestGetContainersTop(t *testing.T) {
t.Fatal(err)
}
assertHttpNotError(r, t)
procs := docker.APITop{}
if err := json.Unmarshal(r.Body.Bytes(), &procs); err != nil {
var procs engine.Env
if err := procs.Decode(r.Body); err != nil {
t.Fatal(err)
}
if len(procs.Titles) != 11 {
t.Fatalf("Expected 11 titles, found %d.", len(procs.Titles))
if len(procs.GetList("Titles")) != 11 {
t.Fatalf("Expected 11 titles, found %d.", len(procs.GetList("Titles")))
}
if procs.Titles[0] != "USER" || procs.Titles[10] != "COMMAND" {
t.Fatalf("Expected Titles[0] to be USER and Titles[10] to be COMMAND, found %s and %s.", procs.Titles[0], procs.Titles[10])
if procs.GetList("Titles")[0] != "USER" || procs.GetList("Titles")[10] != "COMMAND" {
t.Fatalf("Expected Titles[0] to be USER and Titles[10] to be COMMAND, found %s and %s.", procs.GetList("Titles")[0], procs.GetList("Titles")[10])
}
if len(procs.Processes) != 2 {
t.Fatalf("Expected 2 processes, found %d.", len(procs.Processes))
processes := [][]string{}
if err := procs.GetJson("Processes", &processes); err != nil {
t.Fatal(err)
}
if procs.Processes[0][10] != "/bin/sh -c cat" {
t.Fatalf("Expected `/bin/sh -c cat`, found %s.", procs.Processes[0][10])
if len(processes) != 2 {
t.Fatalf("Expected 2 processes, found %d.", len(processes))
}
if procs.Processes[1][10] != "/bin/sh -c cat" {
t.Fatalf("Expected `/bin/sh -c cat`, found %s.", procs.Processes[1][10])
if processes[0][10] != "/bin/sh -c cat" {
t.Fatalf("Expected `/bin/sh -c cat`, found %s.", processes[0][10])
}
if processes[1][10] != "/bin/sh -c cat" {
t.Fatalf("Expected `/bin/sh -c cat`, found %s.", processes[1][10])
}
}
@ -570,11 +573,11 @@ func TestPostCommit(t *testing.T) {
t.Fatalf("%d Created expected, received %d\n", http.StatusCreated, r.Code)
}
apiID := &docker.APIID{}
if err := json.Unmarshal(r.Body.Bytes(), apiID); err != nil {
var env engine.Env
if err := env.Decode(r.Body); err != nil {
t.Fatal(err)
}
if _, err := srv.ImageInspect(apiID.ID); err != nil {
if _, err := srv.ImageInspect(env.Get("Id")); err != nil {
t.Fatalf("The image has not been committed")
}
}
@ -607,11 +610,11 @@ func TestPostContainersCreate(t *testing.T) {
t.Fatalf("%d Created expected, received %d\n", http.StatusCreated, r.Code)
}
apiRun := &docker.APIRun{}
if err := json.Unmarshal(r.Body.Bytes(), apiRun); err != nil {
var apiRun engine.Env
if err := apiRun.Decode(r.Body); err != nil {
t.Fatal(err)
}
containerID := apiRun.ID
containerID := apiRun.Get("Id")
containerAssertExists(eng, containerID, t)
containerRun(eng, containerID, t)
@ -863,12 +866,12 @@ func TestPostContainersWait(t *testing.T) {
t.Fatal(err)
}
assertHttpNotError(r, t)
apiWait := &docker.APIWait{}
if err := json.Unmarshal(r.Body.Bytes(), apiWait); err != nil {
var apiWait engine.Env
if err := apiWait.Decode(r.Body); err != nil {
t.Fatal(err)
}
if apiWait.StatusCode != 0 {
t.Fatalf("Non zero exit code for sleep: %d\n", apiWait.StatusCode)
if apiWait.GetInt("StatusCode") != 0 {
t.Fatalf("Non zero exit code for sleep: %d\n", apiWait.GetInt("StatusCode"))
}
})
@ -1160,12 +1163,12 @@ func TestDeleteImages(t *testing.T) {
t.Fatalf("%d OK expected, received %d\n", http.StatusOK, r.Code)
}
var outs []docker.APIRmi
if err := json.Unmarshal(r2.Body.Bytes(), &outs); err != nil {
outs := engine.NewTable("Created", 0)
if _, err := outs.ReadListFrom(r2.Body.Bytes()); err != nil {
t.Fatal(err)
}
if len(outs) != 1 {
t.Fatalf("Expected %d event (untagged), got %d", 1, len(outs))
if len(outs.Data) != 1 {
t.Fatalf("Expected %d event (untagged), got %d", 1, len(outs.Data))
}
images = getImages(eng, t, false, "")
@ -1190,14 +1193,17 @@ func TestPostContainersCopy(t *testing.T) {
containerRun(eng, containerID, t)
r := httptest.NewRecorder()
copyData := docker.APICopy{HostPath: ".", Resource: "/test.txt"}
jsonData, err := json.Marshal(copyData)
if err != nil {
var copyData engine.Env
copyData.Set("Resource", "/test.txt")
copyData.Set("HostPath", ".")
jsonData := bytes.NewBuffer(nil)
if err := copyData.Encode(jsonData); err != nil {
t.Fatal(err)
}
req, err := http.NewRequest("POST", "/containers/"+containerID+"/copy", bytes.NewReader(jsonData))
req, err := http.NewRequest("POST", "/containers/"+containerID+"/copy", jsonData)
if err != nil {
t.Fatal(err)
}

449
server.go

File diff suppressed because it is too large Load diff