api: Service Logs support

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2016-10-26 01:17:31 -07:00
parent 0ec6865713
commit 819d0159bb
8 changed files with 359 additions and 13 deletions

View File

@ -2,7 +2,9 @@ package swarm
import (
basictypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
types "github.com/docker/docker/api/types/swarm"
"golang.org/x/net/context"
)
// Backend abstracts an swarm commands manager.
@ -19,6 +21,7 @@ type Backend interface {
CreateService(types.ServiceSpec, string) (string, error)
UpdateService(string, uint64, types.ServiceSpec, string, string) error
RemoveService(string) error
ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
GetNode(string) (types.Node, error)
UpdateNode(string, uint64, types.NodeSpec) error

View File

@ -1,6 +1,9 @@
package swarm
import "github.com/docker/docker/api/server/router"
import (
"github.com/docker/docker/api/server/router"
"github.com/docker/docker/daemon"
)
// buildRouter is a router to talk with the build controller
type swarmRouter struct {
@ -9,11 +12,14 @@ type swarmRouter struct {
}
// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
func NewRouter(d *daemon.Daemon, b Backend) router.Router {
r := &swarmRouter{
backend: b,
}
r.initRoutes()
if d.HasExperimental() {
r.addExperimentalRoutes()
}
return r
}
@ -22,6 +28,12 @@ func (sr *swarmRouter) Routes() []router.Route {
return sr.routes
}
func (sr *swarmRouter) addExperimentalRoutes() {
sr.routes = append(sr.routes,
router.Cancellable(router.NewGetRoute("/services/{id}/logs", sr.getServiceLogs)),
)
}
func (sr *swarmRouter) initRoutes() {
sr.routes = []router.Route{
router.NewPostRoute("/swarm/init", sr.initCluster),
@ -32,20 +44,20 @@ func (sr *swarmRouter) initRoutes() {
router.NewPostRoute("/swarm/update", sr.updateCluster),
router.NewPostRoute("/swarm/unlock", sr.unlockCluster),
router.NewGetRoute("/services", sr.getServices),
router.NewGetRoute("/services/{id:.*}", sr.getService),
router.NewGetRoute("/services/{id}", sr.getService),
router.NewPostRoute("/services/create", sr.createService),
router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
router.NewPostRoute("/services/{id}/update", sr.updateService),
router.NewDeleteRoute("/services/{id}", sr.removeService),
router.NewGetRoute("/nodes", sr.getNodes),
router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
router.NewGetRoute("/nodes/{id}", sr.getNode),
router.NewDeleteRoute("/nodes/{id}", sr.removeNode),
router.NewPostRoute("/nodes/{id}/update", sr.updateNode),
router.NewGetRoute("/tasks", sr.getTasks),
router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
router.NewGetRoute("/tasks/{id}", sr.getTask),
router.NewGetRoute("/secrets", sr.getSecrets),
router.NewPostRoute("/secrets", sr.createSecret),
router.NewDeleteRoute("/secrets/{id:.*}", sr.removeSecret),
router.NewGetRoute("/secrets/{id:.*}", sr.getSecret),
router.NewPostRoute("/secrets/{id:.*}/update", sr.updateSecret),
router.NewDeleteRoute("/secrets/{id}", sr.removeSecret),
router.NewGetRoute("/secrets/{id}", sr.getSecret),
router.NewPostRoute("/secrets/{id}/update", sr.updateSecret),
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/docker/docker/api/errors"
"github.com/docker/docker/api/server/httputils"
basictypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/filters"
types "github.com/docker/docker/api/types/swarm"
"golang.org/x/net/context"
@ -208,6 +209,59 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter,
return nil
}
func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
// Args are validated before the stream starts because when it starts we're
// sending HTTP 200 by writing an empty chunk of data to tell the client that
// daemon is going to stream. By sending this initial HTTP 200 we can't report
// any error after the stream starts (i.e. container not found, wrong parameters)
// with the appropriate status code.
stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
if !(stdout || stderr) {
return fmt.Errorf("Bad parameters: you must choose at least one stream")
}
serviceName := vars["id"]
logsConfig := &backend.ContainerLogsConfig{
ContainerLogsOptions: basictypes.ContainerLogsOptions{
Follow: httputils.BoolValue(r, "follow"),
Timestamps: httputils.BoolValue(r, "timestamps"),
Since: r.Form.Get("since"),
Tail: r.Form.Get("tail"),
ShowStdout: stdout,
ShowStderr: stderr,
Details: httputils.BoolValue(r, "details"),
},
OutStream: w,
}
if !logsConfig.Follow {
return fmt.Errorf("Bad parameters: Only follow mode is currently supported")
}
if logsConfig.Details {
return fmt.Errorf("Bad parameters: details is not currently supported")
}
chStarted := make(chan struct{})
if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil {
select {
case <-chStarted:
// The client may be expecting all of the data we're sending to
// be multiplexed, so send it through OutStream, which will
// have been set up to handle that if needed.
fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err)
default:
return err
}
}
return nil
}
func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err

View File

@ -111,6 +111,7 @@ type ServiceAPIClient interface {
ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
ServiceRemove(ctx context.Context, serviceID string) error
ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) error
ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error)
TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
}

