From da61b99b392657343df4dc221ba5cd9ad6b1c9e1 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 23 Jan 2014 12:17:28 -0800 Subject: [PATCH] Simplify logic for registering ports Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- network.go | 158 +++------------ network_test.go | 44 ---- networkdriver/portallocator/portallocator.go | 191 +++++++++--------- .../portallocator/portallocator_test.go | 88 +++++--- runtime.go | 5 +- 5 files changed, 187 insertions(+), 299 deletions(-) diff --git a/network.go b/network.go index ddb0af5bc7..85b0a588df 100644 --- a/network.go +++ b/network.go @@ -12,7 +12,6 @@ import ( "log" "net" "strconv" - "sync" "syscall" "unsafe" ) @@ -282,76 +281,6 @@ func newPortMapper(config *DaemonConfig) (*PortMapper, error) { return mapper, nil } -// Port allocator: Automatically allocate and release networking ports -type PortAllocator struct { - sync.Mutex - inUse map[string]struct{} - fountain chan int - quit chan bool -} - -func (alloc *PortAllocator) runFountain() { - for { - for port := portRangeStart; port < portRangeEnd; port++ { - select { - case alloc.fountain <- port: - case quit := <-alloc.quit: - if quit { - return - } - } - } - } -} - -// FIXME: Release can no longer fail, change its prototype to reflect that. -func (alloc *PortAllocator) Release(addr net.IP, port int) error { - mapKey := (&net.TCPAddr{Port: port, IP: addr}).String() - utils.Debugf("Releasing %d", port) - alloc.Lock() - delete(alloc.inUse, mapKey) - alloc.Unlock() - return nil -} - -func (alloc *PortAllocator) Acquire(addr net.IP, port int) (int, error) { - mapKey := (&net.TCPAddr{Port: port, IP: addr}).String() - utils.Debugf("Acquiring %s", mapKey) - if port == 0 { - // Allocate a port from the fountain - for port := range alloc.fountain { - if _, err := alloc.Acquire(addr, port); err == nil { - return port, nil - } - } - return -1, fmt.Errorf("Port generator ended unexpectedly") - } - alloc.Lock() - defer alloc.Unlock() - if _, inUse := alloc.inUse[mapKey]; inUse { - return -1, fmt.Errorf("Port already in use: %d", port) - } - alloc.inUse[mapKey] = struct{}{} - return port, nil -} - -func (alloc *PortAllocator) Close() error { - alloc.quit <- true - close(alloc.quit) - close(alloc.fountain) - return nil -} - -func newPortAllocator() (*PortAllocator, error) { - allocator := &PortAllocator{ - inUse: make(map[string]struct{}), - fountain: make(chan int), - quit: make(chan bool), - } - go allocator.runFountain() - return allocator, nil -} - // Network interface represents the networking stack of a container type NetworkInterface struct { IPNet net.IPNet @@ -389,30 +318,24 @@ func (iface *NetworkInterface) AllocatePort(port Port, binding PortBinding) (*Na hostPort, _ := parsePort(nat.Binding.HostPort) - if nat.Port.Proto() == "tcp" { - extPort, err := iface.manager.tcpPortAllocator.Acquire(ip, hostPort) - if err != nil { - return nil, err - } - - backend := &net.TCPAddr{IP: iface.IPNet.IP, Port: containerPort} - if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil { - iface.manager.tcpPortAllocator.Release(ip, extPort) - return nil, err - } - nat.Binding.HostPort = strconv.Itoa(extPort) - } else { - extPort, err := iface.manager.udpPortAllocator.Acquire(ip, hostPort) - if err != nil { - return nil, err - } - backend := &net.UDPAddr{IP: iface.IPNet.IP, Port: containerPort} - if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil { - iface.manager.udpPortAllocator.Release(ip, extPort) - return nil, err - } - nat.Binding.HostPort = strconv.Itoa(extPort) + extPort, err := portallocator.RequestPort(ip, nat.Port.Proto(), hostPort) + if err != nil { + return nil, err } + + var backend net.Addr + if nat.Port.Proto() == "tcp" { + backend = &net.TCPAddr{IP: iface.IPNet.IP, Port: containerPort} + } else { + backend = &net.UDPAddr{IP: iface.IPNet.IP, Port: containerPort} + } + + if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil { + portallocator.ReleasePort(ip, nat.Port.Proto(), extPort) + return nil, err + } + + nat.Binding.HostPort = strconv.Itoa(extPort) iface.extPorts = append(iface.extPorts, nat) return nat, nil @@ -445,14 +368,8 @@ func (iface *NetworkInterface) Release() { log.Printf("Unable to unmap port %s: %s", nat, err) } - if nat.Port.Proto() == "tcp" { - if err := iface.manager.tcpPortAllocator.Release(ip, hostPort); err != nil { - log.Printf("Unable to release port %s", nat) - } - } else if nat.Port.Proto() == "udp" { - if err := iface.manager.udpPortAllocator.Release(ip, hostPort); err != nil { - log.Printf("Unable to release port %s: %s", nat, err) - } + if err := portallocator.ReleasePort(ip, nat.Port.Proto(), hostPort); err != nil { + log.Printf("Unable to release port %s", nat) } } @@ -467,9 +384,7 @@ type NetworkManager struct { bridgeIface string bridgeNetwork *net.IPNet - tcpPortAllocator *PortAllocator - udpPortAllocator *PortAllocator - portMapper *PortMapper + portMapper *PortMapper disabled bool } @@ -497,21 +412,6 @@ func (manager *NetworkManager) Allocate() (*NetworkInterface, error) { return iface, nil } -func (manager *NetworkManager) Close() error { - if manager.disabled { - return nil - } - err1 := manager.tcpPortAllocator.Close() - err2 := manager.udpPortAllocator.Close() - if err1 != nil { - return err1 - } - if err2 != nil { - return err2 - } - return nil -} - func newNetworkManager(config *DaemonConfig) (*NetworkManager, error) { if config.BridgeIface == DisableNetworkBridge { manager := &NetworkManager{ @@ -599,27 +499,15 @@ func newNetworkManager(config *DaemonConfig) (*NetworkManager, error) { } } - tcpPortAllocator, err := newPortAllocator() - if err != nil { - return nil, err - } - - udpPortAllocator, err := newPortAllocator() - if err != nil { - return nil, err - } - portMapper, err := newPortMapper(config) if err != nil { return nil, err } manager := &NetworkManager{ - bridgeIface: config.BridgeIface, - bridgeNetwork: network, - tcpPortAllocator: tcpPortAllocator, - udpPortAllocator: udpPortAllocator, - portMapper: portMapper, + bridgeIface: config.BridgeIface, + bridgeNetwork: network, + portMapper: portMapper, } return manager, nil diff --git a/network_test.go b/network_test.go index 0d25ccb158..6cdf50ab6e 100644 --- a/network_test.go +++ b/network_test.go @@ -7,50 +7,6 @@ import ( "testing" ) -func TestPortAllocation(t *testing.T) { - ip := net.ParseIP("192.168.0.1") - ip2 := net.ParseIP("192.168.0.2") - allocator, err := newPortAllocator() - if err != nil { - t.Fatal(err) - } - if port, err := allocator.Acquire(ip, 80); err != nil { - t.Fatal(err) - } else if port != 80 { - t.Fatalf("Acquire(80) should return 80, not %d", port) - } - port, err := allocator.Acquire(ip, 0) - if err != nil { - t.Fatal(err) - } - if port <= 0 { - t.Fatalf("Acquire(0) should return a non-zero port") - } - if _, err := allocator.Acquire(ip, port); err == nil { - t.Fatalf("Acquiring a port already in use should return an error") - } - if newPort, err := allocator.Acquire(ip, 0); err != nil { - t.Fatal(err) - } else if newPort == port { - t.Fatalf("Acquire(0) allocated the same port twice: %d", port) - } - if _, err := allocator.Acquire(ip, 80); err == nil { - t.Fatalf("Acquiring a port already in use should return an error") - } - if _, err := allocator.Acquire(ip2, 80); err != nil { - t.Fatalf("It should be possible to allocate the same port on a different interface") - } - if _, err := allocator.Acquire(ip2, 80); err == nil { - t.Fatalf("Acquiring a port already in use should return an error") - } - if err := allocator.Release(ip, 80); err != nil { - t.Fatal(err) - } - if _, err := allocator.Acquire(ip, 80); err != nil { - t.Fatal(err) - } -} - type StubProxy struct { frontendAddr *net.Addr backendAddr *net.Addr diff --git a/networkdriver/portallocator/portallocator.go b/networkdriver/portallocator/portallocator.go index 5c7ad04caa..2566ea8500 100644 --- a/networkdriver/portallocator/portallocator.go +++ b/networkdriver/portallocator/portallocator.go @@ -7,20 +7,16 @@ import ( "sync" ) -type portMappings map[string]*collections.OrderedIntSet - -type ipData struct { - allocatedPorts portMappings - availablePorts portMappings -} - -type ipMapping map[net.IP]*ipData - const ( BeginPortRange = 49153 EndPortRange = 65535 ) +type ( + portMappings map[string]*collections.OrderedIntSet + ipMapping map[string]portMappings +) + var ( ErrPortAlreadyAllocated = errors.New("port has already been allocated") ErrPortExceedsRange = errors.New("port exceeds upper range") @@ -28,56 +24,19 @@ var ( ) var ( - defaultIPData *ipData - - lock = sync.Mutex{} - ips = ipMapping{} - defaultIP = net.ParseIP("0.0.0.0") + currentDynamicPort = map[string]int{ + "tcp": BeginPortRange - 1, + "udp": BeginPortRange - 1, + } + defaultIP = net.ParseIP("0.0.0.0") + defaultAllocatedPorts = portMappings{} + otherAllocatedPorts = ipMapping{} + lock = sync.Mutex{} ) func init() { - defaultIPData = newIpData() - ips[defaultIP] = defaultIP -} - -func newIpData() { - data := &ipData{ - allocatedPorts: portMappings{}, - availablePorts: portMappings{}, - } - - data.allocatedPorts["udp"] = collections.NewOrderedIntSet() - data.availablePorts["udp"] = collections.NewOrderedIntSet() - data.allocatedPorts["tcp"] = collections.NewOrderedIntSet() - data.availablePorts["tcp"] = collections.NewOrderedIntSet() - - return data -} - -func getData(ip net.IP) *ipData { - data, exists := ips[ip] - if !exists { - data = newIpData() - ips[ip] = data - } - return data -} - -func validateMapping(data *ipData, proto string, port int) error { - allocated := data.allocatedPorts[proto] - if allocated.Exists(proto) { - return ErrPortAlreadyAllocated - } - return nil -} - -func usePort(data *ipData, proto string, port int) { - allocated, available := data.allocatedPorts[proto], data.availablePorts[proto] - for i := 0; i < 2; i++ { - allocated.Push(port) - available.Remove(port) - allocated, available = defaultIPData.allocatedPorts[proto], defaultIPData.availablePorts[proto] - } + defaultAllocatedPorts["tcp"] = collections.NewOrderedIntSet() + defaultAllocatedPorts["udp"] = collections.NewOrderedIntSet() } // RequestPort returns an available port if the port is 0 @@ -91,43 +50,14 @@ func RequestPort(ip net.IP, proto string, port int) (int, error) { return 0, err } - data := getData(ip) - allocated, available := data.allocatedPorts[proto], data.availablePorts[proto] - // If the user requested a specific port to be allocated if port != 0 { - if err := validateMapping(defaultIP, proto, port); err != nil { + if err := registerSetPort(ip, proto, port); err != nil { return 0, err } - - if !defaultIP.Equal(ip) { - if err := validateMapping(data, proto, port); err != nil { - return 0, err - } - } - - available.Remove(port) - allocated.Push(port) - return port, nil } - - // Dynamic allocation - next := available.Pop() - if next == 0 { - next = allocated.PullBack() - if next == 0 { - next = BeginPortRange - } else { - next++ - } - if next > EndPortRange { - return 0, ErrPortExceedsRange - } - } - - allocated.Push(next) - return next, nil + return registerDynamicPort(ip, proto) } // ReleasePort will return the provided port back into the @@ -140,16 +70,95 @@ func ReleasePort(ip net.IP, proto string, port int) error { return err } - allocated, available := getCollection(ip, proto) - + allocated := defaultAllocatedPorts[proto] allocated.Remove(port) - available.Push(port) + + if !equalsDefault(ip) { + registerIP(ip) + + // Remove the port for the specific ip address + allocated = otherAllocatedPorts[ip.String()][proto] + allocated.Remove(port) + } + return nil +} + +func ReleaseAll() error { + lock.Lock() + defer lock.Unlock() + + currentDynamicPort["tcp"] = BeginPortRange - 1 + currentDynamicPort["udp"] = BeginPortRange - 1 + + defaultAllocatedPorts = portMappings{} + defaultAllocatedPorts["tcp"] = collections.NewOrderedIntSet() + defaultAllocatedPorts["udp"] = collections.NewOrderedIntSet() + + otherAllocatedPorts = ipMapping{} return nil } +func registerDynamicPort(ip net.IP, proto string) (int, error) { + allocated := defaultAllocatedPorts[proto] + + port := nextPort(proto) + if port > EndPortRange { + return 0, ErrPortExceedsRange + } + + if !equalsDefault(ip) { + registerIP(ip) + + ipAllocated := otherAllocatedPorts[ip.String()][proto] + ipAllocated.Push(port) + } else { + allocated.Push(port) + } + return port, nil +} + +func registerSetPort(ip net.IP, proto string, port int) error { + allocated := defaultAllocatedPorts[proto] + if allocated.Exists(port) { + return ErrPortAlreadyAllocated + } + + if !equalsDefault(ip) { + registerIP(ip) + + ipAllocated := otherAllocatedPorts[ip.String()][proto] + if ipAllocated.Exists(port) { + return ErrPortAlreadyAllocated + } + ipAllocated.Push(port) + } else { + allocated.Push(port) + } + return nil +} + +func equalsDefault(ip net.IP) bool { + return ip == nil || ip.Equal(defaultIP) +} + +func nextPort(proto string) int { + c := currentDynamicPort[proto] + 1 + currentDynamicPort[proto] = c + return c +} + +func registerIP(ip net.IP) { + if _, exists := otherAllocatedPorts[ip.String()]; !exists { + otherAllocatedPorts[ip.String()] = portMappings{ + "tcp": collections.NewOrderedIntSet(), + "udp": collections.NewOrderedIntSet(), + } + } +} + func validateProtocol(proto string) error { - if _, exists := allocatedPorts[proto]; !exists { + if _, exists := defaultAllocatedPorts[proto]; !exists { return ErrUnknownProtocol } return nil diff --git a/networkdriver/portallocator/portallocator_test.go b/networkdriver/portallocator/portallocator_test.go index 603bf5a15a..603bd03bd7 100644 --- a/networkdriver/portallocator/portallocator_test.go +++ b/networkdriver/portallocator/portallocator_test.go @@ -1,27 +1,18 @@ package portallocator import ( - "github.com/dotcloud/docker/pkg/collections" + "net" "testing" ) func reset() { - lock.Lock() - defer lock.Unlock() - - allocatedPorts = portMappings{} - availablePorts = portMappings{} - - allocatedPorts["udp"] = collections.NewOrderedIntSet() - availablePorts["udp"] = collections.NewOrderedIntSet() - allocatedPorts["tcp"] = collections.NewOrderedIntSet() - availablePorts["tcp"] = collections.NewOrderedIntSet() + ReleaseAll() } func TestRequestNewPort(t *testing.T) { defer reset() - port, err := RequestPort("tcp", 0) + port, err := RequestPort(defaultIP, "tcp", 0) if err != nil { t.Fatal(err) } @@ -34,7 +25,7 @@ func TestRequestNewPort(t *testing.T) { func TestRequestSpecificPort(t *testing.T) { defer reset() - port, err := RequestPort("tcp", 5000) + port, err := RequestPort(defaultIP, "tcp", 5000) if err != nil { t.Fatal(err) } @@ -46,7 +37,7 @@ func TestRequestSpecificPort(t *testing.T) { func TestReleasePort(t *testing.T) { defer reset() - port, err := RequestPort("tcp", 5000) + port, err := RequestPort(defaultIP, "tcp", 5000) if err != nil { t.Fatal(err) } @@ -54,7 +45,7 @@ func TestReleasePort(t *testing.T) { t.Fatalf("Expected port 5000 got %d", port) } - if err := ReleasePort("tcp", 5000); err != nil { + if err := ReleasePort(defaultIP, "tcp", 5000); err != nil { t.Fatal(err) } } @@ -62,7 +53,7 @@ func TestReleasePort(t *testing.T) { func TestReuseReleasedPort(t *testing.T) { defer reset() - port, err := RequestPort("tcp", 5000) + port, err := RequestPort(defaultIP, "tcp", 5000) if err != nil { t.Fatal(err) } @@ -70,11 +61,11 @@ func TestReuseReleasedPort(t *testing.T) { t.Fatalf("Expected port 5000 got %d", port) } - if err := ReleasePort("tcp", 5000); err != nil { + if err := ReleasePort(defaultIP, "tcp", 5000); err != nil { t.Fatal(err) } - port, err = RequestPort("tcp", 5000) + port, err = RequestPort(defaultIP, "tcp", 5000) if err != nil { t.Fatal(err) } @@ -83,7 +74,7 @@ func TestReuseReleasedPort(t *testing.T) { func TestReleaseUnreadledPort(t *testing.T) { defer reset() - port, err := RequestPort("tcp", 5000) + port, err := RequestPort(defaultIP, "tcp", 5000) if err != nil { t.Fatal(err) } @@ -91,7 +82,7 @@ func TestReleaseUnreadledPort(t *testing.T) { t.Fatalf("Expected port 5000 got %d", port) } - port, err = RequestPort("tcp", 5000) + port, err = RequestPort(defaultIP, "tcp", 5000) if err != ErrPortAlreadyAllocated { t.Fatalf("Expected error %s got %s", ErrPortAlreadyAllocated, err) } @@ -100,7 +91,7 @@ func TestReleaseUnreadledPort(t *testing.T) { func TestUnknowProtocol(t *testing.T) { defer reset() - if _, err := RequestPort("tcpp", 0); err != ErrUnknownProtocol { + if _, err := RequestPort(defaultIP, "tcpp", 0); err != ErrUnknownProtocol { t.Fatalf("Expected error %s got %s", ErrUnknownProtocol, err) } } @@ -109,7 +100,7 @@ func TestAllocateAllPorts(t *testing.T) { defer reset() for i := 0; i <= EndPortRange-BeginPortRange; i++ { - port, err := RequestPort("tcp", 0) + port, err := RequestPort(defaultIP, "tcp", 0) if err != nil { t.Fatal(err) } @@ -119,11 +110,11 @@ func TestAllocateAllPorts(t *testing.T) { } } - if _, err := RequestPort("tcp", 0); err != ErrPortExceedsRange { + if _, err := RequestPort(defaultIP, "tcp", 0); err != ErrPortExceedsRange { t.Fatalf("Expected error %s got %s", ErrPortExceedsRange, err) } - _, err := RequestPort("udp", 0) + _, err := RequestPort(defaultIP, "udp", 0) if err != nil { t.Fatal(err) } @@ -132,10 +123,9 @@ func TestAllocateAllPorts(t *testing.T) { func BenchmarkAllocatePorts(b *testing.B) { defer reset() - b.StartTimer() for i := 0; i < b.N; i++ { for i := 0; i <= EndPortRange-BeginPortRange; i++ { - port, err := RequestPort("tcp", 0) + port, err := RequestPort(defaultIP, "tcp", 0) if err != nil { b.Fatal(err) } @@ -146,5 +136,49 @@ func BenchmarkAllocatePorts(b *testing.B) { } reset() } - b.StopTimer() +} + +func TestPortAllocation(t *testing.T) { + defer reset() + + ip := net.ParseIP("192.168.0.1") + ip2 := net.ParseIP("192.168.0.2") + if port, err := RequestPort(ip, "tcp", 80); err != nil { + t.Fatal(err) + } else if port != 80 { + t.Fatalf("Acquire(80) should return 80, not %d", port) + } + port, err := RequestPort(ip, "tcp", 0) + if err != nil { + t.Fatal(err) + } + if port <= 0 { + t.Fatalf("Acquire(0) should return a non-zero port") + } + + if _, err := RequestPort(ip, "tcp", port); err == nil { + t.Fatalf("Acquiring a port already in use should return an error") + } + + if newPort, err := RequestPort(ip, "tcp", 0); err != nil { + t.Fatal(err) + } else if newPort == port { + t.Fatalf("Acquire(0) allocated the same port twice: %d", port) + } + + if _, err := RequestPort(ip, "tcp", 80); err == nil { + t.Fatalf("Acquiring a port already in use should return an error") + } + if _, err := RequestPort(ip2, "tcp", 80); err != nil { + t.Fatalf("It should be possible to allocate the same port on a different interface") + } + if _, err := RequestPort(ip2, "tcp", 80); err == nil { + t.Fatalf("Acquiring a port already in use should return an error") + } + if err := ReleasePort(ip, "tcp", 80); err != nil { + t.Fatal(err) + } + if _, err := RequestPort(ip, "tcp", 80); err != nil { + t.Fatal(err) + } } diff --git a/runtime.go b/runtime.go index 4eb6f476b0..3d47a50398 100644 --- a/runtime.go +++ b/runtime.go @@ -11,6 +11,7 @@ import ( "github.com/dotcloud/docker/graphdriver/aufs" _ "github.com/dotcloud/docker/graphdriver/devmapper" _ "github.com/dotcloud/docker/graphdriver/vfs" + "github.com/dotcloud/docker/networkdriver/portallocator" "github.com/dotcloud/docker/pkg/graphdb" "github.com/dotcloud/docker/pkg/sysinfo" "github.com/dotcloud/docker/utils" @@ -740,8 +741,8 @@ func NewRuntimeFromDirectory(config *DaemonConfig) (*Runtime, error) { func (runtime *Runtime) Close() error { errorsStrings := []string{} - if err := runtime.networkManager.Close(); err != nil { - utils.Errorf("runtime.networkManager.Close(): %s", err.Error()) + if err := portallocator.ReleaseAll(); err != nil { + utils.Errorf("portallocator.ReleaseAll(): %s", err) errorsStrings = append(errorsStrings, err.Error()) } if err := runtime.driver.Cleanup(); err != nil {