1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #34061 from cyli/re-vendor-swarmkit

Re-vendor swarmkit.
This commit is contained in:
Sebastiaan van Stijn 2017-07-11 20:30:56 -07:00 committed by GitHub
commit c0e6da7637
34 changed files with 1155 additions and 734 deletions

View file

@ -77,7 +77,7 @@ func (c *containerConfig) setTask(t *api.Task) error {
c.task = t
if t.Spec.GetContainer() != nil {
preparedSpec, err := template.ExpandContainerSpec(t)
preparedSpec, err := template.ExpandContainerSpec(nil, t)
if err != nil {
return err
}

View file

@ -106,7 +106,7 @@ github.com/stevvooe/continuity cd7a8e21e2b6f84799f5dd4b65faf49c8d3ee02d
github.com/tonistiigi/fsutil 0ac4c11b053b9c5c7c47558f81f96c7100ce50fb
# cluster
github.com/docker/swarmkit 79381d0840be27f8b3f5c667b348a4467d866eeb
github.com/docker/swarmkit a3d96fe13e30e46c3d4cfc3f316ebdd8446a079d
github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

View file

@ -35,7 +35,7 @@ func GetTask(tx *bolt.Tx, id string) (*api.Task, error) {
var t api.Task
if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p := bkt.Get([]byte("data"))
p := bkt.Get(bucketKeyData)
if p == nil {
return errTaskUnknown
}
@ -136,7 +136,7 @@ func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
if err != nil {
return err
}
return bkt.Put([]byte("status"), p)
return bkt.Put(bucketKeyStatus, p)
})
}
@ -154,9 +154,9 @@ func DeleteTask(tx *bolt.Tx, id string) error {
func SetTaskAssignment(tx *bolt.Tx, id string, assigned bool) error {
return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
if assigned {
return bkt.Put([]byte("assigned"), []byte{0xFF})
return bkt.Put(bucketKeyAssigned, []byte{0xFF})
}
return bkt.Delete([]byte("assigned"))
return bkt.Delete(bucketKeyAssigned)
})
}

View file

