2018-02-05 16:05:59 -05:00
package cluster // import "github.com/docker/docker/daemon/cluster"
2017-02-11 13:40:14 -05:00
import (
2018-04-19 18:30:59 -04:00
"context"
2017-02-11 13:40:14 -05:00
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
2017-03-01 19:37:25 -05:00
"strconv"
2017-02-11 13:40:14 -05:00
"strings"
2017-03-01 19:37:25 -05:00
"time"
2017-02-11 13:40:14 -05:00
"github.com/docker/distribution/reference"
apitypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
types "github.com/docker/docker/api/types/swarm"
2017-03-01 19:37:25 -05:00
timetypes "github.com/docker/docker/api/types/time"
2017-02-11 13:40:14 -05:00
"github.com/docker/docker/daemon/cluster/convert"
2018-01-11 14:53:06 -05:00
"github.com/docker/docker/errdefs"
2017-03-03 17:08:49 -05:00
runconfigopts "github.com/docker/docker/runconfig/opts"
2017-02-11 13:40:14 -05:00
swarmapi "github.com/docker/swarmkit/api"
gogotypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
2017-07-26 17:42:13 -04:00
"github.com/sirupsen/logrus"
2018-10-29 20:44:11 -04:00
"google.golang.org/grpc"
2017-02-11 13:40:14 -05:00
)
// GetServices returns all services of a managed swarm cluster.
func ( c * Cluster ) GetServices ( options apitypes . ServiceListOptions ) ( [ ] types . Service , error ) {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
return nil , c . errNoManager ( state )
}
2017-03-03 17:08:49 -05:00
// We move the accepted filter check here as "mode" filter
// is processed in the daemon, not in SwarmKit. So it might
// be good to have accepted file check in the same file as
// the filter processing (in the for loop below).
accepted := map [ string ] bool {
2017-03-24 15:19:59 -04:00
"name" : true ,
"id" : true ,
"label" : true ,
"mode" : true ,
"runtime" : true ,
2017-03-03 17:08:49 -05:00
}
if err := options . Filters . Validate ( accepted ) ; err != nil {
2017-02-11 13:40:14 -05:00
return nil , err
}
2017-03-24 12:26:15 -04:00
2017-06-07 13:07:01 -04:00
if len ( options . Filters . Get ( "runtime" ) ) == 0 {
// Default to using the container runtime filter
options . Filters . Add ( "runtime" , string ( types . RuntimeContainer ) )
}
2017-03-03 17:08:49 -05:00
filters := & swarmapi . ListServicesRequest_Filters {
NamePrefixes : options . Filters . Get ( "name" ) ,
IDPrefixes : options . Filters . Get ( "id" ) ,
Labels : runconfigopts . ConvertKVStringsToMap ( options . Filters . Get ( "label" ) ) ,
2017-06-07 13:07:01 -04:00
Runtimes : options . Filters . Get ( "runtime" ) ,
2017-03-03 17:08:49 -05:00
}
2017-02-11 13:40:14 -05:00
ctx , cancel := c . getRequestContext ( )
defer cancel ( )
r , err := state . controlClient . ListServices (
ctx ,
2018-10-29 20:44:11 -04:00
& swarmapi . ListServicesRequest { Filters : filters } ,
grpc . MaxCallRecvMsgSize ( defaultRecvSizeForListResponse ) ,
)
2017-02-11 13:40:14 -05:00
if err != nil {
return nil , err
}
2017-04-13 23:02:28 -04:00
services := make ( [ ] types . Service , 0 , len ( r . Services ) )
2017-02-11 13:40:14 -05:00
2019-05-16 18:43:48 -04:00
// if the user requests the service statuses, we'll store the IDs needed
// in this slice
var serviceIDs [ ] string
if options . Status {
serviceIDs = make ( [ ] string , 0 , len ( r . Services ) )
}
2017-02-11 13:40:14 -05:00
for _ , service := range r . Services {
2017-09-26 07:39:56 -04:00
if options . Filters . Contains ( "mode" ) {
2017-03-03 17:08:49 -05:00
var mode string
switch service . Spec . GetMode ( ) . ( type ) {
case * swarmapi . ServiceSpec_Global :
mode = "global"
case * swarmapi . ServiceSpec_Replicated :
mode = "replicated"
2019-12-11 11:05:03 -05:00
case * swarmapi . ServiceSpec_ReplicatedJob :
mode = "replicatedjob"
case * swarmapi . ServiceSpec_GlobalJob :
mode = "globaljob"
2017-03-03 17:08:49 -05:00
}
if ! options . Filters . ExactMatch ( "mode" , mode ) {
continue
}
}
2019-05-16 18:43:48 -04:00
if options . Status {
serviceIDs = append ( serviceIDs , service . ID )
}
2017-03-01 15:52:55 -05:00
svcs , err := convert . ServiceFromGRPC ( * service )
if err != nil {
return nil , err
}
services = append ( services , svcs )
2017-02-11 13:40:14 -05:00
}
2019-05-16 18:43:48 -04:00
if options . Status {
// Listing service statuses is a separate call because, while it is the
// most common UI operation, it is still just a UI operation, and it
// would be improper to include this data in swarm's Service object.
// We pay the cost with some complexity here, but this is still way
// more efficient than marshalling and unmarshalling all the JSON
// needed to list tasks and get this data otherwise client-side
resp , err := state . controlClient . ListServiceStatuses (
ctx ,
& swarmapi . ListServiceStatusesRequest { Services : serviceIDs } ,
grpc . MaxCallRecvMsgSize ( defaultRecvSizeForListResponse ) ,
)
if err != nil {
return nil , err
}
// we'll need to match up statuses in the response with the services in
// the list operation. if we did this by operating on two lists, the
// result would be quadratic. instead, make a mapping of service IDs to
// service statuses so that this is roughly linear. additionally,
// convert the status response to an engine api service status here.
serviceMap := map [ string ] * types . ServiceStatus { }
for _ , status := range resp . Statuses {
serviceMap [ status . ServiceID ] = & types . ServiceStatus {
2019-12-11 11:05:03 -05:00
RunningTasks : status . RunningTasks ,
DesiredTasks : status . DesiredTasks ,
CompletedTasks : status . CompletedTasks ,
2019-05-16 18:43:48 -04:00
}
}
// because this is a list of values and not pointers, make sure we
// actually alter the value when iterating.
for i , service := range services {
// the return value of the ListServiceStatuses operation is
// guaranteed to contain a value in the response for every argument
// in the request, so we can safely do this assignment. and even if
// it wasn't, and the service ID was for some reason absent from
// this map, the resulting value of service.Status would just be
// nil -- the same thing it was before
service . ServiceStatus = serviceMap [ service . ID ]
services [ i ] = service
}
}
2017-02-11 13:40:14 -05:00
return services , nil
}
// GetService returns a service based on an ID or name.
2017-03-30 20:15:54 -04:00
func ( c * Cluster ) GetService ( input string , insertDefaults bool ) ( types . Service , error ) {
2017-02-28 05:12:11 -05:00
var service * swarmapi . Service
if err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
2017-03-30 20:15:54 -04:00
s , err := getService ( ctx , state . controlClient , input , insertDefaults )
2017-02-28 05:12:11 -05:00
if err != nil {
return err
}
service = s
return nil
} ) ; err != nil {
2017-02-11 13:40:14 -05:00
return types . Service { } , err
}
2017-03-01 15:52:55 -05:00
svc , err := convert . ServiceFromGRPC ( * service )
if err != nil {
return types . Service { } , err
}
return svc , nil
2017-02-11 13:40:14 -05:00
}
// CreateService creates a new service in a managed swarm cluster.
2017-05-12 16:51:52 -04:00
func ( c * Cluster ) CreateService ( s types . ServiceSpec , encodedAuth string , queryRegistry bool ) ( * apitypes . ServiceCreateResponse , error ) {
2017-02-28 05:12:11 -05:00
var resp * apitypes . ServiceCreateResponse
err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
err := c . populateNetworkID ( ctx , state . controlClient , & s )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
serviceSpec , err := convert . ServiceSpecToGRPC ( s )
if err != nil {
2017-11-28 23:09:37 -05:00
return errdefs . InvalidParameter ( err )
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
2017-03-01 15:52:55 -05:00
resp = & apitypes . ServiceCreateResponse { }
2017-02-11 13:40:14 -05:00
2017-03-01 15:52:55 -05:00
switch serviceSpec . Task . Runtime . ( type ) {
2017-10-18 17:41:59 -04:00
case * swarmapi . TaskSpec_Attachment :
return fmt . Errorf ( "invalid task spec: spec type %q not supported" , types . RuntimeNetworkAttachment )
2017-03-01 15:52:55 -05:00
// handle other runtimes here
2017-06-07 13:07:01 -04:00
case * swarmapi . TaskSpec_Generic :
switch serviceSpec . Task . GetGeneric ( ) . Kind {
case string ( types . RuntimePlugin ) :
2019-02-28 19:52:30 -05:00
if ! c . config . Backend . HasExperimental ( ) {
2017-08-08 21:33:25 -04:00
return fmt . Errorf ( "runtime type %q only supported in experimental" , types . RuntimePlugin )
}
2017-06-07 13:07:01 -04:00
if s . TaskTemplate . PluginSpec == nil {
return errors . New ( "plugin spec must be set" )
}
2017-08-08 21:33:25 -04:00
default :
return fmt . Errorf ( "unsupported runtime type: %q" , serviceSpec . Task . GetGeneric ( ) . Kind )
2017-06-07 13:07:01 -04:00
}
r , err := state . controlClient . CreateService ( ctx , & swarmapi . CreateServiceRequest { Spec : & serviceSpec } )
if err != nil {
return err
}
resp . ID = r . Service . ID
2017-03-01 15:52:55 -05:00
case * swarmapi . TaskSpec_Container :
ctnr := serviceSpec . Task . GetContainer ( )
if ctnr == nil {
return errors . New ( "service does not use container tasks" )
}
if encodedAuth != "" {
ctnr . PullOptions = & swarmapi . ContainerSpec_PullOptions { RegistryAuth : encodedAuth }
}
2017-02-11 13:40:14 -05:00
2017-03-01 15:52:55 -05:00
// retrieve auth config from encoded auth
authConfig := & apitypes . AuthConfig { }
if encodedAuth != "" {
2017-06-07 13:07:01 -04:00
authReader := strings . NewReader ( encodedAuth )
dec := json . NewDecoder ( base64 . NewDecoder ( base64 . URLEncoding , authReader ) )
if err := dec . Decode ( authConfig ) ; err != nil {
2017-03-01 15:52:55 -05:00
logrus . Warnf ( "invalid authconfig: %v" , err )
}
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
2017-05-12 16:51:52 -04:00
// pin image by digest for API versions < 1.30
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
// should be removed in the future. Since integration tests only use the
// latest API version, so this is no longer required.
if os . Getenv ( "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" ) != "1" && queryRegistry {
2017-03-01 15:52:55 -05:00
digestImage , err := c . imageWithDigestString ( ctx , ctnr . Image , authConfig )
if err != nil {
logrus . Warnf ( "unable to pin image %s to digest: %s" , ctnr . Image , err . Error ( ) )
// warning in the client response should be concise
resp . Warnings = append ( resp . Warnings , digestWarning ( ctnr . Image ) )
} else if ctnr . Image != digestImage {
logrus . Debugf ( "pinning image %s by digest: %s" , ctnr . Image , digestImage )
ctnr . Image = digestImage
} else {
logrus . Debugf ( "creating service using supplied digest reference %s" , ctnr . Image )
}
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to create a service
// if the registry is slow or unresponsive.
var cancel func ( )
ctx , cancel = c . getRequestContext ( )
defer cancel ( )
}
2017-02-28 05:12:11 -05:00
2017-03-01 15:52:55 -05:00
r , err := state . controlClient . CreateService ( ctx , & swarmapi . CreateServiceRequest { Spec : & serviceSpec } )
2017-02-28 05:12:11 -05:00
if err != nil {
2017-03-01 15:52:55 -05:00
return err
2017-02-28 05:12:11 -05:00
}
2017-03-06 19:05:56 -05:00
2017-03-01 15:52:55 -05:00
resp . ID = r . Service . ID
2017-02-28 05:12:11 -05:00
}
return nil
} )
2017-03-01 15:52:55 -05:00
2017-02-28 05:12:11 -05:00
return resp , err
2017-02-11 13:40:14 -05:00
}
// UpdateService updates existing service to match new properties.
2017-05-12 16:51:52 -04:00
func ( c * Cluster ) UpdateService ( serviceIDOrName string , version uint64 , spec types . ServiceSpec , flags apitypes . ServiceUpdateOptions , queryRegistry bool ) ( * apitypes . ServiceUpdateResponse , error ) {
2017-02-28 05:12:11 -05:00
var resp * apitypes . ServiceUpdateResponse
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
err := c . populateNetworkID ( ctx , state . controlClient , & spec )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
serviceSpec , err := convert . ServiceSpecToGRPC ( spec )
if err != nil {
2017-11-28 23:09:37 -05:00
return errdefs . InvalidParameter ( err )
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
2017-03-30 20:15:54 -04:00
currentService , err := getService ( ctx , state . controlClient , serviceIDOrName , false )
2017-02-28 05:12:11 -05:00
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-06-07 13:07:01 -04:00
resp = & apitypes . ServiceUpdateResponse { }
2017-02-11 13:40:14 -05:00
2017-06-07 13:07:01 -04:00
switch serviceSpec . Task . Runtime . ( type ) {
2017-10-18 17:41:59 -04:00
case * swarmapi . TaskSpec_Attachment :
return fmt . Errorf ( "invalid task spec: spec type %q not supported" , types . RuntimeNetworkAttachment )
2017-06-07 13:07:01 -04:00
case * swarmapi . TaskSpec_Generic :
switch serviceSpec . Task . GetGeneric ( ) . Kind {
case string ( types . RuntimePlugin ) :
if spec . TaskTemplate . PluginSpec == nil {
return errors . New ( "plugin spec must be set" )
2017-02-28 05:12:11 -05:00
}
}
2017-06-07 13:07:01 -04:00
case * swarmapi . TaskSpec_Container :
newCtnr := serviceSpec . Task . GetContainer ( )
if newCtnr == nil {
2017-02-28 05:12:11 -05:00
return errors . New ( "service does not use container tasks" )
}
2017-06-07 13:07:01 -04:00
encodedAuth := flags . EncodedRegistryAuth
if encodedAuth != "" {
newCtnr . PullOptions = & swarmapi . ContainerSpec_PullOptions { RegistryAuth : encodedAuth }
} else {
// this is needed because if the encodedAuth isn't being updated then we
// shouldn't lose it, and continue to use the one that was already present
var ctnr * swarmapi . ContainerSpec
switch flags . RegistryAuthFrom {
case apitypes . RegistryAuthFromSpec , "" :
ctnr = currentService . Spec . Task . GetContainer ( )
case apitypes . RegistryAuthFromPreviousSpec :
if currentService . PreviousSpec == nil {
return errors . New ( "service does not have a previous spec" )
}
ctnr = currentService . PreviousSpec . Task . GetContainer ( )
default :
return errors . New ( "unsupported registryAuthFrom value" )
}
if ctnr == nil {
return errors . New ( "service does not use container tasks" )
}
newCtnr . PullOptions = ctnr . PullOptions
// update encodedAuth so it can be used to pin image by digest
if ctnr . PullOptions != nil {
encodedAuth = ctnr . PullOptions . RegistryAuth
}
2017-02-11 13:40:14 -05:00
}
2017-06-07 13:07:01 -04:00
// retrieve auth config from encoded auth
authConfig := & apitypes . AuthConfig { }
if encodedAuth != "" {
if err := json . NewDecoder ( base64 . NewDecoder ( base64 . URLEncoding , strings . NewReader ( encodedAuth ) ) ) . Decode ( authConfig ) ; err != nil {
logrus . Warnf ( "invalid authconfig: %v" , err )
}
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
2017-06-07 13:07:01 -04:00
// pin image by digest for API versions < 1.30
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
// should be removed in the future. Since integration tests only use the
// latest API version, so this is no longer required.
if os . Getenv ( "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" ) != "1" && queryRegistry {
digestImage , err := c . imageWithDigestString ( ctx , newCtnr . Image , authConfig )
if err != nil {
logrus . Warnf ( "unable to pin image %s to digest: %s" , newCtnr . Image , err . Error ( ) )
// warning in the client response should be concise
resp . Warnings = append ( resp . Warnings , digestWarning ( newCtnr . Image ) )
} else if newCtnr . Image != digestImage {
logrus . Debugf ( "pinning image %s by digest: %s" , newCtnr . Image , digestImage )
newCtnr . Image = digestImage
} else {
logrus . Debugf ( "updating service using supplied digest reference %s" , newCtnr . Image )
}
2017-02-28 05:12:11 -05:00
2017-06-07 13:07:01 -04:00
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to update a service
// if the registry is slow or unresponsive.
var cancel func ( )
ctx , cancel = c . getRequestContext ( )
defer cancel ( )
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
}
2017-02-16 12:27:01 -05:00
var rollback swarmapi . UpdateServiceRequest_Rollback
switch flags . Rollback {
case "" , "none" :
rollback = swarmapi . UpdateServiceRequest_NONE
case "previous" :
rollback = swarmapi . UpdateServiceRequest_PREVIOUS
default :
return fmt . Errorf ( "unrecognized rollback option %s" , flags . Rollback )
}
2017-02-28 05:12:11 -05:00
_ , err = state . controlClient . UpdateService (
ctx ,
& swarmapi . UpdateServiceRequest {
ServiceID : currentService . ID ,
Spec : & serviceSpec ,
ServiceVersion : & swarmapi . Version {
Index : version ,
} ,
2017-02-16 12:27:01 -05:00
Rollback : rollback ,
2017-02-11 13:40:14 -05:00
} ,
2017-02-28 05:12:11 -05:00
)
return err
} )
2017-02-11 13:40:14 -05:00
return resp , err
}
// RemoveService removes a service from a managed swarm cluster.
func ( c * Cluster ) RemoveService ( input string ) error {
2017-02-28 05:12:11 -05:00
return c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
2017-03-30 20:15:54 -04:00
service , err := getService ( ctx , state . controlClient , input , false )
2017-02-28 05:12:11 -05:00
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
_ , err = state . controlClient . RemoveService ( ctx , & swarmapi . RemoveServiceRequest { ServiceID : service . ID } )
2017-02-11 13:40:14 -05:00
return err
2017-02-28 05:12:11 -05:00
} )
2017-02-11 13:40:14 -05:00
}
// ServiceLogs collects service logs and writes them back to `config.OutStream`
2017-03-20 13:07:04 -04:00
func ( c * Cluster ) ServiceLogs ( ctx context . Context , selector * backend . LogSelector , config * apitypes . ContainerLogsOptions ) ( <- chan * backend . LogMessage , error ) {
2017-02-11 13:40:14 -05:00
c . mu . RLock ( )
2017-03-20 13:07:04 -04:00
defer c . mu . RUnlock ( )
2017-02-11 13:40:14 -05:00
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
2017-03-20 13:07:04 -04:00
return nil , c . errNoManager ( state )
2017-02-11 13:40:14 -05:00
}
2017-03-20 13:07:04 -04:00
swarmSelector , err := convertSelector ( ctx , state . controlClient , selector )
2017-02-11 13:40:14 -05:00
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , errors . Wrap ( err , "error making log selector" )
2017-03-08 21:00:41 -05:00
}
2017-02-11 13:40:14 -05:00
2017-02-23 18:09:09 -05:00
// set the streams we'll use
stdStreams := [ ] swarmapi . LogStream { }
2017-03-20 13:07:04 -04:00
if config . ShowStdout {
2017-02-23 18:09:09 -05:00
stdStreams = append ( stdStreams , swarmapi . LogStreamStdout )
}
2017-03-20 13:07:04 -04:00
if config . ShowStderr {
2017-02-23 18:09:09 -05:00
stdStreams = append ( stdStreams , swarmapi . LogStreamStderr )
}
2017-03-01 19:37:25 -05:00
// Get tail value squared away - the number of previous log lines we look at
var tail int64
2017-03-20 13:07:04 -04:00
// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
// it to -1 (all). i don't agree with that, but i also think no tail value
// should be legitimate. if you don't pass tail, we assume you want "all"
2017-03-21 14:35:55 -04:00
if config . Tail == "all" || config . Tail == "" {
2017-03-01 19:37:25 -05:00
// tail of 0 means send all logs on the swarmkit side
tail = 0
} else {
t , err := strconv . Atoi ( config . Tail )
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , errors . New ( "tail value must be a positive integer or \"all\"" )
2017-03-01 19:37:25 -05:00
}
if t < 0 {
2017-03-20 13:07:04 -04:00
return nil , errors . New ( "negative tail values not supported" )
2017-03-01 19:37:25 -05:00
}
// we actually use negative tail in swarmkit to represent messages
// backwards starting from the beginning. also, -1 means no logs. so,
// basically, for api compat with docker container logs, add one and
// flip the sign. we error above if you try to negative tail, which
// isn't supported by docker (and would error deeper in the stack
// anyway)
//
// See the logs protobuf for more information
tail = int64 ( - ( t + 1 ) )
}
// get the since value - the time in the past we're looking at logs starting from
var sinceProto * gogotypes . Timestamp
if config . Since != "" {
s , n , err := timetypes . ParseTimestamps ( config . Since , 0 )
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , errors . Wrap ( err , "could not parse since timestamp" )
2017-03-01 19:37:25 -05:00
}
since := time . Unix ( s , n )
sinceProto , err = gogotypes . TimestampProto ( since )
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , errors . Wrap ( err , "could not parse timestamp to proto" )
2017-03-01 19:37:25 -05:00
}
}
2017-02-11 13:40:14 -05:00
stream , err := state . logsClient . SubscribeLogs ( ctx , & swarmapi . SubscribeLogsRequest {
2017-03-21 14:35:55 -04:00
Selector : swarmSelector ,
2017-02-11 13:40:14 -05:00
Options : & swarmapi . LogSubscriptionOptions {
2017-02-23 18:09:09 -05:00
Follow : config . Follow ,
Streams : stdStreams ,
2017-03-01 19:37:25 -05:00
Tail : tail ,
Since : sinceProto ,
2017-02-11 13:40:14 -05:00
} ,
} )
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , err
2017-02-11 13:40:14 -05:00
}
2017-03-20 13:07:04 -04:00
messageChan := make ( chan * backend . LogMessage , 1 )
go func ( ) {
defer close ( messageChan )
for {
// Check the context before doing anything.
select {
case <- ctx . Done ( ) :
return
default :
}
subscribeMsg , err := stream . Recv ( )
if err == io . EOF {
return
}
// if we're not io.EOF, push the message in and return
if err != nil {
select {
case <- ctx . Done ( ) :
case messageChan <- & backend . LogMessage { Err : err } :
2017-02-11 13:40:14 -05:00
}
2017-03-20 13:07:04 -04:00
return
2017-02-11 13:40:14 -05:00
}
2017-03-20 13:07:04 -04:00
for _ , msg := range subscribeMsg . Messages {
// make a new message
m := new ( backend . LogMessage )
2017-07-18 22:01:20 -04:00
m . Attrs = make ( [ ] backend . LogAttr , 0 , len ( msg . Attrs ) + 3 )
2017-03-20 13:07:04 -04:00
// add the timestamp, adding the error if it fails
m . Timestamp , err = gogotypes . TimestampFromProto ( msg . Timestamp )
if err != nil {
m . Err = err
}
2017-07-18 22:01:20 -04:00
nodeKey := contextPrefix + ".node.id"
serviceKey := contextPrefix + ".service.id"
taskKey := contextPrefix + ".task.id"
2017-04-04 18:52:19 -04:00
// copy over all of the details
for _ , d := range msg . Attrs {
2017-07-18 22:01:20 -04:00
switch d . Key {
case nodeKey , serviceKey , taskKey :
// we have the final say over context details (in case there
// is a conflict (if the user added a detail with a context's
// key for some reason))
default :
m . Attrs = append ( m . Attrs , backend . LogAttr { Key : d . Key , Value : d . Value } )
}
2017-04-04 18:52:19 -04:00
}
2017-07-18 22:01:20 -04:00
m . Attrs = append ( m . Attrs ,
backend . LogAttr { Key : nodeKey , Value : msg . Context . NodeID } ,
backend . LogAttr { Key : serviceKey , Value : msg . Context . ServiceID } ,
backend . LogAttr { Key : taskKey , Value : msg . Context . TaskID } ,
)
2017-04-04 18:52:19 -04:00
2017-03-20 13:07:04 -04:00
switch msg . Stream {
case swarmapi . LogStreamStdout :
m . Source = "stdout"
case swarmapi . LogStreamStderr :
m . Source = "stderr"
}
m . Line = msg . Data
// there could be a case where the reader stops accepting
// messages and the context is canceled. we need to check that
// here, or otherwise we risk blocking forever on the message
// send.
select {
case <- ctx . Done ( ) :
return
case messageChan <- m :
}
2017-02-11 13:40:14 -05:00
}
}
2017-03-20 13:07:04 -04:00
} ( )
return messageChan , nil
2017-02-11 13:40:14 -05:00
}
2017-03-21 14:35:55 -04:00
// convertSelector takes a backend.LogSelector, which contains raw names that
// may or may not be valid, and converts them to an api.LogSelector proto. It
2017-03-20 13:07:04 -04:00
// returns an error if something fails
func convertSelector ( ctx context . Context , cc swarmapi . ControlClient , selector * backend . LogSelector ) ( * swarmapi . LogSelector , error ) {
2017-03-21 14:35:55 -04:00
// don't rely on swarmkit to resolve IDs, do it ourselves
swarmSelector := & swarmapi . LogSelector { }
for _ , s := range selector . Services {
2017-03-30 20:15:54 -04:00
service , err := getService ( ctx , cc , s , false )
2017-03-21 14:35:55 -04:00
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , err
2017-03-21 14:35:55 -04:00
}
c := service . Spec . Task . GetContainer ( )
if c == nil {
2017-03-20 13:07:04 -04:00
return nil , errors . New ( "logs only supported on container tasks" )
2017-03-21 14:35:55 -04:00
}
swarmSelector . ServiceIDs = append ( swarmSelector . ServiceIDs , service . ID )
}
for _ , t := range selector . Tasks {
task , err := getTask ( ctx , cc , t )
if err != nil {
2017-03-20 13:07:04 -04:00
return nil , err
2017-03-21 14:35:55 -04:00
}
c := task . Spec . GetContainer ( )
if c == nil {
2017-03-20 13:07:04 -04:00
return nil , errors . New ( "logs only supported on container tasks" )
2017-03-21 14:35:55 -04:00
}
swarmSelector . TaskIDs = append ( swarmSelector . TaskIDs , task . ID )
}
2017-03-20 13:07:04 -04:00
return swarmSelector , nil
2017-03-21 14:35:55 -04:00
}
2017-02-11 13:40:14 -05:00
// imageWithDigestString takes an image such as name or name:tag
// and returns the image pinned to a digest, such as name@sha256:34234
func ( c * Cluster ) imageWithDigestString ( ctx context . Context , image string , authConfig * apitypes . AuthConfig ) ( string , error ) {
ref , err := reference . ParseAnyReference ( image )
if err != nil {
return "" , err
}
namedRef , ok := ref . ( reference . Named )
if ! ok {
if _ , ok := ref . ( reference . Digested ) ; ok {
2017-03-09 17:36:45 -05:00
return image , nil
2017-02-11 13:40:14 -05:00
}
return "" , errors . Errorf ( "unknown image reference format: %s" , image )
}
// only query registry if not a canonical reference (i.e. with digest)
if _ , ok := namedRef . ( reference . Canonical ) ; ! ok {
namedRef = reference . TagNameOnly ( namedRef )
taggedRef , ok := namedRef . ( reference . NamedTagged )
if ! ok {
return "" , errors . Errorf ( "image reference not tagged: %s" , image )
}
2018-02-02 17:18:46 -05:00
repo , _ , err := c . config . ImageBackend . GetRepository ( ctx , taggedRef , authConfig )
2017-02-11 13:40:14 -05:00
if err != nil {
return "" , err
}
dscrptr , err := repo . Tags ( ctx ) . Get ( ctx , taggedRef . Tag ( ) )
if err != nil {
return "" , err
}
namedDigestedRef , err := reference . WithDigest ( taggedRef , dscrptr . Digest )
if err != nil {
return "" , err
}
// return familiar form until interface updated to return type
return reference . FamiliarString ( namedDigestedRef ) , nil
}
// reference already contains a digest, so just return it
return reference . FamiliarString ( ref ) , nil
}
2017-03-09 17:36:45 -05:00
// digestWarning constructs a formatted warning string
// using the image name that could not be pinned by digest. The
// formatting is hardcoded, but could me made smarter in the future
func digestWarning ( image string ) string {
return fmt . Sprintf ( "image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n" , image , image )
}