1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

beam/examples/beamsh: commands are messages.

Commands in the pipeline should either implement or pass-through command messages.

This amounts to a proof-of-concept implementation of the "pipeline"
design of Docker plugins.

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
Solomon Hykes 2014-03-31 12:04:39 -07:00
parent 2f4b8b7e8d
commit ed62ca5b2f

View file

@ -106,7 +106,15 @@ func executeRootScript(script []*dockerscript.Command) error {
lastCmd.Children = script lastCmd.Children = script
script = []*dockerscript.Command{rootCmd} script = []*dockerscript.Command{rootCmd}
} }
return executeScript(nil, script) handlers, err := Handlers()
if err != nil {
return err
}
defer handlers.Close()
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
} }
func executeScript(out beam.Sender, script []*dockerscript.Command) error { func executeScript(out beam.Sender, script []*dockerscript.Command) error {
@ -147,61 +155,92 @@ func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if len(cmd.Args) == 0 { if len(cmd.Args) == 0 {
return fmt.Errorf("empty command") return fmt.Errorf("empty command")
} }
handler := GetHandler(cmd.Args[0]) Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
if handler == nil { job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Bytes())
return fmt.Errorf("no such command: %s", cmd.Args[0])
}
inPub, inPriv, err := beam.USocketPair()
if err != nil { if err != nil {
return err return fmt.Errorf("%v\n", err)
} }
// Don't close inPub here. We close it to signify the end of input once
// all children are completed (guaranteeing that no more input will be sent
// by children).
// Otherwise we get a deadlock.
defer inPriv.Close()
outPub, outPriv, err := beam.USocketPair()
if err != nil {
return err
}
defer outPub.Close()
// don't close outPriv here. It must be closed after the handler is called,
// but before the copy tasks associated with it completes.
// Otherwise we get a deadlock.
var tasks sync.WaitGroup var tasks sync.WaitGroup
tasks.Add(2) tasks.Add(1)
go func() { Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
handler(cmd.Args, inPriv, outPriv)
// FIXME: do we need to outPriv.sync before closing it?
Debugf("[%s] handler returned, closing output\n", strings.Join(cmd.Args, " "))
outPriv.Close()
tasks.Done()
}()
go func() { go func() {
if out != nil { if out != nil {
Debugf("[%s] copy start...\n", strings.Join(cmd.Args, " ")) Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, outPub) n, err := beam.Copy(out, job)
if err != nil { if err != nil {
Fatal(err) Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
} }
Debugf("[%s] copied %d messages\n", strings.Join(cmd.Args, " "), n) Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
Debugf("[%s] copy done\n", strings.Join(cmd.Args, " "))
} }
tasks.Done() tasks.Done()
}() }()
// depth-first execution of children commands // depth-first execution of children commands
// executeScript() blocks until all commands are completed // executeScript() blocks until all commands are completed
executeScript(inPub, cmd.Children) Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
inPub.Close() executeScript(job, cmd.Children)
Debugf("[%s] waiting for handler and output copy to complete...\n", strings.Join(cmd.Args, " ")) Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait() tasks.Wait()
Debugf("[%s] handler and output copy complete!\n", strings.Join(cmd.Args, " ")) Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil return nil
} }
type Handler func([]string, beam.Receiver, beam.Sender) type Handler func([]string, beam.Receiver, beam.Sender)
func Handlers() (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
for {
Debugf("[handlers] waiting for next job...\n")
payload, conn, err := beam.ReceiveConn(priv)
Debugf("[handlers] ReceiveConn() returned %v\n", err)
if err != nil {
return
}
tasks.Add(1)
go func(payload []byte, conn *beam.UnixConn) {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}(payload, conn)
}
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler { func GetHandler(name string) Handler {
if name == "log" { if name == "log" {
return func(args []string, in beam.Receiver, out beam.Sender) { return func(args []string, in beam.Receiver, out beam.Sender) {
@ -324,6 +363,16 @@ func GetHandler(name string) Handler {
} }
} else if name == "stdio" { } else if name == "stdio" {
return func(args []string, in beam.Receiver, out beam.Sender) { return func(args []string, in beam.Receiver, out beam.Sender) {
stdout, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendPipe(out, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", args...).Bytes())
if err != nil {
return
}
defer stderr.Close()
var tasks sync.WaitGroup var tasks sync.WaitGroup
defer tasks.Wait() defer tasks.Wait()
for { for {
@ -331,26 +380,39 @@ func GetHandler(name string) Handler {
if err != nil { if err != nil {
return return
} }
tasks.Add(1) cmd := data.Message(payload).Get("cmd")
go func(payload []byte, attachment *os.File) { if attachment != nil && len(cmd) > 0 && cmd[0] == "log" {
defer tasks.Done() w, err := beam.SendPipe(out, payload)
if attachment == nil { if err != nil {
attachment.Close()
fmt.Fprintf(stderr, "sendpipe: %v\n", err)
return return
} }
defer attachment.Close() tasks.Add(1)
cmd := data.Message(payload).Get("cmd") go func(payload []byte, attachment *os.File, sink *os.File) {
if cmd == nil || len(cmd) == 0 { defer tasks.Done()
defer attachment.Close()
defer sink.Close()
cmd := data.Message(payload).Get("cmd")
if cmd == nil || len(cmd) == 0 {
return
}
if cmd[0] != "log" {
return
}
var output io.Writer
if len(cmd) == 1 || cmd[1] == "stdout" {
output = os.Stdout
} else if cmd[1] == "stderr" {
output = os.Stderr
}
io.Copy(io.MultiWriter(output, sink), attachment)
}(payload, attachment, w)
} else {
if err := out.Send(payload, attachment); err != nil {
return return
} }
if cmd[0] != "log" { }
return
}
if len(cmd) == 1 || cmd[1] == "stdout" {
io.Copy(os.Stdout, attachment)
} else if cmd[1] == "stderr" {
io.Copy(os.Stderr, attachment)
}
}(payload, attachment)
} }
} }
} else if name == "echo" { } else if name == "echo" {