52
client/service_logs.go Normal file
View File

@ -0,0 +1,52 @@
package client
import (
"io"
"net/url"
"time"
"golang.org/x/net/context"
"github.com/docker/docker/api/types"
timetypes "github.com/docker/docker/api/types/time"
)
// ServiceLogs returns the logs generated by a service in an io.ReadCloser.
// It's up to the caller to close the stream.
func (cli *Client) ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
query := url.Values{}
if options.ShowStdout {
query.Set("stdout", "1")
}
if options.ShowStderr {
query.Set("stderr", "1")
}
if options.Since != "" {
ts, err := timetypes.GetTimestamp(options.Since, time.Now())
if err != nil {
return nil, err
}
query.Set("since", ts)
}
if options.Timestamps {
query.Set("timestamps", "1")
}
if options.Details {
query.Set("details", "1")
}
if options.Follow {
query.Set("follow", "1")
}
query.Set("tail", options.Tail)
resp, err := cli.get(ctx, "/services/"+serviceID+"/logs", query, nil)
if err != nil {
return nil, err
}
return resp.body, nil
}

133
client/service_logs_test.go Normal file
View File

@ -0,0 +1,133 @@
package client
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"testing"
"time"
"github.com/docker/docker/api/types"
"golang.org/x/net/context"
)
func TestServiceLogsError(t *testing.T) {
client := &Client{
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
}
_, err := client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{})
if err == nil || err.Error() != "Error response from daemon: Server error" {
t.Fatalf("expected a Server Error, got %v", err)
}
_, err = client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{
Since: "2006-01-02TZ",
})
if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) {
t.Fatalf("expected a 'parsing time' error, got %v", err)
}
}
func TestServiceLogs(t *testing.T) {
expectedURL := "/services/service_id/logs"
cases := []struct {
options types.ContainerLogsOptions
expectedQueryParams map[string]string
}{
{
expectedQueryParams: map[string]string{
"tail": "",
},
},
{
options: types.ContainerLogsOptions{
Tail: "any",
},
expectedQueryParams: map[string]string{
"tail": "any",
},
},
{
options: types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: true,
Details: true,
Follow: true,
},
expectedQueryParams: map[string]string{
"tail": "",
"stdout": "1",
"stderr": "1",
"timestamps": "1",
"details": "1",
"follow": "1",
},
},
{
options: types.ContainerLogsOptions{
// An complete invalid date, timestamp or go duration will be
// passed as is
Since: "invalid but valid",
},
expectedQueryParams: map[string]string{
"tail": "",
"since": "invalid but valid",
},
},
}
for _, logCase := range cases {
client := &Client{
client: newMockClient(func(r *http.Request) (*http.Response, error) {
if !strings.HasPrefix(r.URL.Path, expectedURL) {
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, r.URL)
}
// Check query parameters
query := r.URL.Query()
for key, expected := range logCase.expectedQueryParams {
actual := query.Get(key)
if actual != expected {
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
}
}
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))),
}, nil
}),
}
body, err := client.ServiceLogs(context.Background(), "service_id", logCase.options)
if err != nil {
t.Fatal(err)
}
defer body.Close()
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
if string(content) != "response" {
t.Fatalf("expected response to contain 'response', got %s", string(content))
}
}
}
func ExampleClient_ServiceLogs_withTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, _ := NewEnvClient()
reader, err := client.ServiceLogs(ctx, "service_id", types.ContainerLogsOptions{})
if err != nil {
log.Fatal(err)
}
_, err = io.Copy(os.Stdout, reader)
if err != nil && err != io.EOF {
log.Fatal(err)
}
}

