1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Simplify logic for registering ports

Docker-DCO-1.1-Signed-off-by: Michael Crosby <michael@crosbymichael.com> (github: crosbymichael)
This commit is contained in:
Michael Crosby 2014-01-23 12:17:28 -08:00
parent ac2a4e6410
commit da61b99b39
5 changed files with 187 additions and 299 deletions

View file

@ -12,7 +12,6 @@ import (
"log"
"net"
"strconv"
"sync"
"syscall"
"unsafe"
)
@ -282,76 +281,6 @@ func newPortMapper(config *DaemonConfig) (*PortMapper, error) {
return mapper, nil
}
// Port allocator: Automatically allocate and release networking ports
type PortAllocator struct {
sync.Mutex
inUse map[string]struct{}
fountain chan int
quit chan bool
}
func (alloc *PortAllocator) runFountain() {
for {
for port := portRangeStart; port < portRangeEnd; port++ {
select {
case alloc.fountain <- port:
case quit := <-alloc.quit:
if quit {
return
}
}
}
}
}
// FIXME: Release can no longer fail, change its prototype to reflect that.
func (alloc *PortAllocator) Release(addr net.IP, port int) error {
mapKey := (&net.TCPAddr{Port: port, IP: addr}).String()
utils.Debugf("Releasing %d", port)
alloc.Lock()
delete(alloc.inUse, mapKey)
alloc.Unlock()
return nil
}
func (alloc *PortAllocator) Acquire(addr net.IP, port int) (int, error) {
mapKey := (&net.TCPAddr{Port: port, IP: addr}).String()
utils.Debugf("Acquiring %s", mapKey)
if port == 0 {
// Allocate a port from the fountain
for port := range alloc.fountain {
if _, err := alloc.Acquire(addr, port); err == nil {
return port, nil
}
}
return -1, fmt.Errorf("Port generator ended unexpectedly")
}
alloc.Lock()
defer alloc.Unlock()
if _, inUse := alloc.inUse[mapKey]; inUse {
return -1, fmt.Errorf("Port already in use: %d", port)
}
alloc.inUse[mapKey] = struct{}{}
return port, nil
}
func (alloc *PortAllocator) Close() error {
alloc.quit <- true
close(alloc.quit)
close(alloc.fountain)
return nil
}
func newPortAllocator() (*PortAllocator, error) {
allocator := &PortAllocator{
inUse: make(map[string]struct{}),
fountain: make(chan int),
quit: make(chan bool),
}
go allocator.runFountain()
return allocator, nil
}
// Network interface represents the networking stack of a container
type NetworkInterface struct {
IPNet net.IPNet
@ -389,30 +318,24 @@ func (iface *NetworkInterface) AllocatePort(port Port, binding PortBinding) (*Na
hostPort, _ := parsePort(nat.Binding.HostPort)
if nat.Port.Proto() == "tcp" {
extPort, err := iface.manager.tcpPortAllocator.Acquire(ip, hostPort)
if err != nil {
return nil, err
}
backend := &net.TCPAddr{IP: iface.IPNet.IP, Port: containerPort}
if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil {
iface.manager.tcpPortAllocator.Release(ip, extPort)
return nil, err
}
nat.Binding.HostPort = strconv.Itoa(extPort)
} else {
extPort, err := iface.manager.udpPortAllocator.Acquire(ip, hostPort)
if err != nil {
return nil, err
}
backend := &net.UDPAddr{IP: iface.IPNet.IP, Port: containerPort}
if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil {
iface.manager.udpPortAllocator.Release(ip, extPort)
return nil, err
}
nat.Binding.HostPort = strconv.Itoa(extPort)
extPort, err := portallocator.RequestPort(ip, nat.Port.Proto(), hostPort)
if err != nil {
return nil, err
}
var backend net.Addr
if nat.Port.Proto() == "tcp" {
backend = &net.TCPAddr{IP: iface.IPNet.IP, Port: containerPort}
} else {
backend = &net.UDPAddr{IP: iface.IPNet.IP, Port: containerPort}
}
if err := iface.manager.portMapper.Map(ip, extPort, backend); err != nil {
portallocator.ReleasePort(ip, nat.Port.Proto(), extPort)
return nil, err
}
nat.Binding.HostPort = strconv.Itoa(extPort)
iface.extPorts = append(iface.extPorts, nat)
return nat, nil
@ -445,14 +368,8 @@ func (iface *NetworkInterface) Release() {
log.Printf("Unable to unmap port %s: %s", nat, err)
}
if nat.Port.Proto() == "tcp" {
if err := iface.manager.tcpPortAllocator.Release(ip, hostPort); err != nil {
log.Printf("Unable to release port %s", nat)
}
} else if nat.Port.Proto() == "udp" {
if err := iface.manager.udpPortAllocator.Release(ip, hostPort); err != nil {
log.Printf("Unable to release port %s: %s", nat, err)
}
if err := portallocator.ReleasePort(ip, nat.Port.Proto(), hostPort); err != nil {
log.Printf("Unable to release port %s", nat)
}
}
@ -467,9 +384,7 @@ type NetworkManager struct {
bridgeIface string
bridgeNetwork *net.IPNet
tcpPortAllocator *PortAllocator
udpPortAllocator *PortAllocator
portMapper *PortMapper
portMapper *PortMapper
disabled bool
}
@ -497,21 +412,6 @@ func (manager *NetworkManager) Allocate() (*NetworkInterface, error) {
return iface, nil
}
func (manager *NetworkManager) Close() error {
if manager.disabled {
return nil
}
err1 := manager.tcpPortAllocator.Close()
err2 := manager.udpPortAllocator.Close()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return nil
}
func newNetworkManager(config *DaemonConfig) (*NetworkManager, error) {
if config.BridgeIface == DisableNetworkBridge {
manager := &NetworkManager{
@ -599,27 +499,15 @@ func newNetworkManager(config *DaemonConfig) (*NetworkManager, error) {
}
}
tcpPortAllocator, err := newPortAllocator()
if err != nil {
return nil, err
}
udpPortAllocator, err := newPortAllocator()
if err != nil {
return nil, err
}
portMapper, err := newPortMapper(config)
if err != nil {
return nil, err
}
manager := &NetworkManager{
bridgeIface: config.BridgeIface,
bridgeNetwork: network,
tcpPortAllocator: tcpPortAllocator,
udpPortAllocator: udpPortAllocator,
portMapper: portMapper,
bridgeIface: config.BridgeIface,
bridgeNetwork: network,
portMapper: portMapper,
}
return manager, nil

View file

@ -7,50 +7,6 @@ import (
"testing"
)
func TestPortAllocation(t *testing.T) {
ip := net.ParseIP("192.168.0.1")
ip2 := net.ParseIP("192.168.0.2")
allocator, err := newPortAllocator()
if err != nil {
t.Fatal(err)
}
if port, err := allocator.Acquire(ip, 80); err != nil {
t.Fatal(err)
} else if port != 80 {
t.Fatalf("Acquire(80) should return 80, not %d", port)
}
port, err := allocator.Acquire(ip, 0)
if err != nil {
t.Fatal(err)
}
if port <= 0 {
t.Fatalf("Acquire(0) should return a non-zero port")
}
if _, err := allocator.Acquire(ip, port); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if newPort, err := allocator.Acquire(ip, 0); err != nil {
t.Fatal(err)
} else if newPort == port {
t.Fatalf("Acquire(0) allocated the same port twice: %d", port)
}
if _, err := allocator.Acquire(ip, 80); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if _, err := allocator.Acquire(ip2, 80); err != nil {
t.Fatalf("It should be possible to allocate the same port on a different interface")
}
if _, err := allocator.Acquire(ip2, 80); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if err := allocator.Release(ip, 80); err != nil {
t.Fatal(err)
}
if _, err := allocator.Acquire(ip, 80); err != nil {
t.Fatal(err)
}
}
type StubProxy struct {
frontendAddr *net.Addr
backendAddr *net.Addr

View file

@ -7,20 +7,16 @@ import (
"sync"
)
type portMappings map[string]*collections.OrderedIntSet
type ipData struct {
allocatedPorts portMappings
availablePorts portMappings
}
type ipMapping map[net.IP]*ipData
const (
BeginPortRange = 49153
EndPortRange = 65535
)
type (
portMappings map[string]*collections.OrderedIntSet
ipMapping map[string]portMappings
)
var (
ErrPortAlreadyAllocated = errors.New("port has already been allocated")
ErrPortExceedsRange = errors.New("port exceeds upper range")
@ -28,56 +24,19 @@ var (
)
var (
defaultIPData *ipData
lock = sync.Mutex{}
ips = ipMapping{}
defaultIP = net.ParseIP("0.0.0.0")
currentDynamicPort = map[string]int{
"tcp": BeginPortRange - 1,
"udp": BeginPortRange - 1,
}
defaultIP = net.ParseIP("0.0.0.0")
defaultAllocatedPorts = portMappings{}
otherAllocatedPorts = ipMapping{}
lock = sync.Mutex{}
)
func init() {
defaultIPData = newIpData()
ips[defaultIP] = defaultIP
}
func newIpData() {
data := &ipData{
allocatedPorts: portMappings{},
availablePorts: portMappings{},
}
data.allocatedPorts["udp"] = collections.NewOrderedIntSet()
data.availablePorts["udp"] = collections.NewOrderedIntSet()
data.allocatedPorts["tcp"] = collections.NewOrderedIntSet()
data.availablePorts["tcp"] = collections.NewOrderedIntSet()
return data
}
func getData(ip net.IP) *ipData {
data, exists := ips[ip]
if !exists {
data = newIpData()
ips[ip] = data
}
return data
}
func validateMapping(data *ipData, proto string, port int) error {
allocated := data.allocatedPorts[proto]
if allocated.Exists(proto) {
return ErrPortAlreadyAllocated
}
return nil
}
func usePort(data *ipData, proto string, port int) {
allocated, available := data.allocatedPorts[proto], data.availablePorts[proto]
for i := 0; i < 2; i++ {
allocated.Push(port)
available.Remove(port)
allocated, available = defaultIPData.allocatedPorts[proto], defaultIPData.availablePorts[proto]
}
defaultAllocatedPorts["tcp"] = collections.NewOrderedIntSet()
defaultAllocatedPorts["udp"] = collections.NewOrderedIntSet()
}
// RequestPort returns an available port if the port is 0
@ -91,43 +50,14 @@ func RequestPort(ip net.IP, proto string, port int) (int, error) {
return 0, err
}
data := getData(ip)
allocated, available := data.allocatedPorts[proto], data.availablePorts[proto]
// If the user requested a specific port to be allocated
if port != 0 {
if err := validateMapping(defaultIP, proto, port); err != nil {
if err := registerSetPort(ip, proto, port); err != nil {
return 0, err
}
if !defaultIP.Equal(ip) {
if err := validateMapping(data, proto, port); err != nil {
return 0, err
}
}
available.Remove(port)
allocated.Push(port)
return port, nil
}
// Dynamic allocation
next := available.Pop()
if next == 0 {
next = allocated.PullBack()
if next == 0 {
next = BeginPortRange
} else {
next++
}
if next > EndPortRange {
return 0, ErrPortExceedsRange
}
}
allocated.Push(next)
return next, nil
return registerDynamicPort(ip, proto)
}
// ReleasePort will return the provided port back into the
@ -140,16 +70,95 @@ func ReleasePort(ip net.IP, proto string, port int) error {
return err
}
allocated, available := getCollection(ip, proto)
allocated := defaultAllocatedPorts[proto]
allocated.Remove(port)
available.Push(port)
if !equalsDefault(ip) {
registerIP(ip)
// Remove the port for the specific ip address
allocated = otherAllocatedPorts[ip.String()][proto]
allocated.Remove(port)
}
return nil
}
func ReleaseAll() error {
lock.Lock()
defer lock.Unlock()
currentDynamicPort["tcp"] = BeginPortRange - 1
currentDynamicPort["udp"] = BeginPortRange - 1
defaultAllocatedPorts = portMappings{}
defaultAllocatedPorts["tcp"] = collections.NewOrderedIntSet()
defaultAllocatedPorts["udp"] = collections.NewOrderedIntSet()
otherAllocatedPorts = ipMapping{}
return nil
}
func registerDynamicPort(ip net.IP, proto string) (int, error) {
allocated := defaultAllocatedPorts[proto]
port := nextPort(proto)
if port > EndPortRange {
return 0, ErrPortExceedsRange
}
if !equalsDefault(ip) {
registerIP(ip)
ipAllocated := otherAllocatedPorts[ip.String()][proto]
ipAllocated.Push(port)
} else {
allocated.Push(port)
}
return port, nil
}
func registerSetPort(ip net.IP, proto string, port int) error {
allocated := defaultAllocatedPorts[proto]
if allocated.Exists(port) {
return ErrPortAlreadyAllocated
}
if !equalsDefault(ip) {
registerIP(ip)
ipAllocated := otherAllocatedPorts[ip.String()][proto]
if ipAllocated.Exists(port) {
return ErrPortAlreadyAllocated
}
ipAllocated.Push(port)
} else {
allocated.Push(port)
}
return nil
}
func equalsDefault(ip net.IP) bool {
return ip == nil || ip.Equal(defaultIP)
}
func nextPort(proto string) int {
c := currentDynamicPort[proto] + 1
currentDynamicPort[proto] = c
return c
}
func registerIP(ip net.IP) {
if _, exists := otherAllocatedPorts[ip.String()]; !exists {
otherAllocatedPorts[ip.String()] = portMappings{
"tcp": collections.NewOrderedIntSet(),
"udp": collections.NewOrderedIntSet(),
}
}
}
func validateProtocol(proto string) error {
if _, exists := allocatedPorts[proto]; !exists {
if _, exists := defaultAllocatedPorts[proto]; !exists {
return ErrUnknownProtocol
}
return nil

View file

@ -1,27 +1,18 @@
package portallocator
import (
"github.com/dotcloud/docker/pkg/collections"
"net"
"testing"
)
func reset() {
lock.Lock()
defer lock.Unlock()
allocatedPorts = portMappings{}
availablePorts = portMappings{}
allocatedPorts["udp"] = collections.NewOrderedIntSet()
availablePorts["udp"] = collections.NewOrderedIntSet()
allocatedPorts["tcp"] = collections.NewOrderedIntSet()
availablePorts["tcp"] = collections.NewOrderedIntSet()
ReleaseAll()
}
func TestRequestNewPort(t *testing.T) {
defer reset()
port, err := RequestPort("tcp", 0)
port, err := RequestPort(defaultIP, "tcp", 0)
if err != nil {
t.Fatal(err)
}
@ -34,7 +25,7 @@ func TestRequestNewPort(t *testing.T) {
func TestRequestSpecificPort(t *testing.T) {
defer reset()
port, err := RequestPort("tcp", 5000)
port, err := RequestPort(defaultIP, "tcp", 5000)
if err != nil {
t.Fatal(err)
}
@ -46,7 +37,7 @@ func TestRequestSpecificPort(t *testing.T) {
func TestReleasePort(t *testing.T) {
defer reset()
port, err := RequestPort("tcp", 5000)
port, err := RequestPort(defaultIP, "tcp", 5000)
if err != nil {
t.Fatal(err)
}
@ -54,7 +45,7 @@ func TestReleasePort(t *testing.T) {
t.Fatalf("Expected port 5000 got %d", port)
}
if err := ReleasePort("tcp", 5000); err != nil {
if err := ReleasePort(defaultIP, "tcp", 5000); err != nil {
t.Fatal(err)
}
}
@ -62,7 +53,7 @@ func TestReleasePort(t *testing.T) {
func TestReuseReleasedPort(t *testing.T) {
defer reset()
port, err := RequestPort("tcp", 5000)
port, err := RequestPort(defaultIP, "tcp", 5000)
if err != nil {
t.Fatal(err)
}
@ -70,11 +61,11 @@ func TestReuseReleasedPort(t *testing.T) {
t.Fatalf("Expected port 5000 got %d", port)
}
if err := ReleasePort("tcp", 5000); err != nil {
if err := ReleasePort(defaultIP, "tcp", 5000); err != nil {
t.Fatal(err)
}
port, err = RequestPort("tcp", 5000)
port, err = RequestPort(defaultIP, "tcp", 5000)
if err != nil {
t.Fatal(err)
}
@ -83,7 +74,7 @@ func TestReuseReleasedPort(t *testing.T) {
func TestReleaseUnreadledPort(t *testing.T) {
defer reset()
port, err := RequestPort("tcp", 5000)
port, err := RequestPort(defaultIP, "tcp", 5000)
if err != nil {
t.Fatal(err)
}
@ -91,7 +82,7 @@ func TestReleaseUnreadledPort(t *testing.T) {
t.Fatalf("Expected port 5000 got %d", port)
}
port, err = RequestPort("tcp", 5000)
port, err = RequestPort(defaultIP, "tcp", 5000)
if err != ErrPortAlreadyAllocated {
t.Fatalf("Expected error %s got %s", ErrPortAlreadyAllocated, err)
}
@ -100,7 +91,7 @@ func TestReleaseUnreadledPort(t *testing.T) {
func TestUnknowProtocol(t *testing.T) {
defer reset()
if _, err := RequestPort("tcpp", 0); err != ErrUnknownProtocol {
if _, err := RequestPort(defaultIP, "tcpp", 0); err != ErrUnknownProtocol {
t.Fatalf("Expected error %s got %s", ErrUnknownProtocol, err)
}
}
@ -109,7 +100,7 @@ func TestAllocateAllPorts(t *testing.T) {
defer reset()
for i := 0; i <= EndPortRange-BeginPortRange; i++ {
port, err := RequestPort("tcp", 0)
port, err := RequestPort(defaultIP, "tcp", 0)
if err != nil {
t.Fatal(err)
}
@ -119,11 +110,11 @@ func TestAllocateAllPorts(t *testing.T) {
}
}
if _, err := RequestPort("tcp", 0); err != ErrPortExceedsRange {
if _, err := RequestPort(defaultIP, "tcp", 0); err != ErrPortExceedsRange {
t.Fatalf("Expected error %s got %s", ErrPortExceedsRange, err)
}
_, err := RequestPort("udp", 0)
_, err := RequestPort(defaultIP, "udp", 0)
if err != nil {
t.Fatal(err)
}
@ -132,10 +123,9 @@ func TestAllocateAllPorts(t *testing.T) {
func BenchmarkAllocatePorts(b *testing.B) {
defer reset()
b.StartTimer()
for i := 0; i < b.N; i++ {
for i := 0; i <= EndPortRange-BeginPortRange; i++ {
port, err := RequestPort("tcp", 0)
port, err := RequestPort(defaultIP, "tcp", 0)
if err != nil {
b.Fatal(err)
}
@ -146,5 +136,49 @@ func BenchmarkAllocatePorts(b *testing.B) {
}
reset()
}
b.StopTimer()
}
func TestPortAllocation(t *testing.T) {
defer reset()
ip := net.ParseIP("192.168.0.1")
ip2 := net.ParseIP("192.168.0.2")
if port, err := RequestPort(ip, "tcp", 80); err != nil {
t.Fatal(err)
} else if port != 80 {
t.Fatalf("Acquire(80) should return 80, not %d", port)
}
port, err := RequestPort(ip, "tcp", 0)
if err != nil {
t.Fatal(err)
}
if port <= 0 {
t.Fatalf("Acquire(0) should return a non-zero port")
}
if _, err := RequestPort(ip, "tcp", port); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if newPort, err := RequestPort(ip, "tcp", 0); err != nil {
t.Fatal(err)
} else if newPort == port {
t.Fatalf("Acquire(0) allocated the same port twice: %d", port)
}
if _, err := RequestPort(ip, "tcp", 80); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if _, err := RequestPort(ip2, "tcp", 80); err != nil {
t.Fatalf("It should be possible to allocate the same port on a different interface")
}
if _, err := RequestPort(ip2, "tcp", 80); err == nil {
t.Fatalf("Acquiring a port already in use should return an error")
}
if err := ReleasePort(ip, "tcp", 80); err != nil {
t.Fatal(err)
}
if _, err := RequestPort(ip, "tcp", 80); err != nil {
t.Fatal(err)
}
}

View file

@ -11,6 +11,7 @@ import (
"github.com/dotcloud/docker/graphdriver/aufs"
_ "github.com/dotcloud/docker/graphdriver/devmapper"
_ "github.com/dotcloud/docker/graphdriver/vfs"
"github.com/dotcloud/docker/networkdriver/portallocator"
"github.com/dotcloud/docker/pkg/graphdb"
"github.com/dotcloud/docker/pkg/sysinfo"
"github.com/dotcloud/docker/utils"
@ -740,8 +741,8 @@ func NewRuntimeFromDirectory(config *DaemonConfig) (*Runtime, error) {
func (runtime *Runtime) Close() error {
errorsStrings := []string{}
if err := runtime.networkManager.Close(); err != nil {
utils.Errorf("runtime.networkManager.Close(): %s", err.Error())
if err := portallocator.ReleaseAll(); err != nil {
utils.Errorf("portallocator.ReleaseAll(): %s", err)
errorsStrings = append(errorsStrings, err.Error())
}
if err := runtime.driver.Cleanup(); err != nil {