mirror of
synced 2022-11-09 12:21:53 -05:00

This introduces a superficial change to the Beam API: * `beam.SendPipe` is renamed to the more accurate `beam.SendRPipe` * `beam.SendWPipe` is introduced as a mirror to `SendRPipe` There is no other change in the beam API. Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
542 lines
14 KiB
542 lines
14 KiB
package main
import (
var rootPlugins = []string{
var (
flX bool
flPing bool
introspect beam.ReceiveSender = beam.Devnull()
func main() {
fd3 := os.NewFile(3, "beam-introspect")
if introsp, err := beam.FileConn(fd3); err == nil {
introspect = introsp
Logf("introspection enabled\n")
} else {
Logf("introspection disabled\n")
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
if flag.NArg() == 0 {
if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin)
for {
fmt.Printf("[%d] beamsh> ", os.Getpid())
if !input.Scan() {
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
if err := executeRootScript(cmd); err != nil {
if err := input.Err(); err == io.EOF {
} else if err != nil {
} else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
if err := executeRootScript(script); err != nil {
} else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath)
if err != nil {
script, err := dockerscript.Parse(f)
if err != nil {
Fatal("parse error: %v\n", err)
if err := executeRootScript(script); err != nil {
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
for _, plugin := range rootPlugins {
pluginCmd := &dockerscript.Command{
Args: []string{plugin},
if rootCmd == nil {
rootCmd = pluginCmd
} else {
lastCmd.Children = []*dockerscript.Command{pluginCmd}
lastCmd = pluginCmd
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
handlers, err := Handlers(introspect)
if err != nil {
return err
defer handlers.Close()
var tasks sync.WaitGroup
defer func() {
Debugf("Waiting for introspection...\n")
Debugf("DONE Waiting for introspection\n")
if introspect != nil {
go func() {
Debugf("starting introspection\n")
defer Debugf("done with introspection\n")
defer tasks.Done()
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
Debugf("XXX starting reading introspection messages\n")
r := beam.NewRouter(handlers)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
return handlers.Send(p, a)
n, err := beam.Copy(r, introspect)
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
if err := executeScript(handlers, script); err != nil {
return err
return nil
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
defer background.Wait()
for _, cmd := range script {
if cmd.Background {
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
}(out, cmd)
} else {
if err := executeCommand(out, cmd); err != nil {
return err
return nil
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
if err != nil {
return fmt.Errorf("%v\n", err)
var tasks sync.WaitGroup
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
Debugf("[handlers] done closewrite() on endpoint\n")
r := beam.NewRouter(sink)
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
conn, err := beam.FileConn(attachment)
if err != nil {
return err
// attachment.Close()
go func() {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
Debugf("[handlers] '%s' done closewrite\n", payload)
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
handler := GetHandler(cmd[0])
if handler == nil {
stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
if err != nil {
defer stdout.Close()
stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
if err != nil {
defer stderr.Close()
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
return nil
beam.Copy(r, priv)
Debugf("[handlers] waiting for all tasks\n")
Debugf("[handlers] all tasks returned\n")
return pub, nil
func GetHandler(name string) Handler {
if name == "logger" {
return CmdLogger
} else if name == "render" {
return CmdRender
} else if name == "devnull" {
return CmdDevnull
} else if name == "prompt" {
return CmdPrompt
} else if name == "stdio" {
return CmdStdio
} else if name == "echo" {
return CmdEcho
} else if name == "pass" {
return CmdPass
} else if name == "in" {
return CmdIn
} else if name == "exec" {
return CmdExec
} else if name == "trace" {
return CmdTrace
} else if name == "emit" {
return CmdEmit
} else if name == "print" {
return CmdPrint
} else if name == "multiprint" {
return CmdMultiprint
} else if name == "listen" {
return CmdListen
} else if name == "beamsend" {
return CmdBeamsend
} else if name == "beamreceive" {
return CmdBeamreceive
} else if name == "connect" {
return CmdConnect
} else if name == "openfile" {
return CmdOpenfile
} else if name == "spawn" {
return CmdSpawn
} else if name == "chdir" {
return CmdChdir
return nil
func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface {
File() (*os.File, error)
}); !ok {
return nil, fmt.Errorf("no file descriptor available")
} else {
f, err = connWithFile.File()
if err != nil {
return nil, err
return f, err
type Msg struct {
payload []byte
attachment *os.File
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg = msg + "\n"
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
return fmt.Printf(msg, args...)
func Debugf(msg string, args ...interface{}) {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
func Fatalf(msg string, args ...interface{}) {
Logf(msg, args...)
func Fatal(args ...interface{}) {
Fatalf("%v", args[0])
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
lines = append(lines, line)
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
connections <- conn
return connections, nil
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
Logf("new connection\n")
connections <- conn
return connections, nil
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
return err
conn, ok := <-connections
if !ok {
Logf("Sending %s\n", msgDesc(payload, attachment))
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
if attachment == nil {
var iotasks sync.WaitGroup
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
}(payload, attachment, conn)
return nil
func msgDesc(payload []byte, attachment *os.File) string {
return beam.MsgDesc(payload, attachment)
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func() error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
if err == io.EOF {
return nil
} else {
return err
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
return err
pub, priv, err := beam.SocketPair()
if err != nil {
return err
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := dst.Send([]byte(header), priv); err != nil {
return err
return nil
if err != nil {
Logf("Error reading from connection: %v\n", err)
return nil
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
go oneCopy(a, b)
go oneCopy(b, a)