1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #28077 from aluzzardi/swarmkit-revendoring

Revendor swarmkit to 2eaae1ab6800f8521236e01bafb2667d2ec5371f
This commit is contained in:
Alexander Morozov 2016-11-04 14:59:05 -07:00 committed by GitHub
commit bb1153a613
31 changed files with 4408 additions and 187 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d
# cluster # cluster
github.com/docker/swarmkit 4dfc88ccce14ced6f0a6ea82d46dca004c6de0e2 github.com/docker/swarmkit 2eaae1ab6800f8521236e01bafb2667d2ec5371f
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3 github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
@ -108,7 +108,7 @@ github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b65068
golang.org/x/crypto 3fbbcd23f1cb824e69491a5930cfeff09b12f4d2 golang.org/x/crypto 3fbbcd23f1cb824e69491a5930cfeff09b12f4d2
golang.org/x/time a4bde12657593d5e90d0533a3e4fd95e635124cb golang.org/x/time a4bde12657593d5e90d0533a3e4fd95e635124cb
github.com/mreiferson/go-httpclient 63fe23f7434723dc904c901043af07931f293c47 github.com/mreiferson/go-httpclient 63fe23f7434723dc904c901043af07931f293c47
github.com/hashicorp/go-memdb 98f52f52d7a476958fa9da671354d270c50661a7 github.com/hashicorp/go-memdb 608dda3b1410a73eaf3ac8b517c9ae7ebab6aa87 https://github.com/floridoo/go-memdb
github.com/hashicorp/go-immutable-radix 8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990 github.com/hashicorp/go-immutable-radix 8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990
github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4
github.com/coreos/pkg fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 github.com/coreos/pkg fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8

View file

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -48,11 +49,8 @@ func New(config *Config) (*Agent, error) {
return nil, err return nil, err
} }
worker := newWorker(config.DB, config.Executor)
a := &Agent{ a := &Agent{
config: config, config: config,
worker: worker,
sessionq: make(chan sessionOperation), sessionq: make(chan sessionOperation),
started: make(chan struct{}), started: make(chan struct{}),
stopped: make(chan struct{}), stopped: make(chan struct{}),
@ -60,6 +58,7 @@ func New(config *Config) (*Agent, error) {
ready: make(chan struct{}), ready: make(chan struct{}),
} }
a.worker = newWorker(config.DB, config.Executor, a)
return a, nil return a, nil
} }
@ -152,6 +151,7 @@ func (a *Agent) run(ctx context.Context) {
registered = session.registered registered = session.registered
ready = a.ready // first session ready ready = a.ready // first session ready
sessionq chan sessionOperation sessionq chan sessionOperation
subscriptions = map[string]context.CancelFunc{}
) )
if err := a.worker.Init(ctx); err != nil { if err := a.worker.Init(ctx); err != nil {
@ -159,6 +159,7 @@ func (a *Agent) run(ctx context.Context) {
a.err = err a.err = err
return // fatal? return // fatal?
} }
defer a.worker.Close()
// setup a reliable reporter to call back to us. // setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a) reporter := newStatusReporter(ctx, a)
@ -186,6 +187,23 @@ func (a *Agent) run(ctx context.Context) {
if err := a.handleSessionMessage(ctx, msg); err != nil { if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed") log.G(ctx).WithError(err).Error("session message handler failed")
} }
case sub := <-session.subscriptions:
if sub.Close {
if cancel, ok := subscriptions[sub.ID]; ok {
cancel()
}
delete(subscriptions, sub.ID)
continue
}
if _, ok := subscriptions[sub.ID]; ok {
// Duplicate subscription
continue
}
subCtx, subCancel := context.WithCancel(ctx)
subscriptions[sub.ID] = subCancel
go a.worker.Subscribe(subCtx, sub)
case <-registered: case <-registered:
log.G(ctx).Debugln("agent: registered") log.G(ctx).Debugln("agent: registered")
if ready != nil { if ready != nil {
@ -387,6 +405,40 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
} }
} }
// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
var (
err error
client api.LogBroker_PublishLogsClient
)
err = a.withSession(ctx, func(session *session) error {
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
return err
})
if err != nil {
return nil, err
}
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
select {
case <-ctx.Done():
client.CloseSend()
return ctx.Err()
default:
}
return client.Send(&api.PublishLogsMessage{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
})
}), nil
}
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) { func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx) desc, err := a.config.Executor.Describe(ctx)

View file

@ -45,6 +45,33 @@ type Controller interface {
Close() error Close() error
} }
// ControllerLogs defines a component that makes logs accessible.
//
// Can usually be accessed on a controller instance via type assertion.
type ControllerLogs interface {
// Logs will write publisher until the context is cancelled or an error
// occurs.
Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
}
// LogPublisher defines the protocol for receiving a log message.
type LogPublisher interface {
Publish(ctx context.Context, message api.LogMessage) error
}
// LogPublisherFunc implements publisher with just a function.
type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
return fn(ctx, message)
}
// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error)
}
// ContainerStatuser reports status of a container. // ContainerStatuser reports status of a container.
// //
// This can be implemented by controllers or error types. // This can be implemented by controllers or error types.

View file

