2017-02-11 13:40:14 -05:00
package cluster
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
2017-03-01 19:37:25 -05:00
"strconv"
2017-02-11 13:40:14 -05:00
"strings"
2017-03-01 19:37:25 -05:00
"time"
2017-02-11 13:40:14 -05:00
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference"
apierrors "github.com/docker/docker/api/errors"
apitypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
types "github.com/docker/docker/api/types/swarm"
2017-03-01 19:37:25 -05:00
timetypes "github.com/docker/docker/api/types/time"
2017-02-11 13:40:14 -05:00
"github.com/docker/docker/daemon/cluster/convert"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy"
2017-03-03 17:08:49 -05:00
runconfigopts "github.com/docker/docker/runconfig/opts"
2017-02-11 13:40:14 -05:00
swarmapi "github.com/docker/swarmkit/api"
gogotypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// GetServices returns all services of a managed swarm cluster.
func ( c * Cluster ) GetServices ( options apitypes . ServiceListOptions ) ( [ ] types . Service , error ) {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
return nil , c . errNoManager ( state )
}
2017-03-03 17:08:49 -05:00
// We move the accepted filter check here as "mode" filter
// is processed in the daemon, not in SwarmKit. So it might
// be good to have accepted file check in the same file as
// the filter processing (in the for loop below).
accepted := map [ string ] bool {
"name" : true ,
"id" : true ,
"label" : true ,
"mode" : true ,
}
if err := options . Filters . Validate ( accepted ) ; err != nil {
2017-02-11 13:40:14 -05:00
return nil , err
}
2017-03-03 17:08:49 -05:00
filters := & swarmapi . ListServicesRequest_Filters {
NamePrefixes : options . Filters . Get ( "name" ) ,
IDPrefixes : options . Filters . Get ( "id" ) ,
Labels : runconfigopts . ConvertKVStringsToMap ( options . Filters . Get ( "label" ) ) ,
}
2017-02-11 13:40:14 -05:00
ctx , cancel := c . getRequestContext ( )
defer cancel ( )
r , err := state . controlClient . ListServices (
ctx ,
& swarmapi . ListServicesRequest { Filters : filters } )
if err != nil {
return nil , err
}
services := [ ] types . Service { }
for _ , service := range r . Services {
2017-03-03 17:08:49 -05:00
if options . Filters . Include ( "mode" ) {
var mode string
switch service . Spec . GetMode ( ) . ( type ) {
case * swarmapi . ServiceSpec_Global :
mode = "global"
case * swarmapi . ServiceSpec_Replicated :
mode = "replicated"
}
if ! options . Filters . ExactMatch ( "mode" , mode ) {
continue
}
}
2017-02-11 13:40:14 -05:00
services = append ( services , convert . ServiceFromGRPC ( * service ) )
}
return services , nil
}
// GetService returns a service based on an ID or name.
func ( c * Cluster ) GetService ( input string ) ( types . Service , error ) {
2017-02-28 05:12:11 -05:00
var service * swarmapi . Service
if err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
s , err := getService ( ctx , state . controlClient , input )
if err != nil {
return err
}
service = s
return nil
} ) ; err != nil {
2017-02-11 13:40:14 -05:00
return types . Service { } , err
}
return convert . ServiceFromGRPC ( * service ) , nil
}
// CreateService creates a new service in a managed swarm cluster.
func ( c * Cluster ) CreateService ( s types . ServiceSpec , encodedAuth string ) ( * apitypes . ServiceCreateResponse , error ) {
2017-02-28 05:12:11 -05:00
var resp * apitypes . ServiceCreateResponse
err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
err := c . populateNetworkID ( ctx , state . controlClient , & s )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
serviceSpec , err := convert . ServiceSpecToGRPC ( s )
if err != nil {
return apierrors . NewBadRequestError ( err )
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
ctnr := serviceSpec . Task . GetContainer ( )
if ctnr == nil {
return errors . New ( "service does not use container tasks" )
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
if encodedAuth != "" {
ctnr . PullOptions = & swarmapi . ContainerSpec_PullOptions { RegistryAuth : encodedAuth }
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
// retrieve auth config from encoded auth
authConfig := & apitypes . AuthConfig { }
if encodedAuth != "" {
if err := json . NewDecoder ( base64 . NewDecoder ( base64 . URLEncoding , strings . NewReader ( encodedAuth ) ) ) . Decode ( authConfig ) ; err != nil {
logrus . Warnf ( "invalid authconfig: %v" , err )
}
2017-02-11 13:40:14 -05:00
}
2017-02-28 05:12:11 -05:00
resp = & apitypes . ServiceCreateResponse { }
// pin image by digest
if os . Getenv ( "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" ) != "1" {
digestImage , err := c . imageWithDigestString ( ctx , ctnr . Image , authConfig )
if err != nil {
logrus . Warnf ( "unable to pin image %s to digest: %s" , ctnr . Image , err . Error ( ) )
2017-03-09 17:36:45 -05:00
// warning in the client response should be concise
resp . Warnings = append ( resp . Warnings , digestWarning ( ctnr . Image ) )
2017-02-28 05:12:11 -05:00
} else if ctnr . Image != digestImage {
logrus . Debugf ( "pinning image %s by digest: %s" , ctnr . Image , digestImage )
ctnr . Image = digestImage
} else {
logrus . Debugf ( "creating service using supplied digest reference %s" , ctnr . Image )
}
2017-03-06 19:05:56 -05:00
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to create a service
// if the registry is slow or unresponsive.
var cancel func ( )
ctx , cancel = c . getRequestContext ( )
defer cancel ( )
2017-02-28 05:12:11 -05:00
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
r , err := state . controlClient . CreateService ( ctx , & swarmapi . CreateServiceRequest { Spec : & serviceSpec } )
2017-02-11 13:40:14 -05:00
if err != nil {
2017-02-28 05:12:11 -05:00
return err
2017-02-11 13:40:14 -05:00
}
2017-02-28 05:12:11 -05:00
resp . ID = r . Service . ID
return nil
} )
return resp , err
2017-02-11 13:40:14 -05:00
}
// UpdateService updates existing service to match new properties.
2017-02-16 12:27:01 -05:00
func ( c * Cluster ) UpdateService ( serviceIDOrName string , version uint64 , spec types . ServiceSpec , flags apitypes . ServiceUpdateOptions ) ( * apitypes . ServiceUpdateResponse , error ) {
2017-02-28 05:12:11 -05:00
var resp * apitypes . ServiceUpdateResponse
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
err := c . populateNetworkID ( ctx , state . controlClient , & spec )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
serviceSpec , err := convert . ServiceSpecToGRPC ( spec )
if err != nil {
return apierrors . NewBadRequestError ( err )
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
currentService , err := getService ( ctx , state . controlClient , serviceIDOrName )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
newCtnr := serviceSpec . Task . GetContainer ( )
if newCtnr == nil {
return errors . New ( "service does not use container tasks" )
}
2017-02-11 13:40:14 -05:00
2017-02-16 12:27:01 -05:00
encodedAuth := flags . EncodedRegistryAuth
2017-02-28 05:12:11 -05:00
if encodedAuth != "" {
newCtnr . PullOptions = & swarmapi . ContainerSpec_PullOptions { RegistryAuth : encodedAuth }
} else {
// this is needed because if the encodedAuth isn't being updated then we
// shouldn't lose it, and continue to use the one that was already present
var ctnr * swarmapi . ContainerSpec
2017-02-16 12:27:01 -05:00
switch flags . RegistryAuthFrom {
2017-02-28 05:12:11 -05:00
case apitypes . RegistryAuthFromSpec , "" :
ctnr = currentService . Spec . Task . GetContainer ( )
case apitypes . RegistryAuthFromPreviousSpec :
if currentService . PreviousSpec == nil {
return errors . New ( "service does not have a previous spec" )
}
ctnr = currentService . PreviousSpec . Task . GetContainer ( )
default :
return errors . New ( "unsupported registryAuthFrom value" )
}
if ctnr == nil {
return errors . New ( "service does not use container tasks" )
}
newCtnr . PullOptions = ctnr . PullOptions
// update encodedAuth so it can be used to pin image by digest
if ctnr . PullOptions != nil {
encodedAuth = ctnr . PullOptions . RegistryAuth
2017-02-11 13:40:14 -05:00
}
}
2017-02-28 05:12:11 -05:00
// retrieve auth config from encoded auth
authConfig := & apitypes . AuthConfig { }
if encodedAuth != "" {
if err := json . NewDecoder ( base64 . NewDecoder ( base64 . URLEncoding , strings . NewReader ( encodedAuth ) ) ) . Decode ( authConfig ) ; err != nil {
logrus . Warnf ( "invalid authconfig: %v" , err )
}
2017-02-11 13:40:14 -05:00
}
2017-03-09 20:58:12 -05:00
resp = & apitypes . ServiceUpdateResponse { }
2017-02-28 05:12:11 -05:00
// pin image by digest
if os . Getenv ( "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" ) != "1" {
digestImage , err := c . imageWithDigestString ( ctx , newCtnr . Image , authConfig )
if err != nil {
logrus . Warnf ( "unable to pin image %s to digest: %s" , newCtnr . Image , err . Error ( ) )
2017-03-09 17:36:45 -05:00
// warning in the client response should be concise
resp . Warnings = append ( resp . Warnings , digestWarning ( newCtnr . Image ) )
2017-02-28 05:12:11 -05:00
} else if newCtnr . Image != digestImage {
logrus . Debugf ( "pinning image %s by digest: %s" , newCtnr . Image , digestImage )
newCtnr . Image = digestImage
} else {
logrus . Debugf ( "updating service using supplied digest reference %s" , newCtnr . Image )
}
2017-03-06 19:05:56 -05:00
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to update a service
// if the registry is slow or unresponsive.
var cancel func ( )
ctx , cancel = c . getRequestContext ( )
defer cancel ( )
2017-02-11 13:40:14 -05:00
}
2017-02-16 12:27:01 -05:00
var rollback swarmapi . UpdateServiceRequest_Rollback
switch flags . Rollback {
case "" , "none" :
rollback = swarmapi . UpdateServiceRequest_NONE
case "previous" :
rollback = swarmapi . UpdateServiceRequest_PREVIOUS
default :
return fmt . Errorf ( "unrecognized rollback option %s" , flags . Rollback )
}
2017-02-28 05:12:11 -05:00
_ , err = state . controlClient . UpdateService (
ctx ,
& swarmapi . UpdateServiceRequest {
ServiceID : currentService . ID ,
Spec : & serviceSpec ,
ServiceVersion : & swarmapi . Version {
Index : version ,
} ,
2017-02-16 12:27:01 -05:00
Rollback : rollback ,
2017-02-11 13:40:14 -05:00
} ,
2017-02-28 05:12:11 -05:00
)
return err
} )
2017-02-11 13:40:14 -05:00
return resp , err
}
// RemoveService removes a service from a managed swarm cluster.
func ( c * Cluster ) RemoveService ( input string ) error {
2017-02-28 05:12:11 -05:00
return c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
service , err := getService ( ctx , state . controlClient , input )
if err != nil {
return err
}
2017-02-11 13:40:14 -05:00
2017-02-28 05:12:11 -05:00
_ , err = state . controlClient . RemoveService ( ctx , & swarmapi . RemoveServiceRequest { ServiceID : service . ID } )
2017-02-11 13:40:14 -05:00
return err
2017-02-28 05:12:11 -05:00
} )
2017-02-11 13:40:14 -05:00
}
// ServiceLogs collects service logs and writes them back to `config.OutStream`
func ( c * Cluster ) ServiceLogs ( ctx context . Context , input string , config * backend . ContainerLogsConfig , started chan struct { } ) error {
c . mu . RLock ( )
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
c . mu . RUnlock ( )
return c . errNoManager ( state )
}
service , err := getService ( ctx , state . controlClient , input )
if err != nil {
c . mu . RUnlock ( )
return err
}
2017-03-08 21:00:41 -05:00
container := service . Spec . Task . GetContainer ( )
if container == nil {
return errors . New ( "service logs only supported for container tasks" )
}
if container . TTY {
return errors . New ( "service logs not supported on tasks with a TTY attached" )
}
2017-02-11 13:40:14 -05:00
2017-02-23 18:09:09 -05:00
// set the streams we'll use
stdStreams := [ ] swarmapi . LogStream { }
if config . ContainerLogsOptions . ShowStdout {
stdStreams = append ( stdStreams , swarmapi . LogStreamStdout )
}
if config . ContainerLogsOptions . ShowStderr {
stdStreams = append ( stdStreams , swarmapi . LogStreamStderr )
}
2017-03-01 19:37:25 -05:00
// Get tail value squared away - the number of previous log lines we look at
var tail int64
if config . Tail == "all" {
// tail of 0 means send all logs on the swarmkit side
tail = 0
} else {
t , err := strconv . Atoi ( config . Tail )
if err != nil {
return errors . New ( "tail value must be a positive integer or \"all\"" )
}
if t < 0 {
return errors . New ( "negative tail values not supported" )
}
// we actually use negative tail in swarmkit to represent messages
// backwards starting from the beginning. also, -1 means no logs. so,
// basically, for api compat with docker container logs, add one and
// flip the sign. we error above if you try to negative tail, which
// isn't supported by docker (and would error deeper in the stack
// anyway)
//
// See the logs protobuf for more information
tail = int64 ( - ( t + 1 ) )
}
// get the since value - the time in the past we're looking at logs starting from
var sinceProto * gogotypes . Timestamp
if config . Since != "" {
s , n , err := timetypes . ParseTimestamps ( config . Since , 0 )
if err != nil {
return errors . Wrap ( err , "could not parse since timestamp" )
}
since := time . Unix ( s , n )
sinceProto , err = gogotypes . TimestampProto ( since )
if err != nil {
return errors . Wrap ( err , "could not parse timestamp to proto" )
}
}
2017-02-11 13:40:14 -05:00
stream , err := state . logsClient . SubscribeLogs ( ctx , & swarmapi . SubscribeLogsRequest {
Selector : & swarmapi . LogSelector {
ServiceIDs : [ ] string { service . ID } ,
} ,
Options : & swarmapi . LogSubscriptionOptions {
2017-02-23 18:09:09 -05:00
Follow : config . Follow ,
Streams : stdStreams ,
2017-03-01 19:37:25 -05:00
Tail : tail ,
Since : sinceProto ,
2017-02-11 13:40:14 -05:00
} ,
} )
if err != nil {
c . mu . RUnlock ( )
return err
}
wf := ioutils . NewWriteFlusher ( config . OutStream )
defer wf . Close ( )
close ( started )
wf . Flush ( )
outStream := stdcopy . NewStdWriter ( wf , stdcopy . Stdout )
errStream := stdcopy . NewStdWriter ( wf , stdcopy . Stderr )
// Release the lock before starting the stream.
c . mu . RUnlock ( )
for {
// Check the context before doing anything.
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
subscribeMsg , err := stream . Recv ( )
if err == io . EOF {
return nil
}
if err != nil {
return err
}
for _ , msg := range subscribeMsg . Messages {
data := [ ] byte { }
if config . Timestamps {
ts , err := gogotypes . TimestampFromProto ( msg . Timestamp )
if err != nil {
return err
}
data = append ( data , [ ] byte ( ts . Format ( logger . TimeFormat ) + " " ) ... )
}
data = append ( data , [ ] byte ( fmt . Sprintf ( "%s.node.id=%s,%s.service.id=%s,%s.task.id=%s " ,
contextPrefix , msg . Context . NodeID ,
contextPrefix , msg . Context . ServiceID ,
contextPrefix , msg . Context . TaskID ,
) ) ... )
data = append ( data , msg . Data ... )
switch msg . Stream {
case swarmapi . LogStreamStdout :
outStream . Write ( data )
case swarmapi . LogStreamStderr :
errStream . Write ( data )
}
}
}
}
// imageWithDigestString takes an image such as name or name:tag
// and returns the image pinned to a digest, such as name@sha256:34234
func ( c * Cluster ) imageWithDigestString ( ctx context . Context , image string , authConfig * apitypes . AuthConfig ) ( string , error ) {
ref , err := reference . ParseAnyReference ( image )
if err != nil {
return "" , err
}
namedRef , ok := ref . ( reference . Named )
if ! ok {
if _ , ok := ref . ( reference . Digested ) ; ok {
2017-03-09 17:36:45 -05:00
return image , nil
2017-02-11 13:40:14 -05:00
}
return "" , errors . Errorf ( "unknown image reference format: %s" , image )
}
// only query registry if not a canonical reference (i.e. with digest)
if _ , ok := namedRef . ( reference . Canonical ) ; ! ok {
namedRef = reference . TagNameOnly ( namedRef )
taggedRef , ok := namedRef . ( reference . NamedTagged )
if ! ok {
return "" , errors . Errorf ( "image reference not tagged: %s" , image )
}
repo , _ , err := c . config . Backend . GetRepository ( ctx , taggedRef , authConfig )
if err != nil {
return "" , err
}
dscrptr , err := repo . Tags ( ctx ) . Get ( ctx , taggedRef . Tag ( ) )
if err != nil {
return "" , err
}
namedDigestedRef , err := reference . WithDigest ( taggedRef , dscrptr . Digest )
if err != nil {
return "" , err
}
// return familiar form until interface updated to return type
return reference . FamiliarString ( namedDigestedRef ) , nil
}
// reference already contains a digest, so just return it
return reference . FamiliarString ( ref ) , nil
}
2017-03-09 17:36:45 -05:00
// digestWarning constructs a formatted warning string
// using the image name that could not be pinned by digest. The
// formatting is hardcoded, but could me made smarter in the future
func digestWarning ( image string ) string {
return fmt . Sprintf ( "image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n" , image , image )
}