diff --git a/api/client/commands.go b/api/client/commands.go index 0c0f22fd63..4e8259914f 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -5,7 +5,6 @@ func (cli *DockerCli) Command(name string) func(...string) error { return map[string]func(...string) error{ "commit": cli.CmdCommit, "cp": cli.CmdCp, - "events": cli.CmdEvents, "exec": cli.CmdExec, "info": cli.CmdInfo, "inspect": cli.CmdInspect, @@ -16,7 +15,6 @@ func (cli *DockerCli) Command(name string) func(...string) error { "pull": cli.CmdPull, "push": cli.CmdPush, "save": cli.CmdSave, - "stats": cli.CmdStats, "update": cli.CmdUpdate, }[name] } diff --git a/api/client/stats.go b/api/client/container/stats.go similarity index 67% rename from api/client/stats.go rename to api/client/container/stats.go index 39b0361ea4..46861514cd 100644 --- a/api/client/stats.go +++ b/api/client/container/stats.go @@ -1,4 +1,4 @@ -package client +package container import ( "fmt" @@ -11,26 +11,46 @@ import ( "golang.org/x/net/context" "github.com/Sirupsen/logrus" - Cli "github.com/docker/docker/cli" + "github.com/docker/docker/api/client" + "github.com/docker/docker/api/client/system" + "github.com/docker/docker/cli" "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" + "github.com/spf13/cobra" ) -// CmdStats displays a live stream of resource usage statistics for one or more containers. -// +type statsOptions struct { + all bool + noStream bool + + containers []string +} + +// NewStatsCommand creats a new cobra.Command for `docker stats` +func NewStatsCommand(dockerCli *client.DockerCli) *cobra.Command { + var opts statsOptions + + cmd := &cobra.Command{ + Use: "stats [OPTIONS] [CONTAINER...]", + Short: "Display a live stream of container(s) resource usage statistics", + Args: cli.RequiresMinArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + opts.containers = args + return runStats(dockerCli, &opts) + }, + } + + flags := cmd.Flags() + flags.BoolVarP(&opts.all, "all", "a", false, "Show all containers (default shows just running)") + flags.BoolVar(&opts.noStream, "no-stream", false, "Disable streaming stats and only pull the first result") + return cmd +} + +// runStats displays a live stream of resource usage statistics for one or more containers. // This shows real-time information on CPU usage, memory usage, and network I/O. -// -// Usage: docker stats [OPTIONS] [CONTAINER...] -func (cli *DockerCli) CmdStats(args ...string) error { - cmd := Cli.Subcmd("stats", []string{"[CONTAINER...]"}, Cli.DockerCommands["stats"].Description, true) - all := cmd.Bool([]string{"a", "-all"}, false, "Show all containers (default shows just running)") - noStream := cmd.Bool([]string{"-no-stream"}, false, "Disable streaming stats and only pull the first result") - - cmd.ParseFlags(args, true) - - names := cmd.Args() - showAll := len(names) == 0 +func runStats(dockerCli *client.DockerCli, opts *statsOptions) error { + showAll := len(opts.containers) == 0 closeChan := make(chan error) ctx := context.Background() @@ -43,7 +63,7 @@ func (cli *DockerCli) CmdStats(args ...string) error { options := types.EventsOptions{ Filters: f, } - resBody, err := cli.client.Events(ctx, options) + resBody, err := dockerCli.Client().Events(ctx, options) // Whether we successfully subscribed to events or not, we can now // unblock the main goroutine. close(started) @@ -53,7 +73,7 @@ func (cli *DockerCli) CmdStats(args ...string) error { } defer resBody.Close() - decodeEvents(resBody, func(event events.Message, err error) error { + system.DecodeEvents(resBody, func(event events.Message, err error) error { if err != nil { closeChan <- err return nil @@ -71,9 +91,9 @@ func (cli *DockerCli) CmdStats(args ...string) error { // containers (only used when calling `docker stats` without arguments). getContainerList := func() { options := types.ContainerListOptions{ - All: *all, + All: opts.all, } - cs, err := cli.client.ContainerList(ctx, options) + cs, err := dockerCli.Client().ContainerList(ctx, options) if err != nil { closeChan <- err } @@ -81,7 +101,7 @@ func (cli *DockerCli) CmdStats(args ...string) error { s := &containerStats{Name: container.ID[:12]} if cStats.add(s) { waitFirst.Add(1) - go s.Collect(ctx, cli.client, !*noStream, waitFirst) + go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst) } } } @@ -92,13 +112,13 @@ func (cli *DockerCli) CmdStats(args ...string) error { // retrieving the list of running containers to avoid a race where we // would "miss" a creation. started := make(chan struct{}) - eh := eventHandler{handlers: make(map[string]func(events.Message))} + eh := system.InitEventHandler() eh.Handle("create", func(e events.Message) { - if *all { + if opts.all { s := &containerStats{Name: e.ID[:12]} if cStats.add(s) { waitFirst.Add(1) - go s.Collect(ctx, cli.client, !*noStream, waitFirst) + go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst) } } }) @@ -107,12 +127,12 @@ func (cli *DockerCli) CmdStats(args ...string) error { s := &containerStats{Name: e.ID[:12]} if cStats.add(s) { waitFirst.Add(1) - go s.Collect(ctx, cli.client, !*noStream, waitFirst) + go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst) } }) eh.Handle("die", func(e events.Message) { - if !*all { + if !opts.all { cStats.remove(e.ID[:12]) } }) @@ -129,11 +149,11 @@ func (cli *DockerCli) CmdStats(args ...string) error { } else { // Artificially send creation events for the containers we were asked to // monitor (same code path than we use when monitoring all containers). - for _, name := range names { + for _, name := range opts.containers { s := &containerStats{Name: name} if cStats.add(s) { waitFirst.Add(1) - go s.Collect(ctx, cli.client, !*noStream, waitFirst) + go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst) } } @@ -161,11 +181,11 @@ func (cli *DockerCli) CmdStats(args ...string) error { // before print to screen, make sure each container get at least one valid stat data waitFirst.Wait() - w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) + w := tabwriter.NewWriter(dockerCli.Out(), 20, 1, 3, ' ', 0) printHeader := func() { - if !*noStream { - fmt.Fprint(cli.out, "\033[2J") - fmt.Fprint(cli.out, "\033[H") + if !opts.noStream { + fmt.Fprint(dockerCli.Out(), "\033[2J") + fmt.Fprint(dockerCli.Out(), "\033[H") } io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\tPIDS\n") } @@ -174,13 +194,13 @@ func (cli *DockerCli) CmdStats(args ...string) error { printHeader() cStats.mu.Lock() for _, s := range cStats.cs { - if err := s.Display(w); err != nil && !*noStream { + if err := s.Display(w); err != nil && !opts.noStream { logrus.Debugf("stats: got error for %s: %v", s.Name, err) } } cStats.mu.Unlock() w.Flush() - if *noStream { + if opts.noStream { break } select { diff --git a/api/client/stats_helpers.go b/api/client/container/stats_helpers.go similarity index 99% rename from api/client/stats_helpers.go rename to api/client/container/stats_helpers.go index 21f676b808..6083669644 100644 --- a/api/client/stats_helpers.go +++ b/api/client/container/stats_helpers.go @@ -1,4 +1,4 @@ -package client +package container import ( "encoding/json" diff --git a/api/client/stats_unit_test.go b/api/client/container/stats_unit_test.go similarity index 98% rename from api/client/stats_unit_test.go rename to api/client/container/stats_unit_test.go index 9040674240..83f24bb295 100644 --- a/api/client/stats_unit_test.go +++ b/api/client/container/stats_unit_test.go @@ -1,4 +1,4 @@ -package client +package container import ( "bytes" diff --git a/api/client/events.go b/api/client/events.go deleted file mode 100644 index d2408c192e..0000000000 --- a/api/client/events.go +++ /dev/null @@ -1,146 +0,0 @@ -package client - -import ( - "encoding/json" - "fmt" - "io" - "sort" - "strings" - "sync" - "time" - - "golang.org/x/net/context" - - "github.com/Sirupsen/logrus" - Cli "github.com/docker/docker/cli" - "github.com/docker/docker/opts" - "github.com/docker/docker/pkg/jsonlog" - flag "github.com/docker/docker/pkg/mflag" - "github.com/docker/engine-api/types" - eventtypes "github.com/docker/engine-api/types/events" - "github.com/docker/engine-api/types/filters" -) - -// CmdEvents prints a live stream of real time events from the server. -// -// Usage: docker events [OPTIONS] -func (cli *DockerCli) CmdEvents(args ...string) error { - cmd := Cli.Subcmd("events", nil, Cli.DockerCommands["events"].Description, true) - since := cmd.String([]string{"-since"}, "", "Show all events created since timestamp") - until := cmd.String([]string{"-until"}, "", "Stream events until this timestamp") - flFilter := opts.NewListOpts(nil) - cmd.Var(&flFilter, []string{"f", "-filter"}, "Filter output based on conditions provided") - cmd.Require(flag.Exact, 0) - - cmd.ParseFlags(args, true) - - eventFilterArgs := filters.NewArgs() - - // Consolidate all filter flags, and sanity check them early. - // They'll get process in the daemon/server. - for _, f := range flFilter.GetAll() { - var err error - eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs) - if err != nil { - return err - } - } - - options := types.EventsOptions{ - Since: *since, - Until: *until, - Filters: eventFilterArgs, - } - - responseBody, err := cli.client.Events(context.Background(), options) - if err != nil { - return err - } - defer responseBody.Close() - - return streamEvents(responseBody, cli.out) -} - -// streamEvents decodes prints the incoming events in the provided output. -func streamEvents(input io.Reader, output io.Writer) error { - return decodeEvents(input, func(event eventtypes.Message, err error) error { - if err != nil { - return err - } - printOutput(event, output) - return nil - }) -} - -type eventProcessor func(event eventtypes.Message, err error) error - -func decodeEvents(input io.Reader, ep eventProcessor) error { - dec := json.NewDecoder(input) - for { - var event eventtypes.Message - err := dec.Decode(&event) - if err != nil && err == io.EOF { - break - } - - if procErr := ep(event, err); procErr != nil { - return procErr - } - } - return nil -} - -// printOutput prints all types of event information. -// Each output includes the event type, actor id, name and action. -// Actor attributes are printed at the end if the actor has any. -func printOutput(event eventtypes.Message, output io.Writer) { - if event.TimeNano != 0 { - fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed)) - } else if event.Time != 0 { - fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed)) - } - - fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID) - - if len(event.Actor.Attributes) > 0 { - var attrs []string - var keys []string - for k := range event.Actor.Attributes { - keys = append(keys, k) - } - sort.Strings(keys) - for _, k := range keys { - v := event.Actor.Attributes[k] - attrs = append(attrs, fmt.Sprintf("%s=%s", k, v)) - } - fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", ")) - } - fmt.Fprint(output, "\n") -} - -type eventHandler struct { - handlers map[string]func(eventtypes.Message) - mu sync.Mutex -} - -func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) { - w.mu.Lock() - w.handlers[action] = h - w.mu.Unlock() -} - -// Watch ranges over the passed in event chan and processes the events based on the -// handlers created for a given action. -// To stop watching, close the event chan. -func (w *eventHandler) Watch(c <-chan eventtypes.Message) { - for e := range c { - w.mu.Lock() - h, exists := w.handlers[e.Action] - w.mu.Unlock() - if !exists { - continue - } - logrus.Debugf("event handler: received event: %v", e) - go h(e) - } -} diff --git a/api/client/system/events.go b/api/client/system/events.go new file mode 100644 index 0000000000..6faf11ee80 --- /dev/null +++ b/api/client/system/events.go @@ -0,0 +1,115 @@ +package system + +import ( + "fmt" + "io" + "sort" + "strings" + "time" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/client" + "github.com/docker/docker/cli" + "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/engine-api/types" + eventtypes "github.com/docker/engine-api/types/events" + "github.com/docker/engine-api/types/filters" + "github.com/spf13/cobra" +) + +type eventsOptions struct { + since string + until string + filter []string +} + +// NewEventsCommand creats a new cobra.Command for `docker events` +func NewEventsCommand(dockerCli *client.DockerCli) *cobra.Command { + var opts eventsOptions + + cmd := &cobra.Command{ + Use: "events [OPTIONS]", + Short: "Get real time events from the server", + Args: cli.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return runEvents(dockerCli, &opts) + }, + } + + flags := cmd.Flags() + flags.StringVar(&opts.since, "since", "", "Show all events created since timestamp") + flags.StringVar(&opts.until, "until", "", "Stream events until this timestamp") + flags.StringSliceVarP(&opts.filter, "filter", "f", []string{}, "Filter output based on conditions provided") + + return cmd +} + +func runEvents(dockerCli *client.DockerCli, opts *eventsOptions) error { + eventFilterArgs := filters.NewArgs() + + // Consolidate all filter flags, and sanity check them early. + // They'll get process in the daemon/server. + for _, f := range opts.filter { + var err error + eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs) + if err != nil { + return err + } + } + + options := types.EventsOptions{ + Since: opts.since, + Until: opts.until, + Filters: eventFilterArgs, + } + + responseBody, err := dockerCli.Client().Events(context.Background(), options) + if err != nil { + return err + } + defer responseBody.Close() + + return streamEvents(responseBody, dockerCli.Out()) +} + +// streamEvents decodes prints the incoming events in the provided output. +func streamEvents(input io.Reader, output io.Writer) error { + return DecodeEvents(input, func(event eventtypes.Message, err error) error { + if err != nil { + return err + } + printOutput(event, output) + return nil + }) +} + +type eventProcessor func(event eventtypes.Message, err error) error + +// printOutput prints all types of event information. +// Each output includes the event type, actor id, name and action. +// Actor attributes are printed at the end if the actor has any. +func printOutput(event eventtypes.Message, output io.Writer) { + if event.TimeNano != 0 { + fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed)) + } else if event.Time != 0 { + fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed)) + } + + fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID) + + if len(event.Actor.Attributes) > 0 { + var attrs []string + var keys []string + for k := range event.Actor.Attributes { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := event.Actor.Attributes[k] + attrs = append(attrs, fmt.Sprintf("%s=%s", k, v)) + } + fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", ")) + } + fmt.Fprint(output, "\n") +} diff --git a/api/client/system/events_utils.go b/api/client/system/events_utils.go new file mode 100644 index 0000000000..685225daa6 --- /dev/null +++ b/api/client/system/events_utils.go @@ -0,0 +1,66 @@ +package system + +import ( + "encoding/json" + "io" + "sync" + + "github.com/Sirupsen/logrus" + eventtypes "github.com/docker/engine-api/types/events" +) + +// EventHandler is abstract interface for user to customize +// own handle functions of each type of events +type EventHandler interface { + Handle(action string, h func(eventtypes.Message)) + Watch(c <-chan eventtypes.Message) +} + +// InitEventHandler initializes and returns an EventHandler +func InitEventHandler() EventHandler { + return &eventHandler{handlers: make(map[string]func(eventtypes.Message))} +} + +type eventHandler struct { + handlers map[string]func(eventtypes.Message) + mu sync.Mutex +} + +func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) { + w.mu.Lock() + w.handlers[action] = h + w.mu.Unlock() +} + +// Watch ranges over the passed in event chan and processes the events based on the +// handlers created for a given action. +// To stop watching, close the event chan. +func (w *eventHandler) Watch(c <-chan eventtypes.Message) { + for e := range c { + w.mu.Lock() + h, exists := w.handlers[e.Action] + w.mu.Unlock() + if !exists { + continue + } + logrus.Debugf("event handler: received event: %v", e) + go h(e) + } +} + +// DecodeEvents decodes event from input stream +func DecodeEvents(input io.Reader, ep eventProcessor) error { + dec := json.NewDecoder(input) + for { + var event eventtypes.Message + err := dec.Decode(&event) + if err != nil && err == io.EOF { + break + } + + if procErr := ep(event, err); procErr != nil { + return procErr + } + } + return nil +} diff --git a/cli/cobraadaptor/adaptor.go b/cli/cobraadaptor/adaptor.go index ad9750dddb..11b821cc44 100644 --- a/cli/cobraadaptor/adaptor.go +++ b/cli/cobraadaptor/adaptor.go @@ -48,6 +48,7 @@ func NewCobraAdaptor(clientFlags *cliflags.ClientFlags) CobraAdaptor { container.NewRmCommand(dockerCli), container.NewRunCommand(dockerCli), container.NewStartCommand(dockerCli), + container.NewStatsCommand(dockerCli), container.NewStopCommand(dockerCli), container.NewTopCommand(dockerCli), container.NewUnpauseCommand(dockerCli), @@ -60,6 +61,7 @@ func NewCobraAdaptor(clientFlags *cliflags.ClientFlags) CobraAdaptor { image.NewImportCommand(dockerCli), image.NewTagCommand(dockerCli), network.NewNetworkCommand(dockerCli), + system.NewEventsCommand(dockerCli), system.NewVersionCommand(dockerCli), volume.NewVolumeCommand(dockerCli), ) diff --git a/cli/usage.go b/cli/usage.go index ada9718ee1..e9debf3be7 100644 --- a/cli/usage.go +++ b/cli/usage.go @@ -10,7 +10,6 @@ type Command struct { var DockerCommandUsage = []Command{ {"commit", "Create a new image from a container's changes"}, {"cp", "Copy files/folders between a container and the local filesystem"}, - {"events", "Get real time events from the server"}, {"exec", "Run a command in a running container"}, {"info", "Display system-wide information"}, {"inspect", "Return low-level information on a container or image"}, @@ -21,7 +20,6 @@ var DockerCommandUsage = []Command{ {"pull", "Pull an image or a repository from a registry"}, {"push", "Push an image or a repository to a registry"}, {"save", "Save one or more images to a tar archive"}, - {"stats", "Display a live stream of container(s) resource usage statistics"}, {"update", "Update configuration of one or more containers"}, }