@ -110,6 +110,100 @@ func (_mr *_MockControllerRecorder) Close() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Close") return _mr.mock.ctrl.RecordCall(_mr.mock, "Close")
} }
// Mock of ControllerLogs interface
type MockControllerLogs struct {
ctrl *gomock.Controller
recorder *_MockControllerLogsRecorder
}
// Recorder for MockControllerLogs (not exported)
type _MockControllerLogsRecorder struct {
mock *MockControllerLogs
}
func NewMockControllerLogs(ctrl *gomock.Controller) *MockControllerLogs {
mock := &MockControllerLogs{ctrl: ctrl}
mock.recorder = &_MockControllerLogsRecorder{mock}
return mock
}
func (_m *MockControllerLogs) EXPECT() *_MockControllerLogsRecorder {
return _m.recorder
}
func (_m *MockControllerLogs) Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error {
ret := _m.ctrl.Call(_m, "Logs", ctx, publisher, options)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockControllerLogsRecorder) Logs(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Logs", arg0, arg1, arg2)
}
// Mock of LogPublisher interface
type MockLogPublisher struct {
ctrl *gomock.Controller
recorder *_MockLogPublisherRecorder
}
// Recorder for MockLogPublisher (not exported)
type _MockLogPublisherRecorder struct {
mock *MockLogPublisher
}
func NewMockLogPublisher(ctrl *gomock.Controller) *MockLogPublisher {
mock := &MockLogPublisher{ctrl: ctrl}
mock.recorder = &_MockLogPublisherRecorder{mock}
return mock
}
func (_m *MockLogPublisher) EXPECT() *_MockLogPublisherRecorder {
return _m.recorder
}
func (_m *MockLogPublisher) Publish(ctx context.Context, message api.LogMessage) error {
ret := _m.ctrl.Call(_m, "Publish", ctx, message)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockLogPublisherRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1)
}
// Mock of LogPublisherProvider interface
type MockLogPublisherProvider struct {
ctrl *gomock.Controller
recorder *_MockLogPublisherProviderRecorder
}
// Recorder for MockLogPublisherProvider (not exported)
type _MockLogPublisherProviderRecorder struct {
mock *MockLogPublisherProvider
}
func NewMockLogPublisherProvider(ctrl *gomock.Controller) *MockLogPublisherProvider {
mock := &MockLogPublisherProvider{ctrl: ctrl}
mock.recorder = &_MockLogPublisherProviderRecorder{mock}
return mock
}
func (_m *MockLogPublisherProvider) EXPECT() *_MockLogPublisherProviderRecorder {
return _m.recorder
}
func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error) {
ret := _m.ctrl.Call(_m, "Publisher", ctx, subscriptionID)
ret0, _ := ret[0].(LogPublisher)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockLogPublisherProviderRecorder) Publisher(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publisher", arg0, arg1)
}
// Mock of ContainerStatuser interface // Mock of ContainerStatuser interface
type MockContainerStatuser struct { type MockContainerStatuser struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View file

@ -39,6 +39,7 @@ type session struct {
errs chan error errs chan error
messages chan *api.SessionMessage messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage
registered chan struct{} // closed registration registered chan struct{} // closed registration
closed chan struct{} closed chan struct{}
@ -52,9 +53,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
errs: make(chan error, 1), errs: make(chan error, 1),
messages: make(chan *api.SessionMessage), messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage), assignments: make(chan *api.AssignmentsMessage),
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}), registered: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.
peer, err := agent.config.Managers.Select() peer, err := agent.config.Managers.Select()
if err != nil { if err != nil {
s.errs <- err s.errs <- err
@ -98,6 +104,7 @@ func (s *session) run(ctx context.Context, delay time.Duration, description *api
go runctx(ctx, s.closed, s.errs, s.heartbeat) go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch) go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen) go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
close(s.registered) close(s.registered)
} }
@ -213,6 +220,33 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
} }
} }
func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")
client := api.NewLogBrokerClient(s.conn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
}
defer subscriptions.CloseSend()
for {
resp, err := subscriptions.Recv()
if err != nil {
return err
}
select {
case s.subscriptions <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *session) watch(ctx context.Context) error { func (s *session) watch(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"}) log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("") log.Debugf("")

View file

@ -64,6 +64,18 @@ func (tm *taskManager) Close() error {
} }
} }
func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
ctx = log.WithModule(ctx, "taskmanager")
logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
if !ok {
return // no logs available
}
if err := logCtlr.Logs(ctx, publisher, options); err != nil {
log.G(ctx).WithError(err).Errorf("logs call failed")
}
}
func (tm *taskManager) run(ctx context.Context) { func (tm *taskManager) run(ctx context.Context) {
ctx, cancelAll := context.WithCancel(ctx) ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll() // cancel all child operations on exit. defer cancelAll() // cancel all child operations on exit.

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -17,6 +18,11 @@ type Worker interface {
// Init prepares the worker for task assignment. // Init prepares the worker for task assignment.
Init(ctx context.Context) error Init(ctx context.Context) error
// Close performs worker cleanup when no longer needed.
//
// It is not safe to call any worker function after that.
Close()
// Assign assigns a complete set of tasks and secrets to a worker. Any task or secrets not included in // Assign assigns a complete set of tasks and secrets to a worker. Any task or secrets not included in
// this set will be removed. // this set will be removed.
Assign(ctx context.Context, assignments []*api.AssignmentChange) error Assign(ctx context.Context, assignments []*api.AssignmentChange) error
@ -31,6 +37,9 @@ type Worker interface {
// //
// The listener will be removed if the context is cancelled. // The listener will be removed if the context is cancelled.
Listen(ctx context.Context, reporter StatusReporter) Listen(ctx context.Context, reporter StatusReporter)
// Subscribe to log messages matching the subscription.
Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
} }
// statusReporterKey protects removal map from panic. // statusReporterKey protects removal map from panic.
@ -41,16 +50,21 @@ type statusReporterKey struct {
type worker struct { type worker struct {
db *bolt.DB db *bolt.DB
executor exec.Executor executor exec.Executor
publisher exec.LogPublisher
listeners map[*statusReporterKey]struct{} listeners map[*statusReporterKey]struct{}
taskevents *watch.Queue
publisherProvider exec.LogPublisherProvider
taskManagers map[string]*taskManager taskManagers map[string]*taskManager
mu sync.RWMutex mu sync.RWMutex
} }
func newWorker(db *bolt.DB, executor exec.Executor) *worker { func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
return &worker{ return &worker{
db: db, db: db,
executor: executor, executor: executor,
publisherProvider: publisherProvider,
taskevents: watch.NewQueue(),
listeners: make(map[*statusReporterKey]struct{}), listeners: make(map[*statusReporterKey]struct{}),
taskManagers: make(map[string]*taskManager), taskManagers: make(map[string]*taskManager),
} }
@ -90,6 +104,11 @@ func (w *worker) Init(ctx context.Context) error {
}) })
} }
// Close performs worker cleanup when no longer needed.
func (w *worker) Close() {
w.taskevents.Close()
}
// Assign assigns a full set of tasks and secrets to the worker. // Assign assigns a full set of tasks and secrets to the worker.
// Any tasks not previously known will be started. Any tasks that are in the task set // Any tasks not previously known will be started. Any tasks that are in the task set
// and already running will be updated, if possible. Any tasks currently running on // and already running will be updated, if possible. Any tasks currently running on
@ -319,6 +338,7 @@ func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
} }
func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error { func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error {
w.taskevents.Publish(task.Copy())
_, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation. _, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation.
if err != nil { if err != nil {
@ -381,3 +401,63 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin
return nil return nil
} }
// Subscribe to log messages matching the subscription.
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID)
if err != nil {
return err
}
// Send a close once we're done
defer publisher.Publish(ctx, api.LogMessage{})
match := func(t *api.Task) bool {
// TODO(aluzzardi): Consider using maps to limit the iterations.
for _, tid := range subscription.Selector.TaskIDs {
if t.ID == tid {
return true
}
}
for _, sid := range subscription.Selector.ServiceIDs {
if t.ServiceID == sid {
return true
}
}
for _, nid := range subscription.Selector.NodeIDs {
if t.NodeID == nid {
return true
}
}
return false
}
ch, cancel := w.taskevents.Watch()
defer cancel()
w.mu.Lock()
for _, tm := range w.taskManagers {
if match(tm.task) {
go tm.Logs(ctx, *subscription.Options, publisher)
}
}
w.mu.Unlock()
for {
select {
case v := <-ch:
w.mu.Lock()
task := v.(*api.Task)
if match(task) {
go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
}
w.mu.Unlock()
case <-ctx.Done():
return ctx.Err()
}
}
}

View file

@ -1,3 +1,3 @@
package api package api
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto //go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto logbroker.proto

3130
vendor/github.com/docker/swarmkit/api/logbroker.pb.go generated vendored Normal file

File diff suppressed because it is too large Load diff

170
vendor/github.com/docker/swarmkit/api/logbroker.proto generated vendored Normal file
View file

