package beam import ( "fmt" "io/ioutil" "testing" "os" "sync" ) type msg struct { payload []byte attachment *os.File } func (m msg) String() string { return MsgDesc(m.payload, m.attachment) } type mockReceiver []msg func (r *mockReceiver) Send(p []byte, a *os.File) error { (*r) = append((*r), msg{p, a}) return nil } func TestSendNoSinkNoRoute(t *testing.T) { r := NewRouter(nil) if err := r.Send([]byte("hello"), nil); err == nil { t.Fatalf("error expected") } a, b, err := os.Pipe() if err != nil { t.Fatal(err) } defer a.Close() defer b.Close() if err := r.Send([]byte("foo bar baz"), a); err == nil { t.Fatalf("error expected") } } func TestSendSinkNoRoute(t *testing.T) { var sink mockReceiver r := NewRouter(&sink) if err := r.Send([]byte("hello"), nil); err != nil { t.Fatal(err) } a, b, err := os.Pipe() if err != nil { t.Fatal(err) } defer a.Close() defer b.Close() if err := r.Send([]byte("world"), a); err != nil { t.Fatal(err) } if len(sink) != 2 { t.Fatalf("%#v\n", sink) } if string(sink[0].payload) != "hello" { t.Fatalf("%#v\n", sink) } if sink[0].attachment != nil { t.Fatalf("%#v\n", sink) } if string(sink[1].payload) != "world" { t.Fatalf("%#v\n", sink) } if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 { t.Fatalf("%v\n", sink) } var tasks sync.WaitGroup tasks.Add(2) go func() { defer tasks.Done() fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd()) data, err := ioutil.ReadAll(sink[1].attachment) if err != nil { t.Fatal(err) } if string(data) != "foo bar\n" { t.Fatalf("%v\n", string(data)) } }() go func() { defer tasks.Done() fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd()) if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil { t.Fatal(err) } b.Close() }() tasks.Wait() }