Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

509 lines
13 KiB
Raw Normal View History

package libnetwork
import (
func init() {
reexec.Register("fwmarker", fwMarker)
func newService(name string, id string, ingressPorts []*PortConfig) *service {
return &service{
name: name,
id: id,
ingressPorts: ingressPorts,
loadBalancers: make(map[string]*loadBalancer),
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
var (
s *service
addService bool
n, err := c.NetworkByID(nid)
if err != nil {
return err
s, ok := c.serviceBindings[sid]
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(name, sid, ingressPorts)
c.serviceBindings[sid] = s
lb, ok := s.loadBalancers[nid]
if !ok {
// Create a new load balancer if we are seeing this
// network attachment on the service for the first
// time.
lb = &loadBalancer{
vip: vip,
fwMark: fwMarkCtr,
backEnds: make(map[string]net.IP),
service: s,
s.loadBalancers[nid] = lb
// Since we just created this load balancer make sure
// we add a new service service in IPVS rules.
addService = true
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
n.(*network).addSvcRecords(name, svcIP, nil, false)
lb.backEnds[eid] = ip
// Add endpoint IP to special "tasks.svc_name" so that the
// applications have access to DNS RR.
n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
// Add loadbalancer service and backend in all sandboxes in
// the network only if vip is valid.
if len(vip) != 0 {
n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService)
return nil
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
var rmService bool
n, err := c.NetworkByID(nid)
if err != nil {
return err
s, ok := c.serviceBindings[sid]
if !ok {
return nil
lb, ok := s.loadBalancers[nid]
if !ok {
return nil
// Delete the special "tasks.svc_name" backend record.
n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
delete(lb.backEnds, eid)
if len(lb.backEnds) == 0 {
// All the backends for this service have been
// removed. Time to remove the load balancer and also
// remove the service entry in IPVS.
rmService = true
// Make sure to remove the right IP since if vip is
// not valid we would have added a DNS RR record.
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
n.(*network).deleteSvcRecords(name, svcIP, nil, false)
delete(s.loadBalancers, nid)
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
delete(c.serviceBindings, sid)
// Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid.
if len(vip) != 0 {
n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
return nil
// Get all loadbalancers on this network that is currently discovered
// on this node.
func (n *network) connectedLoadbalancers() []*loadBalancer {
c := n.getController()
defer c.Unlock()
var lbs []*loadBalancer
for _, s := range c.serviceBindings {
if lb, ok := s.loadBalancers[n.ID()]; ok {
lbs = append(lbs, lb)
return lbs
// Populate all loadbalancers on the network that the passed endpoint
// belongs to, into this sandbox.
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
var gwIP net.IP
n := ep.getNetwork()
eIP := ep.Iface().Address()
if sb.ingress {
// For the ingress sandbox if this is not gateway
// endpoint do nothing.
if ep != sb.getGatewayEndpoint() {
// This is the gateway endpoint. Now get the ingress
// network and plumb the loadbalancers.
gwIP = ep.Iface().Address().IP
for _, ep := range sb.getConnectedEndpoints() {
if !ep.endpointInGWNetwork() {
n = ep.getNetwork()
eIP = ep.Iface().Address()
for _, lb := range n.connectedLoadbalancers() {
// Skip if vip is not valid.
if len(lb.vip) == 0 {
addService := true
for _, ip := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
eIP, gwIP, addService)
addService = false
// Add loadbalancer backend to all sandboxes which has a connection to
// this network. If needed add the service as well, as specified by
// the addService bool.
func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
var gwIP net.IP
if ep := sb.getGatewayEndpoint(); ep != nil {
gwIP = ep.Iface().Address().IP
sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, addService)
return false
// Remove loadbalancer backend from all sandboxes which has a
// connection to this network. If needed remove the service entry as
// well, as specified by the rmService bool.
func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
var gwIP net.IP
if ep := sb.getGatewayEndpoint(); ep != nil {
gwIP = ep.Iface().Address().IP
sb.rmLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService)
return false
// Add loadbalancer backend into one connected sandbox.
func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, addService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
SchedName: ipvs.RoundRobin,
if addService {
var iPorts []*PortConfig
if sb.ingress {
iPorts = ingressPorts
if err := programIngress(gwIP, iPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
if err := i.NewService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
// Remove the sched name before using the service to add
// destination.
s.SchedName = ""
if err := i.NewDestination(s, d); err != nil {
logrus.Errorf("Failed to create real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
// Remove loadbalancer backend from one connected sandbox.
func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
if err := i.DelDestination(s, d); err != nil {
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
if rmService {
s.SchedName = ipvs.RoundRobin
if err := i.DelService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
var iPorts []*PortConfig
if sb.ingress {
iPorts = ingressPorts
if err := programIngress(gwIP, iPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
addDelOpt := "-A"
if isDelete {
addDelOpt = "-D"
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j DNAT --to-destination %s:%d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, gwIP, iPort.NodePort))
if err := iptables.RawCombinedOutput(rule...); err != nil {
return fmt.Errorf("setting up rule failed, %v: %v", rule, err)
return nil
// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return err
buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
n, err := f.Write(buf)
if err != nil {
return err
if n < len(buf) {
return io.ErrShortWrite
ingressPortsFile = f.Name()
addDelOpt := "-A"
if isDelete {
addDelOpt = "-D"
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.IP.String()),
Stdout: os.Stdout,
Stderr: os.Stderr,
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
return nil
// Firewall marker reexec function.
func fwMarker() {
defer runtime.UnlockOSThread()
if len(os.Args) < 7 {
logrus.Error("invalid number of arguments..")
var ingressPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
ingressPorts = epRec.IngressPorts
vip := os.Args[2]
fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
if err != nil {
logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err)
addDelOpt := os.Args[4]
rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, iPort.Port))
rules = append(rules, rule)
rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, fwMark))
rules = append(rules, rule)
ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
defer ns.Close()
if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
if len(ingressPorts) != 0 && addDelOpt == "-A" {
ruleParams := strings.Fields(fmt.Sprintf("-m ipvs --ipvs -j SNAT --to-source %s", os.Args[6]))
if !iptables.Exists("nat", "POSTROUTING", ruleParams...) {
rule := append(strings.Fields("-t nat -A POSTROUTING"), ruleParams...)
rules = append(rules, rule)
err := ioutil.WriteFile("/proc/sys/net/ipv4/vs/conntrack", []byte{'1', '\n'}, 0644)
if err != nil {
logrus.Errorf("Failed to write to /proc/sys/net/ipv4/vs/conntrack: %v", err)
rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
rules = append(rules, rule)
for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)