1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #23334 from WeiZhang555/cobra-stats

Migrate stats and events command to cobra
This commit is contained in:
Vincent Demeester 2016-06-08 23:50:46 +02:00
commit e884a515e9
9 changed files with 238 additions and 185 deletions

View file

@ -5,7 +5,6 @@ func (cli *DockerCli) Command(name string) func(...string) error {
return map[string]func(...string) error{
"commit": cli.CmdCommit,
"cp": cli.CmdCp,
"events": cli.CmdEvents,
"exec": cli.CmdExec,
"info": cli.CmdInfo,
"inspect": cli.CmdInspect,
@ -16,7 +15,6 @@ func (cli *DockerCli) Command(name string) func(...string) error {
"pull": cli.CmdPull,
"push": cli.CmdPush,
"save": cli.CmdSave,
"stats": cli.CmdStats,
"update": cli.CmdUpdate,
}[name]
}

View file

@ -1,4 +1,4 @@
package client
package container
import (
"fmt"
@ -11,26 +11,46 @@ import (
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
Cli "github.com/docker/docker/cli"
"github.com/docker/docker/api/client"
"github.com/docker/docker/api/client/system"
"github.com/docker/docker/cli"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
"github.com/spf13/cobra"
)
// CmdStats displays a live stream of resource usage statistics for one or more containers.
//
type statsOptions struct {
all bool
noStream bool
containers []string
}
// NewStatsCommand creats a new cobra.Command for `docker stats`
func NewStatsCommand(dockerCli *client.DockerCli) *cobra.Command {
var opts statsOptions
cmd := &cobra.Command{
Use: "stats [OPTIONS] [CONTAINER...]",
Short: "Display a live stream of container(s) resource usage statistics",
Args: cli.RequiresMinArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
opts.containers = args
return runStats(dockerCli, &opts)
},
}
flags := cmd.Flags()
flags.BoolVarP(&opts.all, "all", "a", false, "Show all containers (default shows just running)")
flags.BoolVar(&opts.noStream, "no-stream", false, "Disable streaming stats and only pull the first result")
return cmd
}
// runStats displays a live stream of resource usage statistics for one or more containers.
// This shows real-time information on CPU usage, memory usage, and network I/O.
//
// Usage: docker stats [OPTIONS] [CONTAINER...]
func (cli *DockerCli) CmdStats(args ...string) error {
cmd := Cli.Subcmd("stats", []string{"[CONTAINER...]"}, Cli.DockerCommands["stats"].Description, true)
all := cmd.Bool([]string{"a", "-all"}, false, "Show all containers (default shows just running)")
noStream := cmd.Bool([]string{"-no-stream"}, false, "Disable streaming stats and only pull the first result")
cmd.ParseFlags(args, true)
names := cmd.Args()
showAll := len(names) == 0
func runStats(dockerCli *client.DockerCli, opts *statsOptions) error {
showAll := len(opts.containers) == 0
closeChan := make(chan error)
ctx := context.Background()
@ -43,7 +63,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
options := types.EventsOptions{
Filters: f,
}
resBody, err := cli.client.Events(ctx, options)
resBody, err := dockerCli.Client().Events(ctx, options)
// Whether we successfully subscribed to events or not, we can now
// unblock the main goroutine.
close(started)
@ -53,7 +73,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
}
defer resBody.Close()
decodeEvents(resBody, func(event events.Message, err error) error {
system.DecodeEvents(resBody, func(event events.Message, err error) error {
if err != nil {
closeChan <- err
return nil
@ -71,9 +91,9 @@ func (cli *DockerCli) CmdStats(args ...string) error {
// containers (only used when calling `docker stats` without arguments).
getContainerList := func() {
options := types.ContainerListOptions{
All: *all,
All: opts.all,
}
cs, err := cli.client.ContainerList(ctx, options)
cs, err := dockerCli.Client().ContainerList(ctx, options)
if err != nil {
closeChan <- err
}
@ -81,7 +101,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
s := &containerStats{Name: container.ID[:12]}
if cStats.add(s) {
waitFirst.Add(1)
go s.Collect(ctx, cli.client, !*noStream, waitFirst)
go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst)
}
}
}
@ -92,13 +112,13 @@ func (cli *DockerCli) CmdStats(args ...string) error {
// retrieving the list of running containers to avoid a race where we
// would "miss" a creation.
started := make(chan struct{})
eh := eventHandler{handlers: make(map[string]func(events.Message))}
eh := system.InitEventHandler()
eh.Handle("create", func(e events.Message) {
if *all {
if opts.all {
s := &containerStats{Name: e.ID[:12]}
if cStats.add(s) {
waitFirst.Add(1)
go s.Collect(ctx, cli.client, !*noStream, waitFirst)
go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst)
}
}
})
@ -107,12 +127,12 @@ func (cli *DockerCli) CmdStats(args ...string) error {
s := &containerStats{Name: e.ID[:12]}
if cStats.add(s) {
waitFirst.Add(1)
go s.Collect(ctx, cli.client, !*noStream, waitFirst)
go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst)
}
})
eh.Handle("die", func(e events.Message) {
if !*all {
if !opts.all {
cStats.remove(e.ID[:12])
}
})
@ -129,11 +149,11 @@ func (cli *DockerCli) CmdStats(args ...string) error {
} else {
// Artificially send creation events for the containers we were asked to
// monitor (same code path than we use when monitoring all containers).
for _, name := range names {
for _, name := range opts.containers {
s := &containerStats{Name: name}
if cStats.add(s) {
waitFirst.Add(1)
go s.Collect(ctx, cli.client, !*noStream, waitFirst)
go s.Collect(ctx, dockerCli.Client(), !opts.noStream, waitFirst)
}
}
@ -161,11 +181,11 @@ func (cli *DockerCli) CmdStats(args ...string) error {
// before print to screen, make sure each container get at least one valid stat data
waitFirst.Wait()
w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
w := tabwriter.NewWriter(dockerCli.Out(), 20, 1, 3, ' ', 0)
printHeader := func() {
if !*noStream {
fmt.Fprint(cli.out, "\033[2J")
fmt.Fprint(cli.out, "\033[H")
if !opts.noStream {
fmt.Fprint(dockerCli.Out(), "\033[2J")
fmt.Fprint(dockerCli.Out(), "\033[H")
}
io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\tPIDS\n")
}
@ -174,13 +194,13 @@ func (cli *DockerCli) CmdStats(args ...string) error {
printHeader()
cStats.mu.Lock()
for _, s := range cStats.cs {
if err := s.Display(w); err != nil && !*noStream {
if err := s.Display(w); err != nil && !opts.noStream {
logrus.Debugf("stats: got error for %s: %v", s.Name, err)
}
}
cStats.mu.Unlock()
w.Flush()
if *noStream {
if opts.noStream {
break
}
select {

View file

@ -1,4 +1,4 @@
package client
package container
import (
"encoding/json"

View file

@ -1,4 +1,4 @@
package client
package container
import (
"bytes"

View file

@ -1,146 +0,0 @@
package client
import (
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
Cli "github.com/docker/docker/cli"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/jsonlog"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/engine-api/types"
eventtypes "github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
)
// CmdEvents prints a live stream of real time events from the server.
//
// Usage: docker events [OPTIONS]
func (cli *DockerCli) CmdEvents(args ...string) error {
cmd := Cli.Subcmd("events", nil, Cli.DockerCommands["events"].Description, true)
since := cmd.String([]string{"-since"}, "", "Show all events created since timestamp")
until := cmd.String([]string{"-until"}, "", "Stream events until this timestamp")
flFilter := opts.NewListOpts(nil)
cmd.Var(&flFilter, []string{"f", "-filter"}, "Filter output based on conditions provided")
cmd.Require(flag.Exact, 0)
cmd.ParseFlags(args, true)
eventFilterArgs := filters.NewArgs()
// Consolidate all filter flags, and sanity check them early.
// They'll get process in the daemon/server.
for _, f := range flFilter.GetAll() {
var err error
eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs)
if err != nil {
return err
}
}
options := types.EventsOptions{
Since: *since,
Until: *until,
Filters: eventFilterArgs,
}
responseBody, err := cli.client.Events(context.Background(), options)
if err != nil {
return err
}
defer responseBody.Close()
return streamEvents(responseBody, cli.out)
}
// streamEvents decodes prints the incoming events in the provided output.
func streamEvents(input io.Reader, output io.Writer) error {
return decodeEvents(input, func(event eventtypes.Message, err error) error {
if err != nil {
return err
}
printOutput(event, output)
return nil
})
}
type eventProcessor func(event eventtypes.Message, err error) error
func decodeEvents(input io.Reader, ep eventProcessor) error {
dec := json.NewDecoder(input)
for {
var event eventtypes.Message
err := dec.Decode(&event)
if err != nil && err == io.EOF {
break
}
if procErr := ep(event, err); procErr != nil {
return procErr
}
}
return nil
}
// printOutput prints all types of event information.
// Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any.
func printOutput(event eventtypes.Message, output io.Writer) {
if event.TimeNano != 0 {
fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed))
} else if event.Time != 0 {
fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed))
}
fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID)
if len(event.Actor.Attributes) > 0 {
var attrs []string
var keys []string
for k := range event.Actor.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := event.Actor.Attributes[k]
attrs = append(attrs, fmt.Sprintf("%s=%s", k, v))
}
fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", "))
}
fmt.Fprint(output, "\n")
}
type eventHandler struct {
handlers map[string]func(eventtypes.Message)
mu sync.Mutex
}
func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
w.mu.Lock()
w.handlers[action] = h
w.mu.Unlock()
}
// Watch ranges over the passed in event chan and processes the events based on the
// handlers created for a given action.
// To stop watching, close the event chan.
func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
for e := range c {
w.mu.Lock()
h, exists := w.handlers[e.Action]
w.mu.Unlock()
if !exists {
continue
}
logrus.Debugf("event handler: received event: %v", e)
go h(e)
}
}

115
api/client/system/events.go Normal file
View file

@ -0,0 +1,115 @@
package system
import (
"fmt"
"io"
"sort"
"strings"
"time"
"golang.org/x/net/context"
"github.com/docker/docker/api/client"
"github.com/docker/docker/cli"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/engine-api/types"
eventtypes "github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
"github.com/spf13/cobra"
)
type eventsOptions struct {
since string
until string
filter []string
}
// NewEventsCommand creats a new cobra.Command for `docker events`
func NewEventsCommand(dockerCli *client.DockerCli) *cobra.Command {
var opts eventsOptions
cmd := &cobra.Command{
Use: "events [OPTIONS]",
Short: "Get real time events from the server",
Args: cli.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return runEvents(dockerCli, &opts)
},
}
flags := cmd.Flags()
flags.StringVar(&opts.since, "since", "", "Show all events created since timestamp")
flags.StringVar(&opts.until, "until", "", "Stream events until this timestamp")
flags.StringSliceVarP(&opts.filter, "filter", "f", []string{}, "Filter output based on conditions provided")
return cmd
}
func runEvents(dockerCli *client.DockerCli, opts *eventsOptions) error {
eventFilterArgs := filters.NewArgs()
// Consolidate all filter flags, and sanity check them early.
// They'll get process in the daemon/server.
for _, f := range opts.filter {
var err error
eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs)
if err != nil {
return err
}
}
options := types.EventsOptions{
Since: opts.since,
Until: opts.until,
Filters: eventFilterArgs,
}
responseBody, err := dockerCli.Client().Events(context.Background(), options)
if err != nil {
return err
}
defer responseBody.Close()
return streamEvents(responseBody, dockerCli.Out())
}
// streamEvents decodes prints the incoming events in the provided output.
func streamEvents(input io.Reader, output io.Writer) error {
return DecodeEvents(input, func(event eventtypes.Message, err error) error {
if err != nil {
return err
}
printOutput(event, output)
return nil
})
}
type eventProcessor func(event eventtypes.Message, err error) error
// printOutput prints all types of event information.
// Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any.
func printOutput(event eventtypes.Message, output io.Writer) {
if event.TimeNano != 0 {
fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(jsonlog.RFC3339NanoFixed))
} else if event.Time != 0 {
fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(jsonlog.RFC3339NanoFixed))
}
fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID)
if len(event.Actor.Attributes) > 0 {
var attrs []string
var keys []string
for k := range event.Actor.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := event.Actor.Attributes[k]
attrs = append(attrs, fmt.Sprintf("%s=%s", k, v))
}
fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", "))
}
fmt.Fprint(output, "\n")
}

