Revendor swarmkit to 2eaae1ab6800f8521236e01bafb2667d2ec5371f

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2016-11-04 12:11:41 -07:00
parent 9322636c89
commit 014eeb539b
28 changed files with 4233 additions and 100 deletions

View File

@ -100,7 +100,7 @@ github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d
# cluster
github.com/docker/swarmkit 4dfc88ccce14ced6f0a6ea82d46dca004c6de0e2
github.com/docker/swarmkit 2eaae1ab6800f8521236e01bafb2667d2ec5371f
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
@ -48,11 +49,8 @@ func New(config *Config) (*Agent, error) {
return nil, err
}
worker := newWorker(config.DB, config.Executor)
a := &Agent{
config: config,
worker: worker,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
stopped: make(chan struct{}),
@ -60,6 +58,7 @@ func New(config *Config) (*Agent, error) {
ready: make(chan struct{}),
}
a.worker = newWorker(config.DB, config.Executor, a)
return a, nil
}
@ -147,11 +146,12 @@ func (a *Agent) run(ctx context.Context) {
defer nodeUpdateTicker.Stop()
var (
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
subscriptions = map[string]context.CancelFunc{}
)
if err := a.worker.Init(ctx); err != nil {
@ -159,6 +159,7 @@ func (a *Agent) run(ctx context.Context) {
a.err = err
return // fatal?
}
defer a.worker.Close()
// setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a)
@ -186,6 +187,23 @@ func (a *Agent) run(ctx context.Context) {
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
if sub.Close {
if cancel, ok := subscriptions[sub.ID]; ok {
cancel()
}
delete(subscriptions, sub.ID)
continue
}
if _, ok := subscriptions[sub.ID]; ok {
// Duplicate subscription
continue
}
subCtx, subCancel := context.WithCancel(ctx)
subscriptions[sub.ID] = subCancel
go a.worker.Subscribe(subCtx, sub)
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
@ -387,6 +405,40 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}
// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
var (
err error
client api.LogBroker_PublishLogsClient
)
err = a.withSession(ctx, func(session *session) error {
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
return err
})
if err != nil {
return nil, err
}
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
select {
case <-ctx.Done():
client.CloseSend()
return ctx.Err()
default:
}
return client.Send(&api.PublishLogsMessage{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
})
}), nil
}
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

View File

@ -45,6 +45,33 @@ type Controller interface {
Close() error
}
// ControllerLogs defines a component that makes logs accessible.
//
// Can usually be accessed on a controller instance via type assertion.
type ControllerLogs interface {
// Logs will write publisher until the context is cancelled or an error
// occurs.
Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
}
// LogPublisher defines the protocol for receiving a log message.
type LogPublisher interface {
Publish(ctx context.Context, message api.LogMessage) error
}
// LogPublisherFunc implements publisher with just a function.
type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
return fn(ctx, message)
}
// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error)
}
// ContainerStatuser reports status of a container.
//
// This can be implemented by controllers or error types.

View File

