From dfdc03b061d5bd5a7557f077b500304d4da26d2e Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Sun, 4 May 2014 00:21:53 +0000 Subject: [PATCH] Engine: fix a timeout bug in Sender/Receiver Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/remote.go | 2 ++ engine/remote_test.go | 73 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/engine/remote.go b/engine/remote.go index e0c3c18e0d..fbb9951065 100644 --- a/engine/remote.go +++ b/engine/remote.go @@ -90,6 +90,8 @@ func (rcv *Receiver) Run() error { f.Close() return err } + f.Close() + defer peer.Close() cmd := data.Message(p).Get("cmd") job := rcv.Engine.Job(cmd[0], cmd[1:]...) stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes()) diff --git a/engine/remote_test.go b/engine/remote_test.go index 54092ec934..a56120941f 100644 --- a/engine/remote_test.go +++ b/engine/remote_test.go @@ -1,3 +1,74 @@ package engine -import () +import ( + "bytes" + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "strings" + "testing" + "time" +) + +func TestHelloWorld(t *testing.T) { + testRemote(t, + + // Sender side + func(eng *Engine) { + job := eng.Job("echo", "hello", "world") + out := &bytes.Buffer{} + job.Stdout.Add(out) + job.Run() + if job.status != StatusOK { + t.Fatalf("#%v", job.StatusCode()) + } + if out.String() != "hello world\n" { + t.Fatalf("%#v", out.String()) + } + }, + + // Receiver side + func(eng *Engine) { + eng.Register("echo", func(job *Job) Status { + fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " ")) + return StatusOK + }) + }, + ) +} + +func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) { + sndConn, rcvConn, err := beam.USocketPair() + if err != nil { + t.Fatal(err) + } + defer sndConn.Close() + defer rcvConn.Close() + sender := NewSender(sndConn) + receiver := NewReceiver(rcvConn) + + // Setup the sender side + eng := New() + sender.Install(eng) + + // Setup the receiver side + receiverSide(receiver.Engine) + go receiver.Run() + + timeout(t, func() { + senderSide(eng) + }) +} + +func timeout(t *testing.T, f func()) { + onTimeout := time.After(100 * time.Millisecond) + onDone := make(chan bool) + go func() { + f() + close(onDone) + }() + select { + case <-onTimeout: + t.Fatalf("timeout") + case <-onDone: + } +}