View file

@ -0,0 +1,66 @@
package system
import (
"encoding/json"
"io"
"sync"
"github.com/Sirupsen/logrus"
eventtypes "github.com/docker/engine-api/types/events"
)
// EventHandler is abstract interface for user to customize
// own handle functions of each type of events
type EventHandler interface {
Handle(action string, h func(eventtypes.Message))
Watch(c <-chan eventtypes.Message)
}
// InitEventHandler initializes and returns an EventHandler
func InitEventHandler() EventHandler {
return &eventHandler{handlers: make(map[string]func(eventtypes.Message))}
}
type eventHandler struct {
handlers map[string]func(eventtypes.Message)
mu sync.Mutex
}
func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
w.mu.Lock()
w.handlers[action] = h
w.mu.Unlock()
}
// Watch ranges over the passed in event chan and processes the events based on the
// handlers created for a given action.
// To stop watching, close the event chan.
func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
for e := range c {
w.mu.Lock()
h, exists := w.handlers[e.Action]
w.mu.Unlock()
if !exists {
continue
}
logrus.Debugf("event handler: received event: %v", e)
go h(e)
}
}
// DecodeEvents decodes event from input stream
func DecodeEvents(input io.Reader, ep eventProcessor) error {
dec := json.NewDecoder(input)
for {
var event eventtypes.Message
err := dec.Decode(&event)
if err != nil && err == io.EOF {
break
}
if procErr := ep(event, err); procErr != nil {
return procErr
}
}
return nil
}