@ -0,0 +1,170 @@
syntax = "proto3";
package docker.swarmkit.v1;
import "gogoproto/gogo.proto";
import "timestamp/timestamp.proto"; // TODO(stevvooe): use our own until we fix gogoproto/deepcopy
import "plugin/plugin.proto";
// LogStream defines the stream from which the log message came.
enum LogStream {
option (gogoproto.goproto_enum_prefix) = false;
option (gogoproto.enum_customname) = "LogStream";
LOG_STREAM_UNKNOWN = 0 [(gogoproto.enumvalue_customname) = "LogStreamUnknown"];
LOG_STREAM_STDOUT = 1 [(gogoproto.enumvalue_customname) = "LogStreamStdout"];
LOG_STREAM_STDERR = 2 [(gogoproto.enumvalue_customname) = "LogStreamStderr"];
}
message LogSubscriptionOptions {
// Streams defines which log streams should be sent from the task source.
// Empty means send all the messages.
repeated LogStream streams = 1;
// Follow instructs the publisher to continue sending log messages as they
// are produced, after satisfying the initial query.
bool follow = 2;
// Tail defines how many messages relative to the log stream to send when
// starting the stream.
//
// Positive values will skip that number of messages from the start of the
// stream before publishing.
//
// Negative values will specify messages relative to the end of the stream,
// offset by one. We can say that the last (-n-1) lines are returned when n
// < 0. As reference, -1 would mean send no log lines (typically used with
// follow), -2 would return the last log line, -11 would return the last 10
// and so on.
//
// The default value of zero will return all logs.
//
// Note that this is very different from the Docker API.
int64 tail = 3;
// Since indicates that only log messages produced after this timestamp
// should be sent.
Timestamp since = 4;
}
// LogSelector will match logs from ANY of the defined parameters.
//
// For the best effect, the client should use the least specific parameter
// possible. For example, if they want to listen to all the tasks of a service,
// they should use the service id, rather than specifying the individual tasks.
message LogSelector {
repeated string service_ids = 1 [(gogoproto.customname) = "ServiceIDs"];
repeated string node_ids = 2 [(gogoproto.customname) = "NodeIDs"];
repeated string task_ids = 3 [(gogoproto.customname) = "TaskIDs"];
}
// LogContext marks the context from which a log message was generated.
message LogContext {
string service_id = 1 [(gogoproto.customname) = "ServiceID"];
string node_id = 2 [(gogoproto.customname) = "NodeID"];
string task_id = 3 [(gogoproto.customname) = "TaskID"];
}
// LogMessage
message LogMessage {
// Context identifies the source of the log message.
LogContext context = 1 [(gogoproto.nullable) = false];
// Timestamp is the time at which the message was generated.
Timestamp timestamp = 2;
// Stream identifies the stream of the log message, stdout or stderr.
LogStream stream = 3;
// Data is the raw log message, as generated by the application.
bytes data = 4;
}
// Logs defines the methods for retrieving task logs messages from a cluster.
service Logs {
// SubscribeLogs starts a subscription with the specified selector and options.
//
// The subscription will be distributed to relevant nodes and messages will
// be collected and sent via the returned stream.
//
// The subscription will end with an EOF.
rpc SubscribeLogs(SubscribeLogsRequest) returns (stream SubscribeLogsMessage) {
option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-manager" };
}
}
message SubscribeLogsRequest {
// LogSelector describes the logs to which the subscriber is
LogSelector selector = 1;
LogSubscriptionOptions options = 2;
}
message SubscribeLogsMessage {
repeated LogMessage messages = 1 [(gogoproto.nullable) = false];
}
// LogBroker defines the API used by the worker to send task logs back to a
// manager. A client listens for subscriptions then optimistically retrieves
// logs satisfying said subscriptions, calling PublishLogs for results that are
// relevant.
//
// The structure of ListenSubscriptions is similar to the Dispatcher API but
// decoupled to allow log distribution to work outside of the regular task
// flow.
service LogBroker {
// ListenSubscriptions starts a subscription stream for the node. For each
// message received, the node should attempt to satisfy the subscription.
//
// Log messages that match the provided subscription should be sent via
// PublishLogs.
rpc ListenSubscriptions(ListenSubscriptionsRequest) returns (stream SubscriptionMessage) {
option (docker.protobuf.plugin.tls_authorization) = {
roles: "swarm-worker"
roles: "swarm-manager"
};
}
// PublishLogs receives sets of log messages destined for a single
// subscription identifier.
rpc PublishLogs(stream PublishLogsMessage) returns (PublishLogsResponse) {
option (docker.protobuf.plugin.tls_authorization) = {
roles: "swarm-worker"
roles: "swarm-manager"
};
}
}
// ListenSubscriptionsRequest is a placeholder to begin listening for
// subscriptions.
message ListenSubscriptionsRequest { }
// SubscriptionMessage instructs the listener to start publishing messages for
// the stream or end a subscription.
//
// If Options.Follow == false, the worker should end the subscription on its own.
message SubscriptionMessage {
// ID identifies the subscription.
string id = 1 [(gogoproto.customname) = "ID"];
// Selector defines which sources should be sent for the subscription.
LogSelector selector = 2;
// Options specify how the subscription should be satisfied.
LogSubscriptionOptions options = 3;
// Close will be true if the node should shutdown the subscription with the
// provided identifier.
bool close = 4;
}
message PublishLogsMessage {
// SubscriptionID identifies which subscription the set of messages should
// be sent to. We can think of this as a "mail box" for the subscription.
string subscription_id = 1 [(gogoproto.customname) = "SubscriptionID"];
// Messages is the log message for publishing.
repeated LogMessage messages = 2 [(gogoproto.nullable) = false];
}
message PublishLogsResponse { }

View file

@ -16,6 +16,7 @@
raft.proto raft.proto
health.proto health.proto
resource.proto resource.proto
logbroker.proto
It has these top-level messages: It has these top-level messages:
Version Version
@ -168,6 +169,16 @@
AttachNetworkResponse AttachNetworkResponse
DetachNetworkRequest DetachNetworkRequest
DetachNetworkResponse DetachNetworkResponse
LogSubscriptionOptions
LogSelector
LogContext
LogMessage
SubscribeLogsRequest
SubscribeLogsMessage
ListenSubscriptionsRequest
SubscriptionMessage
PublishLogsMessage
PublishLogsResponse
*/ */
package api package api

View file