@ -18,8 +18,8 @@ func NewSet(key string, vals ...string) []*api.GenericResource {
// NewString creates a String resource
func NewString(key, val string) *api.GenericResource {
return &api.GenericResource{
Resource: &api.GenericResource_Str{
Str: &api.GenericString{
Resource: &api.GenericResource_NamedResourceSpec{
NamedResourceSpec: &api.NamedGenericResource{
Kind: key,
Value: val,
},
@ -30,8 +30,8 @@ func NewString(key, val string) *api.GenericResource {
// NewDiscrete creates a Discrete resource
func NewDiscrete(key string, val int64) *api.GenericResource {
return &api.GenericResource{
Resource: &api.GenericResource_Discrete{
Discrete: &api.GenericDiscrete{
Resource: &api.GenericResource_DiscreteResourceSpec{
DiscreteResourceSpec: &api.DiscreteGenericResource{
Kind: key,
Value: val,
},
@ -86,21 +86,21 @@ loop:
// Returns true if the element is to be removed from the list
func remove(na, r *api.GenericResource) bool {
switch tr := r.Resource.(type) {
case *api.GenericResource_Discrete:
if na.GetDiscrete() == nil {
case *api.GenericResource_DiscreteResourceSpec:
if na.GetDiscreteResourceSpec() == nil {
return false // Type change, ignore
}
na.GetDiscrete().Value -= tr.Discrete.Value
if na.GetDiscrete().Value <= 0 {
na.GetDiscreteResourceSpec().Value -= tr.DiscreteResourceSpec.Value
if na.GetDiscreteResourceSpec().Value <= 0 {
return true
}
case *api.GenericResource_Str:
if na.GetStr() == nil {
case *api.GenericResource_NamedResourceSpec:
if na.GetNamedResourceSpec() == nil {
return false // Type change, ignore
}
if tr.Str.Value != na.GetStr().Value {
if tr.NamedResourceSpec.Value != na.GetNamedResourceSpec().Value {
return false // not the right item, ignore
}

View file

@ -12,7 +12,7 @@ func Claim(nodeAvailableResources, taskAssigned *[]*api.GenericResource,
var resSelected []*api.GenericResource
for _, res := range taskReservations {
tr := res.GetDiscrete()
tr := res.GetDiscreteResourceSpec()
if tr == nil {
return fmt.Errorf("task should only hold Discrete type")
}
@ -39,7 +39,7 @@ func ClaimResources(nodeAvailableResources, taskAssigned *[]*api.GenericResource
}
func selectNodeResources(nodeRes []*api.GenericResource,
tr *api.GenericDiscrete) ([]*api.GenericResource, error) {
tr *api.DiscreteGenericResource) ([]*api.GenericResource, error) {
var nrs []*api.GenericResource
for _, res := range nodeRes {
@ -48,13 +48,13 @@ func selectNodeResources(nodeRes []*api.GenericResource,
}
switch nr := res.Resource.(type) {
case *api.GenericResource_Discrete:
if nr.Discrete.Value >= tr.Value && tr.Value != 0 {
case *api.GenericResource_DiscreteResourceSpec:
if nr.DiscreteResourceSpec.Value >= tr.Value && tr.Value != 0 {
nrs = append(nrs, NewDiscrete(tr.Kind, tr.Value))
}
return nrs, nil
case *api.GenericResource_Str:
case *api.GenericResource_NamedResourceSpec:
nrs = append(nrs, res.Copy())
if int64(len(nrs)) == tr.Value {
@ -90,8 +90,8 @@ func reclaimResources(nodeAvailableResources *[]*api.GenericResource, taskAssign
for _, res := range taskAssigned {
switch tr := res.Resource.(type) {
case *api.GenericResource_Discrete:
nrs := GetResource(tr.Discrete.Kind, *nodeAvailableResources)
case *api.GenericResource_DiscreteResourceSpec:
nrs := GetResource(tr.DiscreteResourceSpec.Kind, *nodeAvailableResources)
// If the resource went down to 0 it's no longer in the
// available list
@ -103,13 +103,13 @@ func reclaimResources(nodeAvailableResources *[]*api.GenericResource, taskAssign
continue // Type change
}
nr := nrs[0].GetDiscrete()
nr := nrs[0].GetDiscreteResourceSpec()
if nr == nil {
continue // Type change
}
nr.Value += tr.Discrete.Value
case *api.GenericResource_Str:
nr.Value += tr.DiscreteResourceSpec.Value
case *api.GenericResource_NamedResourceSpec:
*nodeAvailableResources = append(*nodeAvailableResources, res.Copy())
}
}
@ -157,8 +157,8 @@ func sanitize(nodeRes []*api.GenericResource, nodeAvailableResources *[]*api.Gen
// Returns false if the element isn't in nodeRes and "sane" and the element(s) that should be replacing it
func sanitizeResource(nodeRes []*api.GenericResource, res *api.GenericResource) (ok bool, nrs []*api.GenericResource) {
switch na := res.Resource.(type) {
case *api.GenericResource_Discrete:
nrs := GetResource(na.Discrete.Kind, nodeRes)
case *api.GenericResource_DiscreteResourceSpec:
nrs := GetResource(na.DiscreteResourceSpec.Kind, nodeRes)
// Type change or removed: reset
if len(nrs) != 1 {
@ -166,17 +166,17 @@ func sanitizeResource(nodeRes []*api.GenericResource, res *api.GenericResource)
}
// Type change: reset
nr := nrs[0].GetDiscrete()
nr := nrs[0].GetDiscreteResourceSpec()
if nr == nil {
return false, nrs
}
// Amount change: reset
if na.Discrete.Value > nr.Value {
if na.DiscreteResourceSpec.Value > nr.Value {
return false, nrs
}
case *api.GenericResource_Str:
nrs := GetResource(na.Str.Kind, nodeRes)
case *api.GenericResource_NamedResourceSpec:
nrs := GetResource(na.NamedResourceSpec.Kind, nodeRes)
// Type change
if len(nrs) == 0 {
@ -185,11 +185,11 @@ func sanitizeResource(nodeRes []*api.GenericResource, res *api.GenericResource)
for _, nr := range nrs {
// Type change: reset
if nr.GetDiscrete() != nil {
if nr.GetDiscreteResourceSpec() != nil {
return false, nrs
}
if na.Str.Value == nr.GetStr().Value {
if na.NamedResourceSpec.Value == nr.GetNamedResourceSpec().Value {
return true, nil
}
}

View file

@ -7,17 +7,17 @@ import (
"github.com/docker/swarmkit/api"
)
func discreteToString(d *api.GenericResource_Discrete) string {
return strconv.FormatInt(d.Discrete.Value, 10)
func discreteToString(d *api.GenericResource_DiscreteResourceSpec) string {
return strconv.FormatInt(d.DiscreteResourceSpec.Value, 10)
}
// Kind returns the kind key as a string
func Kind(res *api.GenericResource) string {
switch r := res.Resource.(type) {
case *api.GenericResource_Discrete:
return r.Discrete.Kind
case *api.GenericResource_Str:
return r.Str.Kind
case *api.GenericResource_DiscreteResourceSpec:
return r.DiscreteResourceSpec.Kind
case *api.GenericResource_NamedResourceSpec:
return r.NamedResourceSpec.Kind
}
return ""
@ -26,10 +26,10 @@ func Kind(res *api.GenericResource) string {
// Value returns the value key as a string
func Value(res *api.GenericResource) string {
switch res := res.Resource.(type) {
case *api.GenericResource_Discrete:
case *api.GenericResource_DiscreteResourceSpec:
return discreteToString(res)
case *api.GenericResource_Str:
return res.Str.Value
case *api.GenericResource_NamedResourceSpec:
return res.NamedResourceSpec.Value
}
return ""

View file

@ -9,7 +9,7 @@ import (
// for generic resources
func ValidateTask(resources *api.Resources) error {
for _, v := range resources.Generic {
if v.GetDiscrete() != nil {
if v.GetDiscreteResourceSpec() != nil {
continue
}
@ -21,7 +21,7 @@ func ValidateTask(resources *api.Resources) error {
// HasEnough returns true if node can satisfy the task's GenericResource request
func HasEnough(nodeRes []*api.GenericResource, taskRes *api.GenericResource) (bool, error) {
t := taskRes.GetDiscrete()
t := taskRes.GetDiscreteResourceSpec()
if t == nil {
return false, fmt.Errorf("task should only hold Discrete type")
}
@ -36,11 +36,11 @@ func HasEnough(nodeRes []*api.GenericResource, taskRes *api.GenericResource) (bo
}
switch nr := nrs[0].Resource.(type) {
case *api.GenericResource_Discrete:
if t.Value > nr.Discrete.Value {
case *api.GenericResource_DiscreteResourceSpec:
if t.Value > nr.DiscreteResourceSpec.Value {
return false, nil
}
case *api.GenericResource_Str:
case *api.GenericResource_NamedResourceSpec:
if t.Value > int64(len(nrs)) {
return false, nil
}
@ -57,22 +57,22 @@ func HasResource(res *api.GenericResource, resources []*api.GenericResource) boo
}
switch rtype := r.Resource.(type) {
case *api.GenericResource_Discrete:
if res.GetDiscrete() == nil {
case *api.GenericResource_DiscreteResourceSpec:
if res.GetDiscreteResourceSpec() == nil {
return false
}
if res.GetDiscrete().Value < rtype.Discrete.Value {
if res.GetDiscreteResourceSpec().Value < rtype.DiscreteResourceSpec.Value {
return false
}
return true
case *api.GenericResource_Str:
if res.GetStr() == nil {
case *api.GenericResource_NamedResourceSpec:
if res.GetNamedResourceSpec() == nil {
return false
}
if res.GetStr().Value != rtype.Str.Value {
if res.GetNamedResourceSpec().Value != rtype.NamedResourceSpec.Value {
continue
}

File diff suppressed because it is too large Load diff

View file

@ -30,20 +30,30 @@ message Annotations {
repeated IndexEntry indices = 4 [(gogoproto.nullable) = false];
}
message GenericString {
// NamedGenericResource represents a "user defined" resource which is defined
// as a string.
// "Kind" is used to describe the Kind of a resource (e.g: "GPU", "FPGA", "SSD", ...)
// Value is used to identify the resource (GPU="UUID-1", FPGA="/dev/sdb5", ...)
message NamedGenericResource {
string kind = 1;
string value = 2;
}
message GenericDiscrete {
// DiscreteGenericResource represents a "user defined" resource which is defined
// as an integer
// "Kind" is used to describe the Kind of a resource (e.g: "GPU", "FPGA", "SSD", ...)
// Value is used to count the resource (SSD=5, HDD=3, ...)
message DiscreteGenericResource {
string kind = 1;
int64 value = 2;
}
// GenericResource represents a "user defined" resource which can
// be either an integer (e.g: SSD=3) or a string (e.g: SSD=sda1)
message GenericResource {
oneof resource {
GenericString str = 1;
GenericDiscrete discrete = 2;
NamedGenericResource named_resource_spec = 1;
DiscreteGenericResource discrete_resource_spec = 2;
}
}

View file

@ -14,6 +14,7 @@ import (
"github.com/Sirupsen/logrus"
cfconfig "github.com/cloudflare/cfssl/config"
events "github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/identity"
@ -123,11 +124,11 @@ func validateRootCAAndTLSCert(rootCA *RootCA, externalCARootPool *x509.CertPool,
}
// NewSecurityConfig initializes and returns a new SecurityConfig.
func NewSecurityConfig(rootCA *RootCA, krw *KeyReadWriter, tlsKeyPair *tls.Certificate, issuerInfo *IssuerInfo) (*SecurityConfig, error) {
func NewSecurityConfig(rootCA *RootCA, krw *KeyReadWriter, tlsKeyPair *tls.Certificate, issuerInfo *IssuerInfo) (*SecurityConfig, func() error, error) {
// Create the Server TLS Credentials for this node. These will not be used by workers.
serverTLSCreds, err := rootCA.NewServerTLSCredentials(tlsKeyPair)
if err != nil {
return nil, err
return nil, nil, err
}
// Create a TLSConfig to be used when this node connects as a client to another remote node.
@ -135,7 +136,7 @@ func NewSecurityConfig(rootCA *RootCA, krw *KeyReadWriter, tlsKeyPair *tls.Certi
// and managers always connect to remote managers.
clientTLSCreds, err := rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole)
if err != nil {
return nil, err
return nil, nil, err
}
// Make a new TLS config for the external CA client without a
@ -146,18 +147,21 @@ func NewSecurityConfig(rootCA *RootCA, krw *KeyReadWriter, tlsKeyPair *tls.Certi
MinVersion: tls.VersionTLS12,
}
q := watch.NewQueue()
return &SecurityConfig{
rootCA: rootCA,
keyReadWriter: krw,
certificate: tlsKeyPair,
issuerInfo: issuerInfo,
queue: q,
externalCA: NewExternalCA(rootCA, externalCATLSConfig),
ClientTLSCreds: clientTLSCreds,
ServerTLSCreds: serverTLSCreds,
externalCAClientRootPool: rootCA.Pool,
}, nil
}, q.Close, nil
}
// RootCA returns the root CA.
@ -200,11 +204,9 @@ func (s *SecurityConfig) UpdateRootCA(rootCA *RootCA, externalCARootPool *x509.C
return s.updateTLSCredentials(s.certificate, s.issuerInfo)
}
// SetWatch allows you to set a watch on the security config, in order to be notified of any changes
func (s *SecurityConfig) SetWatch(q *watch.Queue) {
s.mu.Lock()
defer s.mu.Unlock()
s.queue = q
// Watch allows you to set a watch on the security config, in order to be notified of any changes
func (s *SecurityConfig) Watch() (chan events.Event, func()) {
return s.queue.Watch()
}
// IssuerInfo returns the issuer subject and issuer public key
@ -382,7 +384,7 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, connBrok
// LoadSecurityConfig loads TLS credentials from disk, or returns an error if
// these credentials do not exist or are unusable.
func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter, allowExpired bool) (*SecurityConfig, error) {
func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter, allowExpired bool) (*SecurityConfig, func() error, error) {
ctx = log.WithModule(ctx, "tls")
// At this point we've successfully loaded the CA details from disk, or
@ -392,13 +394,13 @@ func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter,
// Read both the Cert and Key from disk
cert, key, err := krw.Read()
if err != nil {
return nil, err
return nil, nil, err
}
// Check to see if this certificate was signed by our CA, and isn't expired
_, chains, err := ValidateCertChain(rootCA.Pool, cert, allowExpired)
if err != nil {
return nil, err
return nil, nil, err
}
// ValidateChain, if successful, will always return at least 1 chain containing
// at least 2 certificates: the leaf and the root.
@ -408,10 +410,10 @@ func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter,
// credentials
keyPair, err := tls.X509KeyPair(cert, key)
if err != nil {
return nil, err
return nil, nil, err
}
secConfig, err := NewSecurityConfig(&rootCA, krw, &keyPair, &IssuerInfo{
secConfig, cleanup, err := NewSecurityConfig(&rootCA, krw, &keyPair, &IssuerInfo{
Subject: issuer.RawSubject,
PublicKey: issuer.RawSubjectPublicKeyInfo,
})
@ -421,7 +423,7 @@ func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter,
"node.role": secConfig.ClientTLSCreds.Role(),
}).Debug("loaded node credentials")
}
return secConfig, err
return secConfig, cleanup, err
}
// CertificateRequestConfig contains the information needed to request a
@ -450,7 +452,7 @@ type CertificateRequestConfig struct {
// CreateSecurityConfig creates a new key and cert for this node, either locally
// or via a remote CA.
func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWriter, config CertificateRequestConfig) (*SecurityConfig, error) {
func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWriter, config CertificateRequestConfig) (*SecurityConfig, func() error, error) {
ctx = log.WithModule(ctx, "tls")
// Create a new random ID for this certificate
@ -467,7 +469,7 @@ func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWrite
tlsKeyPair, issuerInfo, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, config)
if err != nil {
log.G(ctx).WithError(err).Error("failed to request and save new certificate")
return nil, err
return nil, nil, err
}
case nil:
log.G(ctx).WithFields(logrus.Fields{
@ -479,17 +481,17 @@ func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWrite
"node.id": cn,
"node.role": proposedRole,
}).WithError(err).Errorf("failed to issue and save new certificate")
return nil, err
return nil, nil, err
}
secConfig, err := NewSecurityConfig(&rootCA, krw, tlsKeyPair, issuerInfo)
secConfig, cleanup, err := NewSecurityConfig(&rootCA, krw, tlsKeyPair, issuerInfo)
if err == nil {
log.G(ctx).WithFields(logrus.Fields{
"node.id": secConfig.ClientTLSCreds.NodeID(),
"node.role": secConfig.ClientTLSCreds.Role(),
}).Debugf("new node credentials generated: %s", krw.Target())
}
return secConfig, err
return secConfig, cleanup, err
}
// TODO(cyli): currently we have to only update if it's a worker role - if we have a single root CA update path for

View file

@ -8,6 +8,7 @@ import (
"encoding/hex"
"encoding/json"
"encoding/pem"
"io"
"io/ioutil"
"net/http"
"sync"
@ -24,8 +25,18 @@ import (
"golang.org/x/net/context/ctxhttp"
)
// ExternalCrossSignProfile is the profile that we will be sending cross-signing CSR sign requests with
const ExternalCrossSignProfile = "CA"
const (
// ExternalCrossSignProfile is the profile that we will be sending cross-signing CSR sign requests with
ExternalCrossSignProfile = "CA"
// CertificateMaxSize is the maximum expected size of a certificate.
// While there is no specced upper limit to the size of an x509 certificate in PEM format,
// one with a ridiculous RSA key size (16384) and 26 256-character DNS SAN fields is about 14k.
// While there is no upper limit on the length of certificate chains, long chains are impractical.
// To be conservative, and to also account for external CA certificate responses in JSON format
// from CFSSL, we'll set the max to be 256KiB.
CertificateMaxSize int64 = 256 << 10
)
// ErrNoExternalCAURLs is an error used it indicate that an ExternalCA is
// configured with no URLs to which it can proxy certificate signing requests.
@ -191,7 +202,8 @@ func makeExternalSignRequest(ctx context.Context, client *http.Client, url strin
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
b := io.LimitReader(resp.Body, CertificateMaxSize)
body, err := ioutil.ReadAll(b)
if err != nil {
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
}

View file

@ -128,7 +128,13 @@ func validateContainerSpec(taskSpec api.TaskSpec) error {
// Building a empty/dummy Task to validate the templating and
// the resulting container spec as well. This is a *best effort*
// validation.
container, err := template.ExpandContainerSpec(&api.Task{
container, err := template.ExpandContainerSpec(&api.NodeDescription{
Hostname: "nodeHostname",
Platform: &api.Platform{
OS: "os",
Architecture: "architecture",
},
}, &api.Task{
Spec: taskSpec,
ServiceID: "serviceid",
Slot: 1,

View file

@ -475,7 +475,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
addr, err := nodeIPFromContext(ctx)
if err != nil {
log.G(ctx).Debug(err.Error())
log.G(ctx).WithError(err).Debug("failed to get remote node IP")
}
if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
@ -483,7 +483,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
}
expireFunc := func() {
log.G(ctx).Debugf("heartbeat expiration")
log.G(ctx).Debug("heartbeat expiration")
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
}
@ -703,7 +703,7 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log.G(stream.Context()).WithFields(fields).Debugf("")
log.G(stream.Context()).WithFields(fields).Debug("")
if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
return err
@ -827,7 +827,7 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log := log.G(stream.Context()).WithFields(fields)
log.Debugf("")
log.Debug("")
if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
return err
@ -1118,7 +1118,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
// get the node IP addr
addr, err := nodeIPFromContext(stream.Context())
if err != nil {
log.G(ctx).Debugf(err.Error())
log.G(ctx).WithError(err).Debug("failed to get remote node IP")
}
// update the node description
if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil {

View file

@ -159,13 +159,13 @@ func (g *Orchestrator) Run(ctx context.Context) error {
switch v.Node.Status.State {
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
case api.NodeStatus_DOWN:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.shutdownTask)
case api.NodeStatus_READY:
// node could come back to READY from DOWN or DISCONNECT
g.reconcileOneNode(ctx, v.Node)
}
case api.EventDeleteNode:
g.removeTasksFromNode(ctx, v.Node)
g.foreachTaskFromNode(ctx, v.Node, g.deleteTask)
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
@ -201,7 +201,7 @@ func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.T
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
return
}
@ -236,7 +236,7 @@ func (g *Orchestrator) Stop() {
g.restarts.CancelAll()
}
func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) {
func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) {
var (
tasks []*api.Task
err error
@ -245,7 +245,7 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed finding tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks")
return
}
@ -253,13 +253,13 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
for _, t := range tasks {
// Global orchestrator only removes tasks from globalServices
if _, exists := g.globalServices[t.ServiceID]; exists {
g.removeTask(ctx, batch, t)
cb(ctx, batch, t)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks")
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks")
}
}
@ -314,7 +314,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
continue
}
@ -340,7 +340,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
g.shutdownTasks(ctx, batch, ntasks)
}
}
return nil
@ -382,7 +382,7 @@ func (g *Orchestrator) updateService(service *api.Service) {
func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
g.removeTasksFromNode(ctx, node)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}
@ -447,7 +447,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.removeTasks(ctx, batch, tasks[serviceID])
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}
@ -491,7 +491,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
g.shutdownTasks(ctx, batch, dirtyTasks)
}
}
return nil
@ -542,7 +542,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
g.restartTasks = make(map[string]struct{})
}
func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *api.Task) {
func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// set existing task DesiredState to TaskStateShutdown
// TODO(aaronl): optimistic update?
err := batch.Update(func(tx store.Tx) error {
@ -554,7 +554,7 @@ func (g *Orchestrator) removeTask(ctx context.Context, batch *store.Batch, t *ap
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTask failed to remove %s", t.ID)
log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID)
}
}
@ -572,9 +572,18 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
}
}
func (g *Orchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.removeTask(ctx, batch, t)
g.shutdownTask(ctx, batch, t)
}
}
func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
err := batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, t.ID)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID)
}
}

View file

@ -15,6 +15,15 @@ type hostPortSpec struct {
publishedPort uint32
}
// versionedService defines a tuple that contains a service ID and a spec
// version, so that failures can be tracked per spec version. Note that if the
// task predates spec versioning, specVersion will contain the zero value, and
// this will still work correctly.
type versionedService struct {
serviceID string
specVersion api.Version
}
// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
@ -24,12 +33,14 @@ type NodeInfo struct {
AvailableResources *api.Resources
usedHostPorts map[hostPortSpec]struct{}
// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
// service.
// TODO(aaronl): When spec versioning is supported, this should track
// the version of the spec that failed.
recentFailures map[string][]time.Time
// recentFailures is a map from service ID/version to the timestamps of
// the most recent failures the node has experienced from replicas of
// that service.
recentFailures map[versionedService][]time.Time
// lastCleanup is the last time recentFailures was cleaned up. This is
// done periodically to avoid recentFailures growing without any limit.
lastCleanup time.Time
}
func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo {
@ -39,7 +50,8 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
ActiveTasksCountByService: make(map[string]int),
AvailableResources: availableResources.Copy(),
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[string][]time.Time),
recentFailures: make(map[versionedService][]time.Time),
lastCleanup: time.Now(),
}
for _, t := range tasks {
@ -148,30 +160,58 @@ func taskReservations(spec api.TaskSpec) (reservations api.Resources) {
return
}
func (nodeInfo *NodeInfo) cleanupFailures(now time.Time) {
entriesLoop:
for key, failuresEntry := range nodeInfo.recentFailures {
for _, timestamp := range failuresEntry {
if now.Sub(timestamp) < monitorFailures {
continue entriesLoop
}
}
delete(nodeInfo.recentFailures, key)
}
nodeInfo.lastCleanup = now
}
// taskFailed records a task failure from a given service.
func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) {
func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, t *api.Task) {
expired := 0
now := time.Now()
for _, timestamp := range nodeInfo.recentFailures[serviceID] {
if now.Sub(nodeInfo.lastCleanup) >= monitorFailures {
nodeInfo.cleanupFailures(now)
}
versionedService := versionedService{serviceID: t.ServiceID}
if t.SpecVersion != nil {
versionedService.specVersion = *t.SpecVersion
}
for _, timestamp := range nodeInfo.recentFailures[versionedService] {
if now.Sub(timestamp) < monitorFailures {
break
}
expired++
}
if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 {
log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String())
if len(nodeInfo.recentFailures[versionedService])-expired == maxFailures-1 {
log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, t.ServiceID, maxFailures, monitorFailures.String())
}
nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now)
nodeInfo.recentFailures[versionedService] = append(nodeInfo.recentFailures[versionedService][expired:], now)
}
// countRecentFailures returns the number of times the service has failed on
// this node within the lookback window monitorFailures.
func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int {
recentFailureCount := len(nodeInfo.recentFailures[serviceID])
func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int {
versionedService := versionedService{serviceID: t.ServiceID}
if t.SpecVersion != nil {
versionedService.specVersion = *t.SpecVersion
}
recentFailureCount := len(nodeInfo.recentFailures[versionedService])
for i := recentFailureCount - 1; i >= 0; i-- {
if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures {
if now.Sub(nodeInfo.recentFailures[versionedService][i]) > monitorFailures {
recentFailureCount -= i + 1
break
}

View file

@ -261,7 +261,7 @@ func (s *Scheduler) updateTask(ctx context.Context, t *api.Task) bool {
if _, wasPreassigned := s.preassignedTasks[t.ID]; !wasPreassigned {
nodeInfo, err := s.nodeSet.nodeInfo(t.NodeID)
if err == nil {
nodeInfo.taskFailed(ctx, t.ServiceID)
nodeInfo.taskFailed(ctx, t)
s.nodeSet.updateNode(nodeInfo)
}
}
@ -543,8 +543,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
nodeLess := func(a *NodeInfo, b *NodeInfo) bool {
// If either node has at least maxFailures recent failures,
// that's the deciding factor.
recentFailuresA := a.countRecentFailures(now, t.ServiceID)
recentFailuresB := b.countRecentFailures(now, t.ServiceID)
recentFailuresA := a.countRecentFailures(now, t)
recentFailuresB := b.countRecentFailures(now, t)
if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures {
if recentFailuresA > recentFailuresB {

View file

@ -43,21 +43,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
clusters, err := FindClusters(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Clusters))
for i, x := range snapshot.Clusters {
toStoreObj[i] = x
}
for _, n := range clusters {
if err := DeleteCluster(tx, n.ID); err != nil {
return err
}
}
for _, n := range snapshot.Clusters {
if err := CreateCluster(tx, n); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableCluster, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -37,21 +37,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
configs, err := FindConfigs(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Configs))
for i, x := range snapshot.Configs {
toStoreObj[i] = x
}
for _, s := range configs {
if err := DeleteConfig(tx, s.ID); err != nil {
return err
}
}
for _, s := range snapshot.Configs {
if err := CreateConfig(tx, s); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableConfig, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -38,21 +38,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
extensions, err := FindExtensions(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Extensions))
for i, x := range snapshot.Extensions {
toStoreObj[i] = extensionEntry{x}
}
for _, e := range extensions {
if err := DeleteExtension(tx, e.ID); err != nil {
return err
}
}
for _, e := range snapshot.Extensions {
if err := CreateExtension(tx, e); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableExtension, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {
@ -80,6 +70,14 @@ func (e extensionEntry) CopyStoreObject() api.StoreObject {
return extensionEntry{Extension: e.Extension.Copy()}
}
// ensure that when update events are emitted, we unwrap extensionEntry
func (e extensionEntry) EventUpdate(oldObject api.StoreObject) api.Event {
if oldObject != nil {
return api.EventUpdateExtension{Extension: e.Extension, OldExtension: oldObject.(extensionEntry).Extension}
}
return api.EventUpdateExtension{Extension: e.Extension}
}
// CreateExtension adds a new extension to the store.
// Returns ErrExist if the ID is already taken.
func CreateExtension(tx Tx, e *api.Extension) error {

View file

@ -37,21 +37,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
networks, err := FindNetworks(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Networks))
for i, x := range snapshot.Networks {
toStoreObj[i] = x
}
for _, n := range networks {
if err := DeleteNetwork(tx, n.ID); err != nil {
return err
}
}
for _, n := range snapshot.Networks {
if err := CreateNetwork(tx, n); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableNetwork, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -47,21 +47,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
nodes, err := FindNodes(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Nodes))
for i, x := range snapshot.Nodes {
toStoreObj[i] = x
}
for _, n := range nodes {
if err := DeleteNode(tx, n.ID); err != nil {
return err
}
}
for _, n := range snapshot.Nodes {
if err := CreateNode(tx, n); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableNode, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -13,3 +13,46 @@ type ObjectStoreConfig struct {
Restore func(Tx, *api.StoreSnapshot) error
ApplyStoreAction func(Tx, api.StoreAction) error
}
// RestoreTable takes a list of new objects of a particular type (e.g. clusters,
// nodes, etc., which conform to the StoreObject interface) and replaces the
// existing objects in the store of that type with the new objects.
func RestoreTable(tx Tx, table string, newObjects []api.StoreObject) error {
checkType := func(by By) error {
return nil
}
var oldObjects []api.StoreObject
appendResult := func(o api.StoreObject) {
oldObjects = append(oldObjects, o)
}
err := tx.find(table, All, checkType, appendResult)
if err != nil {
return nil
}
updated := make(map[string]struct{})
for _, o := range newObjects {
objectID := o.GetID()
if existing := tx.lookup(table, indexID, objectID); existing != nil {
if err := tx.update(table, o); err != nil {
return err
}
updated[objectID] = struct{}{}
} else {
if err := tx.create(table, o); err != nil {
return err
}
}
}
for _, o := range oldObjects {
objectID := o.GetID()
if _, ok := updated[objectID]; !ok {
if err := tx.delete(table, objectID); err != nil {
return err
}
}
}
return nil
}

View file

@ -40,21 +40,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
resources, err := FindResources(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Resources))
for i, x := range snapshot.Resources {
toStoreObj[i] = resourceEntry{x}
}
for _, r := range resources {
if err := DeleteResource(tx, r.ID); err != nil {
return err
}
}
for _, r := range snapshot.Resources {
if err := CreateResource(tx, r); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableResource, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {
@ -82,6 +72,14 @@ func (r resourceEntry) CopyStoreObject() api.StoreObject {
return resourceEntry{Resource: r.Resource.Copy()}
}
// ensure that when update events are emitted, we unwrap resourceEntry
func (r resourceEntry) EventUpdate(oldObject api.StoreObject) api.Event {
if oldObject != nil {
return api.EventUpdateResource{Resource: r.Resource, OldResource: oldObject.(resourceEntry).Resource}
}
return api.EventUpdateResource{Resource: r.Resource}
}
func confirmExtension(tx Tx, r *api.Resource) error {
// There must be an extension corresponding to the Kind field.
extensions, err := FindExtensions(tx, ByName(r.Kind))

View file

@ -37,21 +37,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
secrets, err := FindSecrets(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Secrets))
for i, x := range snapshot.Secrets {
toStoreObj[i] = x
}
for _, s := range secrets {
if err := DeleteSecret(tx, s.ID); err != nil {
return err
}
}
for _, s := range snapshot.Secrets {
if err := CreateSecret(tx, s); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableSecret, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -58,21 +58,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
services, err := FindServices(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Services))
for i, x := range snapshot.Services {
toStoreObj[i] = x
}
for _, s := range services {
if err := DeleteService(tx, s.ID); err != nil {
return err
}
}
for _, s := range snapshot.Services {
if err := CreateService(tx, s); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableService, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -82,21 +82,11 @@ func init() {
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
tasks, err := FindTasks(tx, All)
if err != nil {
return err
toStoreObj := make([]api.StoreObject, len(snapshot.Tasks))
for i, x := range snapshot.Tasks {
toStoreObj[i] = x
}
for _, t := range tasks {
if err := DeleteTask(tx, t.ID); err != nil {
return err
}
}
for _, t := range snapshot.Tasks {
if err := CreateTask(tx, t); err != nil {
return err
}
}
return nil
return RestoreTable(tx, tableTask, toStoreObj)
},
ApplyStoreAction: func(tx Tx, sa api.StoreAction) error {
switch v := sa.Target.(type) {

View file

@ -28,7 +28,6 @@ import (
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/watch"
"github.com/docker/swarmkit/xnet"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
@ -276,10 +275,11 @@ func (n *Node) run(ctx context.Context) (err error) {
}(ctx)
paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
securityConfig, err := n.loadSecurityConfig(ctx, paths)
securityConfig, secConfigCancel, err := n.loadSecurityConfig(ctx, paths)
if err != nil {
return err
}
defer secConfigCancel()
renewer := ca.NewTLSRenewer(securityConfig, n.connBroker, paths.RootCA)
@ -509,11 +509,8 @@ waitPeer:
default:
}
secChangeQueue := watch.NewQueue()
defer secChangeQueue.Close()
secChangesCh, secChangesCancel := secChangeQueue.Watch()
secChangesCh, secChangesCancel := securityConfig.Watch()
defer secChangesCancel()
securityConfig.SetWatch(secChangeQueue)
rootCA := securityConfig.RootCA()
issuer := securityConfig.IssuerInfo()
@ -668,28 +665,31 @@ func (n *Node) Remotes() []api.Peer {
return remotes
}
func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, error) {
var securityConfig *ca.SecurityConfig
func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, func() error, error) {
var (
securityConfig *ca.SecurityConfig
cancel func() error
)
krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
if err := krw.Migrate(); err != nil {
return nil, err
return nil, nil, err
}
// Check if we already have a valid certificates on disk.
rootCA, err := ca.GetLocalRootCA(paths.RootCA)
if err != nil && err != ca.ErrNoLocalRootCA {
return nil, err
return nil, nil, err
}
if err == nil {
// if forcing a new cluster, we allow the certificates to be expired - a new set will be generated
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
if err != nil {
_, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
if isInvalidKEK {
return nil, ErrInvalidUnlockKey
return nil, nil, ErrInvalidUnlockKey
} else if !os.IsNotExist(err) {
return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
return nil, nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
}
}
}
@ -704,16 +704,16 @@ func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigP
krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
rootCA, err = ca.CreateRootCA(ca.DefaultRootCN)
if err != nil {
return nil, err
return nil, nil, err
}
if err := ca.SaveRootCA(rootCA, paths.RootCA); err != nil {
return nil, err
return nil, nil, err
}
log.G(ctx).Debug("generated CA key and certificate")
} else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
if err != nil {
return nil, err
return nil, nil, err
}
log.G(ctx).Debug("downloaded CA certificate")
}
@ -724,25 +724,25 @@ func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigP
// - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
// Attempt to load certificate from disk
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
if err == nil {
log.G(ctx).WithFields(logrus.Fields{
"node.id": securityConfig.ClientTLSCreds.NodeID(),
}).Debugf("loaded TLS certificate")
} else {
if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
return nil, ErrInvalidUnlockKey
return nil, nil, ErrInvalidUnlockKey
}
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
securityConfig, cancel, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
Token: n.config.JoinToken,
Availability: n.config.Availability,
ConnBroker: n.connBroker,
})
if err != nil {
return nil, err
return nil, nil, err
}
}
}
@ -753,7 +753,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigP
n.roleCond.Broadcast()
n.Unlock()
return securityConfig, nil
return securityConfig, cancel, nil
}
func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {

View file

@ -14,6 +14,12 @@ import (
"github.com/pkg/errors"
)
// Platform holds information about the underlying platform of the node
type Platform struct {
Architecture string
OS string
}
// Context defines the strict set of values that can be injected into a
// template expression in SwarmKit data structure.
// NOTE: Be very careful adding any fields to this structure with types
@ -27,7 +33,9 @@ type Context struct {
}
Node struct {
ID string
ID string
Hostname string
Platform Platform
}
Task struct {
@ -41,16 +49,25 @@ type Context struct {
}
}
// NewContextFromTask returns a new template context from the data available in
// task. The provided context can then be used to populate runtime values in a
// NewContext returns a new template context from the data available in the
// task and the node where it is scheduled to run.
// The provided context can then be used to populate runtime values in a
// ContainerSpec.
func NewContextFromTask(t *api.Task) (ctx Context) {
func NewContext(n *api.NodeDescription, t *api.Task) (ctx Context) {
ctx.Service.ID = t.ServiceID
ctx.Service.Name = t.ServiceAnnotations.Name
ctx.Service.Labels = t.ServiceAnnotations.Labels
ctx.Node.ID = t.NodeID
// Add node information to context only if we have them available
if n != nil {
ctx.Node.Hostname = n.Hostname
ctx.Node.Platform = Platform{
Architecture: n.Platform.Architecture,
OS: n.Platform.OS,
}
}
ctx.Task.ID = t.ID
ctx.Task.Name = naming.Task(t)
@ -157,12 +174,13 @@ func (ctx PayloadContext) envGetter(variable string) (string, error) {
}
// NewPayloadContextFromTask returns a new template context from the data
// available in the task. This context also provides access to the configs
// available in the task and the node where it is scheduled to run.
// This context also provides access to the configs
// and secrets that the task has access to. The provided context can then
// be used to populate runtime values in a templated config or secret.
func NewPayloadContextFromTask(t *api.Task, dependencies exec.DependencyGetter) (ctx PayloadContext) {
func NewPayloadContextFromTask(node *api.NodeDescription, t *api.Task, dependencies exec.DependencyGetter) (ctx PayloadContext) {
return PayloadContext{
Context: NewContextFromTask(t),
Context: NewContext(node, t),
t: t,
restrictedSecrets: secrets.Restrict(dependencies.Secrets(), t),
restrictedConfigs: configs.Restrict(dependencies.Configs(), t),

View file

@ -10,18 +10,19 @@ import (
)
// ExpandContainerSpec expands templated fields in the runtime using the task
// state. Templating is all evaluated on the agent-side, before execution.
// state and the node where it is scheduled to run.
// Templating is all evaluated on the agent-side, before execution.
//
// Note that these are projected only on runtime values, since active task
// values are typically manipulated in the manager.
func ExpandContainerSpec(t *api.Task) (*api.ContainerSpec, error) {
func ExpandContainerSpec(n *api.NodeDescription, t *api.Task) (*api.ContainerSpec, error) {
container := t.Spec.GetContainer()
if container == nil {
return nil, errors.Errorf("task missing ContainerSpec to expand")
}
container = container.Copy()
ctx := NewContextFromTask(t)
ctx := NewContext(n, t)
var err error
container.Env, err = expandEnv(ctx, container.Env)
@ -128,12 +129,12 @@ func expandPayload(ctx PayloadContext, payload []byte) ([]byte, error) {
// ExpandSecretSpec expands the template inside the secret payload, if any.
// Templating is evaluated on the agent-side.
func ExpandSecretSpec(s *api.Secret, t *api.Task, dependencies exec.DependencyGetter) (*api.SecretSpec, error) {
func ExpandSecretSpec(s *api.Secret, node *api.NodeDescription, t *api.Task, dependencies exec.DependencyGetter) (*api.SecretSpec, error) {
if s.Spec.Templating == nil {
return &s.Spec, nil
}
if s.Spec.Templating.Name == "golang" {
ctx := NewPayloadContextFromTask(t, dependencies)
ctx := NewPayloadContextFromTask(node, t, dependencies)
secretSpec := s.Spec.Copy()
var err error
@ -145,12 +146,12 @@ func ExpandSecretSpec(s *api.Secret, t *api.Task, dependencies exec.DependencyGe
// ExpandConfigSpec expands the template inside the config payload, if any.
// Templating is evaluated on the agent-side.
func ExpandConfigSpec(c *api.Config, t *api.Task, dependencies exec.DependencyGetter) (*api.ConfigSpec, error) {
func ExpandConfigSpec(c *api.Config, node *api.NodeDescription, t *api.Task, dependencies exec.DependencyGetter) (*api.ConfigSpec, error) {
if c.Spec.Templating == nil {
return &c.Spec, nil
}
if c.Spec.Templating.Name == "golang" {
ctx := NewPayloadContextFromTask(t, dependencies)
ctx := NewPayloadContextFromTask(node, t, dependencies)
configSpec := c.Spec.Copy()
var err error

View file

@ -9,11 +9,12 @@ import (
type templatedSecretGetter struct {
dependencies exec.DependencyGetter
t *api.Task
node *api.NodeDescription
}
// NewTemplatedSecretGetter returns a SecretGetter that evaluates templates.
func NewTemplatedSecretGetter(dependencies exec.DependencyGetter, t *api.Task) exec.SecretGetter {
return templatedSecretGetter{dependencies: dependencies, t: t}
func NewTemplatedSecretGetter(dependencies exec.DependencyGetter, t *api.Task, node *api.NodeDescription) exec.SecretGetter {
return templatedSecretGetter{dependencies: dependencies, t: t, node: node}
}
func (t templatedSecretGetter) Get(secretID string) (*api.Secret, error) {
@ -31,7 +32,7 @@ func (t templatedSecretGetter) Get(secretID string) (*api.Secret, error) {
return secret, err
}
newSpec, err := ExpandSecretSpec(secret, t.t, t.dependencies)
newSpec, err := ExpandSecretSpec(secret, t.node, t.t, t.dependencies)
if err != nil {
return secret, errors.Wrapf(err, "failed to expand templated secret %s", secretID)
}
@ -44,11 +45,12 @@ func (t templatedSecretGetter) Get(secretID string) (*api.Secret, error) {
type templatedConfigGetter struct {
dependencies exec.DependencyGetter
t *api.Task
node *api.NodeDescription
}
// NewTemplatedConfigGetter returns a ConfigGetter that evaluates templates.
func NewTemplatedConfigGetter(dependencies exec.DependencyGetter, t *api.Task) exec.ConfigGetter {
return templatedConfigGetter{dependencies: dependencies, t: t}
func NewTemplatedConfigGetter(dependencies exec.DependencyGetter, t *api.Task, node *api.NodeDescription) exec.ConfigGetter {
return templatedConfigGetter{dependencies: dependencies, t: t, node: node}
}
func (t templatedConfigGetter) Get(configID string) (*api.Config, error) {
@ -66,7 +68,7 @@ func (t templatedConfigGetter) Get(configID string) (*api.Config, error) {
return config, err
}
newSpec, err := ExpandConfigSpec(config, t.t, t.dependencies)
newSpec, err := ExpandConfigSpec(config, t.node, t.t, t.dependencies)
if err != nil {
return config, errors.Wrapf(err, "failed to expand templated config %s", configID)
}
@ -82,10 +84,10 @@ type templatedDependencyGetter struct {
}
// NewTemplatedDependencyGetter returns a DependencyGetter that evaluates templates.
func NewTemplatedDependencyGetter(dependencies exec.DependencyGetter, t *api.Task) exec.DependencyGetter {
func NewTemplatedDependencyGetter(dependencies exec.DependencyGetter, t *api.Task, node *api.NodeDescription) exec.DependencyGetter {
return templatedDependencyGetter{
secrets: NewTemplatedSecretGetter(dependencies, t),
configs: NewTemplatedConfigGetter(dependencies, t),
secrets: NewTemplatedSecretGetter(dependencies, t, node),
configs: NewTemplatedConfigGetter(dependencies, t, node),
}
}

View file

@ -10,7 +10,7 @@ github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f
github.com/docker/go-metrics d466d4f6fd960e01820085bd7e1a24426ee7ef18
# etcd/raft
github.com/coreos/etcd ea5389a79f40206170582c1ea076191b8622cb8e https://github.com/aaronlehmann/etcd # for https://github.com/coreos/etcd/pull/7830
github.com/coreos/etcd v3.2.1
github.com/coreos/go-systemd v12
github.com/coreos/pkg v3
github.com/prometheus/client_golang 52437c81da6b127a9925d17eb3a382a2e5fd395e
@ -28,18 +28,16 @@ github.com/docker/libnetwork 37e20af882e13dd01ade3658b7aabdae3412118b
github.com/docker/libtrust 9cbd2a1374f46905c68a4eb3694a130610adc62a
github.com/opencontainers/runc b6b70e53451794e8333e9b602cc096b47a20bd0f
github.com/opencontainers/go-digest a6d0ee40d4207ea02364bd3b9e8e77b9159ba1eb
github.com/opencontainers/image-spec f03dbe35d449c54915d235f1a3cf8f585a24babe
github.com/opencontainers/image-spec 372ad780f63454fbbbbcc7cf80e5b90245c13e13
# containerd executor
github.com/containerd/containerd 7fc91b05917e93d474fab9465547d44eacd10ce3
github.com/containerd/continuity f4ad4294c92f596c9241947c416d1297f9faf3ea
github.com/containerd/containerd 76697ac8cbf357a19beb58e4805a81fe48cf7974
github.com/containerd/fifo 69b99525e472735860a5269b75af1970142b3062
github.com/opencontainers/runtime-spec v1.0.0-rc5
github.com/nightlyone/lockfile 1d49c987357a327b5b03aa84cbddd582c328615d
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/Microsoft/go-winio f778f05015353be65d242f3fedc18695756153bb
github.com/Microsoft/go-winio v0.4.2
github.com/Sirupsen/logrus v0.11.0
github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
github.com/boltdb/bolt e72f08ddb5a52992c0a44c7dda9316c7333938b2
@ -52,7 +50,7 @@ github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4
github.com/inconshreveable/mousetrap 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75
github.com/phayes/permbits f7e3ac5e859d0b919c5068d581cc4c5d4f4f9bc5
github.com/pivotal-golang/clock 3fd3c1944c59d9742e1cd333672181cd1a6f9fa0
github.com/pkg/errors 01fa4104b9c248c8945d14d9f128454d5b28d595
github.com/pkg/errors 645ef00459ed84a119197bfb8d8205042c6df63d
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/rcrowley/go-metrics 51425a2415d21afadfd55cd93432c0bc69e9598d
github.com/spf13/cobra 8e91712f174ced10270cf66615e0a9127e7c4de5

158
vendor/github.com/docker/swarmkit/watch/queue/queue.go generated vendored Normal file
View file

@ -0,0 +1,158 @@
package queue
import (
"container/list"
"fmt"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
)
// ErrQueueFull is returned by a Write operation when that Write causes the
// queue to reach its size limit.
var ErrQueueFull = fmt.Errorf("queue closed due to size limit")
// LimitQueue accepts all messages into a queue for asynchronous consumption by
// a sink until an upper limit of messages is reached. When that limit is
// reached, the entire Queue is Closed. It is thread safe but the
// sink must be reliable or events will be dropped.
// If a size of 0 is provided, the LimitQueue is considered limitless.
type LimitQueue struct {
dst events.Sink
events *list.List
limit uint64
cond *sync.Cond
mu sync.Mutex
closed bool
full chan struct{}
fullClosed bool
}
// NewLimitQueue returns a queue to the provided Sink dst.
func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
eq := LimitQueue{
dst: dst,
events: list.New(),
limit: limit,
full: make(chan struct{}),
}
eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq
}
// Write accepts the events into the queue, only failing if the queue has
// been closed or has reached its size limit.
func (eq *LimitQueue) Write(event events.Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return events.ErrSinkClosed
}
if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
// If the limit has been reached, don't write the event to the queue,
// and close the Full channel. This notifies listeners that the queue
// is now full, but the sink is still permitted to consume events. It's
// the responsibility of the listener to decide whether they want to
// live with dropped events or whether they want to Close() the
// LimitQueue
if !eq.fullClosed {
eq.fullClosed = true
close(eq.full)
}
return ErrQueueFull
}
eq.events.PushBack(event)
eq.cond.Signal() // signal waiters
return nil
}
// Full returns a channel that is closed when the queue becomes full for the
// first time.
func (eq *LimitQueue) Full() chan struct{} {
return eq.full
}
// Close shuts down the event queue, flushing all events
func (eq *LimitQueue) Close() error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return nil
}
// set the closed flag
eq.closed = true
eq.cond.Signal() // signal flushes queue
eq.cond.Wait() // wait for signal from last flush
return eq.dst.Close()
}
// run is the main goroutine to flush events to the target sink.
func (eq *LimitQueue) run() {
for {
event := eq.next()
if event == nil {
return // nil block means event queue is closed.
}
if err := eq.dst.Write(event); err != nil {
// TODO(aaronl): Dropping events could be bad depending
// on the application. We should have a way of
// communicating this condition. However, logging
// at a log level above debug may not be appropriate.
// Eventually, go-events should not use logrus at all,
// and should bubble up conditions like this through
// error values.
logrus.WithFields(logrus.Fields{
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
}
}
}
// Len returns the number of items that are currently stored in the queue and
// not consumed by its sink.
func (eq *LimitQueue) Len() int {
eq.mu.Lock()
defer eq.mu.Unlock()
return eq.events.Len()
}
func (eq *LimitQueue) String() string {
eq.mu.Lock()
defer eq.mu.Unlock()
return fmt.Sprintf("%v", eq.events)
}
// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *LimitQueue) next() events.Event {
eq.mu.Lock()
defer eq.mu.Unlock()
for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}
eq.cond.Wait()
}
front := eq.events.Front()
block := front.Value.(events.Event)
eq.events.Remove(front)
return block
}

95
vendor/github.com/docker/swarmkit/watch/sinks.go generated vendored Normal file
View file

@ -0,0 +1,95 @@
package watch
import (
"fmt"
"time"
events "github.com/docker/go-events"
)
// ErrSinkTimeout is returned from the Write method when a sink times out.
var ErrSinkTimeout = fmt.Errorf("timeout exceeded, tearing down sink")
// timeoutSink is a sink that wraps another sink with a timeout. If the
// embedded sink fails to complete a Write operation within the specified
// timeout, the Write operation of the timeoutSink fails.
type timeoutSink struct {
timeout time.Duration
sink events.Sink
}
func (s timeoutSink) Write(event events.Event) error {
errChan := make(chan error)
go func(c chan<- error) {
c <- s.sink.Write(event)
}(errChan)
timer := time.NewTimer(s.timeout)
select {
case err := <-errChan:
timer.Stop()
return err
case <-timer.C:
s.sink.Close()
return ErrSinkTimeout
}
}
func (s timeoutSink) Close() error {
return s.sink.Close()
}
// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
// debug log messages that may be confusing. It is possible that the queue
// will try to write an event to its destination channel while the queue is
// being removed from the broadcaster. Since the channel is closed before the
// queue, there is a narrow window when this is possible. In some event-based
// dropping events when a sink is removed from a broadcaster is a problem, but
// for the usage in this watch package that's the expected behavior.
type dropErrClosed struct {
sink events.Sink
}
func (s dropErrClosed) Write(event events.Event) error {
err := s.sink.Write(event)
if err == events.ErrSinkClosed {
return nil
}
return err
}
func (s dropErrClosed) Close() error {
return s.sink.Close()
}
// dropErrClosedChanGen is a ChannelSinkGenerator for dropErrClosed sinks wrapping
// unbuffered channels.
type dropErrClosedChanGen struct{}
func (s *dropErrClosedChanGen) NewChannelSink() (events.Sink, *events.Channel) {
ch := events.NewChannel(0)
return dropErrClosed{sink: ch}, ch
}
// TimeoutDropErrChanGen is a ChannelSinkGenerator that creates a channel,
// wrapped by the dropErrClosed sink and a timeout.
type TimeoutDropErrChanGen struct {
timeout time.Duration
}
// NewChannelSink creates a new sink chain of timeoutSink->dropErrClosed->Channel
func (s *TimeoutDropErrChanGen) NewChannelSink() (events.Sink, *events.Channel) {
ch := events.NewChannel(0)
return timeoutSink{
timeout: s.timeout,
sink: dropErrClosed{
sink: ch,
},
}, ch
}
// NewTimeoutDropErrSinkGen returns a generator of timeoutSinks wrapping dropErrClosed
// sinks, wrapping unbuffered channel sinks.
func NewTimeoutDropErrSinkGen(timeout time.Duration) ChannelSinkGenerator {
return &TimeoutDropErrChanGen{timeout: timeout}
}

View file

@ -1,48 +1,81 @@
package watch
import (
"context"
"fmt"
"sync"
"time"
"github.com/docker/go-events"
"github.com/docker/swarmkit/watch/queue"
)
// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
// debug log messages that may be confusing. It is possible that the queue
// will try to write an event to its destination channel while the queue is
// being removed from the broadcaster. Since the channel is closed before the
// queue, there is a narrow window when this is possible. In some event-based
// dropping events when a sink is removed from a broadcaster is a problem, but
// for the usage in this watch package that's the expected behavior.
type dropErrClosed struct {
sink events.Sink
}
func (s dropErrClosed) Write(event events.Event) error {
err := s.sink.Write(event)
if err == events.ErrSinkClosed {
return nil
}
return err
}
func (s dropErrClosed) Close() error {
return s.sink.Close()
// ChannelSinkGenerator is a constructor of sinks that eventually lead to a
// channel.
type ChannelSinkGenerator interface {
NewChannelSink() (events.Sink, *events.Channel)
}
// Queue is the structure used to publish events and watch for them.
type Queue struct {
sinkGen ChannelSinkGenerator
// limit is the max number of items to be held in memory for a watcher
limit uint64
mu sync.Mutex
broadcast *events.Broadcaster
cancelFuncs map[*events.Channel]func()
cancelFuncs map[events.Sink]func()
// closeOutChan indicates whether the watchers' channels should be closed
// when a watcher queue reaches its limit or when the Close method of the
// sink is called.
closeOutChan bool
}
// NewQueue creates a new publish/subscribe queue which supports watchers.
// The channels that it will create for subscriptions will have the buffer
// size specified by buffer.
func NewQueue() *Queue {
return &Queue{
broadcast: events.NewBroadcaster(),
cancelFuncs: make(map[*events.Channel]func()),
func NewQueue(options ...func(*Queue) error) *Queue {
// Create a queue with the default values
q := &Queue{
sinkGen: &dropErrClosedChanGen{},
broadcast: events.NewBroadcaster(),
cancelFuncs: make(map[events.Sink]func()),
limit: 0,
closeOutChan: false,
}
for _, option := range options {
err := option(q)
if err != nil {
panic(fmt.Sprintf("Failed to apply options to queue: %s", err))
}
}
return q
}
// WithTimeout returns a functional option for a queue that sets a write timeout
func WithTimeout(timeout time.Duration) func(*Queue) error {
return func(q *Queue) error {
q.sinkGen = NewTimeoutDropErrSinkGen(timeout)
return nil
}
}
// WithCloseOutChan returns a functional option for a queue whose watcher
// channel is closed when no more events are expected to be sent to the watcher.
func WithCloseOutChan() func(*Queue) error {
return func(q *Queue) error {
q.closeOutChan = true
return nil
}
}
// WithLimit returns a functional option for a queue with a max size limit.
func WithLimit(limit uint64) func(*Queue) error {
return func(q *Queue) error {
q.limit = limit
return nil
}
}
@ -52,13 +85,21 @@ func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
return q.CallbackWatch(nil)
}
// WatchContext returns a channel where all items published to the queue will
// be received. The channel will be closed when the provided context is
// cancelled.
func (q *Queue) WatchContext(ctx context.Context) (eventq chan events.Event) {
return q.CallbackWatchContext(ctx, nil)
}
// CallbackWatch returns a channel which will receive all events published to
// the queue from this point that pass the check in the provided callback
// function. The returned cancel function will stop the flow of events and
// close the channel.
func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func()) {
ch := events.NewChannel(0)
sink := events.Sink(events.NewQueue(dropErrClosed{sink: ch}))
chanSink, ch := q.sinkGen.NewChannelSink()
lq := queue.NewLimitQueue(chanSink, q.limit)
sink := events.Sink(lq)
if matcher != nil {
sink = events.NewFilter(sink, matcher)
@ -72,19 +113,68 @@ func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event,
sink.Close()
}
q.mu.Lock()
q.cancelFuncs[ch] = cancelFunc
q.mu.Unlock()
return ch.C, func() {
externalCancelFunc := func() {
q.mu.Lock()
cancelFunc := q.cancelFuncs[ch]
delete(q.cancelFuncs, ch)
cancelFunc := q.cancelFuncs[sink]
delete(q.cancelFuncs, sink)
q.mu.Unlock()
if cancelFunc != nil {
cancelFunc()
}
}
q.mu.Lock()
q.cancelFuncs[sink] = cancelFunc
q.mu.Unlock()
// If the output channel shouldn't be closed and the queue is limitless,
// there's no need for an additional goroutine.
if !q.closeOutChan && q.limit == 0 {
return ch.C, externalCancelFunc
}
outChan := make(chan events.Event)
go func() {
for {
select {
case <-ch.Done():
// Close the output channel if the ChannelSink is Done for any
// reason. This can happen if the cancelFunc is called
// externally or if it has been closed by a wrapper sink, such
// as the TimeoutSink.
if q.closeOutChan {
close(outChan)
}
externalCancelFunc()
return
case <-lq.Full():
// Close the output channel and tear down the Queue if the
// LimitQueue becomes full.
if q.closeOutChan {
close(outChan)
}
externalCancelFunc()
return
case event := <-ch.C:
outChan <- event
}
}
}()
return outChan, externalCancelFunc
}
// CallbackWatchContext returns a channel where all items published to the queue will
// be received. The channel will be closed when the provided context is
// cancelled.
func (q *Queue) CallbackWatchContext(ctx context.Context, matcher events.Matcher) (eventq chan events.Event) {
c, cancel := q.CallbackWatch(matcher)
go func() {
<-ctx.Done()
cancel()
}()
return c
}
// Publish adds an item to the queue.
@ -100,7 +190,7 @@ func (q *Queue) Close() error {
for _, cancelFunc := range q.cancelFuncs {
cancelFunc()
}
q.cancelFuncs = make(map[*events.Channel]func())
q.cancelFuncs = make(map[events.Sink]func())
q.mu.Unlock()
return q.broadcast.Close()