Merge pull request #5713 from shykes/pr_out_engine_ensure_all_pipes_are_properly_closed_by_receiver_and_sender

This commit is contained in:
Solomon Hykes 2014-05-09 16:16:56 -07:00
commit 9dc66f8822
3 changed files with 45 additions and 7 deletions

View File

@ -72,6 +72,9 @@ func (job *Job) Run() error {
if err := job.Stderr.Close(); err != nil {
return err
}
if err := job.Stdin.Close(); err != nil {
return err
}
if job.status != 0 {
return fmt.Errorf("%s", errorMessage)
}

View File

@ -36,20 +36,27 @@ func (s *Sender) Handle(job *Job) Status {
r := beam.NewRouter(nil)
r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
tasks.Add(1)
io.Copy(job.Stdout, stdout)
tasks.Done()
go func() {
io.Copy(job.Stdout, stdout)
stdout.Close()
tasks.Done()
}()
return nil
})
r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
tasks.Add(1)
io.Copy(job.Stderr, stderr)
tasks.Done()
go func() {
io.Copy(job.Stderr, stderr)
stderr.Close()
tasks.Done()
}()
return nil
})
r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error {
tasks.Add(1)
io.Copy(stdin, job.Stdin)
tasks.Done()
go func() {
io.Copy(stdin, job.Stdin)
stdin.Close()
}()
return nil
})
var status int

View File

@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"io"
"strings"
"testing"
"time"
@ -54,6 +55,33 @@ func TestHelloWorld(t *testing.T) {
}
}
func TestStdin(t *testing.T) {
testRemote(t,
func(eng *Engine) {
job := eng.Job("mirror")
job.Stdin.Add(strings.NewReader("hello world!\n"))
out := &bytes.Buffer{}
job.Stdout.Add(out)
if err := job.Run(); err != nil {
t.Fatal(err)
}
if out.String() != "hello world!\n" {
t.Fatalf("%#v", out.String())
}
},
func(eng *Engine) {
eng.Register("mirror", func(job *Job) Status {
if _, err := io.Copy(job.Stdout, job.Stdin); err != nil {
t.Fatal(err)
}
return StatusOK
})
},
)
}
// Helpers
func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {