mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
0c7b930952
daemon/cluster/convert/service.go:96:34: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/convert/service.go:169:44: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/convert/service.go:470:30: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/convert/container.go:224:23: empty-lines: extra empty line at the start of a block (revive) daemon/cluster/convert/network.go:109:14: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/convert/service.go:537:27: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/services.go:247:19: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/services.go:252:41: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/services.go:256:12: empty-lines: extra empty line at the end of a block (revive) daemon/cluster/services.go:289:80: empty-lines: extra empty line at the start of a block (revive) daemon/cluster/executor/container/health_test.go:18:37: empty-lines: extra empty line at the start of a block (revive) daemon/cluster/executor/container/adapter.go:437:68: empty-lines: extra empty line at the end of a block (revive) Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
658 lines
22 KiB
Go
658 lines
22 KiB
Go
package cluster // import "github.com/docker/docker/daemon/cluster"
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/backend"
|
|
"github.com/docker/docker/api/types/registry"
|
|
"github.com/docker/docker/api/types/swarm"
|
|
timetypes "github.com/docker/docker/api/types/time"
|
|
"github.com/docker/docker/daemon/cluster/convert"
|
|
"github.com/docker/docker/errdefs"
|
|
runconfigopts "github.com/docker/docker/runconfig/opts"
|
|
gogotypes "github.com/gogo/protobuf/types"
|
|
swarmapi "github.com/moby/swarmkit/v2/api"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// GetServices returns all services of a managed swarm cluster.
|
|
func (c *Cluster) GetServices(options types.ServiceListOptions) ([]swarm.Service, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
state := c.currentNodeState()
|
|
if !state.IsActiveManager() {
|
|
return nil, c.errNoManager(state)
|
|
}
|
|
|
|
// We move the accepted filter check here as "mode" filter
|
|
// is processed in the daemon, not in SwarmKit. So it might
|
|
// be good to have accepted file check in the same file as
|
|
// the filter processing (in the for loop below).
|
|
accepted := map[string]bool{
|
|
"name": true,
|
|
"id": true,
|
|
"label": true,
|
|
"mode": true,
|
|
"runtime": true,
|
|
}
|
|
if err := options.Filters.Validate(accepted); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(options.Filters.Get("runtime")) == 0 {
|
|
// Default to using the container runtime filter
|
|
options.Filters.Add("runtime", string(swarm.RuntimeContainer))
|
|
}
|
|
|
|
filters := &swarmapi.ListServicesRequest_Filters{
|
|
NamePrefixes: options.Filters.Get("name"),
|
|
IDPrefixes: options.Filters.Get("id"),
|
|
Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
|
|
Runtimes: options.Filters.Get("runtime"),
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
r, err := state.controlClient.ListServices(
|
|
ctx,
|
|
&swarmapi.ListServicesRequest{Filters: filters},
|
|
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
services := make([]swarm.Service, 0, len(r.Services))
|
|
|
|
// if the user requests the service statuses, we'll store the IDs needed
|
|
// in this slice
|
|
var serviceIDs []string
|
|
if options.Status {
|
|
serviceIDs = make([]string, 0, len(r.Services))
|
|
}
|
|
for _, service := range r.Services {
|
|
if options.Filters.Contains("mode") {
|
|
var mode string
|
|
switch service.Spec.GetMode().(type) {
|
|
case *swarmapi.ServiceSpec_Global:
|
|
mode = "global"
|
|
case *swarmapi.ServiceSpec_Replicated:
|
|
mode = "replicated"
|
|
case *swarmapi.ServiceSpec_ReplicatedJob:
|
|
mode = "replicated-job"
|
|
case *swarmapi.ServiceSpec_GlobalJob:
|
|
mode = "global-job"
|
|
}
|
|
|
|
if !options.Filters.ExactMatch("mode", mode) {
|
|
continue
|
|
}
|
|
}
|
|
if options.Status {
|
|
serviceIDs = append(serviceIDs, service.ID)
|
|
}
|
|
svcs, err := convert.ServiceFromGRPC(*service)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
services = append(services, svcs)
|
|
}
|
|
|
|
if options.Status {
|
|
// Listing service statuses is a separate call because, while it is the
|
|
// most common UI operation, it is still just a UI operation, and it
|
|
// would be improper to include this data in swarm's Service object.
|
|
// We pay the cost with some complexity here, but this is still way
|
|
// more efficient than marshalling and unmarshalling all the JSON
|
|
// needed to list tasks and get this data otherwise client-side
|
|
resp, err := state.controlClient.ListServiceStatuses(
|
|
ctx,
|
|
&swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
|
|
grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// we'll need to match up statuses in the response with the services in
|
|
// the list operation. if we did this by operating on two lists, the
|
|
// result would be quadratic. instead, make a mapping of service IDs to
|
|
// service statuses so that this is roughly linear. additionally,
|
|
// convert the status response to an engine api service status here.
|
|
serviceMap := map[string]*swarm.ServiceStatus{}
|
|
for _, status := range resp.Statuses {
|
|
serviceMap[status.ServiceID] = &swarm.ServiceStatus{
|
|
RunningTasks: status.RunningTasks,
|
|
DesiredTasks: status.DesiredTasks,
|
|
CompletedTasks: status.CompletedTasks,
|
|
}
|
|
}
|
|
|
|
// because this is a list of values and not pointers, make sure we
|
|
// actually alter the value when iterating.
|
|
for i, service := range services {
|
|
// the return value of the ListServiceStatuses operation is
|
|
// guaranteed to contain a value in the response for every argument
|
|
// in the request, so we can safely do this assignment. and even if
|
|
// it wasn't, and the service ID was for some reason absent from
|
|
// this map, the resulting value of service.Status would just be
|
|
// nil -- the same thing it was before
|
|
service.ServiceStatus = serviceMap[service.ID]
|
|
services[i] = service
|
|
}
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// GetService returns a service based on an ID or name.
|
|
func (c *Cluster) GetService(input string, insertDefaults bool) (swarm.Service, error) {
|
|
var service *swarmapi.Service
|
|
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
s, err := getService(ctx, state.controlClient, input, insertDefaults)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
service = s
|
|
return nil
|
|
}); err != nil {
|
|
return swarm.Service{}, err
|
|
}
|
|
svc, err := convert.ServiceFromGRPC(*service)
|
|
if err != nil {
|
|
return swarm.Service{}, err
|
|
}
|
|
return svc, nil
|
|
}
|
|
|
|
// CreateService creates a new service in a managed swarm cluster.
|
|
func (c *Cluster) CreateService(s swarm.ServiceSpec, encodedAuth string, queryRegistry bool) (*types.ServiceCreateResponse, error) {
|
|
var resp *types.ServiceCreateResponse
|
|
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
err := c.populateNetworkID(ctx, state.controlClient, &s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serviceSpec, err := convert.ServiceSpecToGRPC(s)
|
|
if err != nil {
|
|
return errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
resp = &types.ServiceCreateResponse{}
|
|
|
|
switch serviceSpec.Task.Runtime.(type) {
|
|
case *swarmapi.TaskSpec_Attachment:
|
|
return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
|
|
// handle other runtimes here
|
|
case *swarmapi.TaskSpec_Generic:
|
|
switch serviceSpec.Task.GetGeneric().Kind {
|
|
case string(swarm.RuntimePlugin):
|
|
if !c.config.Backend.HasExperimental() {
|
|
return fmt.Errorf("runtime type %q only supported in experimental", swarm.RuntimePlugin)
|
|
}
|
|
if s.TaskTemplate.PluginSpec == nil {
|
|
return errors.New("plugin spec must be set")
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported runtime type: %q", serviceSpec.Task.GetGeneric().Kind)
|
|
}
|
|
|
|
r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp.ID = r.Service.ID
|
|
case *swarmapi.TaskSpec_Container:
|
|
ctnr := serviceSpec.Task.GetContainer()
|
|
if ctnr == nil {
|
|
return errors.New("service does not use container tasks")
|
|
}
|
|
if encodedAuth != "" {
|
|
ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
|
|
}
|
|
|
|
// retrieve auth config from encoded auth
|
|
authConfig := ®istry.AuthConfig{}
|
|
if encodedAuth != "" {
|
|
authReader := strings.NewReader(encodedAuth)
|
|
dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
|
|
if err := dec.Decode(authConfig); err != nil {
|
|
logrus.Warnf("invalid authconfig: %v", err)
|
|
}
|
|
}
|
|
|
|
// pin image by digest for API versions < 1.30
|
|
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
|
|
// should be removed in the future. Since integration tests only use the
|
|
// latest API version, so this is no longer required.
|
|
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
|
|
digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
|
|
if err != nil {
|
|
logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
|
|
// warning in the client response should be concise
|
|
resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
|
|
} else if ctnr.Image != digestImage {
|
|
logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
|
|
ctnr.Image = digestImage
|
|
} else {
|
|
logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
|
|
}
|
|
|
|
// Replace the context with a fresh one.
|
|
// If we timed out while communicating with the
|
|
// registry, then "ctx" will already be expired, which
|
|
// would cause UpdateService below to fail. Reusing
|
|
// "ctx" could make it impossible to create a service
|
|
// if the registry is slow or unresponsive.
|
|
var cancel func()
|
|
ctx, cancel = c.getRequestContext()
|
|
defer cancel()
|
|
}
|
|
|
|
r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp.ID = r.Service.ID
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// UpdateService updates existing service to match new properties.
|
|
func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec swarm.ServiceSpec, flags types.ServiceUpdateOptions, queryRegistry bool) (*types.ServiceUpdateResponse, error) {
|
|
var resp *types.ServiceUpdateResponse
|
|
|
|
err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
err := c.populateNetworkID(ctx, state.controlClient, &spec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serviceSpec, err := convert.ServiceSpecToGRPC(spec)
|
|
if err != nil {
|
|
return errdefs.InvalidParameter(err)
|
|
}
|
|
|
|
currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp = &types.ServiceUpdateResponse{}
|
|
|
|
switch serviceSpec.Task.Runtime.(type) {
|
|
case *swarmapi.TaskSpec_Attachment:
|
|
return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
|
|
case *swarmapi.TaskSpec_Generic:
|
|
switch serviceSpec.Task.GetGeneric().Kind {
|
|
case string(swarm.RuntimePlugin):
|
|
if spec.TaskTemplate.PluginSpec == nil {
|
|
return errors.New("plugin spec must be set")
|
|
}
|
|
}
|
|
case *swarmapi.TaskSpec_Container:
|
|
newCtnr := serviceSpec.Task.GetContainer()
|
|
if newCtnr == nil {
|
|
return errors.New("service does not use container tasks")
|
|
}
|
|
|
|
encodedAuth := flags.EncodedRegistryAuth
|
|
if encodedAuth != "" {
|
|
newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
|
|
} else {
|
|
// this is needed because if the encodedAuth isn't being updated then we
|
|
// shouldn't lose it, and continue to use the one that was already present
|
|
var ctnr *swarmapi.ContainerSpec
|
|
switch flags.RegistryAuthFrom {
|
|
case types.RegistryAuthFromSpec, "":
|
|
ctnr = currentService.Spec.Task.GetContainer()
|
|
case types.RegistryAuthFromPreviousSpec:
|
|
if currentService.PreviousSpec == nil {
|
|
return errors.New("service does not have a previous spec")
|
|
}
|
|
ctnr = currentService.PreviousSpec.Task.GetContainer()
|
|
default:
|
|
return errors.New("unsupported registryAuthFrom value")
|
|
}
|
|
if ctnr == nil {
|
|
return errors.New("service does not use container tasks")
|
|
}
|
|
newCtnr.PullOptions = ctnr.PullOptions
|
|
// update encodedAuth so it can be used to pin image by digest
|
|
if ctnr.PullOptions != nil {
|
|
encodedAuth = ctnr.PullOptions.RegistryAuth
|
|
}
|
|
}
|
|
|
|
// retrieve auth config from encoded auth
|
|
authConfig := ®istry.AuthConfig{}
|
|
if encodedAuth != "" {
|
|
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
|
|
logrus.Warnf("invalid authconfig: %v", err)
|
|
}
|
|
}
|
|
|
|
// pin image by digest for API versions < 1.30
|
|
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
|
|
// should be removed in the future. Since integration tests only use the
|
|
// latest API version, so this is no longer required.
|
|
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
|
|
digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
|
|
if err != nil {
|
|
logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
|
|
// warning in the client response should be concise
|
|
resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
|
|
} else if newCtnr.Image != digestImage {
|
|
logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
|
|
newCtnr.Image = digestImage
|
|
} else {
|
|
logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
|
|
}
|
|
|
|
// Replace the context with a fresh one.
|
|
// If we timed out while communicating with the
|
|
// registry, then "ctx" will already be expired, which
|
|
// would cause UpdateService below to fail. Reusing
|
|
// "ctx" could make it impossible to update a service
|
|
// if the registry is slow or unresponsive.
|
|
var cancel func()
|
|
ctx, cancel = c.getRequestContext()
|
|
defer cancel()
|
|
}
|
|
}
|
|
|
|
var rollback swarmapi.UpdateServiceRequest_Rollback
|
|
switch flags.Rollback {
|
|
case "", "none":
|
|
rollback = swarmapi.UpdateServiceRequest_NONE
|
|
case "previous":
|
|
rollback = swarmapi.UpdateServiceRequest_PREVIOUS
|
|
default:
|
|
return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
|
|
}
|
|
|
|
_, err = state.controlClient.UpdateService(
|
|
ctx,
|
|
&swarmapi.UpdateServiceRequest{
|
|
ServiceID: currentService.ID,
|
|
Spec: &serviceSpec,
|
|
ServiceVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
Rollback: rollback,
|
|
},
|
|
)
|
|
return err
|
|
})
|
|
return resp, err
|
|
}
|
|
|
|
// RemoveService removes a service from a managed swarm cluster.
|
|
func (c *Cluster) RemoveService(input string) error {
|
|
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
service, err := getService(ctx, state.controlClient, input, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
|
|
return err
|
|
})
|
|
}
|
|
|
|
// ServiceLogs collects service logs and writes them back to `config.OutStream`
|
|
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
state := c.currentNodeState()
|
|
if !state.IsActiveManager() {
|
|
return nil, c.errNoManager(state)
|
|
}
|
|
|
|
swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error making log selector")
|
|
}
|
|
|
|
// set the streams we'll use
|
|
stdStreams := []swarmapi.LogStream{}
|
|
if config.ShowStdout {
|
|
stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
|
|
}
|
|
if config.ShowStderr {
|
|
stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
|
|
}
|
|
|
|
// Get tail value squared away - the number of previous log lines we look at
|
|
var tail int64
|
|
// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
|
|
// it to -1 (all). i don't agree with that, but i also think no tail value
|
|
// should be legitimate. if you don't pass tail, we assume you want "all"
|
|
if config.Tail == "all" || config.Tail == "" {
|
|
// tail of 0 means send all logs on the swarmkit side
|
|
tail = 0
|
|
} else {
|
|
t, err := strconv.Atoi(config.Tail)
|
|
if err != nil {
|
|
return nil, errors.New("tail value must be a positive integer or \"all\"")
|
|
}
|
|
if t < 0 {
|
|
return nil, errors.New("negative tail values not supported")
|
|
}
|
|
// we actually use negative tail in swarmkit to represent messages
|
|
// backwards starting from the beginning. also, -1 means no logs. so,
|
|
// basically, for api compat with docker container logs, add one and
|
|
// flip the sign. we error above if you try to negative tail, which
|
|
// isn't supported by docker (and would error deeper in the stack
|
|
// anyway)
|
|
//
|
|
// See the logs protobuf for more information
|
|
tail = int64(-(t + 1))
|
|
}
|
|
|
|
// get the since value - the time in the past we're looking at logs starting from
|
|
var sinceProto *gogotypes.Timestamp
|
|
if config.Since != "" {
|
|
s, n, err := timetypes.ParseTimestamps(config.Since, 0)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not parse since timestamp")
|
|
}
|
|
since := time.Unix(s, n)
|
|
sinceProto, err = gogotypes.TimestampProto(since)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not parse timestamp to proto")
|
|
}
|
|
}
|
|
|
|
stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
|
|
Selector: swarmSelector,
|
|
Options: &swarmapi.LogSubscriptionOptions{
|
|
Follow: config.Follow,
|
|
Streams: stdStreams,
|
|
Tail: tail,
|
|
Since: sinceProto,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
messageChan := make(chan *backend.LogMessage, 1)
|
|
go func() {
|
|
defer close(messageChan)
|
|
for {
|
|
// Check the context before doing anything.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
subscribeMsg, err := stream.Recv()
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
// if we're not io.EOF, push the message in and return
|
|
if err != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
case messageChan <- &backend.LogMessage{Err: err}:
|
|
}
|
|
return
|
|
}
|
|
|
|
for _, msg := range subscribeMsg.Messages {
|
|
// make a new message
|
|
m := new(backend.LogMessage)
|
|
m.Attrs = make([]backend.LogAttr, 0, len(msg.Attrs)+3)
|
|
// add the timestamp, adding the error if it fails
|
|
m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
|
|
if err != nil {
|
|
m.Err = err
|
|
}
|
|
|
|
nodeKey := contextPrefix + ".node.id"
|
|
serviceKey := contextPrefix + ".service.id"
|
|
taskKey := contextPrefix + ".task.id"
|
|
|
|
// copy over all of the details
|
|
for _, d := range msg.Attrs {
|
|
switch d.Key {
|
|
case nodeKey, serviceKey, taskKey:
|
|
// we have the final say over context details (in case there
|
|
// is a conflict (if the user added a detail with a context's
|
|
// key for some reason))
|
|
default:
|
|
m.Attrs = append(m.Attrs, backend.LogAttr{Key: d.Key, Value: d.Value})
|
|
}
|
|
}
|
|
m.Attrs = append(m.Attrs,
|
|
backend.LogAttr{Key: nodeKey, Value: msg.Context.NodeID},
|
|
backend.LogAttr{Key: serviceKey, Value: msg.Context.ServiceID},
|
|
backend.LogAttr{Key: taskKey, Value: msg.Context.TaskID},
|
|
)
|
|
|
|
switch msg.Stream {
|
|
case swarmapi.LogStreamStdout:
|
|
m.Source = "stdout"
|
|
case swarmapi.LogStreamStderr:
|
|
m.Source = "stderr"
|
|
}
|
|
m.Line = msg.Data
|
|
|
|
// there could be a case where the reader stops accepting
|
|
// messages and the context is canceled. we need to check that
|
|
// here, or otherwise we risk blocking forever on the message
|
|
// send.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case messageChan <- m:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return messageChan, nil
|
|
}
|
|
|
|
// convertSelector takes a backend.LogSelector, which contains raw names that
|
|
// may or may not be valid, and converts them to an api.LogSelector proto. It
|
|
// returns an error if something fails
|
|
func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
|
|
// don't rely on swarmkit to resolve IDs, do it ourselves
|
|
swarmSelector := &swarmapi.LogSelector{}
|
|
for _, s := range selector.Services {
|
|
service, err := getService(ctx, cc, s, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c := service.Spec.Task.GetContainer()
|
|
if c == nil {
|
|
return nil, errors.New("logs only supported on container tasks")
|
|
}
|
|
swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
|
|
}
|
|
for _, t := range selector.Tasks {
|
|
task, err := getTask(ctx, cc, t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c := task.Spec.GetContainer()
|
|
if c == nil {
|
|
return nil, errors.New("logs only supported on container tasks")
|
|
}
|
|
swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
|
|
}
|
|
return swarmSelector, nil
|
|
}
|
|
|
|
// imageWithDigestString takes an image such as name or name:tag
|
|
// and returns the image pinned to a digest, such as name@sha256:34234
|
|
func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *registry.AuthConfig) (string, error) {
|
|
ref, err := reference.ParseAnyReference(image)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
namedRef, ok := ref.(reference.Named)
|
|
if !ok {
|
|
if _, ok := ref.(reference.Digested); ok {
|
|
return image, nil
|
|
}
|
|
return "", errors.Errorf("unknown image reference format: %s", image)
|
|
}
|
|
// only query registry if not a canonical reference (i.e. with digest)
|
|
if _, ok := namedRef.(reference.Canonical); !ok {
|
|
namedRef = reference.TagNameOnly(namedRef)
|
|
|
|
taggedRef, ok := namedRef.(reference.NamedTagged)
|
|
if !ok {
|
|
return "", errors.Errorf("image reference not tagged: %s", image)
|
|
}
|
|
|
|
repo, err := c.config.ImageBackend.GetRepository(ctx, taggedRef, authConfig)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
// return familiar form until interface updated to return type
|
|
return reference.FamiliarString(namedDigestedRef), nil
|
|
}
|
|
// reference already contains a digest, so just return it
|
|
return reference.FamiliarString(ref), nil
|
|
}
|
|
|
|
// digestWarning constructs a formatted warning string
|
|
// using the image name that could not be pinned by digest. The
|
|
// formatting is hardcoded, but could me made smarter in the future
|
|
func digestWarning(image string) string {
|
|
return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
|
|
}
|