View File

@ -456,7 +456,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
systemrouter.NewRouter(d, c),
volume.NewRouter(d),
build.NewRouter(dockerfile.NewBuildManager(d)),
swarmrouter.NewRouter(c),
swarmrouter.NewRouter(d, c),
}...)
if d.NetworkControllerEnabled() {

View File

@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
@ -16,20 +17,24 @@ import (
"github.com/Sirupsen/logrus"
apierrors "github.com/docker/docker/api/errors"
apitypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/convert"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/docker/daemon/cluster/executor/container"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/reference"
"github.com/docker/docker/runconfig"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/encryption"
swarmnode "github.com/docker/swarmkit/node"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -45,6 +50,7 @@ const defaultAddr = "0.0.0.0:2377"
const (
initialReconnectDelay = 100 * time.Millisecond
maxReconnectDelay = 30 * time.Second
contextPrefix = "com.docker.swarm"
)
// ErrNoSwarm is returned on leaving a cluster that was never initialized
@ -120,6 +126,7 @@ type node struct {
ready bool
conn *grpc.ClientConn
client swarmapi.ControlClient
logs swarmapi.LogsClient
reconnectDelay time.Duration
config nodeStartConfig
}
@ -371,8 +378,10 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
if node.conn != conn {
if conn == nil {
node.client = nil
node.logs = nil
} else {
node.client = swarmapi.NewControlClient(conn)
node.logs = swarmapi.NewLogsClient(conn)
}
}
node.conn = conn
@ -1205,6 +1214,88 @@ func (c *Cluster) RemoveService(input string) error {
return nil
}
// ServiceLogs collects service logs and writes them back to `config.OutStream`
func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
c.RLock()
if !c.isActiveManager() {
c.RUnlock()
return c.errNoManager()
}
service, err := getService(ctx, c.client, input)
if err != nil {
c.RUnlock()
return err
}
stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
Selector: &swarmapi.LogSelector{
ServiceIDs: []string{service.ID},
},
Options: &swarmapi.LogSubscriptionOptions{
Follow: true,
},
})
if err != nil {
c.RUnlock()
return err
}
wf := ioutils.NewWriteFlusher(config.OutStream)
defer wf.Close()
close(started)
wf.Flush()
outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
// Release the lock before starting the stream.
c.RUnlock()
for {
// Check the context before doing anything.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
subscribeMsg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
for _, msg := range subscribeMsg.Messages {
data := []byte{}
if config.Timestamps {
ts, err := ptypes.Timestamp(msg.Timestamp)
if err != nil {
return err
}
data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
}
data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
contextPrefix, msg.Context.NodeID,
contextPrefix, msg.Context.ServiceID,
contextPrefix, msg.Context.TaskID,
))...)
data = append(data, msg.Data...)
switch msg.Stream {
case swarmapi.LogStreamStdout:
outStream.Write(data)
case swarmapi.LogStreamStderr:
errStream.Write(data)
}
}
}
}
// GetNodes returns a list of all nodes known to a cluster.
func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
c.RLock()