Use complete port configs when plumbing mark rules

Currently, a reference counting scheme is used to reference count all
individual port configs that need to be plumbed in the ingress to make
sure that in situations where a service with the same set of port
configs is getting added or removed doesn't accidentally remove the port
config plumbing if the add/remove notifications come out of order. This
same reference counting scheme is also used for plumbing the port-based
marking rules. But marking rules should not be plumbed based on that
because marks are always different for different instantiations of the
same service. So fixed the code to plumb port-based mark rules based on
the complete set of port configs, while plumbing pure port rules and
proxies based on a filter set of port configs based on the reference
count.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2016-09-07 10:10:00 -07:00
parent f34bd07737
commit 093a017f68
1 changed files with 76 additions and 39 deletions

View File

@ -380,17 +380,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}
if addService {
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, iPorts, false); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, filteredPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
return
}
}
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
@ -453,15 +453,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
}
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, iPorts, true); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, filteredPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
}
}
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
@ -715,33 +715,66 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
return nil
}
func writePortsToFile(ports []*PortConfig) (string, error) {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return "", err
}
defer f.Close()
buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ports,
})
n, err := f.Write(buf)
if err != nil {
return "", err
}
if n < len(buf) {
return "", io.ErrShortWrite
}
return f.Name(), nil
}
func readPortsFromFile(fileName string) ([]*PortConfig, error) {
buf, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}
var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
return nil, err
}
return epRec.IngressPorts, nil
}
// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var (
ingressPortsFile string
filteredPortsFile string
)
if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
}
buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
})
n, err := f.Write(buf)
if len(filteredPorts) != 0 {
var err error
filteredPortsFile, err = writePortsToFile(filteredPorts)
if err != nil {
f.Close()
return err
}
if n < len(buf) {
f.Close()
return io.ErrShortWrite
}
ingressPortsFile = f.Name()
f.Close()
}
addDelOpt := "-A"
@ -751,7 +784,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
@ -768,27 +801,29 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 7 {
if len(os.Args) < 8 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
var ingressPorts []*PortConfig
var filteredPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(6)
}
}
var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if os.Args[6] != "" {
var err error
filteredPorts, err = readPortsFromFile(os.Args[6])
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
logrus.Errorf("Failed reading filtered ports file: %v", err)
os.Exit(7)
}
ingressPorts = epRec.IngressPorts
}
vip := os.Args[2]
@ -800,12 +835,14 @@ func fwMarker() {
addDelOpt := os.Args[4]
rules := [][]string{}
for _, iPort := range ingressPorts {
for _, iPort := range filteredPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}
rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
rules = append(rules, rule)
}
@ -823,9 +860,9 @@ func fwMarker() {
}
if addDelOpt == "-A" {
eIP, subnet, err := net.ParseCIDR(os.Args[6])
eIP, subnet, err := net.ParseCIDR(os.Args[7])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err)
os.Exit(9)
}