1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #11963 from crosbymichael/api-server

API Server Socket Refactoring
This commit is contained in:
Alexander Morozov 2015-03-31 14:25:32 -07:00
commit e4545ed8cb
7 changed files with 286 additions and 228 deletions

52
api/server/profiler.go Normal file
View file

@ -0,0 +1,52 @@
package server
import (
"expvar"
"fmt"
"net/http"
"net/http/pprof"
"github.com/gorilla/mux"
)
func NewProfiler() http.Handler {
var (
p = &Profiler{}
r = mux.NewRouter()
)
r.HandleFunc("/vars", p.expVars)
r.HandleFunc("/pprof/", pprof.Index)
r.HandleFunc("/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/pprof/profile", pprof.Profile)
r.HandleFunc("/pprof/symbol", pprof.Symbol)
r.HandleFunc("/pprof/block", pprof.Handler("block").ServeHTTP)
r.HandleFunc("/pprof/heap", pprof.Handler("heap").ServeHTTP)
r.HandleFunc("/pprof/goroutine", pprof.Handler("goroutine").ServeHTTP)
r.HandleFunc("/pprof/threadcreate", pprof.Handler("threadcreate").ServeHTTP)
p.r = r
return p
}
// Profiler enables pprof and expvar support via a HTTP API.
type Profiler struct {
r *mux.Router
}
func (p *Profiler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p.r.ServeHTTP(w, r)
}
// Replicated from expvar.go as not public.
func (p *Profiler) expVars(w http.ResponseWriter, r *http.Request) {
first := true
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
expvar.Do(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}

View file

@ -6,22 +6,16 @@ import (
"encoding/base64"
"encoding/json"
"expvar"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/pprof"
"os"
"strconv"
"strings"
"crypto/tls"
"crypto/x509"
"code.google.com/p/go.net/websocket"
"github.com/docker/libcontainer/user"
"github.com/gorilla/mux"
"github.com/Sirupsen/logrus"
@ -29,7 +23,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/listenbuffer"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/pkg/streamformatter"
@ -1308,38 +1301,11 @@ func makeHttpHandler(eng *engine.Engine, logging bool, localMethod string, local
}
}
// Replicated from expvar.go as not public.
func expvarHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}
func AttachProfiler(router *mux.Router) {
router.HandleFunc("/debug/vars", expvarHandler)
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/block", pprof.Handler("block").ServeHTTP)
router.HandleFunc("/debug/pprof/heap", pprof.Handler("heap").ServeHTTP)
router.HandleFunc("/debug/pprof/goroutine", pprof.Handler("goroutine").ServeHTTP)
router.HandleFunc("/debug/pprof/threadcreate", pprof.Handler("threadcreate").ServeHTTP)
}
// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(eng *engine.Engine, logging, enableCors bool, corsHeaders string, dockerVersion string) *mux.Router {
r := mux.NewRouter()
if os.Getenv("DEBUG") != "" {
AttachProfiler(r)
r.Handle("/debug", NewProfiler())
}
m := map[string]map[string]HttpApiFunc{
"GET": {
@ -1438,91 +1404,6 @@ func ServeRequest(eng *engine.Engine, apiversion version.Version, w http.Respons
router.ServeHTTP(w, req)
}
func lookupGidByName(nameOrGid string) (int, error) {
groupFile, err := user.GetGroupPath()
if err != nil {
return -1, err
}
groups, err := user.ParseGroupFileFilter(groupFile, func(g user.Group) bool {
return g.Name == nameOrGid || strconv.Itoa(g.Gid) == nameOrGid
})
if err != nil {
return -1, err
}
if groups != nil && len(groups) > 0 {
return groups[0].Gid, nil
}
gid, err := strconv.Atoi(nameOrGid)
if err == nil {
logrus.Warnf("Could not find GID %d", gid)
return gid, nil
}
return -1, fmt.Errorf("Group %s not found", nameOrGid)
}
func setupTls(cert, key, ca string, l net.Listener) (net.Listener, error) {
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("Could not load X509 key pair (%s, %s): %v", cert, key, err)
}
return nil, fmt.Errorf("Error reading X509 key pair (%s, %s): %q. Make sure the key is encrypted.",
cert, key, err)
}
tlsConfig := &tls.Config{
NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{tlsCert},
// Avoid fallback on insecure SSL protocols
MinVersion: tls.VersionTLS10,
}
if ca != "" {
certPool := x509.NewCertPool()
file, err := ioutil.ReadFile(ca)
if err != nil {
return nil, fmt.Errorf("Could not read CA certificate: %v", err)
}
certPool.AppendCertsFromPEM(file)
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
tlsConfig.ClientCAs = certPool
}
return tls.NewListener(l, tlsConfig), nil
}
func newListener(proto, addr string, bufferRequests bool) (net.Listener, error) {
if bufferRequests {
return listenbuffer.NewListenBuffer(proto, addr, activationLock)
}
return net.Listen(proto, addr)
}
func changeGroup(addr string, nameOrGid string) error {
gid, err := lookupGidByName(nameOrGid)
if err != nil {
return err
}
logrus.Debugf("%s group found. gid: %d", nameOrGid, gid)
return os.Chown(addr, 0, gid)
}
func setSocketGroup(addr, group string) error {
if group == "" {
return nil
}
if err := changeGroup(addr, group); err != nil {
if group != "docker" {
return err
}
logrus.Debugf("Warning: could not chgrp %s to docker: %v", addr, err)
}
return nil
}
func allocateDaemonPort(addr string) error {
host, port, err := net.SplitHostPort(addr)
if err != nil {
@ -1549,35 +1430,6 @@ func allocateDaemonPort(addr string) error {
return nil
}
func setupTcpHttp(addr string, job *engine.Job) (*HttpServer, error) {
if !job.GetenvBool("TlsVerify") {
logrus.Infof("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
}
r := createRouter(job.Eng, job.GetenvBool("Logging"), job.GetenvBool("EnableCors"), job.Getenv("CorsHeaders"), job.Getenv("Version"))
l, err := newListener("tcp", addr, job.GetenvBool("BufferRequests"))
if err != nil {
return nil, err
}
if err := allocateDaemonPort(addr); err != nil {
return nil, err
}
if job.GetenvBool("Tls") || job.GetenvBool("TlsVerify") {
var tlsCa string
if job.GetenvBool("TlsVerify") {
tlsCa = job.Getenv("TlsCa")
}
l, err = setupTls(job.Getenv("TlsCert"), job.Getenv("TlsKey"), tlsCa, l)
if err != nil {
return nil, err
}
}
return &HttpServer{&http.Server{Addr: addr, Handler: r}, l}, nil
}
type Server interface {
Serve() error
Close() error

View file

@ -4,100 +4,86 @@ package server
import (
"fmt"
"net"
"net/http"
"os"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/systemd"
)
// NewServer sets up the required Server and does protocol specific checking.
func NewServer(proto, addr string, job *engine.Job) (Server, error) {
// Basic error and sanity checking
var (
err error
l net.Listener
r = createRouter(
job.Eng,
job.GetenvBool("Logging"),
job.GetenvBool("EnableCors"),
job.Getenv("CorsHeaders"),
job.Getenv("Version"),
)
)
switch proto {
case "fd":
return nil, serveFd(addr, job)
case "tcp":
return setupTcpHttp(addr, job)
case "unix":
return setupUnixHttp(addr, job)
default:
return nil, fmt.Errorf("Invalid protocol format.")
}
}
func setupUnixHttp(addr string, job *engine.Job) (*HttpServer, error) {
r := createRouter(job.Eng, job.GetenvBool("Logging"), job.GetenvBool("EnableCors"), job.Getenv("CorsHeaders"), job.Getenv("Version"))
if err := syscall.Unlink(addr); err != nil && !os.IsNotExist(err) {
return nil, err
}
mask := syscall.Umask(0777)
defer syscall.Umask(mask)
l, err := newListener("unix", addr, job.GetenvBool("BufferRequests"))
if err != nil {
return nil, err
}
if err := setSocketGroup(addr, job.Getenv("SocketGroup")); err != nil {
return nil, err
}
if err := os.Chmod(addr, 0660); err != nil {
return nil, err
}
return &HttpServer{&http.Server{Addr: addr, Handler: r}, l}, nil
}
// serveFd creates an http.Server and sets it up to serve given a socket activated
// argument.
func serveFd(addr string, job *engine.Job) error {
r := createRouter(job.Eng, job.GetenvBool("Logging"), job.GetenvBool("EnableCors"), job.Getenv("CorsHeaders"), job.Getenv("Version"))
ls, e := systemd.ListenFD(addr)
if e != nil {
return e
}
chErrors := make(chan error, len(ls))
// We don't want to start serving on these sockets until the
// daemon is initialized and installed. Otherwise required handlers
// won't be ready.
<-activationLock
// Since ListenFD will return one or more sockets we have
// to create a go func to spawn off multiple serves
for i := range ls {
listener := ls[i]
go func() {
httpSrv := http.Server{Handler: r}
chErrors <- httpSrv.Serve(listener)
}()
}
for i := 0; i < len(ls); i++ {
err := <-chErrors
ls, err := systemd.ListenFD(addr)
if err != nil {
return err
return nil, err
}
chErrors := make(chan error, len(ls))
// We don't want to start serving on these sockets until the
// daemon is initialized and installed. Otherwise required handlers
// won't be ready.
<-activationLock
// Since ListenFD will return one or more sockets we have
// to create a go func to spawn off multiple serves
for i := range ls {
listener := ls[i]
go func() {
httpSrv := http.Server{Handler: r}
chErrors <- httpSrv.Serve(listener)
}()
}
for i := 0; i < len(ls); i++ {
if err := <-chErrors; err != nil {
return nil, err
}
}
return nil, nil
case "tcp":
if !job.GetenvBool("TlsVerify") {
logrus.Infof("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
}
if l, err = NewTcpSocket(addr, tlsConfigFromJob(job)); err != nil {
return nil, err
}
if err := allocateDaemonPort(addr); err != nil {
return nil, err
}
case "unix":
if l, err = NewUnixSocket(addr, job.Getenv("SocketGroup")); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("Invalid protocol format: %q", proto)
}
return nil
return &HttpServer{
&http.Server{
Addr: addr,
Handler: r,
},
l,
}, nil
}
// Called through eng.Job("acceptconnections")
func AcceptConnections(job *engine.Job) error {
// Tell the init daemon we are accepting requests
go systemd.SdNotify("READY=1")
// close the lock so the listeners start accepting connections
if activationLock != nil {
close(activationLock)
}
return nil
}

View file

@ -1,19 +1,38 @@
// +build windows
package server
import (
"fmt"
"errors"
"net"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
)
// NewServer sets up the required Server and does protocol specific checking.
func NewServer(proto, addr string, job *engine.Job) (Server, error) {
// Basic error and sanity checking
var (
err error
l net.Listener
r = createRouter(
job.Eng,
job.GetenvBool("Logging"),
job.GetenvBool("EnableCors"),
job.Getenv("CorsHeaders"),
job.Getenv("Version"),
)
)
switch proto {
case "tcp":
return setupTcpHttp(addr, job)
if !job.GetenvBool("TlsVerify") {
logrus.Infof("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
}
if l, err = NewTcpSocket(addr, tlsConfigFromJob(job)); err != nil {
return nil, err
}
if err := allocateDaemonPort(addr); err != nil {
return nil, err
}
default:
return nil, errors.New("Invalid protocol format. Windows only supports tcp.")
}
@ -21,11 +40,9 @@ func NewServer(proto, addr string, job *engine.Job) (Server, error) {
// Called through eng.Job("acceptconnections")
func AcceptConnections(job *engine.Job) engine.Status {
// close the lock so the listeners start accepting connections
if activationLock != nil {
close(activationLock)
}
return engine.StatusOK
}

74
api/server/tcp_socket.go Normal file
View file

@ -0,0 +1,74 @@
package server
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"os"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/listenbuffer"
)
type tlsConfig struct {
CA string
Certificate string
Key string
Verify bool
}
func tlsConfigFromJob(job *engine.Job) *tlsConfig {
verify := job.GetenvBool("TlsVerify")
if !job.GetenvBool("Tls") && !verify {
return nil
}
return &tlsConfig{
Verify: verify,
Certificate: job.Getenv("TlsCert"),
Key: job.Getenv("TlsKey"),
CA: job.Getenv("TlsCa"),
}
}
func NewTcpSocket(addr string, config *tlsConfig) (net.Listener, error) {
l, err := listenbuffer.NewListenBuffer("tcp", addr, activationLock)
if err != nil {
return nil, err
}
if config != nil {
if l, err = setupTls(l, config); err != nil {
return nil, err
}
}
return l, nil
}
func setupTls(l net.Listener, config *tlsConfig) (net.Listener, error) {
tlsCert, err := tls.LoadX509KeyPair(config.Certificate, config.Key)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("Could not load X509 key pair (%s, %s): %v", config.Certificate, config.Key, err)
}
return nil, fmt.Errorf("Error reading X509 key pair (%s, %s): %q. Make sure the key is encrypted.",
config.Certificate, config.Key, err)
}
tlsConfig := &tls.Config{
NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{tlsCert},
// Avoid fallback on insecure SSL protocols
MinVersion: tls.VersionTLS10,
}
if config.CA != "" {
certPool := x509.NewCertPool()
file, err := ioutil.ReadFile(config.CA)
if err != nil {
return nil, fmt.Errorf("Could not read CA certificate: %v", err)
}
certPool.AppendCertsFromPEM(file)
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
tlsConfig.ClientCAs = certPool
}
return tls.NewListener(l, tlsConfig), nil
}

78
api/server/unix_socket.go Normal file
View file

@ -0,0 +1,78 @@
package server
import (
"fmt"
"net"
"os"
"strconv"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/listenbuffer"
"github.com/docker/libcontainer/user"
)
func NewUnixSocket(path, group string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
mask := syscall.Umask(0777)
defer syscall.Umask(mask)
l, err := listenbuffer.NewListenBuffer("unix", path, activationLock)
if err != nil {
return nil, err
}
if err := setSocketGroup(path, group); err != nil {
l.Close()
return nil, err
}
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, err
}
return l, nil
}
func setSocketGroup(path, group string) error {
if group == "" {
return nil
}
if err := changeGroup(path, group); err != nil {
if group != "docker" {
return err
}
logrus.Debugf("Warning: could not change group %s to docker: %v", path, err)
}
return nil
}
func changeGroup(path string, nameOrGid string) error {
gid, err := lookupGidByName(nameOrGid)
if err != nil {
return err
}
logrus.Debugf("%s group found. gid: %d", nameOrGid, gid)
return os.Chown(path, 0, gid)
}
func lookupGidByName(nameOrGid string) (int, error) {
groupFile, err := user.GetGroupPath()
if err != nil {
return -1, err
}
groups, err := user.ParseGroupFileFilter(groupFile, func(g user.Group) bool {
return g.Name == nameOrGid || strconv.Itoa(g.Gid) == nameOrGid
})
if err != nil {
return -1, err
}
if groups != nil && len(groups) > 0 {
return groups[0].Gid, nil
}
gid, err := strconv.Atoi(nameOrGid)
if err == nil {
logrus.Warnf("Could not find GID %d", gid)
return gid, nil
}
return -1, fmt.Errorf("Group %s not found", nameOrGid)
}

View file

@ -151,7 +151,6 @@ func mainDaemon() {
job.Setenv("TlsCa", *flCa)
job.Setenv("TlsCert", *flCert)
job.Setenv("TlsKey", *flKey)
job.SetenvBool("BufferRequests", true)
// The serve API job never exits unless an error occurs
// We need to start it as a goroutine and wait on it so