diff --git a/api/server/router/swarm/backend.go b/api/server/router/swarm/backend.go index 19954eee85..c65d705c5c 100644 --- a/api/server/router/swarm/backend.go +++ b/api/server/router/swarm/backend.go @@ -2,7 +2,9 @@ package swarm import ( basictypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" types "github.com/docker/docker/api/types/swarm" + "golang.org/x/net/context" ) // Backend abstracts an swarm commands manager. @@ -19,6 +21,7 @@ type Backend interface { CreateService(types.ServiceSpec, string) (string, error) UpdateService(string, uint64, types.ServiceSpec, string, string) error RemoveService(string) error + ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error GetNodes(basictypes.NodeListOptions) ([]types.Node, error) GetNode(string) (types.Node, error) UpdateNode(string, uint64, types.NodeSpec) error diff --git a/api/server/router/swarm/cluster.go b/api/server/router/swarm/cluster.go index 8e9b58d110..e31c6ad075 100644 --- a/api/server/router/swarm/cluster.go +++ b/api/server/router/swarm/cluster.go @@ -1,6 +1,9 @@ package swarm -import "github.com/docker/docker/api/server/router" +import ( + "github.com/docker/docker/api/server/router" + "github.com/docker/docker/daemon" +) // buildRouter is a router to talk with the build controller type swarmRouter struct { @@ -9,11 +12,14 @@ type swarmRouter struct { } // NewRouter initializes a new build router -func NewRouter(b Backend) router.Router { +func NewRouter(d *daemon.Daemon, b Backend) router.Router { r := &swarmRouter{ backend: b, } r.initRoutes() + if d.HasExperimental() { + r.addExperimentalRoutes() + } return r } @@ -22,6 +28,12 @@ func (sr *swarmRouter) Routes() []router.Route { return sr.routes } +func (sr *swarmRouter) addExperimentalRoutes() { + sr.routes = append(sr.routes, + router.Cancellable(router.NewGetRoute("/services/{id}/logs", sr.getServiceLogs)), + ) +} + func (sr *swarmRouter) initRoutes() { sr.routes = []router.Route{ router.NewPostRoute("/swarm/init", sr.initCluster), @@ -32,20 +44,20 @@ func (sr *swarmRouter) initRoutes() { router.NewPostRoute("/swarm/update", sr.updateCluster), router.NewPostRoute("/swarm/unlock", sr.unlockCluster), router.NewGetRoute("/services", sr.getServices), - router.NewGetRoute("/services/{id:.*}", sr.getService), + router.NewGetRoute("/services/{id}", sr.getService), router.NewPostRoute("/services/create", sr.createService), - router.NewPostRoute("/services/{id:.*}/update", sr.updateService), - router.NewDeleteRoute("/services/{id:.*}", sr.removeService), + router.NewPostRoute("/services/{id}/update", sr.updateService), + router.NewDeleteRoute("/services/{id}", sr.removeService), router.NewGetRoute("/nodes", sr.getNodes), - router.NewGetRoute("/nodes/{id:.*}", sr.getNode), - router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode), - router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode), + router.NewGetRoute("/nodes/{id}", sr.getNode), + router.NewDeleteRoute("/nodes/{id}", sr.removeNode), + router.NewPostRoute("/nodes/{id}/update", sr.updateNode), router.NewGetRoute("/tasks", sr.getTasks), - router.NewGetRoute("/tasks/{id:.*}", sr.getTask), + router.NewGetRoute("/tasks/{id}", sr.getTask), router.NewGetRoute("/secrets", sr.getSecrets), router.NewPostRoute("/secrets", sr.createSecret), - router.NewDeleteRoute("/secrets/{id:.*}", sr.removeSecret), - router.NewGetRoute("/secrets/{id:.*}", sr.getSecret), - router.NewPostRoute("/secrets/{id:.*}/update", sr.updateSecret), + router.NewDeleteRoute("/secrets/{id}", sr.removeSecret), + router.NewGetRoute("/secrets/{id}", sr.getSecret), + router.NewPostRoute("/secrets/{id}/update", sr.updateSecret), } } diff --git a/api/server/router/swarm/cluster_routes.go b/api/server/router/swarm/cluster_routes.go index 2c380b4b56..3ccbe2ba8b 100644 --- a/api/server/router/swarm/cluster_routes.go +++ b/api/server/router/swarm/cluster_routes.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/errors" "github.com/docker/docker/api/server/httputils" basictypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/filters" types "github.com/docker/docker/api/types/swarm" "golang.org/x/net/context" @@ -208,6 +209,59 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter, return nil } +func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + + // Args are validated before the stream starts because when it starts we're + // sending HTTP 200 by writing an empty chunk of data to tell the client that + // daemon is going to stream. By sending this initial HTTP 200 we can't report + // any error after the stream starts (i.e. container not found, wrong parameters) + // with the appropriate status code. + stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr") + if !(stdout || stderr) { + return fmt.Errorf("Bad parameters: you must choose at least one stream") + } + + serviceName := vars["id"] + logsConfig := &backend.ContainerLogsConfig{ + ContainerLogsOptions: basictypes.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), + }, + OutStream: w, + } + + if !logsConfig.Follow { + return fmt.Errorf("Bad parameters: Only follow mode is currently supported") + } + + if logsConfig.Details { + return fmt.Errorf("Bad parameters: details is not currently supported") + } + + chStarted := make(chan struct{}) + if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil { + select { + case <-chStarted: + // The client may be expecting all of the data we're sending to + // be multiplexed, so send it through OutStream, which will + // have been set up to handle that if needed. + fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err) + default: + return err + } + } + + return nil +} + func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := httputils.ParseForm(r); err != nil { return err diff --git a/client/interface.go b/client/interface.go index f24c9a51f6..883e8801fa 100644 --- a/client/interface.go +++ b/client/interface.go @@ -111,6 +111,7 @@ type ServiceAPIClient interface { ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) ServiceRemove(ctx context.Context, serviceID string) error ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) error + ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) } diff --git a/client/service_logs.go b/client/service_logs.go new file mode 100644 index 0000000000..24384e3ec0 --- /dev/null +++ b/client/service_logs.go @@ -0,0 +1,52 @@ +package client + +import ( + "io" + "net/url" + "time" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/types" + timetypes "github.com/docker/docker/api/types/time" +) + +// ServiceLogs returns the logs generated by a service in an io.ReadCloser. +// It's up to the caller to close the stream. +func (cli *Client) ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + query := url.Values{} + if options.ShowStdout { + query.Set("stdout", "1") + } + + if options.ShowStderr { + query.Set("stderr", "1") + } + + if options.Since != "" { + ts, err := timetypes.GetTimestamp(options.Since, time.Now()) + if err != nil { + return nil, err + } + query.Set("since", ts) + } + + if options.Timestamps { + query.Set("timestamps", "1") + } + + if options.Details { + query.Set("details", "1") + } + + if options.Follow { + query.Set("follow", "1") + } + query.Set("tail", options.Tail) + + resp, err := cli.get(ctx, "/services/"+serviceID+"/logs", query, nil) + if err != nil { + return nil, err + } + return resp.body, nil +} diff --git a/client/service_logs_test.go b/client/service_logs_test.go new file mode 100644 index 0000000000..a6d002ba75 --- /dev/null +++ b/client/service_logs_test.go @@ -0,0 +1,133 @@ +package client + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/docker/docker/api/types" + + "golang.org/x/net/context" +) + +func TestServiceLogsError(t *testing.T) { + client := &Client{ + client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), + } + _, err := client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{}) + if err == nil || err.Error() != "Error response from daemon: Server error" { + t.Fatalf("expected a Server Error, got %v", err) + } + _, err = client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{ + Since: "2006-01-02TZ", + }) + if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) { + t.Fatalf("expected a 'parsing time' error, got %v", err) + } +} + +func TestServiceLogs(t *testing.T) { + expectedURL := "/services/service_id/logs" + cases := []struct { + options types.ContainerLogsOptions + expectedQueryParams map[string]string + }{ + { + expectedQueryParams: map[string]string{ + "tail": "", + }, + }, + { + options: types.ContainerLogsOptions{ + Tail: "any", + }, + expectedQueryParams: map[string]string{ + "tail": "any", + }, + }, + { + options: types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Timestamps: true, + Details: true, + Follow: true, + }, + expectedQueryParams: map[string]string{ + "tail": "", + "stdout": "1", + "stderr": "1", + "timestamps": "1", + "details": "1", + "follow": "1", + }, + }, + { + options: types.ContainerLogsOptions{ + // An complete invalid date, timestamp or go duration will be + // passed as is + Since: "invalid but valid", + }, + expectedQueryParams: map[string]string{ + "tail": "", + "since": "invalid but valid", + }, + }, + } + for _, logCase := range cases { + client := &Client{ + client: newMockClient(func(r *http.Request) (*http.Response, error) { + if !strings.HasPrefix(r.URL.Path, expectedURL) { + return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, r.URL) + } + // Check query parameters + query := r.URL.Query() + for key, expected := range logCase.expectedQueryParams { + actual := query.Get(key) + if actual != expected { + return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) + } + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))), + }, nil + }), + } + body, err := client.ServiceLogs(context.Background(), "service_id", logCase.options) + if err != nil { + t.Fatal(err) + } + defer body.Close() + content, err := ioutil.ReadAll(body) + if err != nil { + t.Fatal(err) + } + if string(content) != "response" { + t.Fatalf("expected response to contain 'response', got %s", string(content)) + } + } +} + +func ExampleClient_ServiceLogs_withTimeout() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, _ := NewEnvClient() + reader, err := client.ServiceLogs(ctx, "service_id", types.ContainerLogsOptions{}) + if err != nil { + log.Fatal(err) + } + + _, err = io.Copy(os.Stdout, reader) + if err != nil && err != io.EOF { + log.Fatal(err) + } +} diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 38ad14b475..ec1c348905 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -456,7 +456,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) { systemrouter.NewRouter(d, c), volume.NewRouter(d), build.NewRouter(dockerfile.NewBuildManager(d)), - swarmrouter.NewRouter(c), + swarmrouter.NewRouter(d, c), }...) if d.NetworkControllerEnabled() { diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 528cc38fa3..74e9d7e4bb 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net" "os" @@ -16,20 +17,24 @@ import ( "github.com/Sirupsen/logrus" apierrors "github.com/docker/docker/api/errors" apitypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/docker/daemon/cluster/executor/container" + "github.com/docker/docker/daemon/logger" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/signal" + "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/reference" "github.com/docker/docker/runconfig" swarmapi "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/encryption" swarmnode "github.com/docker/swarmkit/node" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" @@ -45,6 +50,7 @@ const defaultAddr = "0.0.0.0:2377" const ( initialReconnectDelay = 100 * time.Millisecond maxReconnectDelay = 30 * time.Second + contextPrefix = "com.docker.swarm" ) // ErrNoSwarm is returned on leaving a cluster that was never initialized @@ -120,6 +126,7 @@ type node struct { ready bool conn *grpc.ClientConn client swarmapi.ControlClient + logs swarmapi.LogsClient reconnectDelay time.Duration config nodeStartConfig } @@ -371,8 +378,10 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) { if node.conn != conn { if conn == nil { node.client = nil + node.logs = nil } else { node.client = swarmapi.NewControlClient(conn) + node.logs = swarmapi.NewLogsClient(conn) } } node.conn = conn @@ -1205,6 +1214,88 @@ func (c *Cluster) RemoveService(input string) error { return nil } +// ServiceLogs collects service logs and writes them back to `config.OutStream` +func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error { + c.RLock() + if !c.isActiveManager() { + c.RUnlock() + return c.errNoManager() + } + + service, err := getService(ctx, c.client, input) + if err != nil { + c.RUnlock() + return err + } + + stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{ + Selector: &swarmapi.LogSelector{ + ServiceIDs: []string{service.ID}, + }, + Options: &swarmapi.LogSubscriptionOptions{ + Follow: true, + }, + }) + if err != nil { + c.RUnlock() + return err + } + + wf := ioutils.NewWriteFlusher(config.OutStream) + defer wf.Close() + close(started) + wf.Flush() + + outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout) + errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) + + // Release the lock before starting the stream. + c.RUnlock() + for { + // Check the context before doing anything. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + subscribeMsg, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + for _, msg := range subscribeMsg.Messages { + data := []byte{} + + if config.Timestamps { + ts, err := ptypes.Timestamp(msg.Timestamp) + if err != nil { + return err + } + data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...) + } + + data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ", + contextPrefix, msg.Context.NodeID, + contextPrefix, msg.Context.ServiceID, + contextPrefix, msg.Context.TaskID, + ))...) + + data = append(data, msg.Data...) + + switch msg.Stream { + case swarmapi.LogStreamStdout: + outStream.Write(data) + case swarmapi.LogStreamStderr: + errStream.Write(data) + } + } + } +} + // GetNodes returns a list of all nodes known to a cluster. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) { c.RLock()