From 271ba1804349217d8cc22cfdd412a1741dbbcfea Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Wed, 2 Apr 2014 20:05:00 -0700 Subject: [PATCH] beam/examples/beamsh: use beam.Router to simplify 'multiprint' and fix job passthrough Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- pkg/beam/examples/beamsh/beamsh.go | 2 +- pkg/beam/examples/beamsh/builtins.go | 34 ++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/beam/examples/beamsh/beamsh.go b/pkg/beam/examples/beamsh/beamsh.go index 353ef6812d..af20916e5c 100644 --- a/pkg/beam/examples/beamsh/beamsh.go +++ b/pkg/beam/examples/beamsh/beamsh.go @@ -153,7 +153,7 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { return fmt.Errorf("empty command") } Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) - job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Bytes()) + job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes()) if err != nil { return fmt.Errorf("%v\n", err) } diff --git a/pkg/beam/examples/beamsh/builtins.go b/pkg/beam/examples/beamsh/builtins.go index 2391ab6e00..da767f153d 100644 --- a/pkg/beam/examples/beamsh/builtins.go +++ b/pkg/beam/examples/beamsh/builtins.go @@ -226,24 +226,24 @@ func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out bea func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { var tasks sync.WaitGroup - for { - payload, a, err := in.Receive() - if err != nil { - return - } - if a != nil { - tasks.Add(1) - go func(payload []byte, attachment *os.File) { - defer tasks.Done() - msg := data.Message(string(payload)) - input := bufio.NewScanner(attachment) - for input.Scan() { - fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) - } - }(payload, a) - } + defer tasks.Wait() + r := beam.NewRouter(out) + multiprint := func(p []byte, a *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer a.Close() + msg := data.Message(string(p)) + input := bufio.NewScanner(a) + for input.Scan() { + fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) + } + }() + return nil } - tasks.Wait() + r.NewRoute().KeyIncludes("type", "job").Passthrough(out) + r.NewRoute().HasAttachment().Handler(multiprint).Tee(out) + beam.Copy(r, in) } func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {