mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
beam: Router can route beam messages with a convenient set of rules and handlers
Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
This commit is contained in:
parent
40b4f86eab
commit
8f5435e80c
4 changed files with 246 additions and 6 deletions
|
@ -1,6 +1,7 @@
|
||||||
package beam
|
package beam
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
@ -100,3 +101,13 @@ func Copy(dst Sender, src Receiver) (int, error) {
|
||||||
panic("impossibru!")
|
panic("impossibru!")
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MsgDesc returns a human readable description of a beam message, usually
|
||||||
|
// for debugging purposes.
|
||||||
|
func MsgDesc(payload []byte, attachment *os.File) string {
|
||||||
|
var filedesc string = "<nil>"
|
||||||
|
if attachment != nil {
|
||||||
|
filedesc = fmt.Sprintf("%d", attachment.Fd())
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
|
||||||
|
}
|
||||||
|
|
|
@ -851,13 +851,9 @@ func SendToConn(connections chan net.Conn, src beam.Receiver) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func msgDesc(payload []byte, attachment *os.File) string {
|
|
||||||
var filedesc string = "<nil>"
|
|
||||||
if attachment != nil {
|
|
||||||
filedesc = fmt.Sprintf("%d", attachment.Fd())
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
|
|
||||||
|
|
||||||
|
func msgDesc(payload []byte, attachment *os.File) string {
|
||||||
|
return beam.MsgDesc(payload, attachment)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
|
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
|
||||||
|
|
137
pkg/beam/router.go
Normal file
137
pkg/beam/router.go
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
package beam
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"github.com/dotcloud/docker/pkg/beam/data"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Router struct {
|
||||||
|
routes []*Route
|
||||||
|
sink Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRouter(sink Sender) *Router {
|
||||||
|
return &Router{sink:sink}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
|
||||||
|
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
|
||||||
|
defer func() {
|
||||||
|
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
|
||||||
|
}()
|
||||||
|
for _, route := range r.routes {
|
||||||
|
if route.Match(payload, attachment) {
|
||||||
|
return route.Handle(payload, attachment)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.sink != nil {
|
||||||
|
//fmt.Printf("[Router.Send] no match. sending to sink\n")
|
||||||
|
return r.sink.Send(payload, attachment)
|
||||||
|
}
|
||||||
|
//fmt.Printf("[Router.Send] no match. return error.\n")
|
||||||
|
return fmt.Errorf("no matching route")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) NewRoute() *Route {
|
||||||
|
route := &Route{}
|
||||||
|
r.routes = append(r.routes, route)
|
||||||
|
return route
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
type Route struct {
|
||||||
|
rules []func([]byte, *os.File) bool
|
||||||
|
handler func([]byte, *os.File) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (route *Route) Match(payload []byte, attachment *os.File) bool {
|
||||||
|
for _, rule := range route.rules {
|
||||||
|
if !rule(payload, attachment) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (route *Route) Handle(payload []byte, attachment *os.File) error {
|
||||||
|
if route.handler == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return route.handler(payload, attachment)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Route) HasAttachment() *Route {
|
||||||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
|
return attachment != nil
|
||||||
|
})
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (route *Route) Tee(dst Sender) *Route {
|
||||||
|
inner := route.handler
|
||||||
|
route.handler = func(payload []byte, attachment *os.File) error {
|
||||||
|
if inner == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if attachment == nil {
|
||||||
|
return inner(payload, attachment)
|
||||||
|
}
|
||||||
|
// Setup the tee
|
||||||
|
w, err := SendPipe(dst, payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
teeR, teeW, err := os.Pipe()
|
||||||
|
if err != nil {
|
||||||
|
w.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
io.Copy(io.MultiWriter(teeW, w), attachment)
|
||||||
|
attachment.Close()
|
||||||
|
w.Close()
|
||||||
|
teeW.Close()
|
||||||
|
}()
|
||||||
|
return inner(payload, teeR)
|
||||||
|
}
|
||||||
|
return route
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
||||||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
|
values := data.Message(payload).Get(k)
|
||||||
|
if len(values) < len(beginning) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, v := range beginning {
|
||||||
|
if v != values[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Route) Passthrough(dst Sender) *Route {
|
||||||
|
r.handler = func(payload []byte, attachment *os.File) error {
|
||||||
|
return dst.Send(payload, attachment)
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Route) All() *Route {
|
||||||
|
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
|
||||||
|
r.handler = h
|
||||||
|
return r
|
||||||
|
}
|
96
pkg/beam/router_test.go
Normal file
96
pkg/beam/router_test.go
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
package beam
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"testing"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type msg struct {
|
||||||
|
payload []byte
|
||||||
|
attachment *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m msg) String() string {
|
||||||
|
return MsgDesc(m.payload, m.attachment)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
type mockReceiver []msg
|
||||||
|
|
||||||
|
func (r *mockReceiver) Send(p []byte, a *os.File) error {
|
||||||
|
(*r) = append((*r), msg{p, a})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendNoSinkNoRoute(t *testing.T) {
|
||||||
|
r := NewRouter(nil)
|
||||||
|
if err := r.Send([]byte("hello"), nil); err == nil {
|
||||||
|
t.Fatalf("error expected")
|
||||||
|
}
|
||||||
|
a, b, err := os.Pipe()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer a.Close()
|
||||||
|
defer b.Close()
|
||||||
|
if err := r.Send([]byte("foo bar baz"), a); err == nil {
|
||||||
|
t.Fatalf("error expected")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendSinkNoRoute(t *testing.T) {
|
||||||
|
var sink mockReceiver
|
||||||
|
r := NewRouter(&sink)
|
||||||
|
if err := r.Send([]byte("hello"), nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
a, b, err := os.Pipe()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer a.Close()
|
||||||
|
defer b.Close()
|
||||||
|
if err := r.Send([]byte("world"), a); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(sink) != 2 {
|
||||||
|
t.Fatalf("%#v\n", sink)
|
||||||
|
}
|
||||||
|
if string(sink[0].payload) != "hello" {
|
||||||
|
t.Fatalf("%#v\n", sink)
|
||||||
|
}
|
||||||
|
if sink[0].attachment != nil {
|
||||||
|
t.Fatalf("%#v\n", sink)
|
||||||
|
}
|
||||||
|
if string(sink[1].payload) != "world" {
|
||||||
|
t.Fatalf("%#v\n", sink)
|
||||||
|
}
|
||||||
|
if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 {
|
||||||
|
t.Fatalf("%v\n", sink)
|
||||||
|
}
|
||||||
|
var tasks sync.WaitGroup
|
||||||
|
tasks.Add(2)
|
||||||
|
go func() {
|
||||||
|
defer tasks.Done()
|
||||||
|
fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd())
|
||||||
|
data, err := ioutil.ReadAll(sink[1].attachment)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if string(data) != "foo bar\n" {
|
||||||
|
t.Fatalf("%v\n", string(data))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer tasks.Done()
|
||||||
|
fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd())
|
||||||
|
if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
b.Close()
|
||||||
|
}()
|
||||||
|
tasks.Wait()
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue