Delay port redirect until packet reaches container

With port redirect in the ingress path happening before ipvs in the
ingess sandbox, there is a chance of 5-tuple collision in the ipvs
connection table for two entirely different services have different
PublishedPorts but the same TargetPort. To disambiguate the ipvs
connection table, delay the port redirect from PublishedPort to
TargetPort until after the loadbalancing has happened in ipvs. To be
specific, perform the redirect after the packet enters the real backend
container namespace.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2016-09-21 12:15:14 -07:00
parent 093a017f68
commit 3c9d05fba5
1 changed files with 95 additions and 34 deletions

View File

@ -26,6 +26,7 @@ import (
func init() { func init() {
reexec.Register("fwmarker", fwMarker) reexec.Register("fwmarker", fwMarker)
reexec.Register("redirecter", redirecter)
} }
func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork() n := ep.getNetwork()
eIP := ep.Iface().Address() eIP := ep.Iface().Address()
if n.ingress {
if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
}
}
if sb.ingress { if sb.ingress {
// For the ingress sandbox if this is not gateway // For the ingress sandbox if this is not gateway
// endpoint do nothing. // endpoint do nothing.
@ -390,7 +397,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
} }
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil { if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return return
} }
@ -461,7 +468,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
} }
} }
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil { if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
} }
} }
@ -755,11 +762,8 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {
// Invoke fwmarker reexec routine to mark vip destined packets with // Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark. // the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ( var ingressPortsFile string
ingressPortsFile string
filteredPortsFile string
)
if len(ingressPorts) != 0 { if len(ingressPorts) != 0 {
var err error var err error
@ -767,14 +771,8 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
if err != nil { if err != nil {
return err return err
} }
}
if len(filteredPorts) != 0 { defer os.Remove(ingressPortsFile)
var err error
filteredPortsFile, err = writePortsToFile(filteredPorts)
if err != nil {
return err
}
} }
addDelOpt := "-A" addDelOpt := "-A"
@ -784,7 +782,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
cmd := &exec.Cmd{ cmd := &exec.Cmd{
Path: reexec.Self(), Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()), Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Stdout: os.Stdout, Stdout: os.Stdout,
Stderr: os.Stderr, Stderr: os.Stderr,
} }
@ -801,13 +799,12 @@ func fwMarker() {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
if len(os.Args) < 8 { if len(os.Args) < 7 {
logrus.Error("invalid number of arguments..") logrus.Error("invalid number of arguments..")
os.Exit(1) os.Exit(1)
} }
var ingressPorts []*PortConfig var ingressPorts []*PortConfig
var filteredPorts []*PortConfig
if os.Args[5] != "" { if os.Args[5] != "" {
var err error var err error
ingressPorts, err = readPortsFromFile(os.Args[5]) ingressPorts, err = readPortsFromFile(os.Args[5])
@ -817,15 +814,6 @@ func fwMarker() {
} }
} }
if os.Args[6] != "" {
var err error
filteredPorts, err = readPortsFromFile(os.Args[6])
if err != nil {
logrus.Errorf("Failed reading filtered ports file: %v", err)
os.Exit(7)
}
}
vip := os.Args[2] vip := os.Args[2]
fwMark, err := strconv.ParseUint(os.Args[3], 10, 32) fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
if err != nil { if err != nil {
@ -835,12 +823,6 @@ func fwMarker() {
addDelOpt := os.Args[4] addDelOpt := os.Args[4]
rules := [][]string{} rules := [][]string{}
for _, iPort := range filteredPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}
for _, iPort := range ingressPorts { for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
@ -860,9 +842,9 @@ func fwMarker() {
} }
if addDelOpt == "-A" { if addDelOpt == "-A" {
eIP, subnet, err := net.ParseCIDR(os.Args[7]) eIP, subnet, err := net.ParseCIDR(os.Args[6])
if err != nil { if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err) logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
os.Exit(9) os.Exit(9)
} }
@ -889,3 +871,82 @@ func fwMarker() {
} }
} }
} }
func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
defer os.Remove(ingressPortsFile)
}
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}
return nil
}
// Redirecter reexec function.
func redirecter() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 4 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
var ingressPorts []*PortConfig
if os.Args[3] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[3])
if err != nil {
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(2)
}
}
eIP, _, err := net.ParseCIDR(os.Args[2])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
os.Exit(3)
}
rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}
ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(4)
}
defer ns.Close()
if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(5)
}
for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}
}