diff --git a/builtins/builtins.go b/builtins/builtins.go index 75f9d4ce34..ea1e7c6b84 100644 --- a/builtins/builtins.go +++ b/builtins/builtins.go @@ -54,9 +54,6 @@ func remote(eng *engine.Engine) error { // These components should be broken off into plugins of their own. // func daemon(eng *engine.Engine) error { - if err := eng.Register("initserverpidfile", server.InitPidfile); err != nil { - return err - } if err := eng.Register("initserver", server.InitServer); err != nil { return err } diff --git a/daemon/container.go b/daemon/container.go index 1eaac8f3b5..612ef6a733 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -514,13 +514,10 @@ func (container *Container) monitor(callback execdriver.StartCallback) error { if container.Config.OpenStdin { container.stdin, container.stdinPipe = io.Pipe() } - if container.daemon != nil && container.daemon.srv != nil { - container.LogEvent("die") - } - if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { - // FIXME: here is race condition between two RUN instructions in Dockerfile - // because they share same runconfig and change image. Must be fixed - // in builder/builder.go + container.LogEvent("die") + // If the engine is shutting down, don't save the container state as stopped. + // This will cause it to be restarted when the engine is restarted. + if container.daemon != nil && container.daemon.eng != nil && !container.daemon.eng.IsShutdown() { if err := container.toDisk(); err != nil { utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 4753219652..2409b1dc13 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -94,7 +94,6 @@ type Daemon struct { idIndex *truncindex.TruncIndex sysInfo *sysinfo.SysInfo volumes *graph.Graph - srv Server eng *engine.Engine config *daemonconfig.Config containerGraph *graphdb.Database @@ -667,6 +666,20 @@ func NewDaemon(config *daemonconfig.Config, eng *engine.Engine) (*Daemon, error) } func NewDaemonFromDirectory(config *daemonconfig.Config, eng *engine.Engine) (*Daemon, error) { + // Claim the pidfile first, to avoid any and all unexpected race conditions. + // Some of the init doesn't need a pidfile lock - but let's not try to be smart. + if config.Pidfile != "" { + if err := utils.CreatePidFile(config.Pidfile); err != nil { + return nil, err + } + eng.OnShutdown(func() { + // Always release the pidfile last, just in case + utils.RemovePidFile(config.Pidfile) + }) + } + + // Check that the system is supported and we have sufficient privileges + // FIXME: return errors instead of calling Fatal if runtime.GOOS != "linux" { log.Fatalf("The Docker daemon is only supported on linux") } @@ -819,13 +832,32 @@ func NewDaemonFromDirectory(config *daemonconfig.Config, eng *engine.Engine) (*D eng: eng, Sockets: config.Sockets, } - if err := daemon.checkLocaldns(); err != nil { return nil, err } if err := daemon.restore(); err != nil { return nil, err } + // Setup shutdown handlers + // FIXME: can these shutdown handlers be registered closer to their source? + eng.OnShutdown(func() { + // FIXME: if these cleanup steps can be called concurrently, register + // them as separate handlers to speed up total shutdown time + // FIXME: use engine logging instead of utils.Errorf + if err := daemon.shutdown(); err != nil { + utils.Errorf("daemon.shutdown(): %s", err) + } + if err := portallocator.ReleaseAll(); err != nil { + utils.Errorf("portallocator.ReleaseAll(): %s", err) + } + if err := daemon.driver.Cleanup(); err != nil { + utils.Errorf("daemon.driver.Cleanup(): %s", err.Error()) + } + if err := daemon.containerGraph.Close(); err != nil { + utils.Errorf("daemon.containerGraph.Close(): %s", err.Error()) + } + }) + return daemon, nil } @@ -853,30 +885,6 @@ func (daemon *Daemon) shutdown() error { return nil } -func (daemon *Daemon) Close() error { - errorsStrings := []string{} - if err := daemon.shutdown(); err != nil { - utils.Errorf("daemon.shutdown(): %s", err) - errorsStrings = append(errorsStrings, err.Error()) - } - if err := portallocator.ReleaseAll(); err != nil { - utils.Errorf("portallocator.ReleaseAll(): %s", err) - errorsStrings = append(errorsStrings, err.Error()) - } - if err := daemon.driver.Cleanup(); err != nil { - utils.Errorf("daemon.driver.Cleanup(): %s", err.Error()) - errorsStrings = append(errorsStrings, err.Error()) - } - if err := daemon.containerGraph.Close(); err != nil { - utils.Errorf("daemon.containerGraph.Close(): %s", err.Error()) - errorsStrings = append(errorsStrings, err.Error()) - } - if len(errorsStrings) > 0 { - return fmt.Errorf("%s", strings.Join(errorsStrings, ", ")) - } - return nil -} - func (daemon *Daemon) Mount(container *Container) error { dir, err := daemon.driver.Get(container.ID, container.GetMountLabel()) if err != nil { @@ -967,6 +975,8 @@ func (daemon *Daemon) Kill(c *Container, sig int) error { // from the content root, including images, volumes and // container filesystems. // Again: this will remove your entire docker daemon! +// FIXME: this is deprecated, and only used in legacy +// tests. Please remove. func (daemon *Daemon) Nuke() error { var wg sync.WaitGroup for _, container := range daemon.List() { @@ -977,7 +987,6 @@ func (daemon *Daemon) Nuke() error { }(container) } wg.Wait() - daemon.Close() return os.RemoveAll(daemon.config.Root) } @@ -1022,10 +1031,6 @@ func (daemon *Daemon) ContainerGraph() *graphdb.Database { return daemon.containerGraph } -func (daemon *Daemon) SetServer(server Server) { - daemon.srv = server -} - func (daemon *Daemon) checkLocaldns() error { resolvConf, err := resolvconf.Get() if err != nil { diff --git a/daemon/server.go b/daemon/server.go deleted file mode 100644 index 12fb0f57c8..0000000000 --- a/daemon/server.go +++ /dev/null @@ -1,5 +0,0 @@ -package daemon - -type Server interface { - IsRunning() bool // returns true if the server is currently in operation -} diff --git a/docker/daemon.go b/docker/daemon.go index 4b994e9629..15d9c4856f 100644 --- a/docker/daemon.go +++ b/docker/daemon.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" flag "github.com/docker/docker/pkg/mflag" + "github.com/docker/docker/pkg/signal" "github.com/docker/docker/sysinit" ) @@ -39,19 +40,12 @@ func mainDaemon() { } eng := engine.New() + signal.Trap(eng.Shutdown) // Load builtins if err := builtins.Register(eng); err != nil { log.Fatal(err) } - // handle the pidfile early. https://github.com/docker/docker/issues/6973 - if len(*pidfile) > 0 { - job := eng.Job("initserverpidfile", *pidfile) - if err := job.Run(); err != nil { - log.Fatal(err) - } - } - // load the daemon in the background so we can immediately start // the http api so that connections don't fail while the daemon // is booting diff --git a/engine/engine.go b/engine/engine.go index 0008d605d0..4550df9d1d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -7,6 +7,8 @@ import ( "os" "sort" "strings" + "sync" + "time" "github.com/docker/docker/utils" ) @@ -43,14 +45,18 @@ func unregister(name string) { // It acts as a store for *containers*, and allows manipulation of these // containers by executing *jobs*. type Engine struct { - handlers map[string]Handler - catchall Handler - hack Hack // data for temporary hackery (see hack.go) - id string - Stdout io.Writer - Stderr io.Writer - Stdin io.Reader - Logging bool + handlers map[string]Handler + catchall Handler + hack Hack // data for temporary hackery (see hack.go) + id string + Stdout io.Writer + Stderr io.Writer + Stdin io.Reader + Logging bool + tasks sync.WaitGroup + l sync.RWMutex // lock for shutdown + shutdown bool + onShutdown []func() // shutdown handlers } func (eng *Engine) Register(name string, handler Handler) error { @@ -130,6 +136,77 @@ func (eng *Engine) Job(name string, args ...string) *Job { return job } +// OnShutdown registers a new callback to be called by Shutdown. +// This is typically used by services to perform cleanup. +func (eng *Engine) OnShutdown(h func()) { + eng.l.Lock() + eng.onShutdown = append(eng.onShutdown, h) + eng.l.Unlock() +} + +// Shutdown permanently shuts down eng as follows: +// - It refuses all new jobs, permanently. +// - It waits for all active jobs to complete (with no timeout) +// - It calls all shutdown handlers concurrently (if any) +// - It returns when all handlers complete, or after 15 seconds, +// whichever happens first. +func (eng *Engine) Shutdown() { + eng.l.Lock() + if eng.shutdown { + eng.l.Unlock() + return + } + eng.shutdown = true + eng.l.Unlock() + // We don't need to protect the rest with a lock, to allow + // for other calls to immediately fail with "shutdown" instead + // of hanging for 15 seconds. + // This requires all concurrent calls to check for shutdown, otherwise + // it might cause a race. + + // Wait for all jobs to complete. + // Timeout after 5 seconds. + tasksDone := make(chan struct{}) + go func() { + eng.tasks.Wait() + close(tasksDone) + }() + select { + case <-time.After(time.Second * 5): + case <-tasksDone: + } + + // Call shutdown handlers, if any. + // Timeout after 10 seconds. + var wg sync.WaitGroup + for _, h := range eng.onShutdown { + wg.Add(1) + go func(h func()) { + defer wg.Done() + h() + }(h) + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-time.After(time.Second * 10): + case <-done: + } + return +} + +// IsShutdown returns true if the engine is in the process +// of shutting down, or already shut down. +// Otherwise it returns false. +func (eng *Engine) IsShutdown() bool { + eng.l.RLock() + defer eng.l.RUnlock() + return eng.shutdown +} + // ParseJob creates a new job from a text description using a shell-like syntax. // // The following syntax is used to parse `input`: diff --git a/engine/job.go b/engine/job.go index ab8120dd44..c05db5616e 100644 --- a/engine/job.go +++ b/engine/job.go @@ -32,7 +32,6 @@ type Job struct { handler Handler status Status end time.Time - onExit []func() } type Status int @@ -47,6 +46,13 @@ const ( // If the job returns a failure status, an error is returned // which includes the status. func (job *Job) Run() error { + if job.Eng.IsShutdown() { + return fmt.Errorf("engine is shutdown") + } + job.Eng.l.Lock() + job.Eng.tasks.Add(1) + job.Eng.l.Unlock() + defer job.Eng.tasks.Done() // FIXME: make this thread-safe // FIXME: implement wait if !job.end.IsZero() { diff --git a/engine/shutdown_test.go b/engine/shutdown_test.go new file mode 100644 index 0000000000..13d8049267 --- /dev/null +++ b/engine/shutdown_test.go @@ -0,0 +1,80 @@ +package engine + +import ( + "testing" + "time" +) + +func TestShutdownEmpty(t *testing.T) { + eng := New() + if eng.IsShutdown() { + t.Fatalf("IsShutdown should be false") + } + eng.Shutdown() + if !eng.IsShutdown() { + t.Fatalf("IsShutdown should be true") + } +} + +func TestShutdownAfterRun(t *testing.T) { + eng := New() + var called bool + eng.Register("foo", func(job *Job) Status { + called = true + return StatusOK + }) + if err := eng.Job("foo").Run(); err != nil { + t.Fatal(err) + } + eng.Shutdown() + if err := eng.Job("foo").Run(); err == nil { + t.Fatalf("%#v", *eng) + } +} + +// An approximate and racy, but better-than-nothing test that +// +func TestShutdownDuringRun(t *testing.T) { + var ( + jobDelay time.Duration = 500 * time.Millisecond + jobDelayLow time.Duration = 100 * time.Millisecond + jobDelayHigh time.Duration = 700 * time.Millisecond + ) + eng := New() + var completed bool + eng.Register("foo", func(job *Job) Status { + time.Sleep(jobDelay) + completed = true + return StatusOK + }) + go eng.Job("foo").Run() + time.Sleep(50 * time.Millisecond) + done := make(chan struct{}) + var startShutdown time.Time + go func() { + startShutdown = time.Now() + eng.Shutdown() + close(done) + }() + time.Sleep(50 * time.Millisecond) + if err := eng.Job("foo").Run(); err == nil { + t.Fatalf("run on shutdown should fail: %#v", *eng) + } + <-done + // Verify that Shutdown() blocks for roughly 500ms, instead + // of returning almost instantly. + // + // We use >100ms to leave ample margin for race conditions between + // goroutines. It's possible (but unlikely in reasonable testing + // conditions), that this test will cause a false positive or false + // negative. But it's probably better than not having any test + // for the 99.999% of time where testing conditions are reasonable. + if d := time.Since(startShutdown); d.Nanoseconds() < jobDelayLow.Nanoseconds() { + t.Fatalf("shutdown did not block long enough: %v", d) + } else if d.Nanoseconds() > jobDelayHigh.Nanoseconds() { + t.Fatalf("shutdown blocked too long: %v", d) + } + if !completed { + t.Fatalf("job did not complete") + } +} diff --git a/pkg/signal/trap.go b/pkg/signal/trap.go new file mode 100644 index 0000000000..0f44481c2f --- /dev/null +++ b/pkg/signal/trap.go @@ -0,0 +1,53 @@ +package signal + +import ( + "log" + "os" + gosignal "os/signal" + "sync/atomic" + "syscall" +) + +// Trap sets up a simplified signal "trap", appropriate for common +// behavior expected from a vanilla unix command-line tool in general +// (and the Docker engine in particular). +// +// * If SIGINT or SIGTERM are received, `cleanup` is called, then the process is terminated. +// * If SIGINT or SIGTERM are repeated 3 times before cleanup is complete, then cleanup is +// skipped and the process terminated directly. +// * If "DEBUG" is set in the environment, SIGQUIT causes an exit without cleanup. +// +func Trap(cleanup func()) { + c := make(chan os.Signal, 1) + signals := []os.Signal{os.Interrupt, syscall.SIGTERM} + if os.Getenv("DEBUG") == "" { + signals = append(signals, syscall.SIGQUIT) + } + gosignal.Notify(c, signals...) + go func() { + interruptCount := uint32(0) + for sig := range c { + go func(sig os.Signal) { + log.Printf("Received signal '%v', starting shutdown of docker...\n", sig) + switch sig { + case os.Interrupt, syscall.SIGTERM: + // If the user really wants to interrupt, let him do so. + if atomic.LoadUint32(&interruptCount) < 3 { + atomic.AddUint32(&interruptCount, 1) + // Initiate the cleanup only once + if atomic.LoadUint32(&interruptCount) == 1 { + // Call cleanup handler + cleanup() + } else { + return + } + } else { + log.Printf("Force shutdown of docker, interrupting cleanup\n") + } + case syscall.SIGQUIT: + } + os.Exit(128 + int(sig.(syscall.Signal))) + }(sig) + } + }() +} diff --git a/server/init.go b/server/init.go index 082fb0d1cd..3738d0c541 100644 --- a/server/init.go +++ b/server/init.go @@ -5,83 +5,29 @@ package server import ( - "fmt" - "log" - "os" - gosignal "os/signal" - "sync/atomic" - "syscall" - "github.com/docker/docker/daemon" "github.com/docker/docker/daemonconfig" "github.com/docker/docker/engine" - "github.com/docker/docker/utils" ) func (srv *Server) handlerWrap(h engine.Handler) engine.Handler { return func(job *engine.Job) engine.Status { - if !srv.IsRunning() { - return job.Errorf("Server is not running") - } srv.tasks.Add(1) defer srv.tasks.Done() return h(job) } } -func InitPidfile(job *engine.Job) engine.Status { - if len(job.Args) == 0 { - return job.Error(fmt.Errorf("no pidfile provided to initialize")) - } - job.Logf("Creating pidfile") - if err := utils.CreatePidFile(job.Args[0]); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - // jobInitApi runs the remote api server `srv` as a daemon, // Only one api server can run at the same time - this is enforced by a pidfile. // The signals SIGINT, SIGQUIT and SIGTERM are intercepted for cleanup. func InitServer(job *engine.Job) engine.Status { job.Logf("Creating server") - srv, err := NewServer(job.Eng, daemonconfig.ConfigFromJob(job)) + cfg := daemonconfig.ConfigFromJob(job) + srv, err := NewServer(job.Eng, cfg) if err != nil { return job.Error(err) } - job.Logf("Setting up signal traps") - c := make(chan os.Signal, 1) - signals := []os.Signal{os.Interrupt, syscall.SIGTERM} - if os.Getenv("DEBUG") == "" { - signals = append(signals, syscall.SIGQUIT) - } - gosignal.Notify(c, signals...) - go func() { - interruptCount := uint32(0) - for sig := range c { - go func(sig os.Signal) { - log.Printf("Received signal '%v', starting shutdown of docker...\n", sig) - switch sig { - case os.Interrupt, syscall.SIGTERM: - // If the user really wants to interrupt, let him do so. - if atomic.LoadUint32(&interruptCount) < 3 { - atomic.AddUint32(&interruptCount, 1) - // Initiate the cleanup only once - if atomic.LoadUint32(&interruptCount) == 1 { - utils.RemovePidFile(srv.daemon.Config().Pidfile) - srv.Close() - } else { - return - } - } else { - log.Printf("Force shutdown of docker, interrupting cleanup\n") - } - case syscall.SIGQUIT: - } - os.Exit(128 + int(sig.(syscall.Signal))) - }(sig) - } - }() job.Eng.Hack_SetGlobalVar("httpapi.server", srv) job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon) @@ -104,7 +50,6 @@ func InitServer(job *engine.Job) engine.Status { if err := srv.daemon.Install(job.Eng); err != nil { return job.Error(err) } - srv.SetRunning(true) return engine.StatusOK } @@ -119,6 +64,5 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) pullingPool: make(map[string]chan struct{}), pushingPool: make(map[string]chan struct{}), } - daemon.SetServer(srv) return srv, nil } diff --git a/server/server.go b/server/server.go index e5233a604c..21b271d87c 100644 --- a/server/server.go +++ b/server/server.go @@ -23,52 +23,16 @@ package server import ( "sync" - "time" "github.com/docker/docker/daemon" "github.com/docker/docker/engine" ) -func (srv *Server) SetRunning(status bool) { - srv.Lock() - defer srv.Unlock() - - srv.running = status -} - -func (srv *Server) IsRunning() bool { - srv.RLock() - defer srv.RUnlock() - return srv.running -} - -func (srv *Server) Close() error { - if srv == nil { - return nil - } - srv.SetRunning(false) - done := make(chan struct{}) - go func() { - srv.tasks.Wait() - close(done) - }() - select { - // Waiting server jobs for 15 seconds, shutdown immediately after that time - case <-time.After(time.Second * 15): - case <-done: - } - if srv.daemon == nil { - return nil - } - return srv.daemon.Close() -} - type Server struct { sync.RWMutex daemon *daemon.Daemon pullingPool map[string]chan struct{} pushingPool map[string]chan struct{} Eng *engine.Engine - running bool tasks sync.WaitGroup }