@ -110,6 +110,100 @@ func (_mr *_MockControllerRecorder) Close() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Close")
}
// Mock of ControllerLogs interface
type MockControllerLogs struct {
ctrl *gomock.Controller
recorder *_MockControllerLogsRecorder
}
// Recorder for MockControllerLogs (not exported)
type _MockControllerLogsRecorder struct {
mock *MockControllerLogs
}
func NewMockControllerLogs(ctrl *gomock.Controller) *MockControllerLogs {
mock := &MockControllerLogs{ctrl: ctrl}
mock.recorder = &_MockControllerLogsRecorder{mock}
return mock
}
func (_m *MockControllerLogs) EXPECT() *_MockControllerLogsRecorder {
return _m.recorder
}
func (_m *MockControllerLogs) Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error {
ret := _m.ctrl.Call(_m, "Logs", ctx, publisher, options)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockControllerLogsRecorder) Logs(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Logs", arg0, arg1, arg2)
}
// Mock of LogPublisher interface
type MockLogPublisher struct {
ctrl *gomock.Controller
recorder *_MockLogPublisherRecorder
}
// Recorder for MockLogPublisher (not exported)
type _MockLogPublisherRecorder struct {
mock *MockLogPublisher
}
func NewMockLogPublisher(ctrl *gomock.Controller) *MockLogPublisher {
mock := &MockLogPublisher{ctrl: ctrl}
mock.recorder = &_MockLogPublisherRecorder{mock}
return mock
}
func (_m *MockLogPublisher) EXPECT() *_MockLogPublisherRecorder {
return _m.recorder
}
func (_m *MockLogPublisher) Publish(ctx context.Context, message api.LogMessage) error {
ret := _m.ctrl.Call(_m, "Publish", ctx, message)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockLogPublisherRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1)
}
// Mock of LogPublisherProvider interface
type MockLogPublisherProvider struct {
ctrl *gomock.Controller
recorder *_MockLogPublisherProviderRecorder
}
// Recorder for MockLogPublisherProvider (not exported)
type _MockLogPublisherProviderRecorder struct {
mock *MockLogPublisherProvider
}
func NewMockLogPublisherProvider(ctrl *gomock.Controller) *MockLogPublisherProvider {
mock := &MockLogPublisherProvider{ctrl: ctrl}
mock.recorder = &_MockLogPublisherProviderRecorder{mock}
return mock
}
func (_m *MockLogPublisherProvider) EXPECT() *_MockLogPublisherProviderRecorder {
return _m.recorder
}
func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error) {
ret := _m.ctrl.Call(_m, "Publisher", ctx, subscriptionID)
ret0, _ := ret[0].(LogPublisher)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockLogPublisherProviderRecorder) Publisher(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publisher", arg0, arg1)
}
// Mock of ContainerStatuser interface
type MockContainerStatuser struct {
ctrl *gomock.Controller

View File

@ -33,12 +33,13 @@ type session struct {
conn *grpc.ClientConn
addr string
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage
registered chan struct{} // closed registration
closed chan struct{}
@ -47,14 +48,19 @@ type session struct {
func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.
peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
@ -98,6 +104,7 @@ func (s *session) run(ctx context.Context, delay time.Duration, description *api
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
close(s.registered)
}
@ -213,6 +220,33 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}
}
func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")
client := api.NewLogBrokerClient(s.conn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
}
defer subscriptions.CloseSend()
for {
resp, err := subscriptions.Recv()
if err != nil {
return err
}
select {
case s.subscriptions <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *session) watch(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")

View File

@ -64,6 +64,18 @@ func (tm *taskManager) Close() error {
}
}
func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
ctx = log.WithModule(ctx, "taskmanager")
logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
if !ok {
return // no logs available
}
if err := logCtlr.Logs(ctx, publisher, options); err != nil {
log.G(ctx).WithError(err).Errorf("logs call failed")
}
}
func (tm *taskManager) run(ctx context.Context) {
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll() // cancel all child operations on exit.

View File

@ -8,6 +8,7 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context"
)
@ -17,6 +18,11 @@ type Worker interface {
// Init prepares the worker for task assignment.
Init(ctx context.Context) error
// Close performs worker cleanup when no longer needed.
//
// It is not safe to call any worker function after that.
Close()
// Assign assigns a complete set of tasks and secrets to a worker. Any task or secrets not included in
// this set will be removed.
Assign(ctx context.Context, assignments []*api.AssignmentChange) error
@ -31,6 +37,9 @@ type Worker interface {
//
// The listener will be removed if the context is cancelled.
Listen(ctx context.Context, reporter StatusReporter)
// Subscribe to log messages matching the subscription.
Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
}
// statusReporterKey protects removal map from panic.
@ -39,20 +48,25 @@ type statusReporterKey struct {
}
type worker struct {
db *bolt.DB
executor exec.Executor
listeners map[*statusReporterKey]struct{}
db *bolt.DB
executor exec.Executor
publisher exec.LogPublisher
listeners map[*statusReporterKey]struct{}
taskevents *watch.Queue
publisherProvider exec.LogPublisherProvider
taskManagers map[string]*taskManager
mu sync.RWMutex
}
func newWorker(db *bolt.DB, executor exec.Executor) *worker {
func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
return &worker{
db: db,
executor: executor,
listeners: make(map[*statusReporterKey]struct{}),
taskManagers: make(map[string]*taskManager),
db: db,
executor: executor,
publisherProvider: publisherProvider,
taskevents: watch.NewQueue(),
listeners: make(map[*statusReporterKey]struct{}),
taskManagers: make(map[string]*taskManager),
}
}
@ -90,6 +104,11 @@ func (w *worker) Init(ctx context.Context) error {
})
}
// Close performs worker cleanup when no longer needed.
func (w *worker) Close() {
w.taskevents.Close()
}
// Assign assigns a full set of tasks and secrets to the worker.
// Any tasks not previously known will be started. Any tasks that are in the task set
// and already running will be updated, if possible. Any tasks currently running on
@ -319,6 +338,7 @@ func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
}
func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error {
w.taskevents.Publish(task.Copy())
_, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation.
if err != nil {
@ -381,3 +401,63 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin
return nil
}
// Subscribe to log messages matching the subscription.
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID)
if err != nil {
return err
}
// Send a close once we're done
defer publisher.Publish(ctx, api.LogMessage{})
match := func(t *api.Task) bool {
// TODO(aluzzardi): Consider using maps to limit the iterations.
for _, tid := range subscription.Selector.TaskIDs {
if t.ID == tid {
return true
}
}
for _, sid := range subscription.Selector.ServiceIDs {
if t.ServiceID == sid {
return true
}
}
for _, nid := range subscription.Selector.NodeIDs {
if t.NodeID == nid {
return true
}
}
return false
}
ch, cancel := w.taskevents.Watch()
defer cancel()
w.mu.Lock()
for _, tm := range w.taskManagers {
if match(tm.task) {
go tm.Logs(ctx, *subscription.Options, publisher)
}
}
w.mu.Unlock()
for {
select {
case v := <-ch:
w.mu.Lock()
task := v.(*api.Task)
if match(task) {
go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
}
w.mu.Unlock()
case <-ctx.Done():
return ctx.Err()
}
}
}