View file

@ -48,6 +48,7 @@ func NewCobraAdaptor(clientFlags *cliflags.ClientFlags) CobraAdaptor {
container.NewRmCommand(dockerCli),
container.NewRunCommand(dockerCli),
container.NewStartCommand(dockerCli),
container.NewStatsCommand(dockerCli),
container.NewStopCommand(dockerCli),
container.NewTopCommand(dockerCli),
container.NewUnpauseCommand(dockerCli),
@ -60,6 +61,7 @@ func NewCobraAdaptor(clientFlags *cliflags.ClientFlags) CobraAdaptor {
image.NewImportCommand(dockerCli),
image.NewTagCommand(dockerCli),
network.NewNetworkCommand(dockerCli),
system.NewEventsCommand(dockerCli),
system.NewVersionCommand(dockerCli),
volume.NewVolumeCommand(dockerCli),
)

View file

@ -10,7 +10,6 @@ type Command struct {
var DockerCommandUsage = []Command{
{"commit", "Create a new image from a container's changes"},
{"cp", "Copy files/folders between a container and the local filesystem"},
{"events", "Get real time events from the server"},
{"exec", "Run a command in a running container"},
{"info", "Display system-wide information"},
{"inspect", "Return low-level information on a container or image"},
@ -21,7 +20,6 @@ var DockerCommandUsage = []Command{
{"pull", "Pull an image or a repository from a registry"},
{"push", "Push an image or a repository to a registry"},
{"save", "Save one or more images to a tar archive"},
{"stats", "Display a live stream of container(s) resource usage statistics"},
{"update", "Update configuration of one or more containers"},
}