diff --git a/integration-cli/docker_cli_authz_unix_test.go b/integration-cli/docker_cli_authz_unix_test.go index 71a64f3af2..4ab83b9179 100644 --- a/integration-cli/docker_cli_authz_unix_test.go +++ b/integration-cli/docker_cli_authz_unix_test.go @@ -11,10 +11,15 @@ import ( "os" "strings" + "bufio" + "bytes" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/integration/checker" "github.com/docker/docker/pkg/plugins" "github.com/go-check/check" + "os/exec" + "strconv" + "time" ) const ( @@ -221,6 +226,71 @@ func (s *DockerAuthzSuite) TestAuthZPluginDenyResponse(c *check.C) { c.Assert(res, check.Equals, fmt.Sprintf("Error response from daemon: authorization denied by plugin %s: %s\n", testAuthZPlugin, unauthorizedMessage)) } +// TestAuthZPluginAllowEventStream verifies event stream propogates correctly after request pass through by the authorization plugin +func (s *DockerAuthzSuite) TestAuthZPluginAllowEventStream(c *check.C) { + testRequires(c, DaemonIsLinux) + + // Start the authorization plugin + err := s.d.Start("--authorization-plugin=" + testAuthZPlugin) + c.Assert(err, check.IsNil) + s.ctrl.reqRes.Allow = true + s.ctrl.resRes.Allow = true + + startTime := strconv.FormatInt(daemonTime(c).Unix(), 10) + // Add another command to to enable event pipelining + eventsCmd := exec.Command(s.d.cmd.Path, "--host", s.d.sock(), "events", "--since", startTime) + stdout, err := eventsCmd.StdoutPipe() + if err != nil { + c.Assert(err, check.IsNil) + } + + observer := eventObserver{ + buffer: new(bytes.Buffer), + command: eventsCmd, + scanner: bufio.NewScanner(stdout), + startTime: startTime, + } + + err = observer.Start() + c.Assert(err, checker.IsNil) + defer observer.Stop() + + // Create a container and wait for the creation events + _, err = s.d.Cmd("pull", "busybox") + c.Assert(err, check.IsNil) + out, err := s.d.Cmd("run", "-d", "busybox", "top") + c.Assert(err, check.IsNil) + + containerID := strings.TrimSpace(out) + + events := map[string]chan bool{ + "create": make(chan bool), + "start": make(chan bool), + } + + matcher := matchEventLine(containerID, "container", events) + processor := processEventMatch(events) + go observer.Match(matcher, processor) + + // Ensure all events are received + for event, eventChannel := range events { + + select { + case <-time.After(5 * time.Second): + // Fail the test + observer.CheckEventError(c, containerID, event, matcher) + c.FailNow() + case <-eventChannel: + // Ignore, event received + } + } + + // Ensure both events and container endpoints are passed to the authorization plugin + assertURIRecorded(c, s.ctrl.requestsURIs, "/events") + assertURIRecorded(c, s.ctrl.requestsURIs, "/containers/create") + assertURIRecorded(c, s.ctrl.requestsURIs, fmt.Sprintf("/containers/%s/start", containerID)) +} + func (s *DockerAuthzSuite) TestAuthZPluginErrorResponse(c *check.C) { err := s.d.Start("--authorization-plugin=" + testAuthZPlugin) c.Assert(err, check.IsNil) diff --git a/pkg/authorization/authz.go b/pkg/authorization/authz.go index c0ef3877fb..8a15b2b27e 100644 --- a/pkg/authorization/authz.go +++ b/pkg/authorization/authz.go @@ -116,7 +116,7 @@ func (ctx *Ctx) AuthZResponse(rm ResponseModifier, r *http.Request) error { } } - rm.Flush() + rm.FlushAll() return nil } diff --git a/pkg/authorization/authz_test.go b/pkg/authorization/authz_test.go index 75c549b7de..3a6a991511 100644 --- a/pkg/authorization/authz_test.go +++ b/pkg/authorization/authz_test.go @@ -118,7 +118,7 @@ func TestResponseModifier(t *testing.T) { m.Write([]byte("body")) m.WriteHeader(500) - m.Flush() + m.FlushAll() if r.Header().Get("h1") != "v1" { t.Fatalf("Header value must exists %s", r.Header().Get("h1")) } @@ -147,7 +147,7 @@ func TestResponseModifierOverride(t *testing.T) { m.OverrideHeader(overrideHeaderBytes) m.OverrideBody([]byte("override body")) m.OverrideStatusCode(404) - m.Flush() + m.FlushAll() if r.Header().Get("h1") != "v2" { t.Fatalf("Header value must exists %s", r.Header().Get("h1")) } diff --git a/pkg/authorization/response.go b/pkg/authorization/response.go index 8acbcd1844..abe3c9f471 100644 --- a/pkg/authorization/response.go +++ b/pkg/authorization/response.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/Sirupsen/logrus" "net" "net/http" ) @@ -12,6 +13,8 @@ import ( // ResponseModifier allows authorization plugins to read and modify the content of the http.response type ResponseModifier interface { http.ResponseWriter + http.Flusher + http.CloseNotifier // RawBody returns the current http content RawBody() []byte @@ -32,7 +35,10 @@ type ResponseModifier interface { OverrideStatusCode(statusCode int) // Flush flushes all data to the HTTP response - Flush() error + FlushAll() error + + // Hijacked indicates the response has been hijacked by the Docker daemon + Hijacked() bool } // NewResponseModifier creates a wrapper to an http.ResponseWriter to allow inspecting and modifying the content @@ -44,7 +50,10 @@ func NewResponseModifier(rw http.ResponseWriter) ResponseModifier { // the http request/response from docker daemon type responseModifier struct { // The original response writer - rw http.ResponseWriter + rw http.ResponseWriter + + r *http.Request + status int // body holds the response body body []byte @@ -52,15 +61,34 @@ type responseModifier struct { header http.Header // statusCode holds the response status code statusCode int + // hijacked indicates the request has been hijacked + hijacked bool +} + +func (rm *responseModifier) Hijacked() bool { + return rm.hijacked } // WriterHeader stores the http status code func (rm *responseModifier) WriteHeader(s int) { + + // Use original request if hijacked + if rm.hijacked { + rm.rw.WriteHeader(s) + return + } + rm.statusCode = s } // Header returns the internal http header func (rm *responseModifier) Header() http.Header { + + // Use original header if hijacked + if rm.hijacked { + return rm.rw.Header() + } + return rm.header } @@ -90,6 +118,11 @@ func (rm *responseModifier) OverrideHeader(b []byte) error { // Write stores the byte array inside content func (rm *responseModifier) Write(b []byte) (int, error) { + + if rm.hijacked { + return rm.rw.Write(b) + } + rm.body = append(rm.body, b...) return len(b), nil } @@ -109,6 +142,10 @@ func (rm *responseModifier) RawHeaders() ([]byte, error) { // Hijack returns the internal connection of the wrapped http.ResponseWriter func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) { + + rm.hijacked = true + rm.FlushAll() + hijacker, ok := rm.rw.(http.Hijacker) if !ok { return nil, nil, fmt.Errorf("Internal reponse writer doesn't support the Hijacker interface") @@ -116,8 +153,30 @@ func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) { return hijacker.Hijack() } -// Flush flushes all data to the HTTP response -func (rm *responseModifier) Flush() error { +// CloseNotify uses the internal close notify API of the wrapped http.ResponseWriter +func (rm *responseModifier) CloseNotify() <-chan bool { + closeNotifier, ok := rm.rw.(http.CloseNotifier) + if !ok { + logrus.Errorf("Internal reponse writer doesn't support the CloseNotifier interface") + return nil + } + return closeNotifier.CloseNotify() +} + +// Flush uses the internal flush API of the wrapped http.ResponseWriter +func (rm *responseModifier) Flush() { + flusher, ok := rm.rw.(http.Flusher) + if !ok { + logrus.Errorf("Internal reponse writer doesn't support the Flusher interface") + return + } + + rm.FlushAll() + flusher.Flush() +} + +// FlushAll flushes all data to the HTTP response +func (rm *responseModifier) FlushAll() error { // Copy the status code if rm.statusCode > 0 { rm.rw.WriteHeader(rm.statusCode) @@ -130,7 +189,15 @@ func (rm *responseModifier) Flush() error { } } - // Write body - _, err := rm.rw.Write(rm.body) + var err error + if len(rm.body) > 0 { + // Write body + _, err = rm.rw.Write(rm.body) + } + + // Clean previous data + rm.body = nil + rm.statusCode = 0 + rm.header = http.Header{} return err }