View File

@ -1,3 +1,3 @@
package api
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto logbroker.proto

3130
vendor/github.com/docker/swarmkit/api/logbroker.pb.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

170
vendor/github.com/docker/swarmkit/api/logbroker.proto generated vendored Normal file
View File

@ -0,0 +1,170 @@
syntax = "proto3";
package docker.swarmkit.v1;
import "gogoproto/gogo.proto";
import "timestamp/timestamp.proto"; // TODO(stevvooe): use our own until we fix gogoproto/deepcopy
import "plugin/plugin.proto";
// LogStream defines the stream from which the log message came.
enum LogStream {
option (gogoproto.goproto_enum_prefix) = false;
option (gogoproto.enum_customname) = "LogStream";
LOG_STREAM_UNKNOWN = 0 [(gogoproto.enumvalue_customname) = "LogStreamUnknown"];
LOG_STREAM_STDOUT = 1 [(gogoproto.enumvalue_customname) = "LogStreamStdout"];
LOG_STREAM_STDERR = 2 [(gogoproto.enumvalue_customname) = "LogStreamStderr"];
}
message LogSubscriptionOptions {
// Streams defines which log streams should be sent from the task source.
// Empty means send all the messages.
repeated LogStream streams = 1;
// Follow instructs the publisher to continue sending log messages as they
// are produced, after satisfying the initial query.
bool follow = 2;
// Tail defines how many messages relative to the log stream to send when
// starting the stream.
//
// Positive values will skip that number of messages from the start of the
// stream before publishing.
//
// Negative values will specify messages relative to the end of the stream,
// offset by one. We can say that the last (-n-1) lines are returned when n
// < 0. As reference, -1 would mean send no log lines (typically used with
// follow), -2 would return the last log line, -11 would return the last 10
// and so on.
//
// The default value of zero will return all logs.
//
// Note that this is very different from the Docker API.
int64 tail = 3;
// Since indicates that only log messages produced after this timestamp
// should be sent.
Timestamp since = 4;
}
// LogSelector will match logs from ANY of the defined parameters.
//
// For the best effect, the client should use the least specific parameter
// possible. For example, if they want to listen to all the tasks of a service,
// they should use the service id, rather than specifying the individual tasks.
message LogSelector {
repeated string service_ids = 1 [(gogoproto.customname) = "ServiceIDs"];
repeated string node_ids = 2 [(gogoproto.customname) = "NodeIDs"];
repeated string task_ids = 3 [(gogoproto.customname) = "TaskIDs"];
}
// LogContext marks the context from which a log message was generated.
message LogContext {
string service_id = 1 [(gogoproto.customname) = "ServiceID"];
string node_id = 2 [(gogoproto.customname) = "NodeID"];
string task_id = 3 [(gogoproto.customname) = "TaskID"];
}
// LogMessage
message LogMessage {
// Context identifies the source of the log message.
LogContext context = 1 [(gogoproto.nullable) = false];
// Timestamp is the time at which the message was generated.
Timestamp timestamp = 2;
// Stream identifies the stream of the log message, stdout or stderr.
LogStream stream = 3;
// Data is the raw log message, as generated by the application.
bytes data = 4;
}
// Logs defines the methods for retrieving task logs messages from a cluster.
service Logs {
// SubscribeLogs starts a subscription with the specified selector and options.
//
// The subscription will be distributed to relevant nodes and messages will
// be collected and sent via the returned stream.
//
// The subscription will end with an EOF.
rpc SubscribeLogs(SubscribeLogsRequest) returns (stream SubscribeLogsMessage) {
option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-manager" };
}
}
message SubscribeLogsRequest {
// LogSelector describes the logs to which the subscriber is
LogSelector selector = 1;
LogSubscriptionOptions options = 2;
}
message SubscribeLogsMessage {
repeated LogMessage messages = 1 [(gogoproto.nullable) = false];
}
// LogBroker defines the API used by the worker to send task logs back to a
// manager. A client listens for subscriptions then optimistically retrieves
// logs satisfying said subscriptions, calling PublishLogs for results that are
// relevant.
//
// The structure of ListenSubscriptions is similar to the Dispatcher API but
// decoupled to allow log distribution to work outside of the regular task
// flow.
service LogBroker {
// ListenSubscriptions starts a subscription stream for the node. For each
// message received, the node should attempt to satisfy the subscription.
//
// Log messages that match the provided subscription should be sent via
// PublishLogs.
rpc ListenSubscriptions(ListenSubscriptionsRequest) returns (stream SubscriptionMessage) {
option (docker.protobuf.plugin.tls_authorization) = {
roles: "swarm-worker"
roles: "swarm-manager"
};
}
// PublishLogs receives sets of log messages destined for a single
// subscription identifier.
rpc PublishLogs(stream PublishLogsMessage) returns (PublishLogsResponse) {
option (docker.protobuf.plugin.tls_authorization) = {
roles: "swarm-worker"
roles: "swarm-manager"
};
}
}
// ListenSubscriptionsRequest is a placeholder to begin listening for
// subscriptions.
message ListenSubscriptionsRequest { }
// SubscriptionMessage instructs the listener to start publishing messages for
// the stream or end a subscription.
//
// If Options.Follow == false, the worker should end the subscription on its own.
message SubscriptionMessage {
// ID identifies the subscription.
string id = 1 [(gogoproto.customname) = "ID"];
// Selector defines which sources should be sent for the subscription.
LogSelector selector = 2;
// Options specify how the subscription should be satisfied.
LogSubscriptionOptions options = 3;
// Close will be true if the node should shutdown the subscription with the
// provided identifier.
bool close = 4;
}
message PublishLogsMessage {
// SubscriptionID identifies which subscription the set of messages should
// be sent to. We can think of this as a "mail box" for the subscription.
string subscription_id = 1 [(gogoproto.customname) = "SubscriptionID"];
// Messages is the log message for publishing.
repeated LogMessage messages = 2 [(gogoproto.nullable) = false];
}
message PublishLogsResponse { }

