Make sure to close the network allocators

This commit is contained in:
Guillaume J. Charmes 2013-10-08 15:42:02 -07:00
parent 2d425af1b1
commit 9107565d06
No known key found for this signature in database
GPG Key ID: B33E4642CB6E3FF3
3 changed files with 48 additions and 4 deletions

View File

@ -336,13 +336,20 @@ func newPortMapper() (*PortMapper, error) {
type PortAllocator struct {
sync.Mutex
inUse map[int]struct{}
fountain chan (int)
fountain chan int
quit chan bool
}
func (alloc *PortAllocator) runFountain() {
for {
for port := portRangeStart; port < portRangeEnd; port++ {
alloc.fountain <- port
select {
case alloc.fountain <- port:
case quit := <-alloc.quit:
if quit {
return
}
}
}
}
}
@ -376,10 +383,18 @@ func (alloc *PortAllocator) Acquire(port int) (int, error) {
return port, nil
}
func (alloc *PortAllocator) Close() error {
alloc.quit <- true
close(alloc.quit)
close(alloc.fountain)
return nil
}
func newPortAllocator() (*PortAllocator, error) {
allocator := &PortAllocator{
inUse: make(map[int]struct{}),
fountain: make(chan int),
quit: make(chan bool),
}
go allocator.runFountain()
return allocator, nil
@ -391,6 +406,7 @@ type IPAllocator struct {
queueAlloc chan allocatedIP
queueReleased chan net.IP
inUse map[int32]struct{}
quit chan bool
}
type allocatedIP struct {
@ -435,6 +451,10 @@ func (alloc *IPAllocator) run() {
}
select {
case quit := <-alloc.quit:
if quit {
return
}
case alloc.queueAlloc <- ip:
alloc.inUse[newNum] = struct{}{}
case released := <-alloc.queueReleased:
@ -467,12 +487,21 @@ func (alloc *IPAllocator) Release(ip net.IP) {
alloc.queueReleased <- ip
}
func (alloc *IPAllocator) Close() error {
alloc.quit <- true
close(alloc.quit)
close(alloc.queueAlloc)
close(alloc.queueReleased)
return nil
}
func newIPAllocator(network *net.IPNet) *IPAllocator {
alloc := &IPAllocator{
network: network,
queueAlloc: make(chan allocatedIP),
queueReleased: make(chan net.IP),
inUse: make(map[int32]struct{}),
quit: make(chan bool),
}
go alloc.run()
@ -662,6 +691,19 @@ func (manager *NetworkManager) Allocate() (*NetworkInterface, error) {
return iface, nil
}
func (manager *NetworkManager) Close() error {
err1 := manager.tcpPortAllocator.Close()
err2 := manager.udpPortAllocator.Close()
err3 := manager.ipAllocator.Close()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return err3
}
func newNetworkManager(bridgeIface string) (*NetworkManager, error) {
if bridgeIface == DisableNetworkBridge {
@ -708,5 +750,6 @@ func newNetworkManager(bridgeIface string) (*NetworkManager, error) {
udpPortAllocator: udpPortAllocator,
portMapper: portMapper,
}
return manager, nil
}

View File

@ -61,10 +61,9 @@ func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
var broker = func(to, from *net.TCPConn) {
written, err := io.Copy(to, from)
if err != nil {
err, ok := err.(*net.OpError)
// If the socket we are writing to is shutdown with
// SHUT_WR, forward it to the other end of the pipe:
if ok && err.Err == syscall.EPIPE {
if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
from.CloseWrite()
}
}
@ -99,6 +98,7 @@ done:
func (proxy *TCPProxy) Run() {
quit := make(chan bool)
defer close(quit)
utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr)
for {
client, err := proxy.listener.Accept()

View File

@ -42,6 +42,7 @@ func nuke(runtime *Runtime) error {
}(container)
}
wg.Wait()
runtime.networkManager.Close()
return os.RemoveAll(runtime.root)
}