From fe204e6f48eb47a1deb3553003eaf9863a66fd1a Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Mon, 17 Jun 2013 16:10:00 -0700 Subject: [PATCH] - Runtime: Forbid parralel push/pull for a single image/repo. Fixes #311 --- runtime_test.go | 6 ++++- server.go | 62 +++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/runtime_test.go b/runtime_test.go index d7d9a5a315..db6367dfaf 100644 --- a/runtime_test.go +++ b/runtime_test.go @@ -65,7 +65,11 @@ func init() { // Create the "Server" srv := &Server{ - runtime: runtime, + runtime: runtime, + enableCors: false, + lock: &sync.Mutex{}, + pullingPool: make(map[string]struct{}), + pushingPool: make(map[string]struct{}), } // Retrieve the Image if err := srv.ImagePull(unitTestImageName, "", "", os.Stdout, utils.NewStreamFormatter(false), nil); err != nil { diff --git a/server.go b/server.go index 30e3ec6b3a..34040df3a0 100644 --- a/server.go +++ b/server.go @@ -15,6 +15,7 @@ import ( "path" "runtime" "strings" + "sync" ) func (srv *Server) DockerVersion() APIVersion { @@ -401,7 +402,47 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, local, re return nil } +func (srv *Server) poolAdd(kind, key string) error { + srv.lock.Lock() + defer srv.lock.Unlock() + + if _, exists := srv.pullingPool[key]; exists { + return fmt.Errorf("%s %s is already in progress", key, kind) + } + + switch kind { + case "pull": + srv.pullingPool[key] = struct{}{} + break + case "push": + srv.pushingPool[key] = struct{}{} + break + default: + return fmt.Errorf("Unkown pool type") + } + return nil +} + +func (srv *Server) poolRemove(kind, key string) error { + switch kind { + case "pull": + delete(srv.pullingPool, key) + break + case "push": + delete(srv.pushingPool, key) + break + default: + return fmt.Errorf("Unkown pool type") + } + return nil +} + func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error { + if err := srv.poolAdd("pull", name+":"+tag); err != nil { + return err + } + defer srv.poolRemove("pull", name+":"+tag) + r := registry.NewRegistry(srv.runtime.root, authConfig) out = utils.NewWriteFlusher(out) if endpoint != "" { @@ -418,7 +459,6 @@ func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *util if err := srv.pullRepository(r, out, name, remote, tag, sf); err != nil { return err } - return nil } @@ -593,7 +633,13 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgId, return nil } +// FIXME: Allow to interupt current push when new push of same image is done. func (srv *Server) ImagePush(name, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error { + if err := srv.poolAdd("push", name); err != nil { + return err + } + defer srv.poolRemove("push", name) + out = utils.NewWriteFlusher(out) img, err := srv.runtime.graph.Get(name) r := registry.NewRegistry(srv.runtime.root, authConfig) @@ -991,14 +1037,20 @@ func NewServer(autoRestart, enableCors bool, dns ListOpts) (*Server, error) { return nil, err } srv := &Server{ - runtime: runtime, - enableCors: enableCors, + runtime: runtime, + enableCors: enableCors, + lock: &sync.Mutex{}, + pullingPool: make(map[string]struct{}), + pushingPool: make(map[string]struct{}), } runtime.srv = srv return srv, nil } type Server struct { - runtime *Runtime - enableCors bool + runtime *Runtime + enableCors bool + lock *sync.Mutex + pullingPool map[string]struct{} + pushingPool map[string]struct{} }