diff --git a/libnetwork/cmd/dnet/dnet.go b/libnetwork/cmd/dnet/dnet.go index 1c556895bf..ef1a6d18c2 100644 --- a/libnetwork/cmd/dnet/dnet.go +++ b/libnetwork/cmd/dnet/dnet.go @@ -12,6 +12,7 @@ import ( flag "github.com/docker/docker/pkg/mflag" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/reexec" "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/term" @@ -31,6 +32,10 @@ var ( ) func main() { + if reexec.Init() { + return + } + _, stdout, stderr := term.StdStreams() logrus.SetOutput(stderr) diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 34ec9630d5..d11ffda526 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -102,7 +102,6 @@ type controller struct { sandboxes sandboxTable cfg *config.Config store datastore.DataStore - stopChan chan struct{} sync.Mutex } @@ -132,7 +131,6 @@ func New(configFile string) (NetworkController, error) { // But without that, datastore cannot be initialized. log.Debugf("Unable to Parse LibNetwork Config file : %v", err) } - c.stopChan = make(chan struct{}) return c, nil } @@ -226,7 +224,10 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti return nil, err } - if err := c.addNetworkToStore(network); err != nil { + if err := c.updateNetworkToStore(network); err != nil { + if e := network.Delete(); e != nil { + log.Warnf("couldnt cleanup network %s: %v", network.name, err) + } return nil, err } diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 125be0b953..ece0fb4115 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -2,10 +2,11 @@ package datastore import ( "encoding/json" - "errors" + "reflect" "strings" "github.com/docker/libnetwork/config" + "github.com/docker/libnetwork/types" "github.com/docker/swarm/pkg/store" ) @@ -52,17 +53,25 @@ const ( EndpointKeyPrefix = "endpoint" ) +var rootChain = []string{"docker", "libnetwork"} + //Key provides convenient method to create a Key func Key(key ...string) string { - keychain := []string{"docker", "libnetwork"} - keychain = append(keychain, key...) + keychain := append(rootChain, key...) str := strings.Join(keychain, "/") return str + "/" } -var errNewDatastore = errors.New("Error creating new Datastore") -var errInvalidConfiguration = errors.New("Invalid Configuration passed to Datastore") -var errInvalidAtomicRequest = errors.New("Invalid Atomic Request") +//ParseKey provides convenient method to unpack the key to complement the Key function +func ParseKey(key string) ([]string, error) { + chain := strings.Split(strings.Trim(key, "/"), "/") + + // The key must atleast be equal to the rootChain in order to be considered as valid + if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) { + return nil, types.BadRequestErrorf("invalid Key : %s", key) + } + return chain[len(rootChain):], nil +} // newClient used to connect to KV Store func newClient(kv string, addrs string) (DataStore, error) { @@ -77,7 +86,7 @@ func newClient(kv string, addrs string) (DataStore, error) { // NewDataStore creates a new instance of LibKV data store func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) { if cfg == nil { - return nil, errInvalidConfiguration + return nil, types.BadRequestErrorf("invalid configuration passed to datastore") } // TODO : cfg.Embedded case return newClient(cfg.Client.Provider, cfg.Client.Address) @@ -95,12 +104,12 @@ func (ds *datastore) KVStore() store.Store { // PutObjectAtomic adds a new Record based on an object into the datastore func (ds *datastore) PutObjectAtomic(kvObject KV) error { if kvObject == nil { - return errors.New("kvObject is nil") + return types.BadRequestErrorf("invalid KV Object : nil") } kvObjValue := kvObject.Value() if kvObjValue == nil { - return errInvalidAtomicRequest + return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...)) } previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} @@ -116,7 +125,7 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error { // PutObject adds a new Record based on an object into the datastore func (ds *datastore) PutObject(kvObject KV) error { if kvObject == nil { - return errors.New("kvObject is nil") + return types.BadRequestErrorf("invalid KV Object : nil") } return ds.putObjectWithKey(kvObject, kvObject.Key()...) } @@ -125,7 +134,7 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { kvObjValue := kvObject.Value() if kvObjValue == nil { - return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...)) + return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...)) } return ds.store.Put(Key(key...), kvObjValue, nil) } @@ -147,16 +156,12 @@ func (ds *datastore) DeleteObject(kvObject KV) error { // DeleteObjectAtomic performs atomic delete on a record func (ds *datastore) DeleteObjectAtomic(kvObject KV) error { if kvObject == nil { - return errors.New("kvObject is nil") + return types.BadRequestErrorf("invalid KV Object : nil") } previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous) - if err != nil { - return err - } - - return nil + return err } // DeleteTree unconditionally deletes a record from the store diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go index 08731be9a6..32dc2f1ad5 100644 --- a/libnetwork/datastore/datastore_test.go +++ b/libnetwork/datastore/datastore_test.go @@ -2,6 +2,7 @@ package datastore import ( "encoding/json" + "reflect" "testing" "github.com/docker/libnetwork/config" @@ -16,6 +17,25 @@ func NewTestDataStore() DataStore { return &datastore{store: NewMockStore()} } +func TestKey(t *testing.T) { + eKey := []string{"hello", "world"} + sKey := Key(eKey...) + if sKey != "docker/libnetwork/hello/world/" { + t.Fatalf("unexpected key : %s", sKey) + } +} + +func TestParseKey(t *testing.T) { + keySlice, err := ParseKey("/docker/libnetwork/hello/world/") + if err != nil { + t.Fatal(err) + } + eKey := []string{"hello", "world"} + if len(keySlice) < 2 || !reflect.DeepEqual(eKey, keySlice) { + t.Fatalf("unexpected unkey : %s", keySlice) + } +} + func TestInvalidDataStore(t *testing.T) { config := &config.DatastoreCfg{} config.Embedded = false @@ -94,6 +114,11 @@ type dummyObject struct { func (n *dummyObject) Key() []string { return []string{dummyKey, n.ID} } + +func (n *dummyObject) KeyPrefix() []string { + return []string{dummyKey} +} + func (n *dummyObject) Value() []byte { if !n.ReturnValue { return nil diff --git a/libnetwork/datastore/mock_store.go b/libnetwork/datastore/mock_store.go index 84084c915c..14192d3653 100644 --- a/libnetwork/datastore/mock_store.go +++ b/libnetwork/datastore/mock_store.go @@ -3,6 +3,7 @@ package datastore import ( "errors" + "github.com/docker/libnetwork/types" "github.com/docker/swarm/pkg/store" ) @@ -69,7 +70,8 @@ func (s *MockStore) List(prefix string) ([]*store.KVPair, error) { // DeleteTree deletes a range of values at "directory" func (s *MockStore) DeleteTree(prefix string) error { - return ErrNotImplmented + delete(s.db, prefix) + return nil } // Watch a single key for modifications @@ -92,7 +94,7 @@ func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locke func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { mData := s.db[key] if mData != nil && mData.Index != previous.LastIndex { - return false, nil, errInvalidAtomicRequest + return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index") } err := s.Put(key, newValue, nil) if err != nil { @@ -106,7 +108,7 @@ func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPai func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) { mData := s.db[key] if mData != nil && mData.Index != previous.LastIndex { - return false, errInvalidAtomicRequest + return false, types.BadRequestErrorf("atomic delete failed due to mismatched Index") } return true, s.Delete(key) } diff --git a/libnetwork/drivers/bridge/bridge.go b/libnetwork/drivers/bridge/bridge.go index d64733eba6..ce58ddb2ef 100644 --- a/libnetwork/drivers/bridge/bridge.go +++ b/libnetwork/drivers/bridge/bridge.go @@ -110,7 +110,6 @@ func Init(dc driverapi.DriverCallback) error { if out, err := exec.Command("modprobe", "-va", "bridge", "nf_nat", "br_netfilter").Output(); err != nil { logrus.Warnf("Running modprobe bridge nf_nat failed with message: %s, error: %v", out, err) } - return dc.RegisterDriver(networkType, newDriver()) } diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index d4bb7a66a2..77c9adcbe2 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -3,6 +3,7 @@ package libnetwork import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "os" "path" @@ -98,6 +99,7 @@ type containerInfo struct { id string config containerConfig data ContainerData + sync.Mutex } type endpoint struct { @@ -114,11 +116,33 @@ type endpoint struct { sync.Mutex } +func (ci *containerInfo) MarshalJSON() ([]byte, error) { + ci.Lock() + defer ci.Unlock() + + // We are just interested in the container ID. This can be expanded to include all of containerInfo if there is a need + return json.Marshal(ci.id) +} + +func (ci *containerInfo) UnmarshalJSON(b []byte) (err error) { + ci.Lock() + defer ci.Unlock() + + var id string + if err := json.Unmarshal(b, &id); err != nil { + return err + } + ci.id = id + return nil +} + func (ep *endpoint) MarshalJSON() ([]byte, error) { + ep.Lock() + defer ep.Unlock() + epMap := make(map[string]interface{}) epMap["name"] = ep.name epMap["id"] = string(ep.id) - epMap["network"] = ep.network epMap["ep_iface"] = ep.iFaces epMap["exposed_ports"] = ep.exposedPorts epMap["generic"] = ep.generic @@ -129,6 +153,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) { } func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { + ep.Lock() + defer ep.Unlock() + var epMap map[string]interface{} if err := json.Unmarshal(b, &epMap); err != nil { return err @@ -136,11 +163,6 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { ep.name = epMap["name"].(string) ep.id = types.UUID(epMap["id"].(string)) - nb, _ := json.Marshal(epMap["network"]) - var n network - json.Unmarshal(nb, &n) - ep.network = &n - ib, _ := json.Marshal(epMap["ep_iface"]) var ifaces []endpointInterface json.Unmarshal(ib, &ifaces) @@ -191,12 +213,30 @@ func (ep *endpoint) Network() string { return ep.network.name } +// endpoint Key structure : endpoint/network-id/endpoint-id func (ep *endpoint) Key() []string { - return []string{datastore.EndpointKeyPrefix, string(ep.network.id), string(ep.id)} + ep.Lock() + n := ep.network + defer ep.Unlock() + return []string{datastore.EndpointKeyPrefix, string(n.id), string(ep.id)} } func (ep *endpoint) KeyPrefix() []string { - return []string{datastore.EndpointKeyPrefix, string(ep.network.id)} + ep.Lock() + n := ep.network + defer ep.Unlock() + return []string{datastore.EndpointKeyPrefix, string(n.id)} +} + +func (ep *endpoint) networkIDFromKey(key []string) (types.UUID, error) { + // endpoint Key structure : endpoint/network-id/endpoint-id + // its an invalid key if the key doesnt have all the 3 key elements above + if key == nil || len(key) < 3 || key[0] != datastore.EndpointKeyPrefix { + return types.UUID(""), fmt.Errorf("invalid endpoint key : %v", key) + } + + // network-id is placed at index=1. pls refer to endpoint.Key() method + return types.UUID(key[1]), nil } func (ep *endpoint) Value() []byte { @@ -208,10 +248,14 @@ func (ep *endpoint) Value() []byte { } func (ep *endpoint) Index() uint64 { + ep.Lock() + defer ep.Unlock() return ep.dbIndex } func (ep *endpoint) SetIndex(index uint64) { + ep.Lock() + defer ep.Unlock() ep.dbIndex = index } @@ -292,7 +336,14 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error { } ep.joinLeaveStart() - defer ep.joinLeaveEnd() + defer func() { + ep.joinLeaveEnd() + if err != nil { + if e := ep.Leave(containerID, options...); e != nil { + log.Warnf("couldnt leave endpoint : %v", ep.name, err) + } + } + }() ep.Lock() if ep.container != nil { @@ -321,8 +372,6 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error { ep.Lock() ep.container = nil ep.Unlock() - } else { - ep.network.ctrlr.addEndpointToStore(ep) } }() @@ -369,8 +418,11 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error { } }() - container.data.SandboxKey = sb.Key() + if err := network.ctrlr.updateEndpointToStore(ep); err != nil { + return err + } + container.data.SandboxKey = sb.Key() return nil } @@ -399,7 +451,7 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error { container := ep.container n := ep.network - if container == nil || container.id == "" || + if container == nil || container.id == "" || container.data.SandboxKey == "" || containerID == "" || container.id != containerID { if container == nil { err = ErrNoContainer{} @@ -418,6 +470,13 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error { ctrlr := n.ctrlr n.Unlock() + if err := ctrlr.updateEndpointToStore(ep); err != nil { + ep.Lock() + ep.container = container + ep.Unlock() + return err + } + err = driver.Leave(n.id, ep.id) ctrlr.sandboxRm(container.data.SandboxKey, ep) @@ -426,25 +485,54 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error { } func (ep *endpoint) Delete() error { + var err error ep.Lock() + epid := ep.id + name := ep.name + n := ep.network if ep.container != nil { ep.Unlock() - return &ActiveContainerError{name: ep.name, id: string(ep.id)} + return &ActiveContainerError{name: name, id: string(epid)} } + n.Lock() + ctrlr := n.ctrlr + n.Unlock() ep.Unlock() - if err := ep.deleteEndpoint(); err != nil { + if err = ctrlr.deleteEndpointFromStore(ep); err != nil { + return err + } + defer func() { + if err != nil { + ep.SetIndex(0) + if e := ctrlr.updateEndpointToStore(ep); e != nil { + log.Warnf("failed to recreate endpoint in store %s : %v", name, err) + } + } + }() + + // Update the endpoint count in network and update it in the datastore + n.DecEndpointCnt() + if err = ctrlr.updateNetworkToStore(n); err != nil { + return err + } + defer func() { + if err != nil { + n.IncEndpointCnt() + if e := ctrlr.updateNetworkToStore(n); e != nil { + log.Warnf("failed to update network %s : %v", n.name, e) + } + } + }() + + if err = ep.deleteEndpoint(); err != nil { return err } - if err := ep.network.ctrlr.deleteEndpointFromStore(ep); err != nil { - return err - } return nil } func (ep *endpoint) deleteEndpoint() error { - var err error ep.Lock() n := ep.network name := ep.name diff --git a/libnetwork/endpoint_info.go b/libnetwork/endpoint_info.go index bd989f291b..38d5bdc365 100644 --- a/libnetwork/endpoint_info.go +++ b/libnetwork/endpoint_info.go @@ -1,6 +1,7 @@ package libnetwork import ( + "encoding/json" "net" "github.com/docker/libnetwork/driverapi" @@ -49,6 +50,59 @@ type endpointInterface struct { routes []*net.IPNet } +func (epi *endpointInterface) MarshalJSON() ([]byte, error) { + epMap := make(map[string]interface{}) + epMap["id"] = epi.id + epMap["mac"] = epi.mac.String() + epMap["addr"] = epi.addr.String() + epMap["addrv6"] = epi.addrv6.String() + epMap["srcName"] = epi.srcName + epMap["dstPrefix"] = epi.dstPrefix + var routes []string + for _, route := range epi.routes { + routes = append(routes, route.String()) + } + epMap["routes"] = routes + return json.Marshal(epMap) +} + +func (epi *endpointInterface) UnmarshalJSON(b []byte) (err error) { + var epMap map[string]interface{} + if err := json.Unmarshal(b, &epMap); err != nil { + return err + } + epi.id = int(epMap["id"].(float64)) + + mac, _ := net.ParseMAC(epMap["mac"].(string)) + epi.mac = mac + + _, ipnet, _ := net.ParseCIDR(epMap["addr"].(string)) + if ipnet != nil { + epi.addr = *ipnet + } + + _, ipnet, _ = net.ParseCIDR(epMap["addrv6"].(string)) + if ipnet != nil { + epi.addrv6 = *ipnet + } + + epi.srcName = epMap["srcName"].(string) + epi.dstPrefix = epMap["dstPrefix"].(string) + + rb, _ := json.Marshal(epMap["routes"]) + var routes []string + json.Unmarshal(rb, &routes) + epi.routes = make([]*net.IPNet, 0) + for _, route := range routes { + _, ipr, err := net.ParseCIDR(route) + if err == nil { + epi.routes = append(epi.routes, ipr) + } + } + + return nil +} + type endpointJoinInfo struct { gw net.IP gw6 net.IP @@ -116,25 +170,25 @@ func (ep *endpoint) AddInterface(id int, mac net.HardwareAddr, ipv4 net.IPNet, i return nil } -func (i *endpointInterface) ID() int { - return i.id +func (epi *endpointInterface) ID() int { + return epi.id } -func (i *endpointInterface) MacAddress() net.HardwareAddr { - return types.GetMacCopy(i.mac) +func (epi *endpointInterface) MacAddress() net.HardwareAddr { + return types.GetMacCopy(epi.mac) } -func (i *endpointInterface) Address() net.IPNet { - return (*types.GetIPNetCopy(&i.addr)) +func (epi *endpointInterface) Address() net.IPNet { + return (*types.GetIPNetCopy(&epi.addr)) } -func (i *endpointInterface) AddressIPv6() net.IPNet { - return (*types.GetIPNetCopy(&i.addrv6)) +func (epi *endpointInterface) AddressIPv6() net.IPNet { + return (*types.GetIPNetCopy(&epi.addrv6)) } -func (i *endpointInterface) SetNames(srcName string, dstPrefix string) error { - i.srcName = srcName - i.dstPrefix = dstPrefix +func (epi *endpointInterface) SetNames(srcName string, dstPrefix string) error { + epi.srcName = srcName + epi.dstPrefix = dstPrefix return nil } diff --git a/libnetwork/hostdiscovery/hostdiscovery.go b/libnetwork/hostdiscovery/hostdiscovery.go index 509592ba0d..aa39baa834 100644 --- a/libnetwork/hostdiscovery/hostdiscovery.go +++ b/libnetwork/hostdiscovery/hostdiscovery.go @@ -25,6 +25,7 @@ import ( ) const defaultHeartbeat = time.Duration(10) * time.Second +const TTLFactor = 3 type hostDiscovery struct { discovery discovery.Discovery @@ -47,7 +48,7 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join if hb == 0 { hb = defaultHeartbeat } - d, err := discovery.New(cfg.Discovery, hb, 3*hb) + d, err := discovery.New(cfg.Discovery, hb, TTLFactor*hb) if err != nil { return err } diff --git a/libnetwork/network.go b/libnetwork/network.go index 2dec288f40..88398f8d82 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -12,6 +12,7 @@ import ( "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" "github.com/docker/libnetwork/types" + "github.com/docker/swarm/pkg/store" ) // A Network represents a logical connectivity zone that containers may @@ -58,6 +59,7 @@ type network struct { id types.UUID driver driverapi.Driver enableIPv6 bool + endpointCnt uint64 endpoints endpointTable generic options.Generic dbIndex uint64 @@ -90,6 +92,8 @@ func (n *network) Type() string { } func (n *network) Key() []string { + n.Lock() + defer n.Unlock() return []string{datastore.NetworkKeyPrefix, string(n.id)} } @@ -98,6 +102,8 @@ func (n *network) KeyPrefix() []string { } func (n *network) Value() []byte { + n.Lock() + defer n.Unlock() b, err := json.Marshal(n) if err != nil { return nil @@ -106,11 +112,33 @@ func (n *network) Value() []byte { } func (n *network) Index() uint64 { + n.Lock() + defer n.Unlock() return n.dbIndex } func (n *network) SetIndex(index uint64) { + n.Lock() n.dbIndex = index + n.Unlock() +} + +func (n *network) EndpointCnt() uint64 { + n.Lock() + defer n.Unlock() + return n.endpointCnt +} + +func (n *network) IncEndpointCnt() { + n.Lock() + n.endpointCnt++ + n.Unlock() +} + +func (n *network) DecEndpointCnt() { + n.Lock() + n.endpointCnt-- + n.Unlock() } // TODO : Can be made much more generic with the help of reflection (but has some golang limitations) @@ -119,6 +147,7 @@ func (n *network) MarshalJSON() ([]byte, error) { netMap["name"] = n.name netMap["id"] = string(n.id) netMap["networkType"] = n.networkType + netMap["endpointCnt"] = n.endpointCnt netMap["enableIPv6"] = n.enableIPv6 netMap["generic"] = n.generic return json.Marshal(netMap) @@ -133,6 +162,7 @@ func (n *network) UnmarshalJSON(b []byte) (err error) { n.name = netMap["name"].(string) n.id = types.UUID(netMap["id"].(string)) n.networkType = netMap["networkType"].(string) + n.endpointCnt = uint64(netMap["endpointCnt"].(float64)) n.enableIPv6 = netMap["enableIPv6"].(bool) if netMap["generic"] != nil { n.generic = netMap["generic"].(map[string]interface{}) @@ -165,39 +195,51 @@ func (n *network) processOptions(options ...NetworkOption) { } func (n *network) Delete() error { - n.ctrlr.Lock() - _, ok := n.ctrlr.networks[n.id] + var err error + + n.Lock() + ctrlr := n.ctrlr + n.Unlock() + + ctrlr.Lock() + _, ok := ctrlr.networks[n.id] + ctrlr.Unlock() + if !ok { - n.ctrlr.Unlock() return &UnknownNetworkError{name: n.name, id: string(n.id)} } - n.Lock() - numEps := len(n.endpoints) - n.Unlock() + numEps := n.EndpointCnt() if numEps != 0 { - n.ctrlr.Unlock() return &ActiveEndpointsError{name: n.name, id: string(n.id)} } - n.ctrlr.Unlock() - if err = n.deleteNetwork(); err != nil { + // deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help + // prevent any possible race between endpoint join and network delete + if err = ctrlr.deleteNetworkFromStore(n); err != nil { + if err == store.ErrKeyModified { + return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") + } return err } - if err = n.ctrlr.deleteNetworkFromStore(n); err != nil { - log.Warnf("Delete network (%s - %v) failed from datastore : %v", n.name, n.id, err) + if err = n.deleteNetwork(); err != nil { + return err } return nil } func (n *network) deleteNetwork() error { - var err error + n.Lock() + id := n.id + d := n.driver n.ctrlr.Lock() - delete(n.ctrlr.networks, n.id) + delete(n.ctrlr.networks, id) n.ctrlr.Unlock() - if err := n.driver.DeleteNetwork(n.id); err != nil { + n.Unlock() + + if err := d.DeleteNetwork(n.id); err != nil { // Forbidden Errors should be honored if _, ok := err.(types.ForbiddenError); ok { n.ctrlr.Lock() @@ -233,11 +275,12 @@ func (n *network) addEndpoint(ep *endpoint) error { } func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) { + var err error if name == "" { return nil, ErrInvalidName(name) } - if _, err := n.EndpointByName(name); err == nil { + if _, err = n.EndpointByName(name); err == nil { return nil, types.ForbiddenErrorf("service endpoint with name %s already exists", name) } @@ -246,11 +289,34 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep.network = n ep.processOptions(options...) - if err := n.addEndpoint(ep); err != nil { + n.Lock() + ctrlr := n.ctrlr + n.Unlock() + + n.IncEndpointCnt() + if err = ctrlr.updateNetworkToStore(n); err != nil { return nil, err } + defer func() { + if err != nil { + n.DecEndpointCnt() + if err = ctrlr.updateNetworkToStore(n); err != nil { + log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err) + } + } + }() + if err = n.addEndpoint(ep); err != nil { + return nil, err + } + defer func() { + if err != nil { + if e := ep.Delete(); ep != nil { + log.Warnf("cleaning up endpoint failed %s : %v", name, e) + } + } + }() - if err := n.ctrlr.addEndpointToStore(ep); err != nil { + if err = ctrlr.updateEndpointToStore(ep); err != nil { return nil, err } diff --git a/libnetwork/store.go b/libnetwork/store.go index 4fe98d233c..a7c47c833c 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -10,11 +10,14 @@ import ( ) func (c *controller) initDataStore() error { - if c.cfg == nil { + c.Lock() + cfg := c.cfg + c.Unlock() + if cfg == nil { return fmt.Errorf("datastore initialization requires a valid configuration") } - store, err := datastore.NewDataStore(&c.cfg.Datastore) + store, err := datastore.NewDataStore(&cfg.Datastore) if err != nil { return err } @@ -25,15 +28,15 @@ func (c *controller) initDataStore() error { } func (c *controller) newNetworkFromStore(n *network) error { - c.Lock() + n.Lock() n.ctrlr = c - c.Unlock() n.endpoints = endpointTable{} + n.Unlock() return c.addNetwork(n) } -func (c *controller) addNetworkToStore(n *network) error { +func (c *controller) updateNetworkToStore(n *network) error { if isReservedNetwork(n.Name()) { return nil } @@ -45,11 +48,7 @@ func (c *controller) addNetworkToStore(n *network) error { return nil } - // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, - // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. - // return cs.PutObjectAtomic(n) - - return cs.PutObject(n) + return cs.PutObjectAtomic(n) } func (c *controller) deleteNetworkFromStore(n *network) error { @@ -64,11 +63,7 @@ func (c *controller) deleteNetworkFromStore(n *network) error { return nil } - if err := cs.DeleteObject(n); err != nil { - return err - } - - if err := cs.DeleteTree(&endpoint{network: n}); err != nil { + if err := cs.DeleteObjectAtomic(n); err != nil { return err } @@ -83,36 +78,42 @@ func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) { return &n, nil } -func (c *controller) newEndpointFromStore(ep *endpoint) { - c.Lock() - n, ok := c.networks[ep.network.id] - c.Unlock() - - if !ok { +func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { + ep.Lock() + n := ep.network + id := ep.id + ep.Unlock() + if n == nil { // Possibly the watch event for the network has not shown up yet // Try to get network from the store - var err error - n, err = c.getNetworkFromStore(ep.network.id) + nid, err := networkIDFromEndpointKey(key, ep) if err != nil { - log.Warnf("Network (%s) unavailable for endpoint=%s", ep.network.id, ep.name) - return + return err + } + n, err = c.getNetworkFromStore(nid) + if err != nil { + return err } if err := c.newNetworkFromStore(n); err != nil { - log.Warnf("Failed to add Network (%s - %s) from store", n.name, n.id) - return + return err + } + n = c.networks[nid] + } + + _, err := n.EndpointByID(string(id)) + if err != nil { + if _, ok := err.(ErrNoSuchEndpoint); ok { + return n.addEndpoint(ep) } } - - ep.network = n - _, err := n.EndpointByID(string(ep.id)) - if _, ok := err.(ErrNoSuchEndpoint); ok { - n.addEndpoint(ep) - } + return err } -func (c *controller) addEndpointToStore(ep *endpoint) error { +func (c *controller) updateEndpointToStore(ep *endpoint) error { ep.Lock() + name := ep.name if isReservedNetwork(ep.network.name) { + ep.Unlock() return nil } ep.Unlock() @@ -120,15 +121,11 @@ func (c *controller) addEndpointToStore(ep *endpoint) error { cs := c.store c.Unlock() if cs == nil { - log.Debugf("datastore not initialized. endpoint %s is not added to the store", ep.name) + log.Debugf("datastore not initialized. endpoint %s is not added to the store", name) return nil } - // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, - // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. - // return cs.PutObjectAtomic(ep) - - return cs.PutObject(ep) + return cs.PutObjectAtomic(ep) } func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) { @@ -151,7 +148,7 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error { return nil } - if err := cs.DeleteObject(ep); err != nil { + if err := cs.DeleteObjectAtomic(ep); err != nil { return err } @@ -163,11 +160,11 @@ func (c *controller) watchStore() error { cs := c.store c.Unlock() - nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan) + nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), nil) if err != nil { return err } - epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), c.stopChan) + epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil) if err != nil { return err } @@ -187,16 +184,18 @@ func (c *controller) watchStore() error { existing, ok := c.networks[n.id] c.Unlock() if ok { + existing.Lock() // Skip existing network update if existing.dbIndex != n.dbIndex { existing.dbIndex = n.dbIndex + existing.endpointCnt = n.endpointCnt } + existing.Unlock() continue } if err = c.newNetworkFromStore(&n); err != nil { log.Error(err) - continue } } case eps := <-epPairs: @@ -208,26 +207,78 @@ func (c *controller) watchStore() error { continue } ep.dbIndex = epe.LastIndex - c.Lock() - n, ok := c.networks[ep.network.id] - c.Unlock() - if ok { - existing, _ := n.EndpointByID(string(ep.id)) - if existing != nil { - ee := existing.(*endpoint) - // Skip existing endpoint update - if ee.dbIndex != ep.dbIndex { - ee.dbIndex = ep.dbIndex - ee.container = ep.container - } + n, err := c.networkFromEndpointKey(epe.Key, &ep) + if err != nil { + if _, ok := err.(ErrNoSuchNetwork); !ok { + log.Error(err) continue } } - - c.newEndpointFromStore(&ep) + if n != nil { + ep.network = n.(*network) + } + if c.processEndpointUpdate(&ep) { + err = c.newEndpointFromStore(epe.Key, &ep) + if err != nil { + log.Error(err) + } + } } } } }() return nil } + +func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) { + nid, err := networkIDFromEndpointKey(key, ep) + if err != nil { + return nil, err + } + return c.NetworkByID(string(nid)) +} + +func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) { + eKey, err := datastore.ParseKey(key) + if err != nil { + return types.UUID(""), err + } + return ep.networkIDFromKey(eKey) +} + +func (c *controller) processEndpointUpdate(ep *endpoint) bool { + nw := ep.network + if nw == nil { + return true + } + nw.Lock() + id := nw.id + nw.Unlock() + + c.Lock() + n, ok := c.networks[id] + c.Unlock() + if !ok { + return true + } + existing, _ := n.EndpointByID(string(ep.id)) + if existing == nil { + return true + } + + ee := existing.(*endpoint) + ee.Lock() + if ee.dbIndex != ep.dbIndex { + ee.dbIndex = ep.dbIndex + if ee.container != nil && ep.container != nil { + // we care only about the container id + ee.container.id = ep.container.id + } else { + // we still care only about the container id, but this is a short-cut to communicate join or leave operation + ee.container = ep.container + } + } + ee.Unlock() + + return false +}