View File

@ -16,6 +16,7 @@
raft.proto
health.proto
resource.proto
logbroker.proto
It has these top-level messages:
Version
@ -168,6 +169,16 @@
AttachNetworkResponse
DetachNetworkRequest
DetachNetworkResponse
LogSubscriptionOptions
LogSelector
LogContext
LogMessage
SubscribeLogsRequest
SubscribeLogsMessage
ListenSubscriptionsRequest
SubscriptionMessage
PublishLogsMessage
PublishLogsResponse
*/
package api

View File

@ -145,36 +145,29 @@ func (s *Server) GetNetwork(ctx context.Context, request *api.GetNetworkRequest)
// - Returns `NotFound` if the Network is not found.
// - Returns an error if the deletion fails.
func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRequest) (*api.RemoveNetworkResponse, error) {
var (
services []*api.Service
err error
)
if request.NetworkID == "" {
return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
s.store.View(func(tx store.ReadTx) {
services, err = store.FindServices(tx, store.All)
})
if err != nil {
return nil, grpc.Errorf(codes.Internal, "could not find services using network %s", request.NetworkID)
}
for _, s := range services {
specNetworks := s.Spec.Task.Networks
if len(specNetworks) == 0 {
specNetworks = s.Spec.Networks
err := s.store.Update(func(tx store.Tx) error {
services, err := store.FindServices(tx, store.ByReferencedNetworkID(request.NetworkID))
if err != nil {
return grpc.Errorf(codes.Internal, "could not find services using network %s: %v", request.NetworkID, err)
}
for _, na := range specNetworks {
if na.Target == request.NetworkID {
return nil, grpc.Errorf(codes.FailedPrecondition, "network %s is in use", request.NetworkID)
}
if len(services) != 0 {
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", request.NetworkID, services[0].ID)
}
tasks, err := store.FindTasks(tx, store.ByReferencedNetworkID(request.NetworkID))
if err != nil {
return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err)
}
if len(tasks) != 0 {
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID)
}
}
err = s.store.Update(func(tx store.Tx) error {
nw := store.GetNetwork(tx, request.NetworkID)
if _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok {
networkDescription := nw.ID

View File

@ -2,6 +2,7 @@ package controlapi
import (
"regexp"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
@ -191,6 +192,7 @@ func (s *Server) CreateSecret(ctx context.Context, request *api.CreateSecretRequ
// RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`.
// - Returns `InvalidArgument` if `RemoveSecretRequest.ID` is empty.
// - Returns `NotFound` if the a secret named `RemoveSecretRequest.ID` is not found.
// - Returns `SecretInUse` if the secret is currently in use
// - Returns an error if the deletion fails.
func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequest) (*api.RemoveSecretResponse, error) {
if request.SecretID == "" {
@ -198,6 +200,34 @@ func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequ
}
err := s.store.Update(func(tx store.Tx) error {
// Check if the secret exists
secret := store.GetSecret(tx, request.SecretID)
if secret == nil {
return grpc.Errorf(codes.NotFound, "could not find secret %s", request.SecretID)
}
// Check if any services currently reference this secret, return error if so
services, err := store.FindServices(tx, store.ByReferencedSecretID(request.SecretID))
if err != nil {
return grpc.Errorf(codes.Internal, "could not find services using secret %s: %v", request.SecretID, err)
}
if len(services) != 0 {
serviceNames := make([]string, 0, len(services))
for _, service := range services {
serviceNames = append(serviceNames, service.Spec.Annotations.Name)
}
secretName := secret.Spec.Annotations.Name
serviceNameStr := strings.Join(serviceNames, ", ")
serviceStr := "services"
if len(serviceNames) == 1 {
serviceStr = "service"
}
return grpc.Errorf(codes.InvalidArgument, "secret '%s' is in use by the following %s: %v", secretName, serviceStr, serviceNameStr)
}
return store.DeleteSecret(tx, request.SecretID)
})
switch err {

View File

@ -6,6 +6,7 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"github.com/docker/distribution/reference"
"github.com/docker/swarmkit/api"
@ -208,6 +209,47 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
return nil
}
// validateSecretRefsSpec finds if the secrets passed in spec are valid and have no
// conflicting targets.
func validateSecretRefsSpec(spec *api.ServiceSpec) error {
container := spec.Task.GetContainer()
if container == nil {
return nil
}
// Keep a map to track all the targets that will be exposed
// The string returned is only used for logging. It could as well be struct{}{}
existingTargets := make(map[string]string)
for _, secretRef := range container.Secrets {
// SecretID and SecretName are mandatory, we have invalid references without them
if secretRef.SecretID == "" || secretRef.SecretName == "" {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference")
}
// Every secret referece requires a Target
if secretRef.GetTarget() == nil {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided")
}
// If this is a file target, we will ensure filename uniqueness
if secretRef.GetFile() != nil {
fileName := secretRef.GetFile().Name
// Validate the file name
if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
}
// If this target is already in use, we have conflicting targets
if prevSecretName, ok := existingTargets[fileName]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
}
existingTargets[fileName] = secretRef.SecretName
}
}
return nil
}
func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error {
for _, na := range networks {
var network *api.Network
@ -242,6 +284,11 @@ func validateServiceSpec(spec *api.ServiceSpec) error {
if err := validateEndpointSpec(spec.Endpoint); err != nil {
return err
}
// Check to see if the Secret Reference portion of the spec is valid
if err := validateSecretRefsSpec(spec); err != nil {
return err
}
return nil
}
@ -305,42 +352,30 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return nil
}
// checkSecretValidity finds if the secrets passed in spec have any conflicting targets.
func (s *Server) checkSecretValidity(spec *api.ServiceSpec) error {
// checkSecretExistence finds if the secret exists
func (s *Server) checkSecretExistence(tx store.Tx, spec *api.ServiceSpec) error {
container := spec.Task.GetContainer()
if container == nil {
return nil
}
// Keep a map to track all the targets that will be exposed
// The string returned is only used for logging. It could as well be struct{}{}
existingTargets := make(map[string]string)
var failedSecrets []string
for _, secretRef := range container.Secrets {
// SecretID and SecretName are mandatory, we have invalid references without them
if secretRef.SecretID == "" || secretRef.SecretName == "" {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference")
secret := store.GetSecret(tx, secretRef.SecretID)
// Check to see if the secret exists and secretRef.SecretName matches the actual secretName
if secret == nil || secret.Spec.Annotations.Name != secretRef.SecretName {
failedSecrets = append(failedSecrets, secretRef.SecretName)
}
}
if len(failedSecrets) > 0 {
secretStr := "secrets"
if len(failedSecrets) == 1 {
secretStr = "secret"
}
// Every secret referece requires a Target
if secretRef.GetTarget() == nil {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided")
}
return grpc.Errorf(codes.InvalidArgument, "%s not found: %v", secretStr, strings.Join(failedSecrets, ", "))
// If this is a file target, we will ensure filename uniqueness
if secretRef.GetFile() != nil {
fileName := secretRef.GetFile().Name
// Validate the file name
if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
}
// If this target is already in use, we have conflicting targets
if prevSecretName, ok := existingTargets[fileName]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
}
existingTargets[fileName] = secretRef.SecretName
}
}
return nil
@ -364,10 +399,6 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}
// TODO(aluzzardi): Consider using `Name` as a primary key to handle
// duplicate creations. See #65
service := &api.Service{
@ -376,6 +407,13 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
}
err := s.store.Update(func(tx store.Tx) error {
// Check to see if all the secrets being added exist as objects
// in our datastore
err := s.checkSecretExistence(tx, request.Spec)
if err != nil {
return err
}
return store.CreateService(tx, service)
})
if err != nil {
@ -435,10 +473,6 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
}
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}
err := s.store.Update(func(tx store.Tx) error {
service = store.GetService(tx, request.ServiceID)
if service == nil {
@ -459,6 +493,13 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
return errNetworkUpdateNotSupported
}
// Check to see if all the secrets being added exist as objects
// in our datastore
err := s.checkSecretExistence(tx, request.Spec)
if err != nil {
return err
}
// orchestrator is designed to be stateless, so it should not deal
// with service mode change (comparing current config with previous config).
// proper way to change service mode is to delete and re-add.

View File

@ -19,9 +19,9 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/watch"
"github.com/pkg/errors"
"golang.org/x/net/context"
)

View File

@ -0,0 +1,273 @@
package logbroker
import (
"errors"
"io"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context"
)
var (
errAlreadyRunning = errors.New("broker is already running")
errNotRunning = errors.New("broker is not running")
)
// LogBroker coordinates log subscriptions to services and tasks. Çlients can
// publish and subscribe to logs channels.
//
// Log subscriptions are pushed to the work nodes by creating log subscsription
// tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
type LogBroker struct {
mu sync.RWMutex
logQueue *watch.Queue
subscriptionQueue *watch.Queue
registeredSubscriptions map[string]*api.SubscriptionMessage
pctx context.Context
cancelAll context.CancelFunc
}
// New initializes and returns a new LogBroker
func New() *LogBroker {
return &LogBroker{}
}
// Run the log broker
func (lb *LogBroker) Run(ctx context.Context) error {
lb.mu.Lock()
if lb.cancelAll != nil {
lb.mu.Unlock()
return errAlreadyRunning
}
lb.pctx, lb.cancelAll = context.WithCancel(ctx)
lb.logQueue = watch.NewQueue()
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage)
lb.mu.Unlock()
select {
case <-lb.pctx.Done():
return lb.pctx.Err()
}
}
// Stop stops the log broker
func (lb *LogBroker) Stop() error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cancelAll == nil {
return errNotRunning
}
lb.cancelAll()
lb.cancelAll = nil
lb.logQueue.Close()
lb.subscriptionQueue.Close()
return nil
}
func validateSelector(selector *api.LogSelector) error {
if selector == nil {
return grpc.Errorf(codes.InvalidArgument, "log selector must be provided")
}
if len(selector.ServiceIDs) == 0 && len(selector.TaskIDs) == 0 && len(selector.NodeIDs) == 0 {
return grpc.Errorf(codes.InvalidArgument, "log selector must not be empty")
}
return nil
}
func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.registeredSubscriptions[subscription.ID] = subscription
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) {
subscription = subscription.Copy()
subscription.Close = true
lb.mu.Lock()
defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.ID)
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()
subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions))
for _, sub := range lb.registeredSubscriptions {
subs = append(subs, sub)
}
ch, cancel := lb.subscriptionQueue.Watch()
return subs, ch, cancel
}
func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
publish := event.(*api.PublishLogsMessage)
return publish.SubscriptionID == id
}))
}
func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
lb.mu.RLock()
defer lb.mu.RUnlock()
lb.logQueue.Publish(log)
}
// SubscribeLogs creates a log subscription and streams back logs
func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
ctx := stream.Context()
if err := validateSelector(request.Selector); err != nil {
return err
}
subscription := &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: request.Selector,
Options: request.Options,
}
log := log.G(ctx).WithFields(
logrus.Fields{
"method": "(*LogBroker).SubscribeLogs",
"subscription.id": subscription.ID,
},
)
log.Debug("subscribed")
publishCh, publishCancel := lb.subscribe(subscription.ID)
defer publishCancel()
lb.registerSubscription(subscription)
defer lb.unregisterSubscription(subscription)
for {
select {
case event := <-publishCh:
publish := event.(*api.PublishLogsMessage)
if err := stream.Send(&api.SubscribeLogsMessage{
Messages: publish.Messages,
}); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-lb.pctx.Done():
return nil
}
}
}
// ListenSubscriptions returns a stream of matching subscriptions for the current node
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
remote, err := ca.RemoteNode(stream.Context())
if err != nil {
return err
}
log := log.G(stream.Context()).WithFields(
logrus.Fields{
"method": "(*LogBroker).ListenSubscriptions",
"node": remote.NodeID,
},
)
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions()
defer subscriptionCancel()
log.Debug("node registered")
// Start by sending down all active subscriptions.
for _, subscription := range subscriptions {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
return nil
default:
}
if err := stream.Send(subscription); err != nil {
log.Error(err)
return err
}
}
// Send down new subscriptions.
// TODO(aluzzardi): We should filter by relevant tasks for this node rather
for {
select {
case v := <-subscriptionCh:
subscription := v.(*api.SubscriptionMessage)
if err := stream.Send(subscription); err != nil {
log.Error(err)
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
return nil
}
}
}
// PublishLogs publishes log messages for a given subscription
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
remote, err := ca.RemoteNode(stream.Context())
if err != nil {
return err
}
for {
log, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&api.PublishLogsResponse{})
}
if err != nil {
return err
}
if log.SubscriptionID == "" {
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
}
// Make sure logs are emitted using the right Node ID to avoid impersonation.
for _, msg := range log.Messages {
if msg.Context.NodeID != remote.NodeID {
return grpc.Errorf(codes.PermissionDenied, "invalid NodeID: expected=%s;received=%s", remote.NodeID, msg.Context.NodeID)
}
}
lb.publish(log)
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
"github.com/docker/swarmkit/manager/logbroker"
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/replicated"
@ -96,6 +97,7 @@ type Manager struct {
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator
taskReaper *taskreaper.TaskReaper
@ -234,6 +236,7 @@ func New(config *Config) (*Manager, error) {
listeners: listeners,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(raftNode, dispatcherConfig),
logbroker: logbroker.New(),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
@ -292,6 +295,8 @@ func (m *Manager) Run(parent context.Context) error {
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
@ -304,6 +309,7 @@ func (m *Manager) Run(parent context.Context) error {
proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
@ -313,6 +319,7 @@ func (m *Manager) Run(parent context.Context) error {
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, forwardAsOwnRequest)
localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, forwardAsOwnRequest)
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
@ -322,10 +329,13 @@ func (m *Manager) Run(parent context.Context) error {
api.RegisterHealthServer(m.server, authenticatedHealthAPI)
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
api.RegisterControlServer(m.server, authenticatedControlAPI)
api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
@ -419,6 +429,7 @@ func (m *Manager) Stop(ctx context.Context) {
}()
m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop()
if m.allocator != nil {
@ -664,6 +675,12 @@ func (m *Manager) becomeLeader(ctx context.Context) {
}
}(m.dispatcher)
go func(lb *logbroker.LogBroker) {
if err := lb.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker exited with an error")
}
}(m.logbroker)
go func(server *ca.Server) {
if err := server.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("CA signer exited with an error")
@ -712,6 +729,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
// becomeFollower shuts down the subsystems that are only run by the leader.
func (m *Manager) becomeFollower() {
m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop()
if m.allocator != nil {

View File

@ -478,6 +478,9 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
task := orchestrator.NewTask(g.cluster, service, 0, nodeID)
err := batch.Update(func(tx store.Tx) error {
if store.GetService(tx, service.ID) == nil {
return nil
}
return store.CreateTask(tx, task)
})
if err != nil {

View File

@ -53,6 +53,7 @@ func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Even
}
orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
r.restarts.ClearServiceHistory(v.Service.ID)
delete(r.reconcileServices, v.Service.ID)
case state.EventCreateService:
if !orchestrator.IsReplicatedService(v.Service) {
return

View File

@ -16,8 +16,8 @@ import (
"github.com/docker/swarmkit/manager/orchestrator/restart"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/watch"
)
const defaultMonitor = 30 * time.Second
@ -383,6 +383,10 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update
}
err = batch.Update(func(tx store.Tx) error {
if store.GetService(tx, updated.ServiceID) == nil {
return errors.New("service was deleted")
}
if err := store.CreateTask(tx, updated); err != nil {
return err
}

View File

@ -9,7 +9,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

View File

@ -29,7 +29,7 @@ import (
"github.com/docker/swarmkit/manager/raftselector"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"github.com/pivotal-golang/clock"
"github.com/pkg/errors"

View File

@ -121,3 +121,25 @@ func (b byMembership) isBy() {
func ByMembership(membership api.NodeSpec_Membership) By {
return byMembership(membership)
}
type byReferencedNetworkID string
func (b byReferencedNetworkID) isBy() {
}
// ByReferencedNetworkID creates an object to pass to Find to search for a
// service or task that references a network with the given ID.
func ByReferencedNetworkID(networkID string) By {
return byReferencedNetworkID(networkID)
}
type byReferencedSecretID string
func (b byReferencedSecretID) isBy() {
}
// ByReferencedSecretID creates an object to pass to Find to search for a
// service or task that references a secret with the given ID.
func ByReferencedSecretID(secretID string) By {
return byReferencedSecretID(secretID)
}

View File

@ -13,8 +13,8 @@ import (
"github.com/docker/swarmkit/api"
pb "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/watch"
memdb "github.com/hashicorp/go-memdb"
"golang.org/x/net/context"
)
@ -29,6 +29,8 @@ const (
indexDesiredState = "desiredstate"
indexRole = "role"
indexMembership = "membership"
indexNetwork = "network"
indexSecret = "secret"
prefix = "_prefix"
@ -624,6 +626,18 @@ func (tx readTx) findIterators(table string, by By, checkType func(By) error) ([
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedNetworkID:
it, err := tx.memDBTx.Get(table, indexNetwork, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedSecretID:
it, err := tx.memDBTx.Get(table, indexSecret, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
default:
return nil, ErrInvalidFindBy
}

View File

@ -26,6 +26,16 @@ func init() {
Unique: true,
Indexer: serviceIndexerByName{},
},
indexNetwork: {
Name: indexNetwork,
AllowMissing: true,
Indexer: serviceIndexerByNetwork{},
},
indexSecret: {
Name: indexSecret,
AllowMissing: true,
Indexer: serviceIndexerBySecret{},
},
},
},
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
@ -167,7 +177,7 @@ func GetService(tx ReadTx, id string) *api.Service {
func FindServices(tx ReadTx, by By) ([]*api.Service, error) {
checkType := func(by By) error {
switch by.(type) {
case byName, byNamePrefix, byIDPrefix:
case byName, byNamePrefix, byIDPrefix, byReferencedNetworkID, byReferencedSecretID:
return nil
default:
return ErrInvalidFindBy
@ -223,3 +233,58 @@ func (si serviceIndexerByName) FromObject(obj interface{}) (bool, []byte, error)
func (si serviceIndexerByName) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return prefixFromArgs(args...)
}
type serviceIndexerByNetwork struct{}
func (si serviceIndexerByNetwork) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (si serviceIndexerByNetwork) FromObject(obj interface{}) (bool, [][]byte, error) {
s, ok := obj.(serviceEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
var networkIDs [][]byte
specNetworks := s.Spec.Task.Networks
if len(specNetworks) == 0 {
specNetworks = s.Spec.Networks
}
for _, na := range specNetworks {
// Add the null character as a terminator
networkIDs = append(networkIDs, []byte(na.Target+"\x00"))
}
return len(networkIDs) != 0, networkIDs, nil
}
type serviceIndexerBySecret struct{}
func (si serviceIndexerBySecret) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (si serviceIndexerBySecret) FromObject(obj interface{}) (bool, [][]byte, error) {
s, ok := obj.(serviceEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
container := s.Spec.Task.GetContainer()
if container == nil {
return false, nil, nil
}
var secretIDs [][]byte
for _, secretRef := range container.Secrets {
// Add the null character as a terminator
secretIDs = append(secretIDs, []byte(secretRef.SecretID+"\x00"))
}
return len(secretIDs) != 0, secretIDs, nil
}

View File

@ -47,6 +47,16 @@ func init() {
Name: indexDesiredState,
Indexer: taskIndexerByDesiredState{},
},
indexNetwork: {
Name: indexNetwork,
AllowMissing: true,
Indexer: taskIndexerByNetwork{},
},
indexSecret: {
Name: indexSecret,
AllowMissing: true,
Indexer: taskIndexerBySecret{},
},
},
},
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
@ -176,7 +186,7 @@ func GetTask(tx ReadTx, id string) *api.Task {
func FindTasks(tx ReadTx, by By) ([]*api.Task, error) {
checkType := func(by By) error {
switch by.(type) {
case byName, byNamePrefix, byIDPrefix, byDesiredState, byNode, byService, bySlot:
case byName, byNamePrefix, byIDPrefix, byDesiredState, byNode, byService, bySlot, byReferencedNetworkID, byReferencedSecretID:
return nil
default:
return ErrInvalidFindBy
@ -288,16 +298,65 @@ func (ti taskIndexerBySlot) FromObject(obj interface{}) (bool, []byte, error) {
type taskIndexerByDesiredState struct{}
func (ni taskIndexerByDesiredState) FromArgs(args ...interface{}) ([]byte, error) {
func (ti taskIndexerByDesiredState) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ni taskIndexerByDesiredState) FromObject(obj interface{}) (bool, []byte, error) {
n, ok := obj.(taskEntry)
func (ti taskIndexerByDesiredState) FromObject(obj interface{}) (bool, []byte, error) {
t, ok := obj.(taskEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
// Add the null character as a terminator
return true, []byte(strconv.FormatInt(int64(n.DesiredState), 10) + "\x00"), nil
return true, []byte(strconv.FormatInt(int64(t.DesiredState), 10) + "\x00"), nil
}
type taskIndexerByNetwork struct{}
func (ti taskIndexerByNetwork) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByNetwork) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(taskEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
var networkIDs [][]byte
for _, na := range t.Spec.Networks {
// Add the null character as a terminator
networkIDs = append(networkIDs, []byte(na.Target+"\x00"))
}
return len(networkIDs) != 0, networkIDs, nil
}
type taskIndexerBySecret struct{}
func (ti taskIndexerBySecret) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerBySecret) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(taskEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
container := t.Spec.GetContainer()
if container == nil {
return false, nil, nil
}
var secretIDs [][]byte
for _, secretRef := range container.Secrets {
// Add the null character as a terminator
secretIDs = append(secretIDs, []byte(secretRef.SecretID+"\x00"))
}
return len(secretIDs) != 0, secretIDs, nil
}

View File

@ -3,7 +3,7 @@ package state
import (
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/watch"
)
// Event is the type used for events passed over watcher channels, and also