Commit c5f4c00d authored by Aaron U'Ren's avatar Aaron U'Ren
Browse files

feat(.golangci.yml): enable dupl and remediate

Showing with 451 additions and 380 deletions
+451 -380
......@@ -5,6 +5,7 @@ linters:
- bodyclose
- depguard
- dogsled
- dupl
- errcheck
- exportloopref
- gofmt
......
......@@ -106,19 +106,15 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
activePolicyChains[policyChainName] = true
currnetPodIps := make([]string, 0, len(policy.targetPods))
currentPodIPs := make([]string, 0, len(policy.targetPods))
for ip := range policy.targetPods {
currnetPodIps = append(currnetPodIps, ip)
currentPodIPs = append(currentPodIPs, ip)
}
if policy.policyType == "both" || policy.policyType == "ingress" {
// create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name)
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP)
npc.createGenericHashIPSet(targetDestPodIPSetName, utils.TypeHashIP, currentPodIPs)
err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
......@@ -128,11 +124,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
if policy.policyType == "both" || policy.policyType == "egress" {
// create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name)
setEntries := make([][]string, 0)
for _, podIP := range currnetPodIps {
setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP)
npc.createGenericHashIPSet(targetSourcePodIPSetName, utils.TypeHashIP, currentPodIPs)
err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version)
if err != nil {
return nil, nil, err
......@@ -151,6 +143,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
return activePolicyChains, activePolicyIPSets, nil
}
//nolint:dupl // This is as simple as this function gets even though it repeats some of processEgressRules
func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error {
......@@ -164,47 +157,41 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
// run through all the ingress rules in the spec and create iptables rules
// in the chain for the network policy
for i, ingressRule := range policy.ingressRules {
for ruleIdx, ingressRule := range policy.ingressRules {
if len(ingressRule.srcPods) != 0 {
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i)
activePolicyIPSets[srcPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range ingressRule.srcPods {
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries, utils.TypeHashIP)
srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, ruleIdx)
// Create policy based ipset with source pod IPs
npc.createPolicyIndexedIPSet(activePolicyIPSets, srcPodIPSetName, utils.TypeHashIP,
getIPsFromPods(ingressRule.srcPods))
// If the ingress policy contains port declarations, we need to make sure that we match on pod IP and port
if len(ingressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the ingress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
if err := npc.createPodWithPortPolicyRule(ingressRule.ports, policy, policyChainName,
srcPodIPSetName, targetDestPodIPSetName); err != nil {
return err
}
}
// If the ingress policy contains named port declarations, we need to make sure that we match on pod IP and
// the resolved port number
if len(ingressRule.namedPorts) != 0 {
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
for portIdx, eps := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx,
portIdx)
npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName,
eps.protocol, eps.port, eps.endport); err != nil {
return err
}
}
}
// If the ingress policy contains no ports at all create the policy based only on IP
if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 {
// case where no 'ports' details specified in the ingress rule but 'from' details specified
// so match on specified source and destination ip with all port and protocol
......@@ -216,8 +203,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
}
}
// case where only 'ports' details specified but no 'from' details in the ingress rule
// so match on all sources, with specified port (if any) and protocol
// case where only 'ports' details specified but no 'from' details in the ingress rule so match on all sources,
// with specified port (if any) and protocol
if ingressRule.matchAllSource && !ingressRule.matchAllPorts {
for _, portProtocol := range ingressRule.ports {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
......@@ -227,25 +214,21 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
for portIdx, eps := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx,
portIdx)
npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips)
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil {
return err
}
}
}
// case where nether ports nor from details are speified in the ingress rule
// so match on all ports, protocol, source IP's
// case where nether ports nor from details are specified in the ingress rule so match on all ports, protocol,
// source IP's
if ingressRule.matchAllSource && ingressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
......@@ -255,7 +238,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
}
if len(ingressRule.srcIPBlocks) != 0 {
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i)
srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, ruleIdx)
activePolicyIPSets[srcIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet)
......@@ -268,17 +251,14 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
}
}
for j, endPoints := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j)
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashNet)
for portIdx, eps := range ingressRule.namedPorts {
namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx,
portIdx)
npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashNet, eps.ips)
comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil {
return err
}
}
......@@ -296,6 +276,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
return nil
}
//nolint:dupl // This is as simple as this function gets even though it repeats some of ProcessIngressRules
func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error {
......@@ -309,46 +290,40 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
// run through all the egress rules in the spec and create iptables rules
// in the chain for the network policy
for i, egressRule := range policy.egressRules {
for ruleIdx, egressRule := range policy.egressRules {
if len(egressRule.dstPods) != 0 {
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i)
activePolicyIPSets[dstPodIPSetName] = true
setEntries := make([][]string, 0)
for _, pod := range egressRule.dstPods {
setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries, utils.TypeHashIP)
dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, ruleIdx)
// Create policy based ipset with destination pod IPs
npc.createPolicyIndexedIPSet(activePolicyIPSets, dstPodIPSetName, utils.TypeHashIP,
getIPsFromPods(egressRule.dstPods))
// If the egress policy contains port declarations, we need to make sure that we match on pod IP and port
if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule
// so match on specified source and destination ip's and specified port (if any) and protocol
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil {
return err
}
if err := npc.createPodWithPortPolicyRule(egressRule.ports, policy, policyChainName,
targetSourcePodIPSetName, dstPodIPSetName); err != nil {
return err
}
}
// If the egress policy contains named port declarations, we need to make sure that we match on pod IP and
// the resolved port number
if len(egressRule.namedPorts) != 0 {
for j, endPoints := range egressRule.namedPorts {
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j)
activePolicyIPSets[namedPortIPSetName] = true
setEntries := make([][]string, 0)
for _, ip := range endPoints.ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP)
for portIdx, eps := range egressRule.namedPorts {
namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx,
portIdx)
npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips)
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil {
if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil {
return err
}
}
}
// If the egress policy contains no ports at all create the policy based only on IP
if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 {
// case where no 'ports' details specified in the ingress rule but 'from' details specified
// so match on specified source and destination ip with all port and protocol
......@@ -360,8 +335,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
}
}
// case where only 'ports' details specified but no 'to' details in the egress rule
// so match on all sources, with specified port (if any) and protocol
// case where only 'ports' details specified but no 'to' details in the egress rule so match on all sources,
// with specified port (if any) and protocol
if egressRule.matchAllDestinations && !egressRule.matchAllPorts {
for _, portProtocol := range egressRule.ports {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
......@@ -379,8 +354,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
}
}
// case where nether ports nor from details are speified in the egress rule
// so match on all ports, protocol, source IP's
// case where neither ports nor from details are specified in the egress rule so match on all ports, protocol,
// source IP's
if egressRule.matchAllDestinations && egressRule.matchAllPorts {
comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " +
policy.name + " namespace " + policy.namespace
......@@ -388,8 +363,9 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
return err
}
}
if len(egressRule.dstIPBlocks) != 0 {
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i)
dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, ruleIdx)
activePolicyIPSets[dstIPBlockIPSetName] = true
npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet)
if !egressRule.matchAllPorts {
......
......@@ -6,6 +6,7 @@ import (
"regexp"
"strconv"
"github.com/cloudnativelabs/kube-router/pkg/utils"
api "k8s.io/api/core/v1"
)
......@@ -61,3 +62,41 @@ func validateNodePortRange(nodePortOption string) (string, error) {
}
return fmt.Sprintf("%d:%d", port1, port2), nil
}
func getIPsFromPods(pods []podInfo) []string {
ips := make([]string, len(pods))
for idx, pod := range pods {
ips[idx] = pod.ip
}
return ips
}
func (npc *NetworkPolicyController) createGenericHashIPSet(ipsetName, hashType string, ips []string) {
setEntries := make([][]string, 0)
for _, ip := range ips {
setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"})
}
npc.ipSetHandler.RefreshSet(ipsetName, setEntries, hashType)
}
// createPolicyIndexedIPSet creates a policy based ipset and indexes it as an active ipset
func (npc *NetworkPolicyController) createPolicyIndexedIPSet(
activePolicyIPSets map[string]bool, ipsetName, hashType string, ips []string) {
activePolicyIPSets[ipsetName] = true
npc.createGenericHashIPSet(ipsetName, hashType, ips)
}
// createPodWithPortPolicyRule handles the case where port details are provided by the ingress/egress rule and creates
// an iptables rule that matches on both the source/dest IPs and the port
func (npc *NetworkPolicyController) createPodWithPortPolicyRule(
ports []protocolAndPort, policy networkPolicyInfo, policyName string, srcSetName string, dstSetName string) error {
for _, portProtocol := range ports {
comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " +
policy.name + " namespace " + policy.namespace
if err := npc.appendRuleToPolicyChain(policyName, comment, srcSetName, dstSetName, portProtocol.protocol,
portProtocol.port, portProtocol.endport); err != nil {
return err
}
}
return nil
}
......@@ -86,9 +86,11 @@ type ipvsCalls interface {
type netlinkCalls interface {
ipAddrAdd(iface netlink.Link, ip string, addRoute bool) error
ipAddrDel(iface netlink.Link, ip string) error
prepareEndpointForDsr(containerID string, endpointIP string, vip string) error
prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error
getKubeDummyInterface() (netlink.Link, error)
setupRoutesForExternalIPForDSR(serviceInfoMap) error
prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error
configureContainerForDSR(vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error
setupPolicyRoutingForDSR() error
cleanupMangleTableRule(ip string, protocol string, port string, fwmark string, tcpMSS int) error
}
......@@ -1018,20 +1020,6 @@ func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string)
return nil, errors.New("Failed to find pod with ip " + endpointIP)
}
func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) {
err := netns.Set(hostNSHandle)
if err != nil {
klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to " + err.Error())
}
activeNetworkNamespaceHandle, err := netns.Get()
if err != nil {
klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after a previous error due to " + err.Error())
return
}
klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
}
// This function does the following
// - get the pod corresponding to the endpoint ip
// - get the container id from pod spec
......@@ -1040,9 +1028,9 @@ func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) {
// - add VIP to the tunnel interface
// - disable rp_filter
// WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet.
func (ln *linuxNetworking) prepareEndpointForDsr(containerID string, endpointIP string, vip string) error {
func (ln *linuxNetworking) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error {
// FIXME: its possible switch namespaces may never work safely in GO without hacks.
// Its possible switch namespaces may never work safely in GO without hacks.
// https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion
// https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix
// Dont know if same issue, but seen namespace issue, so adding
......@@ -1055,150 +1043,43 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerID string, endpointIP
hostNetworkNamespaceHandle, err := netns.Get()
if err != nil {
return errors.New("Failed to get namespace due to " + err.Error())
return fmt.Errorf("failed to get namespace due to %v", err)
}
defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle)
activeNetworkNamespaceHandle, err = netns.Get()
if err != nil {
return errors.New("Failed to get namespace due to " + err.Error())
return fmt.Errorf("failed to get namespace due to %v", err)
}
klog.V(1).Infof("Current network namespace before netns.Set: " + activeNetworkNamespaceHandle.String())
klog.V(1).Infof("Current network namespace before netns.Set: %s", activeNetworkNamespaceHandle.String())
defer utils.CloseCloserDisregardError(&activeNetworkNamespaceHandle)
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return errors.New("Failed to get docker client due to " + err.Error())
return fmt.Errorf("failed to get docker client due to %v", err)
}
defer utils.CloseCloserDisregardError(dockerClient)
containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerID)
if err != nil {
return errors.New("Failed to get docker container spec due to " + err.Error())
return fmt.Errorf("failed to get docker container spec due to %v", err)
}
pid := containerSpec.State.Pid
endpointNamespaceHandle, err := netns.GetFromPid(pid)
if err != nil {
return errors.New("Failed to get endpoint namespace due to " + err.Error())
}
defer utils.CloseCloserDisregardError(&endpointNamespaceHandle)
err = netns.Set(endpointNamespaceHandle)
if err != nil {
return errors.New("Failed to enter to endpoint namespace due to " + err.Error())
}
activeNetworkNamespaceHandle, err = netns.Get()
if err != nil {
return errors.New("Failed to get activeNetworkNamespace due to " + err.Error())
}
klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
// create a ipip tunnel interface inside the endpoint container
tunIf, err := netlink.LinkByName(KubeTunnelIf)
if err != nil {
if err.Error() != IfaceNotFound {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err.Error())
}
klog.V(2).Infof("Could not find tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + " so creating one.")
ipTunLink := netlink.Iptun{
LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf},
Local: net.ParseIP(endpointIP),
}
err = netlink.LinkAdd(&ipTunLink)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to add ipip tunnel interface in endpoint namespace due to " + err.Error())
}
// TODO: this is ugly, but ran into issue multiple times where interface did not come up quickly.
// need to find the root cause
for retry := 0; retry < 60; retry++ {
time.Sleep(100 * time.Millisecond)
tunIf, err = netlink.LinkByName(KubeTunnelIf)
if err == nil {
break
}
if err.Error() == IfaceNotFound {
klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf)
continue
} else {
break
}
}
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("Failed to get " + KubeTunnelIf + " tunnel interface handle due to " + err.Error())
}
klog.V(2).Infof("Successfully created tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + ".")
}
// bring the tunnel interface up
err = netlink.LinkSetUp(tunIf)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to bring up ipip tunnel interface in endpoint namespace due to " + err.Error())
}
// assign VIP to the KUBE_TUNNEL_IF interface
err = ln.ipAddrAdd(tunIf, vip, false)
if err != nil && err.Error() != IfaceHasAddr {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to assign vip " + vip + " to kube-tunnel-if interface ")
}
klog.Infof("Successfully assigned VIP: " + vip + " in endpoint " + endpointIP + ".")
// disable rp_filter on all interface
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on kube-tunnel-if in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on eth0 in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on `all` in the endpoint container")
}
klog.Infof("Successfully disabled rp_filter in endpoint " + endpointIP + ".")
err = netns.Set(hostNetworkNamespaceHandle)
if err != nil {
return errors.New("Failed to set hostNetworkNamespace handle due to " + err.Error())
}
activeNetworkNamespaceHandle, err = netns.Get()
if err != nil {
return errors.New("Failed to get activeNetworkNamespace handle due to " + err.Error())
}
klog.Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
return nil
return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle)
}
// The same as prepareEndpointForDsr but using CRI instead of docker.
func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) (err error) {
func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error {
// FIXME: its possible switch namespaces may never work safely in GO without hacks.
// Its possible switch namespaces may never work safely in GO without hacks.
// https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion
// https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix
// Dont know if same issue, but seen namespace issue, so adding
// logs and boilerplate code and verbose logs for diagnosis
if runtimeEndpoint == "" {
return errors.New("runtimeEndpoint is not specified")
return fmt.Errorf("runtimeEndpoint is not specified")
}
runtime.LockOSThread()
......@@ -1206,18 +1087,16 @@ func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, contain
hostNetworkNamespaceHandle, err := netns.Get()
if err != nil {
return errors.New("failed to get host namespace due to " + err.Error())
return fmt.Errorf("failed to get host namespace due to %v", err)
}
klog.V(1).Infof("current network namespace before netns.Set: " + hostNetworkNamespaceHandle.String())
klog.V(1).Infof("current network namespace before netns.Set: %s", hostNetworkNamespaceHandle.String())
defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle)
rs, err := cri.NewRemoteRuntimeService(runtimeEndpoint, cri.DefaultConnectionTimeout)
if err != nil {
return err
}
defer func() {
err = rs.Close()
}()
defer utils.CloseCloserDisregardError(rs)
info, err := rs.ContainerInfo(containerID)
if err != nil {
......@@ -1225,117 +1104,7 @@ func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, contain
}
pid := info.Pid
endpointNamespaceHandle, err := netns.GetFromPid(pid)
if err != nil {
return fmt.Errorf("failed to get endpoint namespace (containerID=%s, pid=%d, error=%s)", containerID, pid, err)
}
defer utils.CloseCloserDisregardError(&endpointNamespaceHandle)
err = netns.Set(endpointNamespaceHandle)
if err != nil {
return fmt.Errorf("failed to enter endpoint namespace (containerID=%s, pid=%d, error=%s)", containerID, pid, err)
}
activeNetworkNamespaceHandle, err := netns.Get()
if err != nil {
return errors.New("failed to get activeNetworkNamespace due to " + err.Error())
}
klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
// TODO: fix boilerplate `netns.Set(hostNetworkNamespaceHandle)` code. Need a robust
// way to switch back to old namespace, pretty much all things will go wrong if we dont switch back
// create a ipip tunnel interface inside the endpoint container
tunIf, err := netlink.LinkByName(KubeTunnelIf)
if err != nil {
if err.Error() != IfaceNotFound {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err.Error())
}
klog.V(2).Infof("Could not find tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + " so creating one.")
ipTunLink := netlink.Iptun{
LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf},
Local: net.ParseIP(endpointIP),
}
err = netlink.LinkAdd(&ipTunLink)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to add ipip tunnel interface in endpoint namespace due to " + err.Error())
}
// TODO: this is ugly, but ran into issue multiple times where interface did not come up quickly.
// need to find the root cause
for retry := 0; retry < 60; retry++ {
time.Sleep(100 * time.Millisecond)
tunIf, err = netlink.LinkByName(KubeTunnelIf)
if err == nil {
break
}
if err.Error() == IfaceNotFound {
klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf)
continue
} else {
break
}
}
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to get " + KubeTunnelIf + " tunnel interface handle due to " + err.Error())
}
klog.V(2).Infof("Successfully created tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + ".")
}
// bring the tunnel interface up
err = netlink.LinkSetUp(tunIf)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to bring up ipip tunnel interface in endpoint namespace due to " + err.Error())
}
// assign VIP to the KUBE_TUNNEL_IF interface
err = ln.ipAddrAdd(tunIf, vip, false)
if err != nil && err.Error() != IfaceHasAddr {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to assign vip " + vip + " to kube-tunnel-if interface ")
}
klog.Infof("Successfully assigned VIP: " + vip + " in endpoint " + endpointIP + ".")
// disable rp_filter on all interface
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on kube-tunnel-if in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on eth0 in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return errors.New("failed to disable rp_filter on `all` in the endpoint container")
}
klog.Infof("Successfully disabled rp_filter in endpoint " + endpointIP + ".")
err = netns.Set(hostNetworkNamespaceHandle)
if err != nil {
return errors.New("Failed to set hostNetworkNamespace handle due to " + err.Error())
}
activeNetworkNamespaceHandle, err = netns.Get()
if err != nil {
return errors.New("Failed to get activeNetworkNamespace handle due to " + err.Error())
}
klog.Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
return nil
return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle)
}
func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
......
......@@ -6,6 +6,7 @@ package proxy
import (
"github.com/moby/ipvs"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"net"
"sync"
)
......@@ -20,9 +21,12 @@ var _ LinuxNetworking = &LinuxNetworkingMock{}
//
// // make and configure a mocked LinuxNetworking
// mockedLinuxNetworking := &LinuxNetworkingMock{
// cleanupMangleTableRuleFunc: func(ip string, protocol string, port string, fwmark string) error {
// cleanupMangleTableRuleFunc: func(ip string, protocol string, port string, fwmark string, tcpMSS int) error {
// panic("mock out the cleanupMangleTableRule method")
// },
// configureContainerForDSRFunc: func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error {
// panic("mock out the configureContainerForDSR method")
// },
// getKubeDummyInterfaceFunc: func() (netlink.Link, error) {
// panic("mock out the getKubeDummyInterface method")
// },
......@@ -65,8 +69,11 @@ var _ LinuxNetworking = &LinuxNetworkingMock{}
// ipvsUpdateServiceFunc: func(ipvsSvc *ipvs.Service) error {
// panic("mock out the ipvsUpdateService method")
// },
// prepareEndpointForDsrFunc: func(containerID string, endpointIP string, vip string) error {
// panic("mock out the prepareEndpointForDsr method")
// prepareEndpointForDsrWithCRIFunc: func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error {
// panic("mock out the prepareEndpointForDsrWithCRI method")
// },
// prepareEndpointForDsrWithDockerFunc: func(containerID string, endpointIP string, vip string) error {
// panic("mock out the prepareEndpointForDsrWithDocker method")
// },
// setupPolicyRoutingForDSRFunc: func() error {
// panic("mock out the setupPolicyRoutingForDSR method")
......@@ -84,6 +91,9 @@ type LinuxNetworkingMock struct {
// cleanupMangleTableRuleFunc mocks the cleanupMangleTableRule method.
cleanupMangleTableRuleFunc func(ip string, protocol string, port string, fwmark string, tcpMSS int) error
// configureContainerForDSRFunc mocks the configureContainerForDSR method.
configureContainerForDSRFunc func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error
// getKubeDummyInterfaceFunc mocks the getKubeDummyInterface method.
getKubeDummyInterfaceFunc func() (netlink.Link, error)
......@@ -126,8 +136,11 @@ type LinuxNetworkingMock struct {
// ipvsUpdateServiceFunc mocks the ipvsUpdateService method.
ipvsUpdateServiceFunc func(ipvsSvc *ipvs.Service) error
// prepareEndpointForDsrFunc mocks the prepareEndpointForDsr method.
prepareEndpointForDsrFunc func(containerID string, endpointIP string, vip string) error
// prepareEndpointForDsrWithCRIFunc mocks the prepareEndpointForDsrWithCRI method.
prepareEndpointForDsrWithCRIFunc func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error
// prepareEndpointForDsrWithDockerFunc mocks the prepareEndpointForDsrWithDocker method.
prepareEndpointForDsrWithDockerFunc func(containerID string, endpointIP string, vip string) error
// setupPolicyRoutingForDSRFunc mocks the setupPolicyRoutingForDSR method.
setupPolicyRoutingForDSRFunc func() error
......@@ -147,6 +160,21 @@ type LinuxNetworkingMock struct {
Port string
// Fwmark is the fwmark argument value.
Fwmark string
// TcpMSS is the tcpMSS argument value.
TcpMSS int
}
// configureContainerForDSR holds details about calls to the configureContainerForDSR method.
configureContainerForDSR []struct {
// Vip is the vip argument value.
Vip string
// EndpointIP is the endpointIP argument value.
EndpointIP string
// ContainerID is the containerID argument value.
ContainerID string
// Pid is the pid argument value.
Pid int
// HostNetworkNamespaceHandle is the hostNetworkNamespaceHandle argument value.
HostNetworkNamespaceHandle netns.NsHandle
}
// getKubeDummyInterface holds details about calls to the getKubeDummyInterface method.
getKubeDummyInterface []struct {
......@@ -254,8 +282,19 @@ type LinuxNetworkingMock struct {
// IpvsSvc is the ipvsSvc argument value.
IpvsSvc *ipvs.Service
}
// prepareEndpointForDsr holds details about calls to the prepareEndpointForDsr method.
prepareEndpointForDsr []struct {
// prepareEndpointForDsrWithCRI holds details about calls to the prepareEndpointForDsrWithCRI method.
prepareEndpointForDsrWithCRI []struct {
// RuntimeEndpoint is the runtimeEndpoint argument value.
RuntimeEndpoint string
// ContainerID is the containerID argument value.
ContainerID string
// EndpointIP is the endpointIP argument value.
EndpointIP string
// Vip is the vip argument value.
Vip string
}
// prepareEndpointForDsrWithDocker holds details about calls to the prepareEndpointForDsrWithDocker method.
prepareEndpointForDsrWithDocker []struct {
// ContainerID is the containerID argument value.
ContainerID string
// EndpointIP is the endpointIP argument value.
......@@ -272,24 +311,26 @@ type LinuxNetworkingMock struct {
ServiceInfoMapMoqParam serviceInfoMap
}
}
lockcleanupMangleTableRule sync.RWMutex
lockgetKubeDummyInterface sync.RWMutex
lockipAddrAdd sync.RWMutex
lockipAddrDel sync.RWMutex
lockipvsAddFWMarkService sync.RWMutex
lockipvsAddServer sync.RWMutex
lockipvsAddService sync.RWMutex
lockipvsDelDestination sync.RWMutex
lockipvsDelService sync.RWMutex
lockipvsGetDestinations sync.RWMutex
lockipvsGetServices sync.RWMutex
lockipvsNewDestination sync.RWMutex
lockipvsNewService sync.RWMutex
lockipvsUpdateDestination sync.RWMutex
lockipvsUpdateService sync.RWMutex
lockprepareEndpointForDsr sync.RWMutex
locksetupPolicyRoutingForDSR sync.RWMutex
locksetupRoutesForExternalIPForDSR sync.RWMutex
lockcleanupMangleTableRule sync.RWMutex
lockconfigureContainerForDSR sync.RWMutex
lockgetKubeDummyInterface sync.RWMutex
lockipAddrAdd sync.RWMutex
lockipAddrDel sync.RWMutex
lockipvsAddFWMarkService sync.RWMutex
lockipvsAddServer sync.RWMutex
lockipvsAddService sync.RWMutex
lockipvsDelDestination sync.RWMutex
lockipvsDelService sync.RWMutex
lockipvsGetDestinations sync.RWMutex
lockipvsGetServices sync.RWMutex
lockipvsNewDestination sync.RWMutex
lockipvsNewService sync.RWMutex
lockipvsUpdateDestination sync.RWMutex
lockipvsUpdateService sync.RWMutex
lockprepareEndpointForDsrWithCRI sync.RWMutex
lockprepareEndpointForDsrWithDocker sync.RWMutex
locksetupPolicyRoutingForDSR sync.RWMutex
locksetupRoutesForExternalIPForDSR sync.RWMutex
}
// cleanupMangleTableRule calls cleanupMangleTableRuleFunc.
......@@ -302,11 +343,13 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRule(ip string, protocol stri
Protocol string
Port string
Fwmark string
TcpMSS int
}{
IP: ip,
Protocol: protocol,
Port: port,
Fwmark: fwmark,
TcpMSS: tcpMSS,
}
mock.lockcleanupMangleTableRule.Lock()
mock.calls.cleanupMangleTableRule = append(mock.calls.cleanupMangleTableRule, callInfo)
......@@ -322,12 +365,14 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRuleCalls() []struct {
Protocol string
Port string
Fwmark string
TcpMSS int
} {
var calls []struct {
IP string
Protocol string
Port string
Fwmark string
TcpMSS int
}
mock.lockcleanupMangleTableRule.RLock()
calls = mock.calls.cleanupMangleTableRule
......@@ -335,6 +380,53 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRuleCalls() []struct {
return calls
}
// configureContainerForDSR calls configureContainerForDSRFunc.
func (mock *LinuxNetworkingMock) configureContainerForDSR(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error {
if mock.configureContainerForDSRFunc == nil {
panic("LinuxNetworkingMock.configureContainerForDSRFunc: method is nil but LinuxNetworking.configureContainerForDSR was just called")
}
callInfo := struct {
Vip string
EndpointIP string
ContainerID string
Pid int
HostNetworkNamespaceHandle netns.NsHandle
}{
Vip: vip,
EndpointIP: endpointIP,
ContainerID: containerID,
Pid: pid,
HostNetworkNamespaceHandle: hostNetworkNamespaceHandle,
}
mock.lockconfigureContainerForDSR.Lock()
mock.calls.configureContainerForDSR = append(mock.calls.configureContainerForDSR, callInfo)
mock.lockconfigureContainerForDSR.Unlock()
return mock.configureContainerForDSRFunc(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle)
}
// configureContainerForDSRCalls gets all the calls that were made to configureContainerForDSR.
// Check the length with:
// len(mockedLinuxNetworking.configureContainerForDSRCalls())
func (mock *LinuxNetworkingMock) configureContainerForDSRCalls() []struct {
Vip string
EndpointIP string
ContainerID string
Pid int
HostNetworkNamespaceHandle netns.NsHandle
} {
var calls []struct {
Vip string
EndpointIP string
ContainerID string
Pid int
HostNetworkNamespaceHandle netns.NsHandle
}
mock.lockconfigureContainerForDSR.RLock()
calls = mock.calls.configureContainerForDSR
mock.lockconfigureContainerForDSR.RUnlock()
return calls
}
// getKubeDummyInterface calls getKubeDummyInterfaceFunc.
func (mock *LinuxNetworkingMock) getKubeDummyInterface() (netlink.Link, error) {
if mock.getKubeDummyInterfaceFunc == nil {
......@@ -839,10 +931,53 @@ func (mock *LinuxNetworkingMock) ipvsUpdateServiceCalls() []struct {
return calls
}
// prepareEndpointForDsr calls prepareEndpointForDsrFunc.
func (mock *LinuxNetworkingMock) prepareEndpointForDsr(containerID string, endpointIP string, vip string) error {
if mock.prepareEndpointForDsrFunc == nil {
panic("LinuxNetworkingMock.prepareEndpointForDsrFunc: method is nil but LinuxNetworking.prepareEndpointForDsr was just called")
// prepareEndpointForDsrWithCRI calls prepareEndpointForDsrWithCRIFunc.
func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRI(runtimeEndpoint string, containerID string, endpointIP string, vip string) error {
if mock.prepareEndpointForDsrWithCRIFunc == nil {
panic("LinuxNetworkingMock.prepareEndpointForDsrWithCRIFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithCRI was just called")
}
callInfo := struct {
RuntimeEndpoint string
ContainerID string
EndpointIP string
Vip string
}{
RuntimeEndpoint: runtimeEndpoint,
ContainerID: containerID,
EndpointIP: endpointIP,
Vip: vip,
}
mock.lockprepareEndpointForDsrWithCRI.Lock()
mock.calls.prepareEndpointForDsrWithCRI = append(mock.calls.prepareEndpointForDsrWithCRI, callInfo)
mock.lockprepareEndpointForDsrWithCRI.Unlock()
return mock.prepareEndpointForDsrWithCRIFunc(runtimeEndpoint, containerID, endpointIP, vip)
}
// prepareEndpointForDsrWithCRICalls gets all the calls that were made to prepareEndpointForDsrWithCRI.
// Check the length with:
// len(mockedLinuxNetworking.prepareEndpointForDsrWithCRICalls())
func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRICalls() []struct {
RuntimeEndpoint string
ContainerID string
EndpointIP string
Vip string
} {
var calls []struct {
RuntimeEndpoint string
ContainerID string
EndpointIP string
Vip string
}
mock.lockprepareEndpointForDsrWithCRI.RLock()
calls = mock.calls.prepareEndpointForDsrWithCRI
mock.lockprepareEndpointForDsrWithCRI.RUnlock()
return calls
}
// prepareEndpointForDsrWithDocker calls prepareEndpointForDsrWithDockerFunc.
func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error {
if mock.prepareEndpointForDsrWithDockerFunc == nil {
panic("LinuxNetworkingMock.prepareEndpointForDsrWithDockerFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithDocker was just called")
}
callInfo := struct {
ContainerID string
......@@ -853,16 +988,16 @@ func (mock *LinuxNetworkingMock) prepareEndpointForDsr(containerID string, endpo
EndpointIP: endpointIP,
Vip: vip,
}
mock.lockprepareEndpointForDsr.Lock()
mock.calls.prepareEndpointForDsr = append(mock.calls.prepareEndpointForDsr, callInfo)
mock.lockprepareEndpointForDsr.Unlock()
return mock.prepareEndpointForDsrFunc(containerID, endpointIP, vip)
mock.lockprepareEndpointForDsrWithDocker.Lock()
mock.calls.prepareEndpointForDsrWithDocker = append(mock.calls.prepareEndpointForDsrWithDocker, callInfo)
mock.lockprepareEndpointForDsrWithDocker.Unlock()
return mock.prepareEndpointForDsrWithDockerFunc(containerID, endpointIP, vip)
}
// prepareEndpointForDsrCalls gets all the calls that were made to prepareEndpointForDsr.
// prepareEndpointForDsrWithDockerCalls gets all the calls that were made to prepareEndpointForDsrWithDocker.
// Check the length with:
// len(mockedLinuxNetworking.prepareEndpointForDsrCalls())
func (mock *LinuxNetworkingMock) prepareEndpointForDsrCalls() []struct {
// len(mockedLinuxNetworking.prepareEndpointForDsrWithDockerCalls())
func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDockerCalls() []struct {
ContainerID string
EndpointIP string
Vip string
......@@ -872,9 +1007,9 @@ func (mock *LinuxNetworkingMock) prepareEndpointForDsrCalls() []struct {
EndpointIP string
Vip string
}
mock.lockprepareEndpointForDsr.RLock()
calls = mock.calls.prepareEndpointForDsr
mock.lockprepareEndpointForDsr.RUnlock()
mock.lockprepareEndpointForDsrWithDocker.RLock()
calls = mock.calls.prepareEndpointForDsrWithDocker
mock.lockprepareEndpointForDsrWithDocker.RUnlock()
return calls
}
......
......@@ -427,7 +427,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser
if runtime == "docker" {
// WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet.
err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIPService.externalIP)
err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpoint.ip, externalIPService.externalIP)
if err != nil {
klog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error())
}
......
package proxy
import (
"fmt"
"io/ioutil"
"net"
"strconv"
"time"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"k8s.io/klog/v2"
)
func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) {
err := netns.Set(hostNSHandle)
if err != nil {
klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to " + err.Error())
}
activeNetworkNamespaceHandle, err := netns.Get()
if err != nil {
klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after a previous error due to " + err.Error())
return
}
klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
}
func (ln *linuxNetworking) configureContainerForDSR(
vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error {
endpointNamespaceHandle, err := netns.GetFromPid(pid)
if err != nil {
return fmt.Errorf("failed to get endpoint namespace (containerID=%s, pid=%d, error=%v)",
containerID, pid, err)
}
defer utils.CloseCloserDisregardError(&endpointNamespaceHandle)
err = netns.Set(endpointNamespaceHandle)
if err != nil {
return fmt.Errorf("failed to enter endpoint namespace (containerID=%s, pid=%d, error=%v)",
containerID, pid, err)
}
activeNetworkNamespaceHandle, err := netns.Get()
if err != nil {
return fmt.Errorf("failed to get activeNetworkNamespace due to %v", err)
}
klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: %s",
activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
// TODO: fix boilerplate `netns.Set(hostNetworkNamespaceHandle)` code. Need a robust
// way to switch back to old namespace, pretty much all things will go wrong if we dont switch back
// create a ipip tunnel interface inside the endpoint container
tunIf, err := netlink.LinkByName(KubeTunnelIf)
if err != nil {
if err.Error() != IfaceNotFound {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to verify if ipip tunnel interface exists in endpoint %s namespace due "+
"to %v", endpointIP, err)
}
klog.V(2).Infof("Could not find tunnel interface %s in endpoint %s so creating one.",
KubeTunnelIf, endpointIP)
ipTunLink := netlink.Iptun{
LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf},
Local: net.ParseIP(endpointIP),
}
err = netlink.LinkAdd(&ipTunLink)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to add ipip tunnel interface in endpoint namespace due to %v", err)
}
// this is ugly, but ran into issue multiple times where interface did not come up quickly.
for retry := 0; retry < 60; retry++ {
time.Sleep(100 * time.Millisecond)
tunIf, err = netlink.LinkByName(KubeTunnelIf)
if err == nil {
break
}
if err.Error() == IfaceNotFound {
klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf)
continue
} else {
break
}
}
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to get %s tunnel interface handle due to %v", KubeTunnelIf, err)
}
klog.V(2).Infof("Successfully created tunnel interface %s in endpoint %s.", KubeTunnelIf, endpointIP)
}
// bring the tunnel interface up
err = netlink.LinkSetUp(tunIf)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to bring up ipip tunnel interface in endpoint namespace due to %v", err)
}
// assign VIP to the KUBE_TUNNEL_IF interface
err = ln.ipAddrAdd(tunIf, vip, false)
if err != nil && err.Error() != IfaceHasAddr {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to assign vip %s to kube-tunnel-if interface", vip)
}
klog.Infof("Successfully assigned VIP: %s in endpoint %s.", vip, endpointIP)
// disable rp_filter on all interface
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to disable rp_filter on kube-tunnel-if in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to disable rp_filter on eth0 in the endpoint container")
}
err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640)
if err != nil {
attemptNamespaceResetAfterError(hostNetworkNamespaceHandle)
return fmt.Errorf("failed to disable rp_filter on `all` in the endpoint container")
}
klog.Infof("Successfully disabled rp_filter in endpoint %s.", endpointIP)
err = netns.Set(hostNetworkNamespaceHandle)
if err != nil {
return fmt.Errorf("failed to set hostNetworkNamespace handle due to %v", err)
}
activeNetworkNamespaceHandle, err = netns.Get()
if err != nil {
return fmt.Errorf("failed to get activeNetworkNamespace handle due to %v", err)
}
klog.Infof("Current network namespace after revert namespace to host network namespace: %s",
activeNetworkNamespaceHandle.String())
_ = activeNetworkNamespaceHandle.Close()
return nil
}
......@@ -515,6 +515,7 @@ func Test_advertiseExternalIPs(t *testing.T) {
},
}
//nolint:dupl // There is no need to spend a lot of time de-duplicating test code
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
go testcase.nrc.bgpServer.Serve()
......@@ -704,6 +705,7 @@ func Test_advertiseAnnotationOptOut(t *testing.T) {
},
}
//nolint:dupl // There is no need to spend a lot of time de-duplicating test code
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
go testcase.nrc.bgpServer.Serve()
......@@ -926,6 +928,7 @@ func Test_advertiseAnnotationOptIn(t *testing.T) {
},
}
//nolint:dupl // There is no need to spend a lot of time de-duplicating test code
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
go testcase.nrc.bgpServer.Serve()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment