diff --git a/api/api.go b/api/api.go index 2ba27288c0..8d9bae978f 100644 --- a/api/api.go +++ b/api/api.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/dotcloud/docker/auth" "github.com/dotcloud/docker/engine" + "github.com/dotcloud/docker/pkg/listenbuffer" "github.com/dotcloud/docker/pkg/systemd" "github.com/dotcloud/docker/utils" "github.com/gorilla/mux" @@ -25,6 +26,7 @@ import ( "strconv" "strings" "syscall" + "time" ) // FIXME: move code common to client and server to common.go @@ -34,6 +36,10 @@ const ( DEFAULTUNIXSOCKET = "/var/run/docker.sock" ) +var ( + activationLock chan struct{} +) + func ValidateHost(val string) (string, error) { host, err := utils.ParseHost(DEFAULTHTTPHOST, DEFAULTUNIXSOCKET, val) if err != nil { @@ -1159,7 +1165,7 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors } } - l, err := net.Listen(proto, addr) + l, err := listenbuffer.NewListenBuffer(proto, addr, activationLock, 15*time.Minute) if err != nil { return err } @@ -1201,8 +1207,15 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors // ServeApi loops through all of the protocols sent in to docker and spawns // off a go routine to setup a serving http.Server for each. func ServeApi(job *engine.Job) engine.Status { - protoAddrs := job.Args - chErrors := make(chan error, len(protoAddrs)) + var ( + protoAddrs = job.Args + chErrors = make(chan error, len(protoAddrs)) + ) + activationLock = make(chan struct{}) + + if err := job.Eng.Register("acceptconnections", AcceptConnections); err != nil { + return job.Error(err) + } for _, protoAddr := range protoAddrs { protoAddrParts := strings.SplitN(protoAddr, "://", 2) @@ -1219,8 +1232,15 @@ func ServeApi(job *engine.Job) engine.Status { } } + return engine.StatusOK +} + +func AcceptConnections(job *engine.Job) engine.Status { // Tell the init daemon we are accepting requests go systemd.SdNotify("READY=1") + // close the lock so the listeners start accepting connections + close(activationLock) + return engine.StatusOK } diff --git a/docker/docker.go b/docker/docker.go index a552b8318a..02c99b9316 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -81,25 +81,36 @@ func main() { if err != nil { log.Fatal(err) } - // Load plugin: httpapi - job := eng.Job("initserver") - job.Setenv("Pidfile", *pidfile) - job.Setenv("Root", *flRoot) - job.SetenvBool("AutoRestart", *flAutoRestart) - job.SetenvList("Dns", flDns.GetAll()) - job.SetenvBool("EnableIptables", *flEnableIptables) - job.SetenvBool("EnableIpForward", *flEnableIpForward) - job.Setenv("BridgeIface", *bridgeName) - job.Setenv("BridgeIP", *bridgeIp) - job.Setenv("DefaultIp", *flDefaultIp) - job.SetenvBool("InterContainerCommunication", *flInterContainerComm) - job.Setenv("GraphDriver", *flGraphDriver) - job.SetenvInt("Mtu", *flMtu) - if err := job.Run(); err != nil { - log.Fatal(err) - } + // load the daemon in the background so we can immediately start + // the http api so that connections don't fail while the daemon + // is booting + go func() { + // Load plugin: httpapi + job := eng.Job("initserver") + job.Setenv("Pidfile", *pidfile) + job.Setenv("Root", *flRoot) + job.SetenvBool("AutoRestart", *flAutoRestart) + job.SetenvList("Dns", flDns.GetAll()) + job.SetenvBool("EnableIptables", *flEnableIptables) + job.SetenvBool("EnableIpForward", *flEnableIpForward) + job.Setenv("BridgeIface", *bridgeName) + job.Setenv("BridgeIP", *bridgeIp) + job.Setenv("DefaultIp", *flDefaultIp) + job.SetenvBool("InterContainerCommunication", *flInterContainerComm) + job.Setenv("GraphDriver", *flGraphDriver) + job.SetenvInt("Mtu", *flMtu) + if err := job.Run(); err != nil { + log.Fatal(err) + } + // after the daemon is done setting up we can tell the api to start + // accepting connections + if err := eng.Job("acceptconnections").Run(); err != nil { + log.Fatal(err) + } + }() + // Serve api - job = eng.Job("serveapi", flHosts.GetAll()...) + job := eng.Job("serveapi", flHosts.GetAll()...) job.SetenvBool("Logging", true) job.SetenvBool("EnableCors", *flEnableCors) job.Setenv("Version", dockerversion.VERSION) diff --git a/integration/runtime_test.go b/integration/runtime_test.go index 170f4c9638..ca2119ce1f 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -171,9 +171,14 @@ func spawnGlobalDaemon() { log.Fatalf("Unable to spawn the test daemon: %s", err) } }() + // Give some time to ListenAndServer to actually start // FIXME: use inmem transports instead of tcp time.Sleep(time.Second) + + if err := eng.Job("acceptconnections").Run(); err != nil { + log.Fatalf("Unable to accept connections for test api: %s", err) + } } // FIXME: test that ImagePull(json=true) send correct json output diff --git a/pkg/listenbuffer/buffer.go b/pkg/listenbuffer/buffer.go new file mode 100644 index 0000000000..c350805a7d --- /dev/null +++ b/pkg/listenbuffer/buffer.go @@ -0,0 +1,61 @@ +/* + Package to allow go applications to immediately start + listening on a socket, unix, tcp, udp but hold connections + until the application has booted and is ready to accept them +*/ +package listenbuffer + +import ( + "fmt" + "net" + "time" +) + +// NewListenBuffer returns a listener listening on addr with the protocol. It sets the +// timeout to wait on first connection before an error is returned +func NewListenBuffer(proto, addr string, activate chan struct{}, timeout time.Duration) (net.Listener, error) { + wrapped, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + return &defaultListener{ + wrapped: wrapped, + activate: activate, + timeout: timeout, + }, nil +} + +type defaultListener struct { + wrapped net.Listener // the real listener to wrap + ready bool // is the listner ready to start accpeting connections + activate chan struct{} + timeout time.Duration // how long to wait before we consider this an error +} + +func (l *defaultListener) Close() error { + return l.wrapped.Close() +} + +func (l *defaultListener) Addr() net.Addr { + return l.wrapped.Addr() +} + +func (l *defaultListener) Accept() (net.Conn, error) { + // if the listen has been told it is ready then we can go ahead and + // start returning connections + if l.ready { + return l.wrapped.Accept() + } + + select { + case <-time.After(l.timeout): + // close the connection so any clients are disconnected + l.Close() + return nil, fmt.Errorf("timeout (%s) reached waiting for listener to become ready", l.timeout.String()) + case <-l.activate: + l.ready = true + return l.Accept() + } + panic("unreachable") +}