From d745067487fe2a750cfa2625ed7f652a56abc17c Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 5 Aug 2014 10:00:05 +0000 Subject: [PATCH] Subsystems can register cleanup handlers with Engine.OnShutdown Signed-off-by: Solomon Hykes --- engine/engine.go | 84 +++++++++++++++++++++++++++++++++++++++++++----- engine/job.go | 7 ++++ 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 0008d605d0..299b0c05fb 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -7,6 +7,8 @@ import ( "os" "sort" "strings" + "sync" + "time" "github.com/docker/docker/utils" ) @@ -43,14 +45,18 @@ func unregister(name string) { // It acts as a store for *containers*, and allows manipulation of these // containers by executing *jobs*. type Engine struct { - handlers map[string]Handler - catchall Handler - hack Hack // data for temporary hackery (see hack.go) - id string - Stdout io.Writer - Stderr io.Writer - Stdin io.Reader - Logging bool + handlers map[string]Handler + catchall Handler + hack Hack // data for temporary hackery (see hack.go) + id string + Stdout io.Writer + Stderr io.Writer + Stdin io.Reader + Logging bool + tasks sync.WaitGroup + l sync.RWMutex // lock for shutdown + shutdown bool + onShutdown []func() // shutdown handlers } func (eng *Engine) Register(name string, handler Handler) error { @@ -130,6 +136,68 @@ func (eng *Engine) Job(name string, args ...string) *Job { return job } +// OnShutdown registers a new callback to be called by Shutdown. +// This is typically used by services to perform cleanup. +func (eng *Engine) OnShutdown(h func()) { + eng.l.Lock() + eng.onShutdown = append(eng.onShutdown, h) + eng.l.Unlock() +} + +// Shutdown permanently shuts down eng as follows: +// - It refuses all new jobs, permanently. +// - It waits for all active jobs to complete (with no timeout) +// - It calls all shutdown handlers concurrently (if any) +// - It returns when all handlers complete, or after 15 seconds, +// whichever happens first. +func (eng *Engine) Shutdown() { + eng.l.Lock() + if eng.shutdown { + eng.l.Unlock() + return + } + eng.shutdown = true + eng.l.Unlock() + // We don't need to protect the rest with a lock, to allow + // for other calls to immediately fail with "shutdown" instead + // of hanging for 15 seconds. + // This requires all concurrent calls to check for shutdown, otherwise + // it might cause a race. + + // Wait for all jobs to complete + eng.tasks.Wait() + + // Call shutdown handlers, if any. + // Timeout after 15 seconds. + var wg sync.WaitGroup + for _, h := range eng.onShutdown { + wg.Add(1) + go func(h func()) { + defer wg.Done() + h() + }(h) + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-time.After(time.Second * 15): + case <-done: + } + return +} + +// IsShutdown returns true if the engine is in the process +// of shutting down, or already shut down. +// Otherwise it returns false. +func (eng *Engine) IsShutdown() bool { + eng.l.RLock() + defer eng.l.RUnlock() + return eng.shutdown +} + // ParseJob creates a new job from a text description using a shell-like syntax. // // The following syntax is used to parse `input`: diff --git a/engine/job.go b/engine/job.go index ab8120dd44..efcb3058d7 100644 --- a/engine/job.go +++ b/engine/job.go @@ -47,6 +47,13 @@ const ( // If the job returns a failure status, an error is returned // which includes the status. func (job *Job) Run() error { + if job.Eng.IsShutdown() { + return fmt.Errorf("engine is shutdown") + } + job.Eng.l.Lock() + job.Eng.tasks.Add(1) + job.Eng.l.Unlock() + defer job.Eng.tasks.Done() // FIXME: make this thread-safe // FIXME: implement wait if !job.end.IsZero() {