From 1ded1f26e154e283ab26f347971d4d4a51edc94f Mon Sep 17 00:00:00 2001 From: runshenzhu Date: Mon, 27 Jun 2016 18:08:56 -0700 Subject: [PATCH] add health check in docker cluster Signed-off-by: runshenzhu --- daemon/cluster/executor/backend.go | 5 + daemon/cluster/executor/container/adapter.go | 39 ++++++- .../cluster/executor/container/container.go | 10 ++ .../cluster/executor/container/controller.go | 64 ++++++++++- daemon/cluster/executor/container/errors.go | 3 + .../cluster/executor/container/health_test.go | 102 ++++++++++++++++++ 6 files changed, 217 insertions(+), 6 deletions(-) create mode 100644 daemon/cluster/executor/container/health_test.go diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index 7a168850d3..3a4ff0fc1b 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -2,10 +2,13 @@ package executor import ( "io" + "time" clustertypes "github.com/docker/docker/daemon/cluster/provider" "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/events" + "github.com/docker/engine-api/types/filters" "github.com/docker/engine-api/types/network" "github.com/docker/libnetwork/cluster" networktypes "github.com/docker/libnetwork/types" @@ -33,4 +36,6 @@ type Backend interface { SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error SetClusterProvider(provider cluster.Provider) IsSwarmCompatible() error + SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{}) + UnsubscribeFromEvents(listener chan interface{}) } diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index 6261079a71..38ff63afc2 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -7,11 +7,13 @@ import ( "io" "strings" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/api/server/httputils" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/versions" "github.com/docker/libnetwork" "github.com/docker/swarmkit/api" @@ -168,9 +170,40 @@ func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, er // events issues a call to the events API and returns a channel with all // events. The stream of events can be shutdown by cancelling the context. -// -// A chan struct{} is returned that will be closed if the event processing -// fails and needs to be restarted. +func (c *containerAdapter) events(ctx context.Context) <-chan events.Message { + log.G(ctx).Debugf("waiting on events") + buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter()) + eventsq := make(chan events.Message, len(buffer)) + + for _, event := range buffer { + eventsq <- event + } + + go func() { + defer c.backend.UnsubscribeFromEvents(l) + + for { + select { + case ev := <-l: + jev, ok := ev.(events.Message) + if !ok { + log.G(ctx).Warnf("unexpected event message: %q", ev) + continue + } + select { + case eventsq <- jev: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return eventsq +} + func (c *containerAdapter) wait(ctx context.Context) error { return c.backend.ContainerWaitWithContext(ctx, c.container.name()) } diff --git a/daemon/cluster/executor/container/container.go b/daemon/cluster/executor/container/container.go index ece2ec3743..265908a0a9 100644 --- a/daemon/cluster/executor/container/container.go +++ b/daemon/cluster/executor/container/container.go @@ -13,6 +13,8 @@ import ( "github.com/docker/docker/reference" "github.com/docker/engine-api/types" enginecontainer "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/events" + "github.com/docker/engine-api/types/filters" "github.com/docker/engine-api/types/network" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" @@ -420,3 +422,11 @@ func (c *containerConfig) networkCreateRequest(name string) (clustertypes.Networ return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil } + +func (c containerConfig) eventFilter() filters.Args { + filter := filters.NewArgs() + filter.Add("type", events.ContainerEventType) + filter.Add("name", c.name()) + filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID)) + return filter +} diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index 8b9556296f..a51e5c52e5 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -6,6 +6,7 @@ import ( executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" @@ -153,20 +154,39 @@ func (r *controller) Wait(pctx context.Context) error { ctx, cancel := context.WithCancel(pctx) defer cancel() + healthErr := make(chan error, 1) + go func() { + ectx, cancel := context.WithCancel(ctx) // cancel event context on first event + defer cancel() + if err := r.checkHealth(ectx); err == ErrContainerUnhealthy { + healthErr <- ErrContainerUnhealthy + if err := r.Shutdown(ectx); err != nil { + log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy") + } + } + }() + err := r.adapter.wait(ctx) if ctx.Err() != nil { return ctx.Err() } + if err != nil { ee := &exitError{} - if err.Error() != "" { - ee.cause = err - } if ec, ok := err.(exec.ExitCoder); ok { ee.code = ec.ExitCode() } + select { + case e := <-healthErr: + ee.cause = e + default: + if err.Error() != "" { + ee.cause = err + } + } return ee } + return nil } @@ -250,6 +270,21 @@ func (r *controller) Close() error { return nil } +func (r *controller) matchevent(event events.Message) bool { + if event.Type != events.ContainerEventType { + return false + } + + // TODO(stevvooe): Filter based on ID matching, in addition to name. + + // Make sure the events are for this container. + if event.Actor.Attributes["name"] != r.adapter.container.name() { + return false + } + + return true +} + func (r *controller) checkClosed() error { select { case <-r.closed: @@ -289,3 +324,26 @@ func (e *exitError) ExitCode() int { func (e *exitError) Cause() error { return e.cause } + +// checkHealth blocks until unhealthy container is detected or ctx exits +func (r *controller) checkHealth(ctx context.Context) error { + eventq := r.adapter.events(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case <-r.closed: + return nil + case event := <-eventq: + if !r.matchevent(event) { + continue + } + + switch event.Action { + case "health_status: unhealthy": + return ErrContainerUnhealthy + } + } + } +} diff --git a/daemon/cluster/executor/container/errors.go b/daemon/cluster/executor/container/errors.go index 6c03d36071..63e1233566 100644 --- a/daemon/cluster/executor/container/errors.go +++ b/daemon/cluster/executor/container/errors.go @@ -9,4 +9,7 @@ var ( // ErrContainerDestroyed returned when a container is prematurely destroyed // during a wait call. ErrContainerDestroyed = fmt.Errorf("dockerexec: container destroyed") + + // ErrContainerUnhealthy returned if controller detects the health check failure + ErrContainerUnhealthy = fmt.Errorf("dockerexec: unhealthy container") ) diff --git a/daemon/cluster/executor/container/health_test.go b/daemon/cluster/executor/container/health_test.go new file mode 100644 index 0000000000..472624b54d --- /dev/null +++ b/daemon/cluster/executor/container/health_test.go @@ -0,0 +1,102 @@ +// +build !windows + +package container + +import ( + "testing" + "time" + + "github.com/docker/docker/container" + "github.com/docker/docker/daemon" + "github.com/docker/docker/daemon/events" + containertypes "github.com/docker/engine-api/types/container" + "github.com/docker/swarmkit/api" + "golang.org/x/net/context" +) + +func TestHealthStates(t *testing.T) { + + // set up environment: events, task, container .... + e := events.New() + _, l, _ := e.Subscribe() + defer e.Evict(l) + + task := &api.Task{ + ID: "id", + ServiceID: "sid", + Spec: api.TaskSpec{ + Runtime: &api.TaskSpec_Container{ + Container: &api.ContainerSpec{ + Image: "image_name", + Labels: map[string]string{ + "com.docker.swarm.task.id": "id", + }, + }, + }, + }, + Annotations: api.Annotations{Name: "name"}, + } + + c := &container.Container{ + CommonContainer: container.CommonContainer{ + ID: "id", + Name: "name", + Config: &containertypes.Config{ + Image: "image_name", + Labels: map[string]string{ + "com.docker.swarm.task.id": "id", + }, + }, + }, + } + + daemon := &daemon.Daemon{ + EventsService: e, + } + + controller, err := newController(daemon, task) + if err != nil { + t.Fatalf("create controller fail %v", err) + } + + errChan := make(chan error, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // fire checkHealth + go func() { + err := controller.checkHealth(ctx) + select { + case errChan <- err: + case <-ctx.Done(): + } + }() + + // send an event and expect to get expectedErr + // if expectedErr is nil, shouldn't get any error + logAndExpect := func(msg string, expectedErr error) { + daemon.LogContainerEvent(c, msg) + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + + select { + case err := <-errChan: + if err != expectedErr { + t.Fatalf("expect error %v, but get %v", expectedErr, err) + } + case <-timer.C: + if expectedErr != nil { + t.Fatalf("time limit exceeded, didn't get expected error") + } + } + } + + // events that are ignored by checkHealth + logAndExpect("health_status: running", nil) + logAndExpect("health_status: healthy", nil) + logAndExpect("die", nil) + + // unhealthy event will be caught by checkHealth + logAndExpect("health_status: unhealthy", ErrContainerUnhealthy) +}