@ -145,36 +145,29 @@ func (s *Server) GetNetwork(ctx context.Context, request *api.GetNetworkRequest)
// - Returns `NotFound` if the Network is not found. // - Returns `NotFound` if the Network is not found.
// - Returns an error if the deletion fails. // - Returns an error if the deletion fails.
func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRequest) (*api.RemoveNetworkResponse, error) { func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRequest) (*api.RemoveNetworkResponse, error) {
var (
services []*api.Service
err error
)
if request.NetworkID == "" { if request.NetworkID == "" {
return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error()) return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
} }
s.store.View(func(tx store.ReadTx) { err := s.store.Update(func(tx store.Tx) error {
services, err = store.FindServices(tx, store.All) services, err := store.FindServices(tx, store.ByReferencedNetworkID(request.NetworkID))
})
if err != nil { if err != nil {
return nil, grpc.Errorf(codes.Internal, "could not find services using network %s", request.NetworkID) return grpc.Errorf(codes.Internal, "could not find services using network %s: %v", request.NetworkID, err)
} }
for _, s := range services { if len(services) != 0 {
specNetworks := s.Spec.Task.Networks return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", request.NetworkID, services[0].ID)
if len(specNetworks) == 0 {
specNetworks = s.Spec.Networks
} }
for _, na := range specNetworks { tasks, err := store.FindTasks(tx, store.ByReferencedNetworkID(request.NetworkID))
if na.Target == request.NetworkID { if err != nil {
return nil, grpc.Errorf(codes.FailedPrecondition, "network %s is in use", request.NetworkID) return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err)
} }
}
if len(tasks) != 0 {
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID)
} }
err = s.store.Update(func(tx store.Tx) error {
nw := store.GetNetwork(tx, request.NetworkID) nw := store.GetNetwork(tx, request.NetworkID)
if _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok { if _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok {
networkDescription := nw.ID networkDescription := nw.ID

View file

@ -2,6 +2,7 @@ package controlapi
import ( import (
"regexp" "regexp"
"strings"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
@ -191,6 +192,7 @@ func (s *Server) CreateSecret(ctx context.Context, request *api.CreateSecretRequ
// RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`. // RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`.
// - Returns `InvalidArgument` if `RemoveSecretRequest.ID` is empty. // - Returns `InvalidArgument` if `RemoveSecretRequest.ID` is empty.
// - Returns `NotFound` if the a secret named `RemoveSecretRequest.ID` is not found. // - Returns `NotFound` if the a secret named `RemoveSecretRequest.ID` is not found.
// - Returns `SecretInUse` if the secret is currently in use
// - Returns an error if the deletion fails. // - Returns an error if the deletion fails.
func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequest) (*api.RemoveSecretResponse, error) { func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequest) (*api.RemoveSecretResponse, error) {
if request.SecretID == "" { if request.SecretID == "" {
@ -198,6 +200,34 @@ func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequ
} }
err := s.store.Update(func(tx store.Tx) error { err := s.store.Update(func(tx store.Tx) error {
// Check if the secret exists
secret := store.GetSecret(tx, request.SecretID)
if secret == nil {
return grpc.Errorf(codes.NotFound, "could not find secret %s", request.SecretID)
}
// Check if any services currently reference this secret, return error if so
services, err := store.FindServices(tx, store.ByReferencedSecretID(request.SecretID))
if err != nil {
return grpc.Errorf(codes.Internal, "could not find services using secret %s: %v", request.SecretID, err)
}
if len(services) != 0 {
serviceNames := make([]string, 0, len(services))
for _, service := range services {
serviceNames = append(serviceNames, service.Spec.Annotations.Name)
}
secretName := secret.Spec.Annotations.Name
serviceNameStr := strings.Join(serviceNames, ", ")
serviceStr := "services"
if len(serviceNames) == 1 {
serviceStr = "service"
}
return grpc.Errorf(codes.InvalidArgument, "secret '%s' is in use by the following %s: %v", secretName, serviceStr, serviceNameStr)
}
return store.DeleteSecret(tx, request.SecretID) return store.DeleteSecret(tx, request.SecretID)
}) })
switch err { switch err {

View file

@ -6,6 +6,7 @@ import (
"reflect" "reflect"
"regexp" "regexp"
"strconv" "strconv"
"strings"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
@ -208,6 +209,47 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
return nil return nil
} }
// validateSecretRefsSpec finds if the secrets passed in spec are valid and have no
// conflicting targets.
func validateSecretRefsSpec(spec *api.ServiceSpec) error {
container := spec.Task.GetContainer()
if container == nil {
return nil
}
// Keep a map to track all the targets that will be exposed
// The string returned is only used for logging. It could as well be struct{}{}
existingTargets := make(map[string]string)
for _, secretRef := range container.Secrets {
// SecretID and SecretName are mandatory, we have invalid references without them
if secretRef.SecretID == "" || secretRef.SecretName == "" {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference")
}
// Every secret referece requires a Target
if secretRef.GetTarget() == nil {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided")
}
// If this is a file target, we will ensure filename uniqueness
if secretRef.GetFile() != nil {
fileName := secretRef.GetFile().Name
// Validate the file name
if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
}
// If this target is already in use, we have conflicting targets
if prevSecretName, ok := existingTargets[fileName]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
}
existingTargets[fileName] = secretRef.SecretName
}
}
return nil
}
func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error { func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error {
for _, na := range networks { for _, na := range networks {
var network *api.Network var network *api.Network
@ -242,6 +284,11 @@ func validateServiceSpec(spec *api.ServiceSpec) error {
if err := validateEndpointSpec(spec.Endpoint); err != nil { if err := validateEndpointSpec(spec.Endpoint); err != nil {
return err return err
} }
// Check to see if the Secret Reference portion of the spec is valid
if err := validateSecretRefsSpec(spec); err != nil {
return err
}
return nil return nil
} }
@ -305,42 +352,30 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return nil return nil
} }
// checkSecretValidity finds if the secrets passed in spec have any conflicting targets. // checkSecretExistence finds if the secret exists
func (s *Server) checkSecretValidity(spec *api.ServiceSpec) error { func (s *Server) checkSecretExistence(tx store.Tx, spec *api.ServiceSpec) error {
container := spec.Task.GetContainer() container := spec.Task.GetContainer()
if container == nil { if container == nil {
return nil return nil
} }
// Keep a map to track all the targets that will be exposed var failedSecrets []string
// The string returned is only used for logging. It could as well be struct{}{}
existingTargets := make(map[string]string)
for _, secretRef := range container.Secrets { for _, secretRef := range container.Secrets {
// SecretID and SecretName are mandatory, we have invalid references without them secret := store.GetSecret(tx, secretRef.SecretID)
if secretRef.SecretID == "" || secretRef.SecretName == "" { // Check to see if the secret exists and secretRef.SecretName matches the actual secretName
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference") if secret == nil || secret.Spec.Annotations.Name != secretRef.SecretName {
failedSecrets = append(failedSecrets, secretRef.SecretName)
}
} }
// Every secret referece requires a Target if len(failedSecrets) > 0 {
if secretRef.GetTarget() == nil { secretStr := "secrets"
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided") if len(failedSecrets) == 1 {
secretStr = "secret"
} }
// If this is a file target, we will ensure filename uniqueness return grpc.Errorf(codes.InvalidArgument, "%s not found: %v", secretStr, strings.Join(failedSecrets, ", "))
if secretRef.GetFile() != nil {
fileName := secretRef.GetFile().Name
// Validate the file name
if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
}
// If this target is already in use, we have conflicting targets
if prevSecretName, ok := existingTargets[fileName]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
}
existingTargets[fileName] = secretRef.SecretName
}
} }
return nil return nil
@ -364,10 +399,6 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err return nil, err
} }
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}
// TODO(aluzzardi): Consider using `Name` as a primary key to handle // TODO(aluzzardi): Consider using `Name` as a primary key to handle
// duplicate creations. See #65 // duplicate creations. See #65
service := &api.Service{ service := &api.Service{
@ -376,6 +407,13 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
} }
err := s.store.Update(func(tx store.Tx) error { err := s.store.Update(func(tx store.Tx) error {
// Check to see if all the secrets being added exist as objects
// in our datastore
err := s.checkSecretExistence(tx, request.Spec)
if err != nil {
return err
}
return store.CreateService(tx, service) return store.CreateService(tx, service)
}) })
if err != nil { if err != nil {
@ -435,10 +473,6 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
} }
} }
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}
err := s.store.Update(func(tx store.Tx) error { err := s.store.Update(func(tx store.Tx) error {
service = store.GetService(tx, request.ServiceID) service = store.GetService(tx, request.ServiceID)
if service == nil { if service == nil {
@ -459,6 +493,13 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
return errNetworkUpdateNotSupported return errNetworkUpdateNotSupported
} }
// Check to see if all the secrets being added exist as objects
// in our datastore
err := s.checkSecretExistence(tx, request.Spec)
if err != nil {
return err
}
// orchestrator is designed to be stateless, so it should not deal // orchestrator is designed to be stateless, so it should not deal
// with service mode change (comparing current config with previous config). // with service mode change (comparing current config with previous config).
// proper way to change service mode is to delete and re-add. // proper way to change service mode is to delete and re-add.

View file

@ -19,9 +19,9 @@ import (
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes" "github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/watch"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/context" "golang.org/x/net/context"
) )

View file

@ -0,0 +1,273 @@
package logbroker
import (
"errors"
"io"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context"
)
var (
errAlreadyRunning = errors.New("broker is already running")
errNotRunning = errors.New("broker is not running")
)
// LogBroker coordinates log subscriptions to services and tasks. Çlients can
// publish and subscribe to logs channels.
//
// Log subscriptions are pushed to the work nodes by creating log subscsription
// tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
type LogBroker struct {
mu sync.RWMutex
logQueue *watch.Queue
subscriptionQueue *watch.Queue
registeredSubscriptions map[string]*api.SubscriptionMessage
pctx context.Context
cancelAll context.CancelFunc
}
// New initializes and returns a new LogBroker
func New() *LogBroker {
return &LogBroker{}
}
// Run the log broker
func (lb *LogBroker) Run(ctx context.Context) error {
lb.mu.Lock()
if lb.cancelAll != nil {
lb.mu.Unlock()
return errAlreadyRunning
}
lb.pctx, lb.cancelAll = context.WithCancel(ctx)
lb.logQueue = watch.NewQueue()
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage)
lb.mu.Unlock()
select {
case <-lb.pctx.Done():
return lb.pctx.Err()
}
}
// Stop stops the log broker
func (lb *LogBroker) Stop() error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cancelAll == nil {
return errNotRunning
}
lb.cancelAll()
lb.cancelAll = nil
lb.logQueue.Close()
lb.subscriptionQueue.Close()
return nil
}
func validateSelector(selector *api.LogSelector) error {
if selector == nil {
return grpc.Errorf(codes.InvalidArgument, "log selector must be provided")
}
if len(selector.ServiceIDs) == 0 && len(selector.TaskIDs) == 0 && len(selector.NodeIDs) == 0 {
return grpc.Errorf(codes.InvalidArgument, "log selector must not be empty")
}
return nil
}
func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.registeredSubscriptions[subscription.ID] = subscription
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) {
subscription = subscription.Copy()
subscription.Close = true
lb.mu.Lock()
defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.ID)
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()
subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions))
for _, sub := range lb.registeredSubscriptions {
subs = append(subs, sub)
}
ch, cancel := lb.subscriptionQueue.Watch()
return subs, ch, cancel
}
func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
publish := event.(*api.PublishLogsMessage)
return publish.SubscriptionID == id
}))
}
func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
lb.mu.RLock()
defer lb.mu.RUnlock()
lb.logQueue.Publish(log)
}
// SubscribeLogs creates a log subscription and streams back logs
func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
ctx := stream.Context()
if err := validateSelector(request.Selector); err != nil {
return err
}
subscription := &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: request.Selector,
Options: request.Options,
}
log := log.G(ctx).WithFields(
logrus.Fields{
"method": "(*LogBroker).SubscribeLogs",
"subscription.id": subscription.ID,
},
)
log.Debug("subscribed")
publishCh, publishCancel := lb.subscribe(subscription.ID)
defer publishCancel()
lb.registerSubscription(subscription)
defer lb.unregisterSubscription(subscription)
for {
select {
case event := <-publishCh:
publish := event.(*api.PublishLogsMessage)
if err := stream.Send(&api.SubscribeLogsMessage{
Messages: publish.Messages,
}); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-lb.pctx.Done():
return nil
}
}
}
// ListenSubscriptions returns a stream of matching subscriptions for the current node
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
remote, err := ca.RemoteNode(stream.Context())
if err != nil {
return err
}
log := log.G(stream.Context()).WithFields(
logrus.Fields{
"method": "(*LogBroker).ListenSubscriptions",
"node": remote.NodeID,
},
)
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions()
defer subscriptionCancel()
log.Debug("node registered")
// Start by sending down all active subscriptions.
for _, subscription := range subscriptions {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
return nil
default:
}
if err := stream.Send(subscription); err != nil {
log.Error(err)
return err
}
}
// Send down new subscriptions.
// TODO(aluzzardi): We should filter by relevant tasks for this node rather
for {
select {
case v := <-subscriptionCh:
subscription := v.(*api.SubscriptionMessage)
if err := stream.Send(subscription); err != nil {
log.Error(err)
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
return nil
}
}
}
// PublishLogs publishes log messages for a given subscription
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
remote, err := ca.RemoteNode(stream.Context())
if err != nil {
return err
}
for {
log, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&api.PublishLogsResponse{})
}
if err != nil {
return err
}
if log.SubscriptionID == "" {
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
}
// Make sure logs are emitted using the right Node ID to avoid impersonation.
for _, msg := range log.Messages {
if msg.Context.NodeID != remote.NodeID {
return grpc.Errorf(codes.PermissionDenied, "invalid NodeID: expected=%s;received=%s", remote.NodeID, msg.Context.NodeID)
}
}
lb.publish(log)
}
}

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/manager/dispatcher" "github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/health" "github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager" "github.com/docker/swarmkit/manager/keymanager"
"github.com/docker/swarmkit/manager/logbroker"
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer" "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global" "github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/replicated" "github.com/docker/swarmkit/manager/orchestrator/replicated"
@ -96,6 +97,7 @@ type Manager struct {
caserver *ca.Server caserver *ca.Server
dispatcher *dispatcher.Dispatcher dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
replicatedOrchestrator *replicated.Orchestrator replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator globalOrchestrator *global.Orchestrator
taskReaper *taskreaper.TaskReaper taskReaper *taskreaper.TaskReaper
@ -234,6 +236,7 @@ func New(config *Config) (*Manager, error) {
listeners: listeners, listeners: listeners,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(raftNode, dispatcherConfig), dispatcher: dispatcher.New(raftNode, dispatcherConfig),
logbroker: logbroker.New(),
server: grpc.NewServer(opts...), server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...), localserver: grpc.NewServer(opts...),
raftNode: raftNode, raftNode: raftNode,
@ -292,6 +295,8 @@ func (m *Manager) Run(parent context.Context) error {
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize) authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize) authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize) authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize) authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize) authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
@ -304,6 +309,7 @@ func (m *Manager) Run(parent context.Context) error {
proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, ca.WithMetadataForwardTLSInfo) proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, ca.WithMetadataForwardTLSInfo) proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, ca.WithMetadataForwardTLSInfo) proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, ca.WithMetadataForwardTLSInfo)
// localProxyControlAPI is a special kind of proxy. It is only wired up // localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests // to receive requests from a trusted local socket, and these requests
@ -313,6 +319,7 @@ func (m *Manager) Run(parent context.Context) error {
// information to put in the metadata map). // information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil } forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, forwardAsOwnRequest) localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, forwardAsOwnRequest)
localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, forwardAsOwnRequest)
// Everything registered on m.server should be an authenticated // Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper! // wrapper, or a proxy wrapping an authenticated wrapper!
@ -322,10 +329,13 @@ func (m *Manager) Run(parent context.Context) error {
api.RegisterHealthServer(m.server, authenticatedHealthAPI) api.RegisterHealthServer(m.server, authenticatedHealthAPI)
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI) api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
api.RegisterControlServer(m.server, authenticatedControlAPI) api.RegisterControlServer(m.server, authenticatedControlAPI)
api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI) api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI) api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
api.RegisterControlServer(m.localserver, localProxyControlAPI) api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer) api.RegisterHealthServer(m.localserver, localHealthServer)
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING) healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
@ -419,6 +429,7 @@ func (m *Manager) Stop(ctx context.Context) {
}() }()
m.dispatcher.Stop() m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop() m.caserver.Stop()
if m.allocator != nil { if m.allocator != nil {
@ -664,6 +675,12 @@ func (m *Manager) becomeLeader(ctx context.Context) {
} }
}(m.dispatcher) }(m.dispatcher)
go func(lb *logbroker.LogBroker) {
if err := lb.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker exited with an error")
}
}(m.logbroker)
go func(server *ca.Server) { go func(server *ca.Server) {
if err := server.Run(ctx); err != nil { if err := server.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("CA signer exited with an error") log.G(ctx).WithError(err).Error("CA signer exited with an error")
@ -712,6 +729,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
// becomeFollower shuts down the subsystems that are only run by the leader. // becomeFollower shuts down the subsystems that are only run by the leader.
func (m *Manager) becomeFollower() { func (m *Manager) becomeFollower() {
m.dispatcher.Stop() m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop() m.caserver.Stop()
if m.allocator != nil { if m.allocator != nil {

View file

@ -478,6 +478,9 @@ func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service
task := orchestrator.NewTask(g.cluster, service, 0, nodeID) task := orchestrator.NewTask(g.cluster, service, 0, nodeID)
err := batch.Update(func(tx store.Tx) error { err := batch.Update(func(tx store.Tx) error {
if store.GetService(tx, service.ID) == nil {
return nil
}
return store.CreateTask(tx, task) return store.CreateTask(tx, task)
}) })
if err != nil { if err != nil {

View file

@ -53,6 +53,7 @@ func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Even
} }
orchestrator.DeleteServiceTasks(ctx, r.store, v.Service) orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
r.restarts.ClearServiceHistory(v.Service.ID) r.restarts.ClearServiceHistory(v.Service.ID)
delete(r.reconcileServices, v.Service.ID)
case state.EventCreateService: case state.EventCreateService:
if !orchestrator.IsReplicatedService(v.Service) { if !orchestrator.IsReplicatedService(v.Service) {
return return

View file

@ -16,8 +16,8 @@ import (
"github.com/docker/swarmkit/manager/orchestrator/restart" "github.com/docker/swarmkit/manager/orchestrator/restart"
"github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes" "github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/watch"
) )
const defaultMonitor = 30 * time.Second const defaultMonitor = 30 * time.Second
@ -383,6 +383,10 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update
} }
err = batch.Update(func(tx store.Tx) error { err = batch.Update(func(tx store.Tx) error {
if store.GetService(tx, updated.ServiceID) == nil {
return errors.New("service was deleted")
}
if err := store.CreateTask(tx, updated); err != nil { if err := store.CreateTask(tx, updated); err != nil {
return err return err
} }

View file

@ -9,7 +9,7 @@ import (
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/watch" "github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
) )

View file

@ -29,7 +29,7 @@ import (
"github.com/docker/swarmkit/manager/raftselector" "github.com/docker/swarmkit/manager/raftselector"
"github.com/docker/swarmkit/manager/state/raft/membership" "github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch" "github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pivotal-golang/clock" "github.com/pivotal-golang/clock"
"github.com/pkg/errors" "github.com/pkg/errors"

View file

@ -121,3 +121,25 @@ func (b byMembership) isBy() {
func ByMembership(membership api.NodeSpec_Membership) By { func ByMembership(membership api.NodeSpec_Membership) By {
return byMembership(membership) return byMembership(membership)
} }
type byReferencedNetworkID string
func (b byReferencedNetworkID) isBy() {
}
// ByReferencedNetworkID creates an object to pass to Find to search for a
// service or task that references a network with the given ID.
func ByReferencedNetworkID(networkID string) By {
return byReferencedNetworkID(networkID)
}
type byReferencedSecretID string
func (b byReferencedSecretID) isBy() {
}
// ByReferencedSecretID creates an object to pass to Find to search for a
// service or task that references a secret with the given ID.
func ByReferencedSecretID(secretID string) By {
return byReferencedSecretID(secretID)
}

View file

@ -13,8 +13,8 @@ import (
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
pb "github.com/docker/swarmkit/api" pb "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/protobuf/ptypes" "github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/watch"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -29,6 +29,8 @@ const (
indexDesiredState = "desiredstate" indexDesiredState = "desiredstate"
indexRole = "role" indexRole = "role"
indexMembership = "membership" indexMembership = "membership"
indexNetwork = "network"
indexSecret = "secret"
prefix = "_prefix" prefix = "_prefix"
@ -624,6 +626,18 @@ func (tx readTx) findIterators(table string, by By, checkType func(By) error) ([
return nil, err return nil, err
} }
return []memdb.ResultIterator{it}, nil return []memdb.ResultIterator{it}, nil
case byReferencedNetworkID:
it, err := tx.memDBTx.Get(table, indexNetwork, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedSecretID:
it, err := tx.memDBTx.Get(table, indexSecret, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
default: default:
return nil, ErrInvalidFindBy return nil, ErrInvalidFindBy
} }

View file

@ -26,6 +26,16 @@ func init() {
Unique: true, Unique: true,
Indexer: serviceIndexerByName{}, Indexer: serviceIndexerByName{},
}, },
indexNetwork: {
Name: indexNetwork,
AllowMissing: true,
Indexer: serviceIndexerByNetwork{},
},
indexSecret: {
Name: indexSecret,
AllowMissing: true,
Indexer: serviceIndexerBySecret{},
},
}, },
}, },
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error { Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
@ -167,7 +177,7 @@ func GetService(tx ReadTx, id string) *api.Service {
func FindServices(tx ReadTx, by By) ([]*api.Service, error) { func FindServices(tx ReadTx, by By) ([]*api.Service, error) {
checkType := func(by By) error { checkType := func(by By) error {
switch by.(type) { switch by.(type) {
case byName, byNamePrefix, byIDPrefix: case byName, byNamePrefix, byIDPrefix, byReferencedNetworkID, byReferencedSecretID:
return nil return nil
default: default:
return ErrInvalidFindBy return ErrInvalidFindBy
@ -223,3 +233,58 @@ func (si serviceIndexerByName) FromObject(obj interface{}) (bool, []byte, error)
func (si serviceIndexerByName) PrefixFromArgs(args ...interface{}) ([]byte, error) { func (si serviceIndexerByName) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return prefixFromArgs(args...) return prefixFromArgs(args...)
} }
type serviceIndexerByNetwork struct{}
func (si serviceIndexerByNetwork) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (si serviceIndexerByNetwork) FromObject(obj interface{}) (bool, [][]byte, error) {
s, ok := obj.(serviceEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
var networkIDs [][]byte
specNetworks := s.Spec.Task.Networks
if len(specNetworks) == 0 {
specNetworks = s.Spec.Networks
}
for _, na := range specNetworks {
// Add the null character as a terminator
networkIDs = append(networkIDs, []byte(na.Target+"\x00"))
}
return len(networkIDs) != 0, networkIDs, nil
}
type serviceIndexerBySecret struct{}
func (si serviceIndexerBySecret) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (si serviceIndexerBySecret) FromObject(obj interface{}) (bool, [][]byte, error) {
s, ok := obj.(serviceEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
container := s.Spec.Task.GetContainer()
if container == nil {
return false, nil, nil
}
var secretIDs [][]byte
for _, secretRef := range container.Secrets {
// Add the null character as a terminator
secretIDs = append(secretIDs, []byte(secretRef.SecretID+"\x00"))
}
return len(secretIDs) != 0, secretIDs, nil
}

View file

@ -47,6 +47,16 @@ func init() {
Name: indexDesiredState, Name: indexDesiredState,
Indexer: taskIndexerByDesiredState{}, Indexer: taskIndexerByDesiredState{},
}, },
indexNetwork: {
Name: indexNetwork,
AllowMissing: true,
Indexer: taskIndexerByNetwork{},
},
indexSecret: {
Name: indexSecret,
AllowMissing: true,
Indexer: taskIndexerBySecret{},
},
}, },
}, },
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error { Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
@ -176,7 +186,7 @@ func GetTask(tx ReadTx, id string) *api.Task {
func FindTasks(tx ReadTx, by By) ([]*api.Task, error) { func FindTasks(tx ReadTx, by By) ([]*api.Task, error) {
checkType := func(by By) error { checkType := func(by By) error {
switch by.(type) { switch by.(type) {
case byName, byNamePrefix, byIDPrefix, byDesiredState, byNode, byService, bySlot: case byName, byNamePrefix, byIDPrefix, byDesiredState, byNode, byService, bySlot, byReferencedNetworkID, byReferencedSecretID:
return nil return nil
default: default:
return ErrInvalidFindBy return ErrInvalidFindBy
@ -288,16 +298,65 @@ func (ti taskIndexerBySlot) FromObject(obj interface{}) (bool, []byte, error) {
type taskIndexerByDesiredState struct{} type taskIndexerByDesiredState struct{}
func (ni taskIndexerByDesiredState) FromArgs(args ...interface{}) ([]byte, error) { func (ti taskIndexerByDesiredState) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...) return fromArgs(args...)
} }
func (ni taskIndexerByDesiredState) FromObject(obj interface{}) (bool, []byte, error) { func (ti taskIndexerByDesiredState) FromObject(obj interface{}) (bool, []byte, error) {
n, ok := obj.(taskEntry) t, ok := obj.(taskEntry)
if !ok { if !ok {
panic("unexpected type passed to FromObject") panic("unexpected type passed to FromObject")
} }
// Add the null character as a terminator // Add the null character as a terminator
return true, []byte(strconv.FormatInt(int64(n.DesiredState), 10) + "\x00"), nil return true, []byte(strconv.FormatInt(int64(t.DesiredState), 10) + "\x00"), nil
}
type taskIndexerByNetwork struct{}
func (ti taskIndexerByNetwork) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerByNetwork) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(taskEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
var networkIDs [][]byte
for _, na := range t.Spec.Networks {
// Add the null character as a terminator
networkIDs = append(networkIDs, []byte(na.Target+"\x00"))
}
return len(networkIDs) != 0, networkIDs, nil
}
type taskIndexerBySecret struct{}
func (ti taskIndexerBySecret) FromArgs(args ...interface{}) ([]byte, error) {
return fromArgs(args...)
}
func (ti taskIndexerBySecret) FromObject(obj interface{}) (bool, [][]byte, error) {
t, ok := obj.(taskEntry)
if !ok {
panic("unexpected type passed to FromObject")
}
container := t.Spec.GetContainer()
if container == nil {
return false, nil, nil
}
var secretIDs [][]byte
for _, secretRef := range container.Secrets {
// Add the null character as a terminator
secretIDs = append(secretIDs, []byte(secretRef.SecretID+"\x00"))
}
return len(secretIDs) != 0, secretIDs, nil
} }

View file

@ -3,7 +3,7 @@ package state
import ( import (
"github.com/docker/go-events" "github.com/docker/go-events"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/watch" "github.com/docker/swarmkit/watch"
) )
// Event is the type used for events passed over watcher channels, and also // Event is the type used for events passed over watcher channels, and also

View file

@ -9,15 +9,27 @@ import (
// Indexer is an interface used for defining indexes // Indexer is an interface used for defining indexes
type Indexer interface { type Indexer interface {
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, []byte, error)
// ExactFromArgs is used to build an exact index lookup // ExactFromArgs is used to build an exact index lookup
// based on arguments // based on arguments
FromArgs(args ...interface{}) ([]byte, error) FromArgs(args ...interface{}) ([]byte, error)
} }
// SingleIndexer is an interface used for defining indexes
// generating a single entry per object
type SingleIndexer interface {
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, []byte, error)
}
// MultiIndexer is an interface used for defining indexes
// generating multiple entries per object
type MultiIndexer interface {
// FromObject is used to extract index values from an
// object or to indicate that the index value is missing.
FromObject(raw interface{}) (bool, [][]byte, error)
}
// PrefixIndexer can optionally be implemented for any // PrefixIndexer can optionally be implemented for any
// indexes that support prefix based iteration. This may // indexes that support prefix based iteration. This may
// not apply to all indexes. // not apply to all indexes.
@ -88,6 +100,79 @@ func (s *StringFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
return val, nil return val, nil
} }
// StringSliceFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field.
type StringSliceFieldIndex struct {
Field string
Lowercase bool
}
func (s *StringSliceFieldIndex) FromObject(obj interface{}) (bool, [][]byte, error) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v) // Dereference the pointer if any
fv := v.FieldByName(s.Field)
if !fv.IsValid() {
return false, nil,
fmt.Errorf("field '%s' for %#v is invalid", s.Field, obj)
}
if fv.Kind() != reflect.Slice || fv.Type().Elem().Kind() != reflect.String {
return false, nil, fmt.Errorf("field '%s' is not a string slice", s.Field)
}
length := fv.Len()
vals := make([][]byte, 0, length)
for i := 0; i < fv.Len(); i++ {
val := fv.Index(i).String()
if val == "" {
continue
}
if s.Lowercase {
val = strings.ToLower(val)
}
// Add the null character as a terminator
val += "\x00"
vals = append(vals, []byte(val))
}
if len(vals) == 0 {
return false, nil, nil
}
return true, vals, nil
}
func (s *StringSliceFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
if s.Lowercase {
arg = strings.ToLower(arg)
}
// Add the null character as a terminator
arg += "\x00"
return []byte(arg), nil
}
func (s *StringSliceFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
val, err := s.FromArgs(args...)
if err != nil {
return nil, err
}
// Strip the null terminator, the rest is a prefix
n := len(val)
if n > 0 {
return val[:n-1], nil
}
return val, nil
}
// UUIDFieldIndex is used to extract a field from an object // UUIDFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field by treating // using reflection and builds an index on that field by treating
// it as a UUID. This is an optimization to using a StringFieldIndex // it as a UUID. This is an optimization to using a StringFieldIndex
@ -270,7 +355,11 @@ type CompoundIndex struct {
func (c *CompoundIndex) FromObject(raw interface{}) (bool, []byte, error) { func (c *CompoundIndex) FromObject(raw interface{}) (bool, []byte, error) {
var out []byte var out []byte
for i, idx := range c.Indexes { for i, idxRaw := range c.Indexes {
idx, ok := idxRaw.(SingleIndexer)
if !ok {
return false, nil, fmt.Errorf("sub-index %d error: %s", i, "sub-index must be a SingleIndexer")
}
ok, val, err := idx.FromObject(raw) ok, val, err := idx.FromObject(raw)
if err != nil { if err != nil {
return false, nil, fmt.Errorf("sub-index %d error: %v", i, err) return false, nil, fmt.Errorf("sub-index %d error: %v", i, err)
@ -291,40 +380,17 @@ func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != len(c.Indexes) { if len(args) != len(c.Indexes) {
return nil, fmt.Errorf("less arguments than index fields") return nil, fmt.Errorf("less arguments than index fields")
} }
var out []byte return c.PrefixFromArgs(args...)
for i, arg := range args {
val, err := c.Indexes[i].FromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
}
return out, nil
} }
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
if len(args) > len(c.Indexes) {
return nil, fmt.Errorf("more arguments than index fields")
}
var out []byte var out []byte
for i, arg := range args { for i, arg := range args {
if i+1 < len(args) {
val, err := c.Indexes[i].FromArgs(arg) val, err := c.Indexes[i].FromArgs(arg)
if err != nil { if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err) return nil, fmt.Errorf("sub-index %d error: %v", i, err)
} }
out = append(out, val...) out = append(out, val...)
} else {
prefixIndexer, ok := c.Indexes[i].(PrefixIndexer)
if !ok {
return nil, fmt.Errorf("sub-index %d does not support prefix scanning", i)
}
val, err := prefixIndexer.PrefixFromArgs(arg)
if err != nil {
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
}
out = append(out, val...)
}
} }
return out, nil return out, nil
} }

View file

@ -46,6 +46,9 @@ func (s *TableSchema) Validate() error {
if !s.Indexes["id"].Unique { if !s.Indexes["id"].Unique {
return fmt.Errorf("id index must be unique") return fmt.Errorf("id index must be unique")
} }
if _, ok := s.Indexes["id"].Indexer.(SingleIndexer); !ok {
return fmt.Errorf("id index must be a SingleIndexer")
}
for name, index := range s.Indexes { for name, index := range s.Indexes {
if name != index.Name { if name != index.Name {
return fmt.Errorf("index name mis-match for '%s'", name) return fmt.Errorf("index name mis-match for '%s'", name)
@ -72,5 +75,11 @@ func (s *IndexSchema) Validate() error {
if s.Indexer == nil { if s.Indexer == nil {
return fmt.Errorf("missing index function for '%s'", s.Name) return fmt.Errorf("missing index function for '%s'", s.Name)
} }
switch s.Indexer.(type) {
case SingleIndexer:
case MultiIndexer:
default:
return fmt.Errorf("indexer for '%s' must be a SingleIndexer or MultiIndexer", s.Name)
}
return nil return nil
} }

View file

@ -148,7 +148,8 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
// Get the primary ID of the object // Get the primary ID of the object
idSchema := tableSchema.Indexes[id] idSchema := tableSchema.Indexes[id]
ok, idVal, err := idSchema.Indexer.FromObject(obj) idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil { if err != nil {
return fmt.Errorf("failed to build primary index: %v", err) return fmt.Errorf("failed to build primary index: %v", err)
} }
@ -167,7 +168,19 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
indexTxn := txn.writableIndex(table, name) indexTxn := txn.writableIndex(table, name)
// Determine the new index value // Determine the new index value
ok, val, err := indexSchema.Indexer.FromObject(obj) var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(obj)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(obj)
}
if err != nil { if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err) return fmt.Errorf("failed to build index '%s': %v", name, err)
} }
@ -176,16 +189,31 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
// This is done by appending the primary key which must // This is done by appending the primary key which must
// be unique anyways. // be unique anyways.
if ok && !indexSchema.Unique { if ok && !indexSchema.Unique {
val = append(val, idVal...) for i := range vals {
vals[i] = append(vals[i], idVal...)
}
} }
// Handle the update by deleting from the index first // Handle the update by deleting from the index first
if update { if update {
okExist, valExist, err := indexSchema.Indexer.FromObject(existing) var (
okExist bool
valsExist [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var valExist []byte
okExist, valExist, err = indexer.FromObject(existing)
valsExist = [][]byte{valExist}
case MultiIndexer:
okExist, valsExist, err = indexer.FromObject(existing)
}
if err != nil { if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err) return fmt.Errorf("failed to build index '%s': %v", name, err)
} }
if okExist { if okExist {
for i, valExist := range valsExist {
// Handle non-unique index by computing a unique index. // Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must // This is done by appending the primary key which must
// be unique anyways. // be unique anyways.
@ -196,11 +224,12 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
// If we are writing to the same index with the same value, // If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the // we can avoid the delete as the insert will overwrite the
// value anyways. // value anyways.
if !bytes.Equal(valExist, val) { if i >= len(vals) || !bytes.Equal(valExist, vals[i]) {
indexTxn.Delete(valExist) indexTxn.Delete(valExist)
} }
} }
} }
}
// If there is no index value, either this is an error or an expected // If there is no index value, either this is an error or an expected
// case and we can skip updating // case and we can skip updating
@ -213,8 +242,10 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
} }
// Update the value of the index // Update the value of the index
for _, val := range vals {
indexTxn.Insert(val, obj) indexTxn.Insert(val, obj)
} }
}
return nil return nil
} }
@ -233,7 +264,8 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
// Get the primary ID of the object // Get the primary ID of the object
idSchema := tableSchema.Indexes[id] idSchema := tableSchema.Indexes[id]
ok, idVal, err := idSchema.Indexer.FromObject(obj) idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil { if err != nil {
return fmt.Errorf("failed to build primary index: %v", err) return fmt.Errorf("failed to build primary index: %v", err)
} }
@ -253,7 +285,19 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
indexTxn := txn.writableIndex(table, name) indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first // Handle the update by deleting from the index first
ok, val, err := indexSchema.Indexer.FromObject(existing) var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(existing)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(existing)
}
if err != nil { if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err) return fmt.Errorf("failed to build index '%s': %v", name, err)
} }
@ -261,12 +305,14 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
// Handle non-unique index by computing a unique index. // Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must // This is done by appending the primary key which must
// be unique anyways. // be unique anyways.
for _, val := range vals {
if !indexSchema.Unique { if !indexSchema.Unique {
val = append(val, idVal...) val = append(val, idVal...)
} }
indexTxn.Delete(val) indexTxn.Delete(val)
} }
} }
}
return nil return nil
} }
@ -334,39 +380,6 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er
return value, nil return value, nil
} }
// LongestPrefix is used to fetch the longest prefix match for the given
// constraints on the index. Note that this will not work with the memdb
// StringFieldIndex because it adds null terminators which prevent the
// algorithm from correctly finding a match (it will get to right before the
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
// Enforce that this only works on prefix indexes.
if !strings.HasSuffix(index, "_prefix") {
return nil, fmt.Errorf("must use '%s_prefix' on index", index)
}
// Get the index value.
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, err
}
// This algorithm only makes sense against a unique index, otherwise the
// index keys will have the IDs appended to them.
if !indexSchema.Unique {
return nil, fmt.Errorf("index '%s' is not unique", index)
}
// Find the longest prefix match with the given index.
indexTxn := txn.readableIndex(table, indexSchema.Name)
if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
return value, nil
}
return nil, nil
}
// getIndexValue is used to get the IndexSchema and the value // getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based // used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support // scans when the index has the "_prefix" suffix. The index must support