1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/api/client/events.go
Brian Goff df95474885 Fixes issue with stats on start event
In situations where a client is called like `docker stats` with no
arguments or flags, if a container which was already created but not
started yet is then subsequently started it will not be added to the
stats list as expected.

Also splits some of the stats helpers to a separate file from the stats
CLI which is already quite long.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-02-29 16:34:42 -05:00

147 lines
3.8 KiB
Go

package client
import (
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
Cli "github.com/docker/docker/cli"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/jsonlog"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/engine-api/types"
eventtypes "github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
)
// CmdEvents prints a live stream of real time events from the server.
//
// Usage: docker events [OPTIONS]
func (cli *DockerCli) CmdEvents(args ...string) error {
cmd := Cli.Subcmd("events", nil, Cli.DockerCommands["events"].Description, true)
since := cmd.String([]string{"-since"}, "", "Show all events created since timestamp")
until := cmd.String([]string{"-until"}, "", "Stream events until this timestamp")
flFilter := opts.NewListOpts(nil)
cmd.Var(&flFilter, []string{"f", "-filter"}, "Filter output based on conditions provided")
cmd.Require(flag.Exact, 0)
cmd.ParseFlags(args, true)
eventFilterArgs := filters.NewArgs()
// Consolidate all filter flags, and sanity check them early.
// They'll get process in the daemon/server.
for _, f := range flFilter.GetAll() {
var err error
eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs)
if err != nil {
return err
}
}
options := types.EventsOptions{
Since: *since,
Until: *until,
Filters: eventFilterArgs,
}
responseBody, err := cli.client.Events(context.Background(), options)
if err != nil {
return err
}
defer responseBody.Close()
return streamEvents(responseBody, cli.out)
}
// streamEvents decodes prints the incoming events in the provided output.
func streamEvents(input io.Reader, output io.Writer) error {
return decodeEvents(input, func(event eventtypes.Message, err error) error {
if err != nil {
return err
}
printOutput(event, output)
return nil
})
}
type eventProcessor func(event eventtypes.Message, err error) error
func decodeEvents(input io.Reader, ep eventProcessor) error {
dec := json.NewDecoder(input)
for {
var event eventtypes.Message
err := dec.Decode(&event)
if err != nil && err == io.EOF {
break
}
if procErr := ep(event, err); procErr != nil {
return procErr
}
}
return nil
}
// printOutput prints all types of event information.
// Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any.
func printOutput(event eventtypes.Message, output io.Writer) {
if event.TimeNano != 0 {
fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed))
} else if event.Time != 0 {
fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed))
}
fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID)
if len(event.Actor.Attributes) > 0 {
var attrs []string
var keys []string
for k := range event.Actor.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := event.Actor.Attributes[k]
attrs = append(attrs, fmt.Sprintf("%s=%s", k, v))
}
fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", "))
}
fmt.Fprint(output, "\n")
}
type eventHandler struct {
handlers map[string]func(eventtypes.Message)
mu sync.Mutex
closed bool
}
func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
w.mu.Lock()
w.handlers[action] = h
w.mu.Unlock()
}
// Watch ranges over the passed in event chan and processes the events based on the
// handlers created for a given action.
// To stop watching, close the event chan.
func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
for e := range c {
w.mu.Lock()
h, exists := w.handlers[e.Action]
w.mu.Unlock()
if !exists {
continue
}
logrus.Debugf("event handler: received event: %v", e)
go h(e)
}
}