mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
![Ying Li](/assets/img/avatar_default.png)
- https://github.com/docker/swarmkit/pull/2266 (support for templating Node.Hostname in docker executor) - https://github.com/docker/swarmkit/pull/2281 (change restore action on objects to be update, not delete/create) - https://github.com/docker/swarmkit/pull/2285 (extend watch queue with timeout and size limit) - https://github.com/docker/swarmkit/pull/2253 (version-aware failure tracking in the scheduler) - https://github.com/docker/swarmkit/pull/2275 (update containerd and port executor to container client library) - https://github.com/docker/swarmkit/pull/2292 (rename some generic resources) - https://github.com/docker/swarmkit/pull/2300 (limit the size of the external CA response) - https://github.com/docker/swarmkit/pull/2301 (delete global tasks when the node running them is deleted) Minor cleanups, dependency bumps, and vendoring: - https://github.com/docker/swarmkit/pull/2271 - https://github.com/docker/swarmkit/pull/2279 - https://github.com/docker/swarmkit/pull/2283 - https://github.com/docker/swarmkit/pull/2282 - https://github.com/docker/swarmkit/pull/2274 - https://github.com/docker/swarmkit/pull/2296 (dependency bump of etcd, go-winio) Signed-off-by: Ying Li <ying.li@docker.com>
95 lines
2.6 KiB
Go
95 lines
2.6 KiB
Go
package watch
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
events "github.com/docker/go-events"
|
|
)
|
|
|
|
// ErrSinkTimeout is returned from the Write method when a sink times out.
|
|
var ErrSinkTimeout = fmt.Errorf("timeout exceeded, tearing down sink")
|
|
|
|
// timeoutSink is a sink that wraps another sink with a timeout. If the
|
|
// embedded sink fails to complete a Write operation within the specified
|
|
// timeout, the Write operation of the timeoutSink fails.
|
|
type timeoutSink struct {
|
|
timeout time.Duration
|
|
sink events.Sink
|
|
}
|
|
|
|
func (s timeoutSink) Write(event events.Event) error {
|
|
errChan := make(chan error)
|
|
go func(c chan<- error) {
|
|
c <- s.sink.Write(event)
|
|
}(errChan)
|
|
|
|
timer := time.NewTimer(s.timeout)
|
|
select {
|
|
case err := <-errChan:
|
|
timer.Stop()
|
|
return err
|
|
case <-timer.C:
|
|
s.sink.Close()
|
|
return ErrSinkTimeout
|
|
}
|
|
}
|
|
|
|
func (s timeoutSink) Close() error {
|
|
return s.sink.Close()
|
|
}
|
|
|
|
// dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
|
|
// debug log messages that may be confusing. It is possible that the queue
|
|
// will try to write an event to its destination channel while the queue is
|
|
// being removed from the broadcaster. Since the channel is closed before the
|
|
// queue, there is a narrow window when this is possible. In some event-based
|
|
// dropping events when a sink is removed from a broadcaster is a problem, but
|
|
// for the usage in this watch package that's the expected behavior.
|
|
type dropErrClosed struct {
|
|
sink events.Sink
|
|
}
|
|
|
|
func (s dropErrClosed) Write(event events.Event) error {
|
|
err := s.sink.Write(event)
|
|
if err == events.ErrSinkClosed {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s dropErrClosed) Close() error {
|
|
return s.sink.Close()
|
|
}
|
|
|
|
// dropErrClosedChanGen is a ChannelSinkGenerator for dropErrClosed sinks wrapping
|
|
// unbuffered channels.
|
|
type dropErrClosedChanGen struct{}
|
|
|
|
func (s *dropErrClosedChanGen) NewChannelSink() (events.Sink, *events.Channel) {
|
|
ch := events.NewChannel(0)
|
|
return dropErrClosed{sink: ch}, ch
|
|
}
|
|
|
|
// TimeoutDropErrChanGen is a ChannelSinkGenerator that creates a channel,
|
|
// wrapped by the dropErrClosed sink and a timeout.
|
|
type TimeoutDropErrChanGen struct {
|
|
timeout time.Duration
|
|
}
|
|
|
|
// NewChannelSink creates a new sink chain of timeoutSink->dropErrClosed->Channel
|
|
func (s *TimeoutDropErrChanGen) NewChannelSink() (events.Sink, *events.Channel) {
|
|
ch := events.NewChannel(0)
|
|
return timeoutSink{
|
|
timeout: s.timeout,
|
|
sink: dropErrClosed{
|
|
sink: ch,
|
|
},
|
|
}, ch
|
|
}
|
|
|
|
// NewTimeoutDropErrSinkGen returns a generator of timeoutSinks wrapping dropErrClosed
|
|
// sinks, wrapping unbuffered channel sinks.
|
|
func NewTimeoutDropErrSinkGen(timeout time.Duration) ChannelSinkGenerator {
|
|
return &TimeoutDropErrChanGen{timeout: timeout}
|
|
}
|