diff --git a/engine/remote.go b/engine/remote.go deleted file mode 100644 index 974ca02137..0000000000 --- a/engine/remote.go +++ /dev/null @@ -1,138 +0,0 @@ -package engine - -import ( - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/beam/data" - "io" - "os" - "strconv" - "sync" -) - -type Sender struct { - beam.Sender -} - -func NewSender(s beam.Sender) *Sender { - return &Sender{s} -} - -func (s *Sender) Install(eng *Engine) error { - // FIXME: this doesn't exist yet. - eng.RegisterCatchall(s.Handle) - return nil -} - -func (s *Sender) Handle(job *Job) Status { - cmd := append([]string{job.Name}, job.Args...) - env := data.Encode(job.Env().MultiMap()) - msg := data.Empty().Set("cmd", cmd...).Set("env", env) - peer, err := beam.SendConn(s, msg.Bytes()) - if err != nil { - return job.Errorf("beamsend: %v", err) - } - defer peer.Close() - var tasks sync.WaitGroup - defer tasks.Wait() - r := beam.NewRouter(nil) - r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error { - tasks.Add(1) - go func() { - io.Copy(job.Stdout, stdout) - stdout.Close() - tasks.Done() - }() - return nil - }) - r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error { - tasks.Add(1) - go func() { - io.Copy(job.Stderr, stderr) - stderr.Close() - tasks.Done() - }() - return nil - }) - r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error { - go func() { - io.Copy(stdin, job.Stdin) - stdin.Close() - }() - return nil - }) - var status int - r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error { - cmd := data.Message(p).Get("cmd") - if len(cmd) != 2 { - return fmt.Errorf("usage: %s <0-127>", cmd[0]) - } - s, err := strconv.ParseUint(cmd[1], 10, 8) - if err != nil { - return fmt.Errorf("usage: %s <0-127>", cmd[0]) - } - status = int(s) - return nil - - }) - if _, err := beam.Copy(r, peer); err != nil { - return job.Errorf("%v", err) - } - return Status(status) -} - -type Receiver struct { - *Engine - peer beam.Receiver -} - -func NewReceiver(peer beam.Receiver) *Receiver { - return &Receiver{Engine: New(), peer: peer} -} - -func (rcv *Receiver) Run() error { - r := beam.NewRouter(nil) - r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error { - // Use the attachment as a beam return channel - peer, err := beam.FileConn(f) - if err != nil { - f.Close() - return err - } - f.Close() - defer peer.Close() - msg := data.Message(p) - cmd := msg.Get("cmd") - job := rcv.Engine.Job(cmd[0], cmd[1:]...) - // Decode env - env, err := data.Decode(msg.GetOne("env")) - if err != nil { - return fmt.Errorf("error decoding 'env': %v", err) - } - job.Env().InitMultiMap(env) - stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes()) - if err != nil { - return err - } - job.Stdout.Add(stdout) - stderr, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes()) - if err != nil { - return err - } - job.Stderr.Add(stderr) - stdin, err := beam.SendWPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes()) - if err != nil { - return err - } - job.Stdin.Add(stdin) - // ignore error because we pass the raw status - job.Run() - err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil) - if err != nil { - return err - } - return nil - }) - _, err := beam.Copy(r, rcv.peer) - return err -} diff --git a/engine/remote_test.go b/engine/remote_test.go deleted file mode 100644 index 1563660c97..0000000000 --- a/engine/remote_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package engine - -import ( - "bufio" - "bytes" - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/testutils" - "io" - "strings" - "testing" - "time" -) - -func TestHelloWorld(t *testing.T) { - for i := 0; i < 10; i++ { - testRemote(t, - - // Sender side - func(eng *Engine) { - job := eng.Job("echo", "hello", "world") - out := &bytes.Buffer{} - job.Stdout.Add(out) - job.Run() - if job.status != StatusOK { - t.Fatalf("#%v", job.StatusCode()) - } - lines := bufio.NewScanner(out) - var i int - for lines.Scan() { - if lines.Text() != "hello world" { - t.Fatalf("%#v", lines.Text()) - } - i++ - } - if i != 1000 { - t.Fatalf("%#v", i) - } - }, - - // Receiver side - func(eng *Engine) { - eng.Register("echo", func(job *Job) Status { - // Simulate more output with a delay in the middle - for i := 0; i < 500; i++ { - fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " ")) - } - time.Sleep(5 * time.Millisecond) - for i := 0; i < 500; i++ { - fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " ")) - } - return StatusOK - }) - }, - ) - } -} - -func TestStdin(t *testing.T) { - testRemote(t, - - func(eng *Engine) { - job := eng.Job("mirror") - job.Stdin.Add(strings.NewReader("hello world!\n")) - out := &bytes.Buffer{} - job.Stdout.Add(out) - if err := job.Run(); err != nil { - t.Fatal(err) - } - if out.String() != "hello world!\n" { - t.Fatalf("%#v", out.String()) - } - }, - - func(eng *Engine) { - eng.Register("mirror", func(job *Job) Status { - if _, err := io.Copy(job.Stdout, job.Stdin); err != nil { - t.Fatal(err) - } - return StatusOK - }) - }, - ) -} - -func TestEnv(t *testing.T) { - var ( - foo string - answer int - shadok_words []string - ) - testRemote(t, - - func(eng *Engine) { - job := eng.Job("sendenv") - job.Env().Set("foo", "bar") - job.Env().SetInt("answer", 42) - job.Env().SetList("shadok_words", []string{"ga", "bu", "zo", "meu"}) - if err := job.Run(); err != nil { - t.Fatal(err) - } - }, - - func(eng *Engine) { - eng.Register("sendenv", func(job *Job) Status { - foo = job.Env().Get("foo") - answer = job.Env().GetInt("answer") - shadok_words = job.Env().GetList("shadok_words") - return StatusOK - }) - }, - ) - // Check for results here rather than inside the job handler, - // otherwise the tests may incorrectly pass if the handler is not - // called. - if foo != "bar" { - t.Fatalf("%#v", foo) - } - if answer != 42 { - t.Fatalf("%#v", answer) - } - if strings.Join(shadok_words, ", ") != "ga, bu, zo, meu" { - t.Fatalf("%#v", shadok_words) - } -} - -// Helpers - -func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) { - sndConn, rcvConn, err := beam.USocketPair() - if err != nil { - t.Fatal(err) - } - defer sndConn.Close() - defer rcvConn.Close() - sender := NewSender(sndConn) - receiver := NewReceiver(rcvConn) - - // Setup the sender side - eng := New() - sender.Install(eng) - - // Setup the receiver side - receiverSide(receiver.Engine) - go receiver.Run() - - testutils.Timeout(t, func() { - senderSide(eng) - }) -} diff --git a/engine/rengine/main.go b/engine/rengine/main.go deleted file mode 100644 index b4fa01d39c..0000000000 --- a/engine/rengine/main.go +++ /dev/null @@ -1,43 +0,0 @@ -package main - -import ( - "fmt" - "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/pkg/beam" - "net" - "os" -) - -func main() { - eng := engine.New() - - c, err := net.Dial("unix", "beam.sock") - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - return - } - defer c.Close() - f, err := c.(*net.UnixConn).File() - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - return - } - - child, err := beam.FileConn(f) - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - return - } - defer child.Close() - - sender := engine.NewSender(child) - sender.Install(eng) - - cmd := eng.Job(os.Args[1], os.Args[2:]...) - cmd.Stdout.Add(os.Stdout) - cmd.Stderr.Add(os.Stderr) - if err := cmd.Run(); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - os.Exit(1) - } -} diff --git a/engine/spawn/spawn.go b/engine/spawn/spawn.go deleted file mode 100644 index 6680845bc1..0000000000 --- a/engine/spawn/spawn.go +++ /dev/null @@ -1,119 +0,0 @@ -package spawn - -import ( - "fmt" - "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/utils" - "os" - "os/exec" -) - -var initCalled bool - -// Init checks if the current process has been created by Spawn. -// -// If no, it returns nil and the original program can continue -// unmodified. -// -// If no, it hijacks the process to run as a child worker controlled -// by its parent over a beam connection, with f exposed as a remote -// service. In this case Init never returns. -// -// The hijacking process takes place as follows: -// - Open file descriptor 3 as a beam endpoint. If this fails, -// terminate the current process. -// - Start a new engine. -// - Call f.Install on the engine. Any handlers registered -// will be available for remote invocation by the parent. -// - Listen for beam messages from the parent and pass them to -// the handlers. -// - When the beam endpoint is closed by the parent, terminate -// the current process. -// -// NOTE: Init must be called at the beginning of the same program -// calling Spawn. This is because Spawn approximates a "fork" by -// re-executing the current binary - where it expects spawn.Init -// to intercept the control flow and execute the worker code. -func Init(f engine.Installer) error { - initCalled = true - if os.Getenv("ENGINESPAWN") != "1" { - return nil - } - fmt.Printf("[%d child]\n", os.Getpid()) - // Hijack the process - childErr := func() error { - fd3 := os.NewFile(3, "beam-introspect") - introsp, err := beam.FileConn(fd3) - if err != nil { - return fmt.Errorf("beam introspection error: %v", err) - } - fd3.Close() - defer introsp.Close() - eng := engine.NewReceiver(introsp) - if err := f.Install(eng.Engine); err != nil { - return err - } - if err := eng.Run(); err != nil { - return err - } - return nil - }() - if childErr != nil { - os.Exit(1) - } - os.Exit(0) - return nil // Never reached -} - -// Spawn starts a new Engine in a child process and returns -// a proxy Engine through which it can be controlled. -// -// The commands available on the child engine are determined -// by an earlier call to Init. It is important that Init be -// called at the very beginning of the current program - this -// allows it to be called as a re-execution hook in the child -// process. -// -// Long story short, if you want to expose `myservice` in a child -// process, do this: -// -// func main() { -// spawn.Init(myservice) -// [..] -// child, err := spawn.Spawn() -// [..] -// child.Job("dosomething").Run() -// } -func Spawn() (*engine.Engine, error) { - if !initCalled { - return nil, fmt.Errorf("spawn.Init must be called at the top of the main() function") - } - cmd := exec.Command(utils.SelfPath()) - cmd.Env = append(cmd.Env, "ENGINESPAWN=1") - local, remote, err := beam.SocketPair() - if err != nil { - return nil, err - } - child, err := beam.FileConn(local) - if err != nil { - local.Close() - remote.Close() - return nil, err - } - local.Close() - cmd.ExtraFiles = append(cmd.ExtraFiles, remote) - // FIXME: the beam/engine glue has no way to inform the caller - // of the child's termination. The next call will simply return - // an error. - if err := cmd.Start(); err != nil { - child.Close() - return nil, err - } - eng := engine.New() - if err := engine.NewSender(child).Install(eng); err != nil { - child.Close() - return nil, err - } - return eng, nil -} diff --git a/engine/spawn/subengine/main.go b/engine/spawn/subengine/main.go deleted file mode 100644 index 3be7520a67..0000000000 --- a/engine/spawn/subengine/main.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "fmt" - "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/engine/spawn" - "log" - "os" - "os/exec" - "strings" -) - -func main() { - fmt.Printf("[%d] MAIN\n", os.Getpid()) - spawn.Init(&Worker{}) - fmt.Printf("[%d parent] spawning\n", os.Getpid()) - eng, err := spawn.Spawn() - if err != nil { - log.Fatal(err) - } - fmt.Printf("[parent] spawned\n") - job := eng.Job(os.Args[1], os.Args[2:]...) - job.Stdout.Add(os.Stdout) - job.Stderr.Add(os.Stderr) - job.Run() - // FIXME: use the job's status code - os.Exit(0) -} - -type Worker struct { -} - -func (w *Worker) Install(eng *engine.Engine) error { - eng.Register("exec", w.Exec) - eng.Register("cd", w.Cd) - eng.Register("echo", w.Echo) - return nil -} - -func (w *Worker) Exec(job *engine.Job) engine.Status { - fmt.Printf("--> %v\n", job.Args) - cmd := exec.Command(job.Args[0], job.Args[1:]...) - cmd.Stdout = job.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return job.Errorf("%v\n", err) - } - return engine.StatusOK -} - -func (w *Worker) Cd(job *engine.Job) engine.Status { - if err := os.Chdir(job.Args[0]); err != nil { - return job.Errorf("%v\n", err) - } - return engine.StatusOK -} - -func (w *Worker) Echo(job *engine.Job) engine.Status { - fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " ")) - return engine.StatusOK -} diff --git a/pkg/beam/MAINTAINERS b/pkg/beam/MAINTAINERS deleted file mode 100644 index aee10c8421..0000000000 --- a/pkg/beam/MAINTAINERS +++ /dev/null @@ -1 +0,0 @@ -Solomon Hykes (@shykes) diff --git a/pkg/beam/beam.go b/pkg/beam/beam.go deleted file mode 100644 index 2e8895a153..0000000000 --- a/pkg/beam/beam.go +++ /dev/null @@ -1,166 +0,0 @@ -package beam - -import ( - "fmt" - "io" - "os" -) - -type Sender interface { - Send([]byte, *os.File) error -} - -type Receiver interface { - Receive() ([]byte, *os.File, error) -} - -type ReceiveCloser interface { - Receiver - Close() error -} - -type SendCloser interface { - Sender - Close() error -} - -type ReceiveSender interface { - Receiver - Sender -} - -const ( - R = iota - W -) - -func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) { - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - var ( - remote *os.File - local *os.File - ) - if mode == R { - remote = r - local = w - } else if mode == W { - remote = w - local = r - } - if err := dst.Send(data, remote); err != nil { - local.Close() - remote.Close() - return nil, err - } - return local, nil - -} - -// SendRPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendRPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, R) -} - -// SendWPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendWPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, W) -} - -func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { - local, remote, err := SocketPair() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - local.Close() - remote.Close() - } - }() - conn, err = FileConn(local) - if err != nil { - return nil, err - } - local.Close() - if err := dst.Send(data, remote); err != nil { - return nil, err - } - return conn, nil -} - -func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { - for { - data, f, err := src.Receive() - if err != nil { - return nil, nil, err - } - if f == nil { - // Skip empty attachments - continue - } - conn, err := FileConn(f) - if err != nil { - // Skip beam attachments which are not connections - // (for example might be a regular file, directory etc) - continue - } - return data, conn, nil - } - panic("impossibru!") - return nil, nil, nil -} - -func Copy(dst Sender, src Receiver) (int, error) { - var n int - for { - payload, attachment, err := src.Receive() - if err == io.EOF { - return n, nil - } else if err != nil { - return n, err - } - if err := dst.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return n, err - } - n++ - } - panic("impossibru!") - return n, nil -} - -// MsgDesc returns a human readable description of a beam message, usually -// for debugging purposes. -func MsgDesc(payload []byte, attachment *os.File) string { - var filedesc string = "" - if attachment != nil { - filedesc = fmt.Sprintf("%d", attachment.Fd()) - } - return fmt.Sprintf("'%s'[%s]", payload, filedesc) -} - -type devnull struct{} - -func Devnull() ReceiveSender { - return devnull{} -} - -func (d devnull) Send(p []byte, a *os.File) error { - if a != nil { - a.Close() - } - return nil -} - -func (d devnull) Receive() ([]byte, *os.File, error) { - return nil, nil, io.EOF -} diff --git a/pkg/beam/beam_test.go b/pkg/beam/beam_test.go deleted file mode 100644 index 2822861a37..0000000000 --- a/pkg/beam/beam_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package beam - -import ( - "github.com/dotcloud/docker/pkg/beam/data" - "testing" -) - -func TestSendConn(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) - if err != nil { - t.Fatal(err) - } - if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { - t.Fatal(err) - } - conn.CloseWrite() - }() - payload, conn, err := ReceiveConn(b) - if err != nil { - t.Fatal(err) - } - if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { - t.Fatalf("%v != %v\n", val, "connection") - } - msg, _, err := conn.Receive() - if err != nil { - t.Fatal(err) - } - if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { - t.Fatalf("%v != %v\n", val, "bar") - } -} diff --git a/pkg/beam/data/data.go b/pkg/beam/data/data.go deleted file mode 100644 index e205fe43f4..0000000000 --- a/pkg/beam/data/data.go +++ /dev/null @@ -1,115 +0,0 @@ -package data - -import ( - "fmt" - "strconv" - "strings" -) - -func Encode(obj map[string][]string) string { - var msg string - msg += encodeHeader(0) - for k, values := range obj { - msg += encodeNamedList(k, values) - } - return msg -} - -func encodeHeader(msgtype int) string { - return fmt.Sprintf("%03.3d;", msgtype) -} - -func encodeString(s string) string { - return fmt.Sprintf("%d:%s,", len(s), s) -} - -var EncodeString = encodeString -var DecodeString = decodeString - -func encodeList(l []string) string { - values := make([]string, 0, len(l)) - for _, s := range l { - values = append(values, encodeString(s)) - } - return encodeString(strings.Join(values, "")) -} - -func encodeNamedList(name string, l []string) string { - return encodeString(name) + encodeList(l) -} - -func Decode(msg string) (map[string][]string, error) { - msgtype, skip, err := decodeHeader(msg) - if err != nil { - return nil, err - } - if msgtype != 0 { - // FIXME: use special error type so the caller can easily ignore - return nil, fmt.Errorf("unknown message type: %d", msgtype) - } - msg = msg[skip:] - obj := make(map[string][]string) - for len(msg) > 0 { - k, skip, err := decodeString(msg) - if err != nil { - return nil, err - } - msg = msg[skip:] - values, skip, err := decodeList(msg) - if err != nil { - return nil, err - } - msg = msg[skip:] - obj[k] = values - } - return obj, nil -} - -func decodeList(msg string) ([]string, int, error) { - blob, skip, err := decodeString(msg) - if err != nil { - return nil, 0, err - } - var l []string - for len(blob) > 0 { - v, skipv, err := decodeString(blob) - if err != nil { - return nil, 0, err - } - l = append(l, v) - blob = blob[skipv:] - } - return l, skip, nil -} - -func decodeString(msg string) (string, int, error) { - parts := strings.SplitN(msg, ":", 2) - if len(parts) != 2 { - return "", 0, fmt.Errorf("invalid format: no column") - } - var length int - if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil { - return "", 0, err - } else { - length = int(l) - } - if len(parts[1]) < length+1 { - return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1) - } - payload := parts[1][:length+1] - if payload[length] != ',' { - return "", 0, fmt.Errorf("message is not comma-terminated") - } - return payload[:length], len(parts[0]) + 1 + length + 1, nil -} - -func decodeHeader(msg string) (int, int, error) { - if len(msg) < 4 { - return 0, 0, fmt.Errorf("message too small") - } - msgtype, err := strconv.ParseInt(msg[:3], 10, 32) - if err != nil { - return 0, 0, err - } - return int(msgtype), 4, nil -} diff --git a/pkg/beam/data/data_test.go b/pkg/beam/data/data_test.go deleted file mode 100644 index 9059922b3b..0000000000 --- a/pkg/beam/data/data_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package data - -import ( - "strings" - "testing" -) - -func TestEncodeHelloWorld(t *testing.T) { - input := "hello world!" - output := encodeString(input) - expectedOutput := "12:hello world!," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyString(t *testing.T) { - input := "" - output := encodeString(input) - expectedOutput := "0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyList(t *testing.T) { - input := []string{} - output := encodeList(input) - expectedOutput := "0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyMap(t *testing.T) { - input := make(map[string][]string) - output := Encode(input) - expectedOutput := "000;" - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncode1Key1Value(t *testing.T) { - input := make(map[string][]string) - input["hello"] = []string{"world"} - output := Encode(input) - expectedOutput := "000;5:hello,8:5:world,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncode1Key2Value(t *testing.T) { - input := make(map[string][]string) - input["hello"] = []string{"beautiful", "world"} - output := Encode(input) - expectedOutput := "000;5:hello,20:9:beautiful,5:world,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyValue(t *testing.T) { - input := make(map[string][]string) - input["foo"] = []string{} - output := Encode(input) - expectedOutput := "000;3:foo,0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeBinaryKey(t *testing.T) { - input := make(map[string][]string) - input["foo\x00bar\x7f"] = []string{} - output := Encode(input) - expectedOutput := "000;8:foo\x00bar\x7f,0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeBinaryValue(t *testing.T) { - input := make(map[string][]string) - input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"} - output := Encode(input) - expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestDecodeString(t *testing.T) { - validEncodedStrings := []struct { - input string - output string - skip int - }{ - {"3:foo,", "foo", 6}, - {"5:hello,", "hello", 8}, - {"5:hello,5:world,", "hello", 8}, - } - for _, sample := range validEncodedStrings { - output, skip, err := decodeString(sample.input) - if err != nil { - t.Fatalf("error decoding '%v': %v", sample.input, err) - } - if skip != sample.skip { - t.Fatalf("invalid skip: %v!=%v", skip, sample.skip) - } - if output != sample.output { - t.Fatalf("invalid output: %v!=%v", output, sample.output) - } - } -} - -func TestDecode1Key1Value(t *testing.T) { - input := "000;3:foo,6:3:bar,," - output, err := Decode(input) - if err != nil { - t.Fatal(err) - } - if v, exists := output["foo"]; !exists { - t.Fatalf("wrong output: %v\n", output) - } else if len(v) != 1 || strings.Join(v, "") != "bar" { - t.Fatalf("wrong output: %v\n", output) - } -} diff --git a/pkg/beam/data/message.go b/pkg/beam/data/message.go deleted file mode 100644 index 0ebe90295a..0000000000 --- a/pkg/beam/data/message.go +++ /dev/null @@ -1,103 +0,0 @@ -package data - -import ( - "fmt" - "strings" -) - -type Message string - -func Empty() Message { - return Message(Encode(nil)) -} - -func Parse(args []string) Message { - data := make(map[string][]string) - for _, word := range args { - if strings.Contains(word, "=") { - kv := strings.SplitN(word, "=", 2) - key := kv[0] - var val string - if len(kv) == 2 { - val = kv[1] - } - data[key] = []string{val} - } - } - return Message(Encode(data)) -} - -func (m Message) Add(k, v string) Message { - data, err := Decode(string(m)) - if err != nil { - return m - } - if values, exists := data[k]; exists { - data[k] = append(values, v) - } else { - data[k] = []string{v} - } - return Message(Encode(data)) -} - -func (m Message) Set(k string, v ...string) Message { - data, err := Decode(string(m)) - if err != nil { - panic(err) - return m - } - data[k] = v - return Message(Encode(data)) -} - -func (m Message) Del(k string) Message { - data, err := Decode(string(m)) - if err != nil { - panic(err) - return m - } - delete(data, k) - return Message(Encode(data)) -} - -func (m Message) Get(k string) []string { - data, err := Decode(string(m)) - if err != nil { - return nil - } - v, exists := data[k] - if !exists { - return nil - } - return v -} - -// GetOne returns the last value added at the key k, -// or an empty string if there is no value. -func (m Message) GetOne(k string) string { - var v string - if vals := m.Get(k); len(vals) > 0 { - v = vals[len(vals)-1] - } - return v -} - -func (m Message) Pretty() string { - data, err := Decode(string(m)) - if err != nil { - return "" - } - entries := make([]string, 0, len(data)) - for k, values := range data { - entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ","))) - } - return strings.Join(entries, " ") -} - -func (m Message) String() string { - return string(m) -} - -func (m Message) Bytes() []byte { - return []byte(m) -} diff --git a/pkg/beam/data/message_test.go b/pkg/beam/data/message_test.go deleted file mode 100644 index 7224f33d11..0000000000 --- a/pkg/beam/data/message_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package data - -import ( - "testing" -) - -func TestEmptyMessage(t *testing.T) { - m := Empty() - if m.String() != Encode(nil) { - t.Fatalf("%v != %v", m.String(), Encode(nil)) - } -} - -func TestSetMessage(t *testing.T) { - m := Empty().Set("foo", "bar") - output := m.String() - expectedOutput := "000;3:foo,6:3:bar,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } - decodedOutput, err := Decode(output) - if err != nil { - t.Fatal(err) - } - if len(decodedOutput) != 1 { - t.Fatalf("wrong output data: %#v\n", decodedOutput) - } -} - -func TestSetMessageTwice(t *testing.T) { - m := Empty().Set("foo", "bar").Set("ga", "bu") - output := m.String() - expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } - decodedOutput, err := Decode(output) - if err != nil { - t.Fatal(err) - } - if len(decodedOutput) != 2 { - t.Fatalf("wrong output data: %#v\n", decodedOutput) - } -} - -func TestSetDelMessage(t *testing.T) { - m := Empty().Set("foo", "bar").Del("foo") - output := m.String() - expectedOutput := Encode(nil) - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestGetOne(t *testing.T) { - m := Empty().Set("shadok words", "ga", "bu", "zo", "meu") - val := m.GetOne("shadok words") - if val != "meu" { - t.Fatalf("%#v", val) - } -} diff --git a/pkg/beam/data/netstring.txt b/pkg/beam/data/netstring.txt deleted file mode 100644 index 17560929b6..0000000000 --- a/pkg/beam/data/netstring.txt +++ /dev/null @@ -1,92 +0,0 @@ -## -## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt -## - -Netstrings -D. J. Bernstein, djb@pobox.com -19970201 - - -1. Introduction - - A netstring is a self-delimiting encoding of a string. Netstrings are - very easy to generate and to parse. Any string may be encoded as a - netstring; there are no restrictions on length or on allowed bytes. - Another virtue of a netstring is that it declares the string size up - front. Thus an application can check in advance whether it has enough - space to store the entire string. - - Netstrings may be used as a basic building block for reliable network - protocols. Most high-level protocols, in effect, transmit a sequence - of strings; those strings may be encoded as netstrings and then - concatenated into a sequence of characters, which in turn may be - transmitted over a reliable stream protocol such as TCP. - - Note that netstrings can be used recursively. The result of encoding - a sequence of strings is a single string. A series of those encoded - strings may in turn be encoded into a single string. And so on. - - In this document, a string of 8-bit bytes may be written in two - different forms: as a series of hexadecimal numbers between angle - brackets, or as a sequence of ASCII characters between double quotes. - For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of - length 12; it is the same as the string "hello world!". - - Although this document restricts attention to strings of 8-bit bytes, - netstrings could be used with any 6-bit-or-larger character set. - - -2. Definition - - Any string of 8-bit bytes may be encoded as [len]":"[string]",". - Here [string] is the string and [len] is a nonempty sequence of ASCII - digits giving the length of [string] in decimal. The ASCII digits are - <30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros - at the front of [len] are prohibited: [len] begins with <30> exactly - when [string] is empty. - - For example, the string "hello world!" is encoded as <31 32 3a 68 - 65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The - empty string is encoded as "0:,". - - [len]":"[string]"," is called a netstring. [string] is called the - interpretation of the netstring. - - -3. Sample code - - The following C code starts with a buffer buf of length len and - prints it as a netstring. - - if (printf("%lu:",len) < 0) barf(); - if (fwrite(buf,1,len,stdout) < len) barf(); - if (putchar(',') < 0) barf(); - - The following C code reads a netstring and decodes it into a - dynamically allocated buffer buf of length len. - - if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */ - if (getchar() != ':') barf(); - buf = malloc(len + 1); /* malloc(0) is not portable */ - if (!buf) barf(); - if (fread(buf,1,len,stdin) < len) barf(); - if (getchar() != ',') barf(); - - Both of these code fragments assume that the local character set is - ASCII, and that the relevant stdio streams are in binary mode. - - -4. Security considerations - - The famous Finger security hole may be blamed on Finger's use of the - CRLF encoding. In that encoding, each string is simply terminated by - CRLF. This encoding has several problems. Most importantly, it does - not declare the string size in advance. This means that a correct - CRLF parser must be prepared to ask for more and more memory as it is - reading the string. In the case of Finger, a lazy implementor found - this to be too much trouble; instead he simply declared a fixed-size - buffer and used C's gets() function. The rest is history. - - In contrast, as the above sample code shows, it is very easy to - handle netstrings without risking buffer overflow. Thus widespread - use of netstrings may improve network security. diff --git a/pkg/beam/examples/beamsh/beamsh b/pkg/beam/examples/beamsh/beamsh deleted file mode 100755 index 9bfe78ef4a..0000000000 Binary files a/pkg/beam/examples/beamsh/beamsh and /dev/null differ diff --git a/pkg/beam/examples/beamsh/beamsh.go b/pkg/beam/examples/beamsh/beamsh.go deleted file mode 100644 index 808f038c68..0000000000 --- a/pkg/beam/examples/beamsh/beamsh.go +++ /dev/null @@ -1,542 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/beam/data" - "github.com/dotcloud/docker/pkg/dockerscript" - "github.com/dotcloud/docker/pkg/term" - "io" - "net" - "net/url" - "os" - "path" - "strings" - "sync" -) - -var rootPlugins = []string{ - "stdio", -} - -var ( - flX bool - flPing bool - introspect beam.ReceiveSender = beam.Devnull() -) - -func main() { - fd3 := os.NewFile(3, "beam-introspect") - if introsp, err := beam.FileConn(fd3); err == nil { - introspect = introsp - Logf("introspection enabled\n") - } else { - Logf("introspection disabled\n") - } - fd3.Close() - flag.BoolVar(&flX, "x", false, "print commands as they are being executed") - flag.Parse() - if flag.NArg() == 0 { - if term.IsTerminal(0) { - // No arguments, stdin is terminal --> interactive mode - input := bufio.NewScanner(os.Stdin) - for { - fmt.Printf("[%d] beamsh> ", os.Getpid()) - if !input.Scan() { - break - } - line := input.Text() - if len(line) != 0 { - cmd, err := dockerscript.Parse(strings.NewReader(line)) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - continue - } - if err := executeRootScript(cmd); err != nil { - Fatal(err) - } - } - if err := input.Err(); err == io.EOF { - break - } else if err != nil { - Fatal(err) - } - } - } else { - // No arguments, stdin not terminal --> batch mode - script, err := dockerscript.Parse(os.Stdin) - if err != nil { - Fatal("parse error: %v\n", err) - } - if err := executeRootScript(script); err != nil { - Fatal(err) - } - } - } else { - // 1+ arguments: parse them as script files - for _, scriptpath := range flag.Args() { - f, err := os.Open(scriptpath) - if err != nil { - Fatal(err) - } - script, err := dockerscript.Parse(f) - if err != nil { - Fatal("parse error: %v\n", err) - } - if err := executeRootScript(script); err != nil { - Fatal(err) - } - } - } -} - -func executeRootScript(script []*dockerscript.Command) error { - if len(rootPlugins) > 0 { - // If there are root plugins, wrap the script inside them - var ( - rootCmd *dockerscript.Command - lastCmd *dockerscript.Command - ) - for _, plugin := range rootPlugins { - pluginCmd := &dockerscript.Command{ - Args: []string{plugin}, - } - if rootCmd == nil { - rootCmd = pluginCmd - } else { - lastCmd.Children = []*dockerscript.Command{pluginCmd} - } - lastCmd = pluginCmd - } - lastCmd.Children = script - script = []*dockerscript.Command{rootCmd} - } - handlers, err := Handlers(introspect) - if err != nil { - return err - } - defer handlers.Close() - var tasks sync.WaitGroup - defer func() { - Debugf("Waiting for introspection...\n") - tasks.Wait() - Debugf("DONE Waiting for introspection\n") - }() - if introspect != nil { - tasks.Add(1) - go func() { - Debugf("starting introspection\n") - defer Debugf("done with introspection\n") - defer tasks.Done() - introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil) - Debugf("XXX starting reading introspection messages\n") - r := beam.NewRouter(handlers) - r.NewRoute().All().Handler(func(p []byte, a *os.File) error { - Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a)) - return handlers.Send(p, a) - }) - n, err := beam.Copy(r, introspect) - Debugf("XXX done reading %d introspection messages: %v\n", n, err) - }() - } - if err := executeScript(handlers, script); err != nil { - return err - } - return nil -} - -func executeScript(out beam.Sender, script []*dockerscript.Command) error { - Debugf("executeScript(%s)\n", scriptString(script)) - defer Debugf("executeScript(%s) DONE\n", scriptString(script)) - var background sync.WaitGroup - defer background.Wait() - for _, cmd := range script { - if cmd.Background { - background.Add(1) - go func(out beam.Sender, cmd *dockerscript.Command) { - executeCommand(out, cmd) - background.Done() - }(out, cmd) - } else { - if err := executeCommand(out, cmd); err != nil { - return err - } - } - } - return nil -} - -// 1) Find a handler for the command (if no handler, fail) -// 2) Attach new in & out pair to the handler -// 3) [in the background] Copy handler output to our own output -// 4) [in the background] Run the handler -// 5) Recursively executeScript() all children commands and wait for them to complete -// 6) Wait for handler to return and (shortly afterwards) output copy to complete -// 7) Profit -func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { - if flX { - fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1)) - } - Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) - defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) - if len(cmd.Args) == 0 { - return fmt.Errorf("empty command") - } - Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) - job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes()) - if err != nil { - return fmt.Errorf("%v\n", err) - } - var tasks sync.WaitGroup - tasks.Add(1) - Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) - go func() { - if out != nil { - Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) - n, err := beam.Copy(out, job) - if err != nil { - Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err) - } - Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n) - } - tasks.Done() - }() - // depth-first execution of children commands - // executeScript() blocks until all commands are completed - Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) - executeScript(job, cmd.Children) - Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) - job.CloseWrite() - Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " ")) - Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " ")) - tasks.Wait() - Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " ")) - return nil -} - -type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender) - -func Handlers(sink beam.Sender) (*beam.UnixConn, error) { - var tasks sync.WaitGroup - pub, priv, err := beam.USocketPair() - if err != nil { - return nil, err - } - go func() { - defer func() { - Debugf("[handlers] closewrite() on endpoint\n") - // FIXME: this is not yet necessary but will be once - // there is synchronization over standard beam messages - priv.CloseWrite() - Debugf("[handlers] done closewrite() on endpoint\n") - }() - r := beam.NewRouter(sink) - r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error { - conn, err := beam.FileConn(attachment) - if err != nil { - attachment.Close() - return err - } - // attachment.Close() - tasks.Add(1) - go func() { - defer tasks.Done() - defer func() { - Debugf("[handlers] '%s' closewrite\n", payload) - conn.CloseWrite() - Debugf("[handlers] '%s' done closewrite\n", payload) - }() - cmd := data.Message(payload).Get("cmd") - Debugf("[handlers] received %s\n", strings.Join(cmd, " ")) - if len(cmd) == 0 { - return - } - handler := GetHandler(cmd[0]) - if handler == nil { - return - } - stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes()) - if err != nil { - return - } - defer stdout.Close() - stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes()) - if err != nil { - return - } - defer stderr.Close() - Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) - handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn)) - Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) - }() - return nil - }) - beam.Copy(r, priv) - Debugf("[handlers] waiting for all tasks\n") - tasks.Wait() - Debugf("[handlers] all tasks returned\n") - }() - return pub, nil -} - -func GetHandler(name string) Handler { - if name == "logger" { - return CmdLogger - } else if name == "render" { - return CmdRender - } else if name == "devnull" { - return CmdDevnull - } else if name == "prompt" { - return CmdPrompt - } else if name == "stdio" { - return CmdStdio - } else if name == "echo" { - return CmdEcho - } else if name == "pass" { - return CmdPass - } else if name == "in" { - return CmdIn - } else if name == "exec" { - return CmdExec - } else if name == "trace" { - return CmdTrace - } else if name == "emit" { - return CmdEmit - } else if name == "print" { - return CmdPrint - } else if name == "multiprint" { - return CmdMultiprint - } else if name == "listen" { - return CmdListen - } else if name == "beamsend" { - return CmdBeamsend - } else if name == "beamreceive" { - return CmdBeamreceive - } else if name == "connect" { - return CmdConnect - } else if name == "openfile" { - return CmdOpenfile - } else if name == "spawn" { - return CmdSpawn - } else if name == "chdir" { - return CmdChdir - } - return nil -} - -// VARIOUS HELPER FUNCTIONS: - -func connToFile(conn net.Conn) (f *os.File, err error) { - if connWithFile, ok := conn.(interface { - File() (*os.File, error) - }); !ok { - return nil, fmt.Errorf("no file descriptor available") - } else { - f, err = connWithFile.File() - if err != nil { - return nil, err - } - } - return f, err -} - -type Msg struct { - payload []byte - attachment *os.File -} - -func Logf(msg string, args ...interface{}) (int, error) { - if len(msg) == 0 || msg[len(msg)-1] != '\n' { - msg = msg + "\n" - } - msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg) - return fmt.Printf(msg, args...) -} - -func Debugf(msg string, args ...interface{}) { - if os.Getenv("BEAMDEBUG") != "" { - Logf(msg, args...) - } -} - -func Fatalf(msg string, args ...interface{}) { - Logf(msg, args...) - os.Exit(1) -} - -func Fatal(args ...interface{}) { - Fatalf("%v", args[0]) -} - -func scriptString(script []*dockerscript.Command) string { - lines := make([]string, 0, len(script)) - for _, cmd := range script { - line := strings.Join(cmd.Args, " ") - if len(cmd.Children) > 0 { - line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) - } else { - line += " {}" - } - lines = append(lines, line) - } - return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) -} - -func dialer(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - return - } - connections <- conn - } - }() - return connections, nil -} - -func listener(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := l.Accept() - if err != nil { - return - } - Logf("new connection\n") - connections <- conn - } - }() - return connections, nil -} - -func SendToConn(connections chan net.Conn, src beam.Receiver) error { - var tasks sync.WaitGroup - defer tasks.Wait() - for { - payload, attachment, err := src.Receive() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - conn, ok := <-connections - if !ok { - break - } - Logf("Sending %s\n", msgDesc(payload, attachment)) - tasks.Add(1) - go func(payload []byte, attachment *os.File, conn net.Conn) { - defer tasks.Done() - if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { - return - } - if attachment == nil { - conn.Close() - return - } - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying the connection to [%d]\n", attachment.Fd()) - io.Copy(attachment, conn) - attachment.Close() - Debugf("done copying the connection to [%d]\n", attachment.Fd()) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying [%d] to the connection\n", attachment.Fd()) - io.Copy(conn, attachment) - conn.Close() - Debugf("done copying [%d] to the connection\n", attachment.Fd()) - }(attachment, conn) - iotasks.Wait() - }(payload, attachment, conn) - } - return nil -} - -func msgDesc(payload []byte, attachment *os.File) string { - return beam.MsgDesc(payload, attachment) -} - -func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error { - for conn := range connections { - err := func() error { - Logf("parsing message from network...\n") - defer Logf("done parsing message from network\n") - buf := make([]byte, 4098) - n, err := conn.Read(buf) - if n == 0 { - conn.Close() - if err == io.EOF { - return nil - } else { - return err - } - } - Logf("decoding message from '%s'\n", buf[:n]) - header, skip, err := data.DecodeString(string(buf[:n])) - if err != nil { - conn.Close() - return err - } - pub, priv, err := beam.SocketPair() - if err != nil { - return err - } - Logf("decoded message: %s\n", data.Message(header).Pretty()) - go func(skipped []byte, conn net.Conn, f *os.File) { - // this closes both conn and f - if len(skipped) > 0 { - if _, err := f.Write(skipped); err != nil { - Logf("ERROR: %v\n", err) - f.Close() - conn.Close() - return - } - } - bicopy(conn, f) - }(buf[skip:n], conn, pub) - if err := dst.Send([]byte(header), priv); err != nil { - return err - } - return nil - }() - if err != nil { - Logf("Error reading from connection: %v\n", err) - } - } - return nil -} - -func bicopy(a, b io.ReadWriteCloser) { - var iotasks sync.WaitGroup - oneCopy := func(dst io.WriteCloser, src io.Reader) { - defer iotasks.Done() - io.Copy(dst, src) - dst.Close() - } - iotasks.Add(2) - go oneCopy(a, b) - go oneCopy(b, a) - iotasks.Wait() -} diff --git a/pkg/beam/examples/beamsh/builtins.go b/pkg/beam/examples/beamsh/builtins.go deleted file mode 100644 index 3242237cc1..0000000000 --- a/pkg/beam/examples/beamsh/builtins.go +++ /dev/null @@ -1,441 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/beam/data" - "github.com/dotcloud/docker/pkg/term" - "github.com/dotcloud/docker/utils" - "io" - "net" - "net/url" - "os" - "os/exec" - "path" - "strings" - "sync" - "text/template" -) - -func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if err := os.MkdirAll("logs", 0700); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - var tasks sync.WaitGroup - defer tasks.Wait() - var n int = 1 - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { - tasks.Add(1) - go func(n int) { - defer tasks.Done() - defer attachment.Close() - var streamname string - if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { - streamname = "stdout" - } else { - streamname = cmd[1] - } - if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { - streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) - } - logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - defer logfile.Close() - io.Copy(logfile, attachment) - logfile.Sync() - }(n) - n++ - return nil - }).Tee(out) - if _, err := beam.Copy(r, in); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - txt := args[1] - if !strings.HasSuffix(txt, "\n") { - txt += "\n" - } - t := template.Must(template.New("render").Parse(txt)) - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - msg, err := data.Decode(string(payload)) - if err != nil { - fmt.Fprintf(stderr, "decode error: %v\n") - } - if err := t.Execute(stdout, msg); err != nil { - fmt.Fprintf(stderr, "rendering error: %v\n", err) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - if err := out.Send(payload, attachment); err != nil { - return - } - } -} - -func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - _, attachment, err := in.Receive() - if err != nil { - return - } - if attachment != nil { - attachment.Close() - } - } -} - -func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) - return - } - if !term.IsTerminal(0) { - fmt.Fprintf(stderr, "can't prompt: no tty available...\n") - return - } - fmt.Printf("%s: ", strings.Join(args[1:], " ")) - oldState, _ := term.SaveState(0) - term.DisableEcho(0, oldState) - line, _, err := bufio.NewReader(os.Stdin).ReadLine() - if err != nil { - fmt.Fprintln(stderr, err.Error()) - return - } - val := string(line) - fmt.Printf("\n") - term.RestoreTerminal(0, oldState) - out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) -} - -func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - defer tasks.Wait() - - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { - tasks.Add(1) - go func() { - defer tasks.Done() - defer attachment.Close() - io.Copy(os.Stdout, attachment) - attachment.Close() - }() - return nil - }).Tee(out) - - if _, err := beam.Copy(r, in); err != nil { - Fatal(err) - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - fmt.Fprintln(stdout, strings.Join(args[1:], " ")) -} - -func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - if err := out.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return - } - } -} - -func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - c := exec.Command(utils.SelfPath()) - r, w, err := os.Pipe() - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - c.Stdin = r - c.Stdout = stdout - c.Stderr = stderr - go func() { - fmt.Fprintf(w, strings.Join(args[1:], " ")) - w.Sync() - w.Close() - }() - if err := c.Run(); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - os.Chdir(args[1]) - GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) -} - -func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - cmd := exec.Command(args[1], args[2:]...) - cmd.Stdout = stdout - cmd.Stderr = stderr - //cmd.Stdin = os.Stdin - local, remote, err := beam.SocketPair() - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - child, err := beam.FileConn(local) - if err != nil { - local.Close() - remote.Close() - fmt.Fprintf(stderr, "%v\n", err) - return - } - local.Close() - cmd.ExtraFiles = append(cmd.ExtraFiles, remote) - - var tasks sync.WaitGroup - tasks.Add(1) - go func() { - defer Debugf("done copying to child\n") - defer tasks.Done() - defer child.CloseWrite() - beam.Copy(child, in) - }() - - tasks.Add(1) - go func() { - defer Debugf("done copying from child %d\n") - defer tasks.Done() - r := beam.NewRouter(out) - r.NewRoute().All().Handler(func(p []byte, a *os.File) error { - return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a) - }) - beam.Copy(r, child) - }() - execErr := cmd.Run() - // We can close both ends of the socket without worrying about data stuck in the buffer, - // because unix socket writes are fully synchronous. - child.Close() - tasks.Wait() - var status string - if execErr != nil { - status = execErr.Error() - } else { - status = "ok" - } - out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) -} - -func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - r := beam.NewRouter(out) - r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error { - var sfd string = "nil" - if attachment != nil { - sfd = fmt.Sprintf("%d", attachment.Fd()) - } - fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd) - out.Send(payload, attachment) - return nil - }) - beam.Copy(r, in) -} - -func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - out.Send(data.Parse(args[1:]).Bytes(), nil) -} - -func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, a, err := in.Receive() - if err != nil { - return - } - // Skip commands - if a != nil && data.Message(payload).Get("cmd") == nil { - dup, err := beam.SendRPipe(out, payload) - if err != nil { - a.Close() - return - } - io.Copy(io.MultiWriter(os.Stdout, dup), a) - dup.Close() - } else { - if err := out.Send(payload, a); err != nil { - return - } - } - } -} - -func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - defer tasks.Wait() - r := beam.NewRouter(out) - multiprint := func(p []byte, a *os.File) error { - tasks.Add(1) - go func() { - defer tasks.Done() - defer a.Close() - msg := data.Message(string(p)) - input := bufio.NewScanner(a) - for input.Scan() { - fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) - } - }() - return nil - } - r.NewRoute().KeyIncludes("type", "job").Passthrough(out) - r.NewRoute().HasAttachment().Handler(multiprint).Tee(out) - beam.Copy(r, in) -} - -func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - for { - conn, err := l.Accept() - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - f, err := connToFile(conn) - if err != nil { - conn.Close() - continue - } - out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) - } -} - -func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = dialer - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - SendToConn(connections, in) -} - -func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = listener - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - ReceiveFromConn(connections, out) -} - -func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - var tasks sync.WaitGroup - for { - _, attachment, err := in.Receive() - if err != nil { - break - } - if attachment == nil { - continue - } - Logf("connecting to %s/%s\n", u.Scheme, u.Host) - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil) - return - } - out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) - tasks.Add(1) - go func(attachment *os.File, conn net.Conn) { - defer tasks.Done() - // even when successful, conn.File() returns a duplicate, - // so we must close the original - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(attachment, conn) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(conn, attachment) - }(attachment, conn) - iotasks.Wait() - conn.Close() - attachment.Close() - }(attachment, conn) - } - tasks.Wait() -} - -func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for _, name := range args { - f, err := os.Open(name) - if err != nil { - continue - } - if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { - f.Close() - } - } -} - -func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - os.Chdir(args[1]) -} diff --git a/pkg/beam/examples/beamsh/scripts/bug0.ds b/pkg/beam/examples/beamsh/scripts/bug0.ds deleted file mode 100755 index 89b75230be..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug0.ds +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env beamsh - -exec ls -l diff --git a/pkg/beam/examples/beamsh/scripts/bug1.ds b/pkg/beam/examples/beamsh/scripts/bug1.ds deleted file mode 100755 index 2d8a9e2ed9..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug1.ds +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - exec ls -l -} diff --git a/pkg/beam/examples/beamsh/scripts/bug2.ds b/pkg/beam/examples/beamsh/scripts/bug2.ds deleted file mode 100755 index 08f0431f68..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug2.ds +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - stdio { - exec ls -l - } -} diff --git a/pkg/beam/examples/beamsh/scripts/bug3.ds b/pkg/beam/examples/beamsh/scripts/bug3.ds deleted file mode 100755 index 7bb8694d49..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug3.ds +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env beamsh -x - -trace outer { - # stdio fails - stdio { - trace inner { - exec ls -l - } - } -} diff --git a/pkg/beam/examples/beamsh/scripts/bug4.ds b/pkg/beam/examples/beamsh/scripts/bug4.ds deleted file mode 100755 index b7beedbae2..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug4.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - trace { - stdio { - exec ls -l - } - } -} diff --git a/pkg/beam/examples/beamsh/scripts/bug5.ds b/pkg/beam/examples/beamsh/scripts/bug5.ds deleted file mode 100755 index 9f9a85515d..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug5.ds +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - # exec fails - exec ls -l -} diff --git a/pkg/beam/examples/beamsh/scripts/bug6.ds b/pkg/beam/examples/beamsh/scripts/bug6.ds deleted file mode 100755 index 90281401cd..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug6.ds +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - trace { - echo hello - } -} diff --git a/pkg/beam/examples/beamsh/scripts/bug7.ds b/pkg/beam/examples/beamsh/scripts/bug7.ds deleted file mode 100755 index b6e7bd9201..0000000000 --- a/pkg/beam/examples/beamsh/scripts/bug7.ds +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - # exec fails - echo hello world -} diff --git a/pkg/beam/examples/beamsh/scripts/demo1.ds b/pkg/beam/examples/beamsh/scripts/demo1.ds deleted file mode 100755 index 20a3359f3a..0000000000 --- a/pkg/beam/examples/beamsh/scripts/demo1.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -devnull { - multiprint { - exec tail -f /var/log/system.log & - exec ls -l - exec ls ksdhfkjdshf jksdfhkjsdhf - } -} diff --git a/pkg/beam/examples/beamsh/scripts/helloworld.ds b/pkg/beam/examples/beamsh/scripts/helloworld.ds deleted file mode 100755 index 32e59b062e..0000000000 --- a/pkg/beam/examples/beamsh/scripts/helloworld.ds +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env beamsh - -print { - trace { - emit msg=hello - emit msg=world - } -} diff --git a/pkg/beam/examples/beamsh/scripts/logdemo.ds b/pkg/beam/examples/beamsh/scripts/logdemo.ds deleted file mode 100755 index 8b729a966f..0000000000 --- a/pkg/beam/examples/beamsh/scripts/logdemo.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - log { - exec ls -l - exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf - echo hello world - } -} diff --git a/pkg/beam/examples/beamsh/scripts/miniserver.ds b/pkg/beam/examples/beamsh/scripts/miniserver.ds deleted file mode 100755 index 9707477ee0..0000000000 --- a/pkg/beam/examples/beamsh/scripts/miniserver.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -multiprint { - trace { - listen tcp://localhost:7676 & - listen tcp://localhost:8787 & - } -} - diff --git a/pkg/beam/router.go b/pkg/beam/router.go deleted file mode 100644 index 15910e95b1..0000000000 --- a/pkg/beam/router.go +++ /dev/null @@ -1,184 +0,0 @@ -package beam - -import ( - "fmt" - "github.com/dotcloud/docker/pkg/beam/data" - "io" - "os" -) - -type Router struct { - routes []*Route - sink Sender -} - -func NewRouter(sink Sender) *Router { - return &Router{sink: sink} -} - -func (r *Router) Send(payload []byte, attachment *os.File) (err error) { - //fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment)) - defer func() { - //fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err) - }() - for _, route := range r.routes { - if route.Match(payload, attachment) { - return route.Handle(payload, attachment) - } - } - if r.sink != nil { - // fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink) - return r.sink.Send(payload, attachment) - } - //fmt.Printf("[Router.Send] no match. return error.\n") - return fmt.Errorf("no matching route") -} - -func (r *Router) NewRoute() *Route { - route := &Route{} - r.routes = append(r.routes, route) - return route -} - -type Route struct { - rules []func([]byte, *os.File) bool - handler func([]byte, *os.File) error -} - -func (route *Route) Match(payload []byte, attachment *os.File) bool { - for _, rule := range route.rules { - if !rule(payload, attachment) { - return false - } - } - return true -} - -func (route *Route) Handle(payload []byte, attachment *os.File) error { - if route.handler == nil { - return nil - } - return route.handler(payload, attachment) -} - -func (r *Route) HasAttachment() *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return attachment != nil - }) - return r -} - -func (route *Route) Tee(dst Sender) *Route { - inner := route.handler - route.handler = func(payload []byte, attachment *os.File) error { - if inner == nil { - return nil - } - if attachment == nil { - return inner(payload, attachment) - } - // Setup the tee - w, err := SendRPipe(dst, payload) - if err != nil { - return err - } - teeR, teeW, err := os.Pipe() - if err != nil { - w.Close() - return err - } - go func() { - io.Copy(io.MultiWriter(teeW, w), attachment) - attachment.Close() - w.Close() - teeW.Close() - }() - return inner(payload, teeR) - } - return route -} - -func (r *Route) Filter(f func([]byte, *os.File) bool) *Route { - r.rules = append(r.rules, f) - return r -} - -func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - values := data.Message(payload).Get(k) - if values == nil { - return false - } - if len(values) < len(beginning) { - return false - } - for i, v := range beginning { - if v != values[i] { - return false - } - } - return true - }) - return r -} - -func (r *Route) KeyEquals(k string, full ...string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - values := data.Message(payload).Get(k) - if len(values) != len(full) { - return false - } - for i, v := range full { - if v != values[i] { - return false - } - } - return true - }) - return r -} - -func (r *Route) KeyIncludes(k, v string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - for _, val := range data.Message(payload).Get(k) { - if val == v { - return true - } - } - return false - }) - return r -} - -func (r *Route) NoKey(k string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return len(data.Message(payload).Get(k)) == 0 - }) - return r -} - -func (r *Route) KeyExists(k string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return data.Message(payload).Get(k) != nil - }) - return r -} - -func (r *Route) Passthrough(dst Sender) *Route { - r.handler = func(payload []byte, attachment *os.File) error { - return dst.Send(payload, attachment) - } - return r -} - -func (r *Route) All() *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return true - }) - return r -} - -func (r *Route) Handler(h func([]byte, *os.File) error) *Route { - r.handler = h - return r -} diff --git a/pkg/beam/router_test.go b/pkg/beam/router_test.go deleted file mode 100644 index f7f7bf1d2d..0000000000 --- a/pkg/beam/router_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package beam - -import ( - "fmt" - "io/ioutil" - "os" - "sync" - "testing" -) - -type msg struct { - payload []byte - attachment *os.File -} - -func (m msg) String() string { - return MsgDesc(m.payload, m.attachment) -} - -type mockReceiver []msg - -func (r *mockReceiver) Send(p []byte, a *os.File) error { - (*r) = append((*r), msg{p, a}) - return nil -} - -func TestSendNoSinkNoRoute(t *testing.T) { - r := NewRouter(nil) - if err := r.Send([]byte("hello"), nil); err == nil { - t.Fatalf("error expected") - } - a, b, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - if err := r.Send([]byte("foo bar baz"), a); err == nil { - t.Fatalf("error expected") - } -} - -func TestSendSinkNoRoute(t *testing.T) { - var sink mockReceiver - r := NewRouter(&sink) - if err := r.Send([]byte("hello"), nil); err != nil { - t.Fatal(err) - } - a, b, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - if err := r.Send([]byte("world"), a); err != nil { - t.Fatal(err) - } - if len(sink) != 2 { - t.Fatalf("%#v\n", sink) - } - if string(sink[0].payload) != "hello" { - t.Fatalf("%#v\n", sink) - } - if sink[0].attachment != nil { - t.Fatalf("%#v\n", sink) - } - if string(sink[1].payload) != "world" { - t.Fatalf("%#v\n", sink) - } - if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 { - t.Fatalf("%v\n", sink) - } - var tasks sync.WaitGroup - tasks.Add(2) - go func() { - defer tasks.Done() - fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd()) - data, err := ioutil.ReadAll(sink[1].attachment) - if err != nil { - t.Fatal(err) - } - if string(data) != "foo bar\n" { - t.Fatalf("%v\n", string(data)) - } - }() - go func() { - defer tasks.Done() - fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd()) - if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil { - t.Fatal(err) - } - b.Close() - }() - tasks.Wait() -} diff --git a/pkg/beam/service.go b/pkg/beam/service.go deleted file mode 100644 index 8e117059cb..0000000000 --- a/pkg/beam/service.go +++ /dev/null @@ -1,85 +0,0 @@ -package beam - -import ( - "net" -) - -// Listen is a convenience interface for applications to create service endpoints -// which can be easily used with existing networking code. -// -// Listen registers a new service endpoint on the beam connection `conn`, using the -// service name `name`. It returns a listener which can be used in the usual -// way. Calling Accept() on the listener will block until a new connection is available -// on the service endpoint. The endpoint is then returned as a regular net.Conn and -// can be used as a regular network connection. -// -// Note that if the underlying file descriptor received in attachment is nil or does -// not point to a connection, that message will be skipped. -// -func Listen(conn Sender, name string) (net.Listener, error) { - endpoint, err := SendConn(conn, []byte(name)) - if err != nil { - return nil, err - } - return &listener{ - name: name, - endpoint: endpoint, - }, nil -} - -func Connect(ctx *UnixConn, name string) (net.Conn, error) { - l, err := Listen(ctx, name) - if err != nil { - return nil, err - } - conn, err := l.Accept() - if err != nil { - return nil, err - } - return conn, nil -} - -type listener struct { - name string - endpoint ReceiveCloser -} - -func (l *listener) Accept() (net.Conn, error) { - for { - _, f, err := l.endpoint.Receive() - if err != nil { - return nil, err - } - if f == nil { - // Skip empty attachments - continue - } - conn, err := net.FileConn(f) - if err != nil { - // Skip beam attachments which are not connections - // (for example might be a regular file, directory etc) - continue - } - return conn, nil - } - panic("impossibru!") - return nil, nil -} - -func (l *listener) Close() error { - return l.endpoint.Close() -} - -func (l *listener) Addr() net.Addr { - return addr(l.name) -} - -type addr string - -func (a addr) Network() string { - return "beam" -} - -func (a addr) String() string { - return string(a) -} diff --git a/pkg/beam/unix.go b/pkg/beam/unix.go deleted file mode 100644 index b2d0d94150..0000000000 --- a/pkg/beam/unix.go +++ /dev/null @@ -1,317 +0,0 @@ -package beam - -import ( - "bufio" - "fmt" - "net" - "os" - "syscall" -) - -func debugCheckpoint(msg string, args ...interface{}) { - if os.Getenv("DEBUG") == "" { - return - } - os.Stdout.Sync() - tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700) - fmt.Fprintf(tty, msg, args...) - bufio.NewScanner(tty).Scan() - tty.Close() -} - -type UnixConn struct { - *net.UnixConn - fds []*os.File -} - -// Framing: -// In order to handle framing in Send/Recieve, as these give frame -// boundaries we use a very simple 4 bytes header. It is a big endiand -// uint32 where the high bit is set if the message includes a file -// descriptor. The rest of the uint32 is the length of the next frame. -// We need the bit in order to be able to assign recieved fds to -// the right message, as multiple messages may be coalesced into -// a single recieve operation. -func makeHeader(data []byte, fds []int) ([]byte, error) { - header := make([]byte, 4) - - length := uint32(len(data)) - - if length > 0x7fffffff { - return nil, fmt.Errorf("Data to large") - } - - if len(fds) != 0 { - length = length | 0x80000000 - } - header[0] = byte((length >> 24) & 0xff) - header[1] = byte((length >> 16) & 0xff) - header[2] = byte((length >> 8) & 0xff) - header[3] = byte((length >> 0) & 0xff) - - return header, nil -} - -func parseHeader(header []byte) (uint32, bool) { - length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) - hasFd := length&0x80000000 != 0 - length = length & ^uint32(0x80000000) - - return length, hasFd -} - -func FileConn(f *os.File) (*UnixConn, error) { - conn, err := net.FileConn(f) - if err != nil { - return nil, err - } - uconn, ok := conn.(*net.UnixConn) - if !ok { - conn.Close() - return nil, fmt.Errorf("%d: not a unix connection", f.Fd()) - } - return &UnixConn{UnixConn: uconn}, nil - -} - -// Send sends a new message on conn with data and f as payload and -// attachment, respectively. -// On success, f is closed -func (conn *UnixConn) Send(data []byte, f *os.File) error { - { - var fd int = -1 - if f != nil { - fd = int(f.Fd()) - } - debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd) - } - var fds []int - if f != nil { - fds = append(fds, int(f.Fd())) - } - if err := conn.sendUnix(data, fds...); err != nil { - return err - } - - if f != nil { - f.Close() - } - return nil -} - -// Receive waits for a new message on conn, and receives its payload -// and attachment, or an error if any. -// -// If more than 1 file descriptor is sent in the message, they are all -// closed except for the first, which is the attachment. -// It is legal for a message to have no attachment or an empty payload. -func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) { - defer func() { - var fd int = -1 - if rf != nil { - fd = int(rf.Fd()) - } - debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd) - }() - - // Read header - header := make([]byte, 4) - nRead := uint32(0) - - for nRead < 4 { - n, err := conn.receiveUnix(header[nRead:]) - if err != nil { - return nil, nil, err - } - nRead = nRead + uint32(n) - } - - length, hasFd := parseHeader(header) - - if hasFd { - if len(conn.fds) == 0 { - return nil, nil, fmt.Errorf("No expected file descriptor in message") - } - - rf = conn.fds[0] - conn.fds = conn.fds[1:] - } - - rdata = make([]byte, length) - - nRead = 0 - for nRead < length { - n, err := conn.receiveUnix(rdata[nRead:]) - if err != nil { - return nil, nil, err - } - nRead = nRead + uint32(n) - } - - return -} - -func (conn *UnixConn) receiveUnix(buf []byte) (int, error) { - oob := make([]byte, syscall.CmsgSpace(4)) - bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob) - if err != nil { - return 0, err - } - fd := extractFd(oob[:oobn]) - if fd != -1 { - f := os.NewFile(uintptr(fd), "") - conn.fds = append(conn.fds, f) - } - - return bufn, nil -} - -func (conn *UnixConn) sendUnix(data []byte, fds ...int) error { - header, err := makeHeader(data, fds) - if err != nil { - return err - } - - // There is a bug in conn.WriteMsgUnix where it doesn't correctly return - // the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645) - // So, we can't rely on the return value from it. However, we must use it to - // send the fds. In order to handle this we only write one byte using WriteMsgUnix - // (when we have to), as that can only ever block or fully suceed. We then write - // the rest with conn.Write() - // The reader side should not rely on this though, as hopefully this gets fixed - // in go later. - written := 0 - if len(fds) != 0 { - oob := syscall.UnixRights(fds...) - wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil) - if err != nil { - return err - } - written = written + wrote - } - - for written < len(header) { - wrote, err := conn.Write(header[written:]) - if err != nil { - return err - } - written = written + wrote - } - - written = 0 - for written < len(data) { - wrote, err := conn.Write(data[written:]) - if err != nil { - return err - } - written = written + wrote - } - - return nil -} - -func extractFd(oob []byte) int { - // Grab forklock to make sure no forks accidentally inherit the new - // fds before they are made CLOEXEC - // There is a slight race condition between ReadMsgUnix returns and - // when we grap the lock, so this is not perfect. Unfortunately - // There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any - // way to implement non-blocking i/o in go, so this is hard to fix. - syscall.ForkLock.Lock() - defer syscall.ForkLock.Unlock() - scms, err := syscall.ParseSocketControlMessage(oob) - if err != nil { - return -1 - } - - foundFd := -1 - for _, scm := range scms { - fds, err := syscall.ParseUnixRights(&scm) - if err != nil { - continue - } - - for _, fd := range fds { - if foundFd == -1 { - syscall.CloseOnExec(fd) - foundFd = fd - } else { - syscall.Close(fd) - } - } - } - - return foundFd -} - -func socketpair() ([2]int, error) { - return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) -} - -// SocketPair is a convenience wrapper around the socketpair(2) syscall. -// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors -// not bound to the underlying filesystem. -// Messages sent on one end are received on the other, and vice-versa. -// It is the caller's responsibility to close both ends. -func SocketPair() (a *os.File, b *os.File, err error) { - defer func() { - var ( - fdA int = -1 - fdB int = -1 - ) - if a != nil { - fdA = int(a.Fd()) - } - if b != nil { - fdB = int(b.Fd()) - } - debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB) - }() - pair, err := socketpair() - if err != nil { - return nil, nil, err - } - return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil -} - -func USocketPair() (*UnixConn, *UnixConn, error) { - debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ") - defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ") - a, b, err := SocketPair() - if err != nil { - return nil, nil, err - } - defer a.Close() - defer b.Close() - uA, err := FileConn(a) - if err != nil { - return nil, nil, err - } - uB, err := FileConn(b) - if err != nil { - uA.Close() - return nil, nil, err - } - return uA, uB, nil -} - -// FdConn wraps a file descriptor in a standard *net.UnixConn object, or -// returns an error if the file descriptor does not point to a unix socket. -// This creates a duplicate file descriptor. It's the caller's responsibility -// to close both. -func FdConn(fd int) (n *net.UnixConn, err error) { - { - debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd) - } - f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd)) - conn, err := net.FileConn(f) - if err != nil { - return nil, err - } - uconn, ok := conn.(*net.UnixConn) - if !ok { - conn.Close() - return nil, fmt.Errorf("%d: not a unix connection", fd) - } - return uconn, nil -} diff --git a/pkg/beam/unix_test.go b/pkg/beam/unix_test.go deleted file mode 100644 index 976f089c23..0000000000 --- a/pkg/beam/unix_test.go +++ /dev/null @@ -1,237 +0,0 @@ -package beam - -import ( - "fmt" - "io/ioutil" - "testing" -) - -func TestSocketPair(t *testing.T) { - a, b, err := SocketPair() - if err != nil { - t.Fatal(err) - } - go func() { - a.Write([]byte("hello world!")) - fmt.Printf("done writing. closing\n") - a.Close() - fmt.Printf("done closing\n") - }() - data, err := ioutil.ReadAll(b) - if err != nil { - t.Fatal(err) - } - fmt.Printf("--> %s\n", data) - fmt.Printf("still open: %v\n", a.Fd()) -} - -func TestUSocketPair(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - - data := "hello world!" - go func() { - a.Write([]byte(data)) - a.Close() - }() - res := make([]byte, 1024) - size, err := b.Read(res) - if err != nil { - t.Fatal(err) - } - if size != len(data) { - t.Fatal("Unexpected size") - } - if string(res[0:size]) != data { - t.Fatal("Unexpected data") - } -} - -func TestSendUnixSocket(t *testing.T) { - a1, a2, err := USocketPair() - if err != nil { - t.Fatal(err) - } - // defer a1.Close() - // defer a2.Close() - b1, b2, err := USocketPair() - if err != nil { - t.Fatal(err) - } - // defer b1.Close() - // defer b2.Close() - glueA, glueB, err := SocketPair() - if err != nil { - t.Fatal(err) - } - // defer glueA.Close() - // defer glueB.Close() - go func() { - err := b2.Send([]byte("a"), glueB) - if err != nil { - t.Fatal(err) - } - }() - go func() { - err := a2.Send([]byte("b"), glueA) - if err != nil { - t.Fatal(err) - } - }() - connAhdr, connA, err := a1.Receive() - if err != nil { - t.Fatal(err) - } - if string(connAhdr) != "b" { - t.Fatalf("unexpected: %s", connAhdr) - } - connBhdr, connB, err := b1.Receive() - if err != nil { - t.Fatal(err) - } - if string(connBhdr) != "a" { - t.Fatalf("unexpected: %s", connBhdr) - } - fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd()) - go func() { - fmt.Printf("sending message on %v\n", connA.Fd()) - connA.Write([]byte("hello world")) - connA.Sync() - fmt.Printf("closing %v\n", connA.Fd()) - connA.Close() - }() - data, err := ioutil.ReadAll(connB) - if err != nil { - t.Fatal(err) - } - fmt.Printf("---> %s\n", data) - -} - -// Ensure we get proper segmenting of messages -func TestSendSegmenting(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - - extrafd1, extrafd2, err := SocketPair() - if err != nil { - t.Fatal(err) - } - extrafd2.Close() - - go func() { - a.Send([]byte("message 1"), nil) - a.Send([]byte("message 2"), extrafd1) - a.Send([]byte("message 3"), nil) - }() - - msg1, file1, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg1) != "message 1" { - t.Fatal("unexpected msg1:", string(msg1)) - } - if file1 != nil { - t.Fatal("unexpectedly got file1") - } - - msg2, file2, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg2) != "message 2" { - t.Fatal("unexpected msg2:", string(msg2)) - } - if file2 == nil { - t.Fatal("didn't get file2") - } - file2.Close() - - msg3, file3, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg3) != "message 3" { - t.Fatal("unexpected msg3:", string(msg3)) - } - if file3 != nil { - t.Fatal("unexpectedly got file3") - } - -} - -// Test sending a zero byte message -func TestSendEmpty(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - a.Send([]byte{}, nil) - }() - - msg, file, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if len(msg) != 0 { - t.Fatalf("unexpected non-empty message: %v", msg) - } - if file != nil { - t.Fatal("unexpectedly got file") - } - -} - -func makeLarge(size int) []byte { - res := make([]byte, size) - for i := range res { - res[i] = byte(i % 255) - } - return res -} - -func verifyLarge(data []byte, size int) bool { - if len(data) != size { - return false - } - for i := range data { - if data[i] != byte(i%255) { - return false - } - } - return true -} - -// Test sending a large message -func TestSendLarge(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - a.Send(makeLarge(100000), nil) - }() - - msg, file, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if !verifyLarge(msg, 100000) { - t.Fatalf("unexpected message (size %d)", len(msg)) - } - if file != nil { - t.Fatal("unexpectedly got file") - } -}