package events import ( "bytes" "encoding/json" "fmt" "io" "testing" "time" "github.com/docker/docker/engine" "github.com/docker/docker/utils" ) func TestEventsPublish(t *testing.T) { e := New() l1 := make(chan *utils.JSONMessage) l2 := make(chan *utils.JSONMessage) e.subscribe(l1) e.subscribe(l2) count := e.subscribersCount() if count != 2 { t.Fatalf("Must be 2 subscribers, got %d", count) } go e.log("test", "cont", "image") select { case msg := <-l1: if len(e.events) != 1 { t.Fatalf("Must be only one event, got %d", len(e.events)) } if msg.Status != "test" { t.Fatalf("Status should be test, got %s", msg.Status) } if msg.ID != "cont" { t.Fatalf("ID should be cont, got %s", msg.ID) } if msg.From != "image" { t.Fatalf("From should be image, got %s", msg.From) } case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for broadcasted message") } select { case msg := <-l2: if len(e.events) != 1 { t.Fatalf("Must be only one event, got %d", len(e.events)) } if msg.Status != "test" { t.Fatalf("Status should be test, got %s", msg.Status) } if msg.ID != "cont" { t.Fatalf("ID should be cont, got %s", msg.ID) } if msg.From != "image" { t.Fatalf("From should be image, got %s", msg.From) } case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for broadcasted message") } } func TestEventsPublishTimeout(t *testing.T) { e := New() l := make(chan *utils.JSONMessage) e.subscribe(l) c := make(chan struct{}) go func() { e.log("test", "cont", "image") close(c) }() select { case <-c: case <-time.After(time.Second): t.Fatal("Timeout publishing message") } } func TestLogEvents(t *testing.T) { e := New() eng := engine.New() if err := e.Install(eng); err != nil { t.Fatal(err) } for i := 0; i < eventsLimit+16; i++ { action := fmt.Sprintf("action_%d", i) id := fmt.Sprintf("cont_%d", i) from := fmt.Sprintf("image_%d", i) job := eng.Job("log", action, id, from) if err := job.Run(); err != nil { t.Fatal(err) } } time.Sleep(50 * time.Millisecond) if len(e.events) != eventsLimit { t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) } job := eng.Job("events") job.SetenvInt64("since", 1) job.SetenvInt64("until", time.Now().Unix()) buf := bytes.NewBuffer(nil) job.Stdout.Add(buf) if err := job.Run(); err != nil { t.Fatal(err) } buf = bytes.NewBuffer(buf.Bytes()) dec := json.NewDecoder(buf) var msgs []utils.JSONMessage for { var jm utils.JSONMessage if err := dec.Decode(&jm); err != nil { if err == io.EOF { break } t.Fatal(err) } msgs = append(msgs, jm) } if len(msgs) != eventsLimit { t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs)) } first := msgs[0] if first.Status != "action_16" { t.Fatalf("First action is %s, must be action_15", first.Status) } last := msgs[len(msgs)-1] if last.Status != "action_79" { t.Fatalf("First action is %s, must be action_79", first.Status) } } func TestEventsCountJob(t *testing.T) { e := New() eng := engine.New() if err := e.Install(eng); err != nil { t.Fatal(err) } l1 := make(chan *utils.JSONMessage) l2 := make(chan *utils.JSONMessage) e.subscribe(l1) e.subscribe(l2) job := eng.Job("subscribers_count") env, _ := job.Stdout.AddEnv() if err := job.Run(); err != nil { t.Fatal(err) } count := env.GetInt("count") if count != 2 { t.Fatalf("There must be 2 subscribers, got %d", count) } }