Commit 54eedcd0 authored by Joakim Karlsson's avatar Joakim Karlsson Committed by Murali Reddy
Browse files

Issue 572 - Graceful termination + Update to go-1.10.8, alpine-3.9 (#706)

* update netlink

* update libnetwork to get ipvs stats

* update gopkg.lock for libnetwork update

* update libnetwork

* add cli options

* make endpoints delete gracefully

* move conntrack flusher

* get some order in the mainloop

* update to alpine 3.9 & go 1.11.1

* revert to 1.10.3 just update alpine

* and revert travis.yml

* lock version

* test 1.12

* test
parent 736757d9
base 0.3 PR902 apply-both-ingress-egress-pol bgppolicies ci-fix closed-channel consolidate_network_policy_chains dependabot/go_modules/github.com/aws/aws-sdk-go-1.44.211 dependabot/go_modules/github.com/osrg/gobgp/v3-3.11.0 dependabot/go_modules/k8s.io/api-0.26.2 dependabot/go_modules/k8s.io/apimachinery-0.26.2 dependabot/go_modules/k8s.io/cri-api-0.26.2 disable-policy-routing drop-flag egress-netpol enix exttrafficpolicy fix-Test_syncInternalPeers fix-build-break fix-ci fix-test-failures-due-to-pr-813 fix_network_policy_cleanup_code gobgp-update gomod goreleaser goreleaser-ldflags handle-null health_check_give_more_grace issue-609 issue-712 issue-828 issue-841 issue-862 issue-905 issue-templates linters master mrueg-patch-1 nflog npc-refactor policytypes pr914-feedback proxy-healtchecks rel-v0.0.1 release-test/22.02.3 release/21.06-BETA.1 release/21.08-BETA.1 release/21.08-BETA.2 release/22.02 release/22.02-RC.1 release/22.02-RC.2 release/22.02-test release/22.02.1 release/22.02.2 release/22.02.3 release/22.02.4 release/22.12 release/22.12-BETA.1 release/22.12-BETA.2 release/22.12-RC.1 release/22.12.1 release/22.12.2 release/22.12.3 release/22.12.4 release/22.2-RC.1 release/23.10-BETA.1 release/23.10-RC.1 release/23.10.0 release/23.10.1 release/23.10.1.2 release/23.10.1.3 release/23.10.2 release/24.04-BETA.1 release/24.04-RC.1 release/24.04.0 remove_deprecated_cluster-cidr_option revert-819-exttrafficpolicy stable/angelfish stable/angelfish-backup-06-04-22 stable/angelfish-backup-28-05-22 stable/bluefin stable/cobia stable/dragonfish testing-refine-branchout-process testing-refine-branchout-process2 tmprelease/test-21.08 tmprelease/test-21.09 tmprelease/test2-21.09 tmprelease/test3-21.09 tmprelease/test4-21.09 truenas/master truenas/master-backup-03-4-22 truenas/master-backup-10-7-21 truenas/master-backup-2-7-23 truenas/master-backup-21-08-22 truenas/master-backup-28-7-21 truenas/master-backup-29-05-22 truenas/master-backup-29-4-21 v1.0 v1.1 xtables v1.0.1 v1.0.0 v1.0.0-rc6 v1.0.0-rc5 v1.0.0-rc4 v1.0.0-rc3 v1.0.0-rc2 v1.0.0-rc1 v0.4.0 v0.4.0-rc3 v0.4.0-rc2 v0.4.0-rc1 v0.3.2 v0.3.1 v0.3.0 TS-24.04-RC.1 TS-24.04-BETA.1 TS-23.10.2 TS-23.10.1.3 TS-23.10.1.2 TS-23.10.1.1 TS-23.10.1 TS-23.10.0.1 TS-23.10.0 TS-23.10-RC.1 TS-23.10-BETA.1 TS-22.12.4.2 TS-22.12.4.1 TS-22.12.4 TS-22.12.3.3 TS-22.12.3.2 TS-22.12.3.1 TS-22.12.3 TS-22.12.2 TS-22.12.1 TS-22.12.0 TS-22.12-RC.1 TS-22.12-BETA.2 TS-22.12-BETA.1 TS-22.12-ALPHA.1 TS-22.02.4 TS-22.02.3 TS-22.02.2.1 TS-22.02.2 TS-22.02.1 TS-22.02.0.1 TS-22.02.0 TS-22.2.0 TS-22.02.RELEASE.1 TS-22.02-RC.2 TS-22.02-RC.1 TS-22.02-RC.1-2 TS-22.02-RC.1-1 TS-21.08-BETA.2 TS-21.08-BETA.1 TS-21.06-BETA.1 TS-21.04-ALPHA.1 TS-21.02-ALPHA.1 TS-20.12-ALPHA TS-20.10-ALPHA TS-12.12.3 DN110M-CS-v2.0
No related merge requests found
Showing with 970 additions and 142 deletions
+970 -142
......@@ -3,7 +3,7 @@ services:
language: go
go:
- 1.10.x
- 1.10.8
branches:
only:
......
FROM alpine:3.7
FROM alpine:3.9
RUN apk add --no-cache \
iptables \
......
......@@ -178,11 +178,11 @@
version = "v0.3.3"
[[projects]]
digest = "1:6b20706ea25f78700bfd6eda620a95233c6340acb53d295553e290c626c809c7"
digest = "1:ac5ec8249d02dc101ed000889d1890304e3da1b5180547ec4a19e335e6550f03"
name = "github.com/docker/libnetwork"
packages = ["ipvs"]
pruneopts = "UT"
revision = "14aa49f99093e1a22e65155b641103762911db8d"
revision = "48f846327bbe6a0dce0c556e8dc9f5bb939d5c16"
[[projects]]
digest = "1:975a4480c40f2d0b95e1f83d3ec1aa29a2774e80179e08a9a4ba2aab86721b23"
......@@ -719,15 +719,14 @@
revision = "8dc2790b029dc41e2b8ff772c63c26adbb1db70d"
[[projects]]
digest = "1:2d9d06cb9d46dacfdbb45f8575b39fc0126d083841a29d4fbf8d97708f43107e"
digest = "1:7a2aba84d53fb903d985b8a1ae09fb3fa4aca20e305828c18b678e576a0005bf"
name = "github.com/vishvananda/netlink"
packages = [
".",
"nl",
]
pruneopts = "UT"
revision = "a2ad57a690f3caf3015351d2d6e1c0b95c349752"
version = "v1.0.0"
revision = "f504738125a57f35f87fc30fb69b8df75237ccde"
[[projects]]
branch = "master"
......@@ -1130,6 +1129,7 @@
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/spf13/pflag",
"github.com/vishvananda/netlink",
"github.com/vishvananda/netlink/nl",
"github.com/vishvananda/netns",
"golang.org/x/net/context",
"k8s.io/api/core/v1",
......@@ -1137,6 +1137,7 @@
"k8s.io/api/networking/v1",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/apimachinery/pkg/labels",
"k8s.io/apimachinery/pkg/util/intstr",
"k8s.io/apimachinery/pkg/util/sets",
"k8s.io/client-go/informers",
"k8s.io/client-go/kubernetes",
......
......@@ -69,7 +69,7 @@ required = ["github.com/osrg/gobgp/gobgp"]
[[override]]
name = "github.com/vishvananda/netlink"
version = "1.0.0"
revision = "f504738125a57f35f87fc30fb69b8df75237ccde"
[[constraint]]
branch = "master"
......@@ -89,7 +89,7 @@ required = ["github.com/osrg/gobgp/gobgp"]
[[constraint]]
name = "github.com/docker/libnetwork"
revision = "14aa49f99093e1a22e65155b641103762911db8d"
revision = "48f846327bbe6a0dce0c556e8dc9f5bb939d5c16"
[[override]]
revision = "1e2f10eb65743fed02f573d31a4587de09afb20e"
......
......@@ -17,7 +17,7 @@ DOCKER=$(if $(or $(IN_DOCKER_GROUP),$(IS_ROOT),$(OSX)),docker,sudo docker)
MAKEFILE_DIR=$(dir $(realpath $(firstword $(MAKEFILE_LIST))))
UPSTREAM_IMPORT_PATH=$(GOPATH)/src/github.com/cloudnativelabs/kube-router/
BUILD_IN_DOCKER?=false
DOCKER_BUILD_IMAGE?=golang:1.10.3-alpine
DOCKER_BUILD_IMAGE?=golang:1.10.8-alpine3.9
ifeq ($(GOARCH), arm)
ARCH_TAG_PREFIX=$(GOARCH)
FILE_ARCH=ARM
......@@ -53,9 +53,9 @@ endif
test: gofmt gomoqs ## Runs code quality pipelines (gofmt, tests, coverage, lint, etc)
ifeq "$(BUILD_IN_DOCKER)" "true"
$(DOCKER) run -v $(PWD):/go/src/github.com/cloudnativelabs/kube-router -w /go/src/github.com/cloudnativelabs/kube-router $(DOCKER_BUILD_IMAGE) \
sh -c 'go test github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...'
sh -c 'go test -v -timeout 30s github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...'
else
go test github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...
go test -v -timeout 30s github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...
endif
vagrant-up: export docker=$(DOCKER)
......
......@@ -57,6 +57,8 @@ Usage of kube-router:
-h, --help Print usage information.
--hostname-override string Overrides the NodeName of the node. Set this if kube-router is unable to determine your NodeName automatically.
--iptables-sync-period duration The delay between iptables rule synchronizations (e.g. '5s', '1m'). Must be greater than 0. (default 5m0s)
--ipvs-graceful-period duration The graceful period before removing destinations from IPVS services (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 30s)
--ipvs-graceful-termination Enables the experimental IPVS graceful terminaton capability
--ipvs-sync-period duration The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 5m0s)
--kubeconfig string Path to kubeconfig file with authorization information (the master location is set by the master flag).
--masquerade-all SNAT all traffic to cluster IP/node port.
......@@ -285,6 +287,13 @@ If you would like to use `HostPort` functionality below changes are required in
For an e.g manifest please look at [manifest](../daemonset/kubeadm-kuberouter-all-features-hostport.yaml) with necessary changes required for `HostPort` functionality.
## IPVS Graceful termination support
As of 0.2.6 we support experimental graceful termination of IPVS destinations. When possible the pods's TerminationGracePeriodSeconds is used, if it cannot be retrived for some reason
the fallback period is 30 seconds and can be adjusted with `--ipvs-graceful-period` cli-opt
graceful termination works in such a way that when kube-router receives a delete endpoint notification for a service it's weight is adjusted to 0 before getting deleted after he termination grace period has passed or the Active & Inactive connections goes down to 0.
## BGP configuration
[Configuring BGP Peers](bgp.md)
......
package proxy
import (
"fmt"
"os/exec"
"regexp"
"strconv"
"sync"
"syscall"
"time"
"github.com/docker/libnetwork/ipvs"
"github.com/golang/glog"
)
type gracefulQueue struct {
mu sync.Mutex
queue []gracefulRequest
}
type gracefulQueueItem struct {
added time.Time
service *ipvs.Service
}
type gracefulRequest struct {
ipvsSvc *ipvs.Service
ipvsDst *ipvs.Destination
deletionTime time.Time
gracefulTerminationPeriod time.Duration
}
func (nsc *NetworkServicesController) ipvsDeleteDestination(svc *ipvs.Service, dst *ipvs.Destination) error {
// If we have enabled graceful termination set the weight of the destination to 0
// then add it to the queue for graceful termination
if nsc.gracefulTermination {
req := gracefulRequest{
ipvsSvc: svc,
ipvsDst: dst,
deletionTime: time.Now(),
}
dst.Weight = 0
err := nsc.ln.ipvsUpdateDestination(svc, dst)
if err != nil {
return err
}
nsc.addToGracefulQueue(&req)
} else {
err := nsc.ln.ipvsDelDestination(svc, dst)
if err != nil {
return err
}
}
// flush conntrack when Destination for a UDP service changes
if svc.Protocol == syscall.IPPROTO_UDP {
if err := nsc.flushConntrackUDP(svc); err != nil {
glog.Errorf("Failed to flush conntrack: %s", err.Error())
}
}
return nil
}
func (nsc *NetworkServicesController) addToGracefulQueue(req *gracefulRequest) {
nsc.gracefulQueue.mu.Lock()
defer nsc.gracefulQueue.mu.Unlock()
var alreadyExists bool
for _, jobQitem := range nsc.gracefulQueue.queue {
if jobQitem.ipvsSvc.Address.Equal(req.ipvsSvc.Address) && jobQitem.ipvsSvc.Port == req.ipvsSvc.Port && jobQitem.ipvsSvc.Protocol == req.ipvsSvc.Protocol {
if jobQitem.ipvsDst.Address.Equal(req.ipvsDst.Address) && jobQitem.ipvsDst.Port == req.ipvsDst.Port {
glog.V(2).Infof("Endpoint already scheduled for removal %+v %+v %s", *req.ipvsSvc, *req.ipvsDst, req.gracefulTerminationPeriod.String())
alreadyExists = true
break
}
}
}
if !alreadyExists {
// try to get get Termination grace period from the pod, if unsuccesfull use the default timeout
podObj, err := nsc.getPodObjectForEndpoint(req.ipvsDst.Address.String())
if err != nil {
glog.V(1).Infof("Failed to find endpoint with ip: %s err: %s", req.ipvsDst.Address.String(), err.Error())
req.gracefulTerminationPeriod = nsc.gracefulPeriod
} else {
glog.V(1).Infof("Found pod termination grace period %d for pod %s", *podObj.Spec.TerminationGracePeriodSeconds, podObj.Name)
req.gracefulTerminationPeriod = time.Duration(float64(*podObj.Spec.TerminationGracePeriodSeconds) * float64(time.Second))
}
nsc.gracefulQueue.queue = append(nsc.gracefulQueue.queue, *req)
}
}
func (nsc *NetworkServicesController) gracefulSync() {
nsc.gracefulQueue.mu.Lock()
defer nsc.gracefulQueue.mu.Unlock()
var newQueue []gracefulRequest
// Itterate over our queued destination removals one by one, and don't add them back to the queue if they were processed
for _, job := range nsc.gracefulQueue.queue {
if removed := nsc.gracefulDeleteIpvsDestination(job); removed {
continue
}
newQueue = append(newQueue, job)
}
nsc.gracefulQueue.queue = newQueue
}
func (nsc *NetworkServicesController) gracefulDeleteIpvsDestination(req gracefulRequest) bool {
var deleteDestination bool
// Get active and inactive connections for the destination
aConn, iConn, err := nsc.getIpvsDestinationConnStats(req.ipvsSvc, req.ipvsDst)
if err != nil {
glog.V(1).Infof("Could not get connection stats for destination: %s", err.Error())
} else {
// Do we have active or inactive connections to this destination
// if we don't, proceed and delete the destination ahead of graceful period
if aConn == 0 && iConn == 0 {
deleteDestination = true
}
}
// Check if our destinations graceful termination period has passed
if time.Since(req.deletionTime) > req.gracefulTerminationPeriod {
deleteDestination = true
}
//Destination has has one or more conditions for deletion
if deleteDestination {
glog.V(2).Infof("Deleting IPVS destination: %s", ipvsDestinationString(req.ipvsDst))
if err := nsc.ln.ipvsDelDestination(req.ipvsSvc, req.ipvsDst); err != nil {
glog.Errorf("Failed to delete IPVS destination: %s, %s", ipvsDestinationString(req.ipvsDst), err.Error())
}
}
return deleteDestination
}
// getConnStats returns the number of active & inactive connections for the IPVS destination
func (nsc *NetworkServicesController) getIpvsDestinationConnStats(ipvsSvc *ipvs.Service, dest *ipvs.Destination) (int, int, error) {
destStats, err := nsc.ln.ipvsGetDestinations(ipvsSvc)
if err != nil {
return 0, 0, fmt.Errorf("failed to get IPVS destinations for service : %s : %s", ipvsServiceString(ipvsSvc), err.Error())
}
for _, destStat := range destStats {
if destStat.Address.Equal(dest.Address) && destStat.Port == dest.Port {
return destStat.ActiveConnections, destStat.InactiveConnections, nil
}
}
return 0, 0, fmt.Errorf("destination %s not found on IPVS service %s ", ipvsDestinationString(dest), ipvsServiceString(ipvsSvc))
}
// flushConntrackUDP flushes UDP conntrack records for the given service destination
func (nsc *NetworkServicesController) flushConntrackUDP(svc *ipvs.Service) error {
// Conntrack exits with non zero exit code when exiting if 0 flow entries have been deleted, use regex to check output and don't Error when matching
re := regexp.MustCompile("([[:space:]]0 flow entries have been deleted.)")
// Shell out and flush conntrack records
out, err := exec.Command("conntrack", "-D", "--orig-dst", svc.Address.String(), "-p", "udp", "--dport", strconv.Itoa(int(svc.Port))).CombinedOutput()
if err != nil {
if matched := re.MatchString(string(out)); !matched {
return fmt.Errorf("Failed to delete conntrack entry for endpoint: %s:%d due to %s", svc.Address.String(), svc.Port, err.Error())
}
}
glog.V(1).Infof("Deleted conntrack entry for endpoint: %s:%d", svc.Address.String(), svc.Port)
return nil
}
......@@ -10,7 +10,6 @@ import (
"os"
"os/exec"
"reflect"
"regexp"
"runtime"
"strconv"
"strings"
......@@ -60,6 +59,8 @@ const (
ipvsServicesIPSetName = "kube-router-ipvs-services"
serviceIPsIPSetName = "kube-router-service-ips"
ipvsFirewallChainName = "KUBE-ROUTER-SERVICES"
synctypeAll = iota
synctypeIpvs
)
var (
......@@ -226,6 +227,11 @@ type NetworkServicesController struct {
ServiceEventHandler cache.ResourceEventHandler
EndpointsEventHandler cache.ResourceEventHandler
gracefulPeriod time.Duration
gracefulQueue gracefulQueue
gracefulTermination bool
syncChan chan int
}
// internal representation of kubernetes service
......@@ -270,10 +276,10 @@ type endpointsInfoMap map[string][]endpointsInfo
// Run periodically sync ipvs configuration to reflect desired state of services and endpoints
func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
t := time.NewTicker(nsc.syncPeriod)
defer t.Stop()
defer wg.Done()
defer close(nsc.syncChan)
glog.Infof("Starting network services controller")
......@@ -330,34 +336,74 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
return errors.New("Error setting up ipvs firewall: " + err.Error())
}
// loop forever unitl notified to stop on stopCh
for {
select {
case <-stopCh:
glog.Info("Shutting down network services controller")
return nil
default:
}
gracefulTicker := time.NewTicker(5 * time.Second)
defer gracefulTicker.Stop()
glog.V(1).Info("Performing periodic sync of ipvs services")
err := nsc.sync()
select {
case <-stopCh:
glog.Info("Shutting down network services controller")
return nil
default:
err := nsc.doSync()
if err != nil {
glog.Errorf("Error during periodic ipvs sync in network service controller. Error: " + err.Error())
glog.Errorf("Skipping sending heartbeat from network service controller as periodic sync failed.")
} else {
healthcheck.SendHeartBeat(healthChan, "NSC")
glog.Fatalf("Failed to perform initial full sync %s", err.Error())
}
nsc.readyForUpdates = true
}
// loop forever until notified to stop on stopCh
for {
select {
case <-stopCh:
glog.Info("Shutting down network services controller")
return nil
case <-gracefulTicker.C:
if nsc.readyForUpdates && nsc.gracefulTermination {
glog.V(3).Info("Performing periodic graceful destination cleanup")
nsc.gracefulSync()
}
case perform := <-nsc.syncChan:
switch perform {
case synctypeAll:
glog.V(1).Info("Performing requested full sync of services")
err := nsc.doSync()
if err != nil {
glog.Errorf("Error during full sync in network service controller. Error: " + err.Error())
}
case synctypeIpvs:
glog.V(1).Info("Performing requested sync of ipvs services")
nsc.mu.Lock()
err := nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
nsc.mu.Unlock()
if err != nil {
glog.Errorf("Error during ipvs sync in network service controller. Error: " + err.Error())
}
}
case <-t.C:
glog.V(1).Info("Performing periodic sync of ipvs services")
err := nsc.doSync()
if err != nil {
glog.Errorf("Error during periodic ipvs sync in network service controller. Error: " + err.Error())
glog.Errorf("Skipping sending heartbeat from network service controller as periodic sync failed.")
} else {
healthcheck.SendHeartBeat(healthChan, "NSC")
}
}
}
}
func (nsc *NetworkServicesController) sync() error {
func (nsc *NetworkServicesController) sync(syncType int) {
select {
case nsc.syncChan <- syncType:
default:
glog.V(2).Infof("Already pending sync, dropping request for type %d", syncType)
}
}
func (nsc *NetworkServicesController) doSync() error {
var err error
nsc.mu.Lock()
defer nsc.mu.Unlock()
......@@ -730,7 +776,7 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
nsc.endpointsMap = newEndpointsMap
nsc.serviceMap = newServiceMap
glog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", ep.Namespace, ep.Name)
nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
nsc.sync(synctypeIpvs)
} else {
glog.V(1).Infof("Skipping IPVS services sync on endpoint: %s/%s update as nothing changed", ep.Namespace, ep.Name)
}
......@@ -760,7 +806,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
nsc.endpointsMap = newEndpointsMap
nsc.serviceMap = newServiceMap
glog.V(1).Infof("Syncing IPVS services sync on update to service: %s/%s", svc.Namespace, svc.Name)
nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
nsc.sync(synctypeIpvs)
} else {
glog.V(1).Infof("Skipping syncing IPVS services for update to service: %s/%s as nothing changed", svc.Namespace, svc.Name)
}
......@@ -783,12 +829,7 @@ func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo) bool {
// sync the ipvs service and server details configured to reflect the desired state of services and endpoint
// as learned from services and endpoints information from the api server
func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error {
var ipvsSvcs []*ipvs.Service
// Conntrack exits with non zero exit code when exiting if 0 flow entries have been deleted, use regex to check output and don't Error when matching
re := regexp.MustCompile("([[:space:]]0 flow entries have been deleted.)")
start := time.Now()
defer func() {
......@@ -1059,9 +1100,9 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
// cleanup stale IPs on dummy interface
glog.V(1).Info("Cleaning up if any, old service IPs on dummy interface")
addrActive := make(map[string]bool)
for k, endpoints := range activeServiceEndpointMap {
for k := range activeServiceEndpointMap {
// verify active and its a generateIpPortId() type service
if len(endpoints) > 0 && strings.Contains(k, "-") {
if strings.Contains(k, "-") {
parts := strings.SplitN(k, "-", 3)
addrActive[parts[0]] = true
}
......@@ -1109,7 +1150,9 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
}
endpoints, ok := activeServiceEndpointMap[key]
if !ok || len(endpoints) == 0 {
// Only delete the service if it's not there anymore to prevent flapping
// old: if !ok || len(endpoints) == 0 {
if !ok {
glog.V(1).Infof("Found a IPVS service %s which is no longer needed so cleaning up",
ipvsServiceString(ipvsSvc))
err := nsc.ln.ipvsDelService(ipvsSvc)
......@@ -1134,22 +1177,11 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
if !validEp {
glog.V(1).Infof("Found a destination %s in service %s which is no longer needed so cleaning up",
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
err := nsc.ln.ipvsDelDestination(ipvsSvc, dst)
err = nsc.ipvsDeleteDestination(ipvsSvc, dst)
if err != nil {
glog.Errorf("Failed to delete destination %s from ipvs service %s",
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
}
// flush conntrack when endpoint for a UDP service changes
if ipvsSvc.Protocol == syscall.IPPROTO_UDP {
out, err := exec.Command("conntrack", "-D", "--orig-dst", dst.Address.String(), "-p", "udp", "--dport", strconv.Itoa(int(dst.Port))).CombinedOutput()
if err != nil {
if matched := re.MatchString(string(out)); !matched {
glog.Error("Failed to delete conntrack entry for endpoint: " + dst.Address.String() + ":" + strconv.Itoa(int(dst.Port)) + " due to " + err.Error())
}
}
glog.V(1).Infof("Deleted conntrack entry for endpoint: " + dst.Address.String() + ":" + strconv.Itoa(int(dst.Port)))
}
}
}
}
......@@ -1616,7 +1648,7 @@ func (nsc *NetworkServicesController) syncHairpinIptablesRules() error {
if err != nil {
glog.Errorf("Unable to delete hairpin rule \"%s\" from chain %s: %e", ruleFromNode, hairpinChain, err)
} else {
glog.V(1).Info("Deleted invalid/outdated hairpin rule \"%s\" from chain %s", ruleFromNode, hairpinChain)
glog.V(1).Infof("Deleted invalid/outdated hairpin rule \"%s\" from chain %s", ruleFromNode, hairpinChain)
}
} else {
// Ignore the chain creation rule
......@@ -2349,6 +2381,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
if err != nil {
return nil, err
}
nsc := NetworkServicesController{ln: ln}
if config.MetricsEnabled {
......@@ -2369,6 +2402,9 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
}
nsc.syncPeriod = config.IpvsSyncPeriod
nsc.syncChan = make(chan int, 2)
nsc.gracefulPeriod = config.IpvsGracefulPeriod
nsc.gracefulTermination = config.IpvsGracefulTermination
nsc.globalHairpin = config.GlobalHairpinMode
nsc.serviceMap = make(serviceInfoMap)
......
......@@ -4,10 +4,11 @@
package proxy
import (
"github.com/docker/libnetwork/ipvs"
"github.com/vishvananda/netlink"
"net"
"sync"
"github.com/docker/libnetwork/ipvs"
"github.com/vishvananda/netlink"
)
var (
......@@ -31,6 +32,10 @@ var (
lockLinuxNetworkingMocksetupRoutesForExternalIPForDSR sync.RWMutex
)
// Ensure, that LinuxNetworkingMock does implement LinuxNetworking.
// If this is not the case, regenerate this file with moq.
var _ LinuxNetworking = &LinuxNetworkingMock{}
// LinuxNetworkingMock is a mock implementation of LinuxNetworking.
//
// func TestSomethingThatUsesLinuxNetworking(t *testing.T) {
......@@ -38,63 +43,63 @@ var (
// // make and configure a mocked LinuxNetworking
// mockedLinuxNetworking := &LinuxNetworkingMock{
// cleanupMangleTableRuleFunc: func(ip string, protocol string, port string, fwmark string) error {
// panic("TODO: mock out the cleanupMangleTableRule method")
// panic("mock out the cleanupMangleTableRule method")
// },
// getKubeDummyInterfaceFunc: func() (netlink.Link, error) {
// panic("TODO: mock out the getKubeDummyInterface method")
// panic("mock out the getKubeDummyInterface method")
// },
// ipAddrAddFunc: func(iface netlink.Link, ip string, addRoute bool) error {
// panic("TODO: mock out the ipAddrAdd method")
// panic("mock out the ipAddrAdd method")
// },
// ipAddrDelFunc: func(iface netlink.Link, ip string) error {
// panic("TODO: mock out the ipAddrDel method")
// panic("mock out the ipAddrDel method")
// },
// ipvsAddFWMarkServiceFunc: func(vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) {
// panic("TODO: mock out the ipvsAddFWMarkService method")
// panic("mock out the ipvsAddFWMarkService method")
// },
// ipvsAddServerFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error {
// panic("TODO: mock out the ipvsAddServer method")
// panic("mock out the ipvsAddServer method")
// },
// ipvsAddServiceFunc: func(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) {
// panic("TODO: mock out the ipvsAddService method")
// panic("mock out the ipvsAddService method")
// },
// ipvsDelDestinationFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error {
// panic("TODO: mock out the ipvsDelDestination method")
// panic("mock out the ipvsDelDestination method")
// },
// ipvsDelServiceFunc: func(ipvsSvc *ipvs.Service) error {
// panic("TODO: mock out the ipvsDelService method")
// panic("mock out the ipvsDelService method")
// },
// ipvsGetDestinationsFunc: func(ipvsSvc *ipvs.Service) ([]*ipvs.Destination, error) {
// panic("TODO: mock out the ipvsGetDestinations method")
// panic("mock out the ipvsGetDestinations method")
// },
// ipvsGetServicesFunc: func() ([]*ipvs.Service, error) {
// panic("TODO: mock out the ipvsGetServices method")
// panic("mock out the ipvsGetServices method")
// },
// ipvsNewDestinationFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error {
// panic("TODO: mock out the ipvsNewDestination method")
// panic("mock out the ipvsNewDestination method")
// },
// ipvsNewServiceFunc: func(ipvsSvc *ipvs.Service) error {
// panic("TODO: mock out the ipvsNewService method")
// panic("mock out the ipvsNewService method")
// },
// ipvsUpdateDestinationFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error {
// panic("TODO: mock out the ipvsUpdateDestination method")
// panic("mock out the ipvsUpdateDestination method")
// },
// ipvsUpdateServiceFunc: func(ipvsSvc *ipvs.Service) error {
// panic("TODO: mock out the ipvsUpdateService method")
// panic("mock out the ipvsUpdateService method")
// },
// prepareEndpointForDsrFunc: func(containerId string, endpointIP string, vip string) error {
// panic("TODO: mock out the prepareEndpointForDsr method")
// panic("mock out the prepareEndpointForDsr method")
// },
// setupPolicyRoutingForDSRFunc: func() error {
// panic("TODO: mock out the setupPolicyRoutingForDSR method")
// panic("mock out the setupPolicyRoutingForDSR method")
// },
// setupRoutesForExternalIPForDSRFunc: func(in1 serviceInfoMap) error {
// panic("TODO: mock out the setupRoutesForExternalIPForDSR method")
// panic("mock out the setupRoutesForExternalIPForDSR method")
// },
// }
//
// // TODO: use mockedLinuxNetworking in code that requires LinuxNetworking
// // and then make assertions.
// // use mockedLinuxNetworking in code that requires LinuxNetworking
// // and then make assertions.
//
// }
type LinuxNetworkingMock struct {
......
......@@ -36,6 +36,8 @@ type KubeRouterConfig struct {
HostnameOverride string
IPTablesSyncPeriod time.Duration
IpvsSyncPeriod time.Duration
IpvsGracefulPeriod time.Duration
IpvsGracefulTermination bool
Kubeconfig string
MasqueradeAll bool
Master string
......@@ -64,6 +66,7 @@ func NewKubeRouterConfig() *KubeRouterConfig {
CacheSyncTimeout: 1 * time.Minute,
IpvsSyncPeriod: 5 * time.Minute,
IPTablesSyncPeriod: 5 * time.Minute,
IpvsGracefulPeriod: 30 * time.Second,
RoutesSyncPeriod: 5 * time.Minute,
EnableOverlay: true,
OverlayType: "subnet",
......@@ -99,6 +102,10 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) {
"The delay between iptables rule synchronizations (e.g. '5s', '1m'). Must be greater than 0.")
fs.DurationVar(&s.IpvsSyncPeriod, "ipvs-sync-period", s.IpvsSyncPeriod,
"The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&s.IpvsGracefulPeriod, "ipvs-graceful-period", s.IpvsGracefulPeriod,
"The graceful period before removing destinations from IPVS services (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.BoolVar(&s.IpvsGracefulTermination, "ipvs-graceful-termination", false,
"Enables the experimental IPVS graceful terminaton capability")
fs.DurationVar(&s.RoutesSyncPeriod, "routes-sync-period", s.RoutesSyncPeriod,
"The delay between route updates and advertisements (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.BoolVar(&s.AdvertiseClusterIp, "advertise-cluster-ip", false,
......
......@@ -145,3 +145,23 @@ const (
// addresses.
SourceHashing = "sh"
)
const (
// ConnFwdMask is a mask for the fwd methods
ConnFwdMask = 0x0007
// ConnFwdMasq denotes forwarding via masquerading/NAT
ConnFwdMasq = 0x0000
// ConnFwdLocalNode denotes forwarding to a local node
ConnFwdLocalNode = 0x0001
// ConnFwdTunnel denotes forwarding via a tunnel
ConnFwdTunnel = 0x0002
// ConnFwdDirectRoute denotes forwarding via direct routing
ConnFwdDirectRoute = 0x0003
// ConnFwdBypass denotes forwarding while bypassing the cache
ConnFwdBypass = 0x0004
)
......@@ -5,10 +5,18 @@ package ipvs
import (
"net"
"syscall"
"time"
"fmt"
"github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"
)
const (
netlinkRecvSocketsTimeout = 3 * time.Second
netlinkSendSocketTimeout = 30 * time.Second
)
// Service defines an IPVS service in its entirety.
......@@ -46,13 +54,26 @@ type SvcStats struct {
// Destination defines an IPVS destination (real server) in its
// entirety.
type Destination struct {
Address net.IP
Port uint16
Weight int
ConnectionFlags uint32
AddressFamily uint16
UpperThreshold uint32
LowerThreshold uint32
Address net.IP
Port uint16
Weight int
ConnectionFlags uint32
AddressFamily uint16
UpperThreshold uint32
LowerThreshold uint32
ActiveConnections int
InactiveConnections int
Stats DstStats
}
// DstStats defines IPVS destination (real server) statistics
type DstStats SvcStats
// Config defines IPVS timeout configuration
type Config struct {
TimeoutTCP time.Duration
TimeoutTCPFin time.Duration
TimeoutUDP time.Duration
}
// Handle provides a namespace specific ipvs handle to program ipvs
......@@ -82,6 +103,16 @@ func New(path string) (*Handle, error) {
if err != nil {
return nil, err
}
// Add operation timeout to avoid deadlocks
tv := unix.NsecToTimeval(netlinkSendSocketTimeout.Nanoseconds())
if err := sock.SetSendTimeout(&tv); err != nil {
return nil, err
}
tv = unix.NsecToTimeval(netlinkRecvSocketsTimeout.Nanoseconds())
if err := sock.SetReceiveTimeout(&tv); err != nil {
return nil, err
}
return &Handle{sock: sock}, nil
}
......@@ -166,3 +197,13 @@ func (i *Handle) GetService(s *Service) (*Service, error) {
return res[0], nil
}
// GetConfig returns the current timeout configuration
func (i *Handle) GetConfig() (*Config, error) {
return i.doGetConfigCmd()
}
// SetConfig set the current timeout configuration. 0: no change
func (i *Handle) SetConfig(c *Config) error {
return i.doSetConfigCmd(c)
}
......@@ -12,6 +12,7 @@ import (
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/sirupsen/logrus"
......@@ -100,7 +101,7 @@ func fillService(s *Service) nl.NetlinkRequestData {
return cmdAttr
}
func fillDestinaton(d *Destination) nl.NetlinkRequestData {
func fillDestination(d *Destination) nl.NetlinkRequestData {
cmdAttr := nl.NewRtAttr(ipvsCmdAttrDest, nil)
nl.NewRtAttrChild(cmdAttr, ipvsDestAttrAddress, rawIPData(d.Address))
......@@ -134,7 +135,7 @@ func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]b
}
} else {
req.AddData(fillDestinaton(d))
req.AddData(fillDestination(d))
}
res, err := execute(i.sock, req, 0)
......@@ -203,10 +204,6 @@ func newGenlRequest(familyID int, cmd uint8) *nl.NetlinkRequest {
}
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
var (
err error
)
if err := s.Send(req); err != nil {
return nil, err
}
......@@ -222,6 +219,13 @@ done:
for {
msgs, err := s.Receive()
if err != nil {
if s.GetFd() == -1 {
return nil, fmt.Errorf("Socket got closed on receive")
}
if err == syscall.EAGAIN {
// timeout fired
continue
}
return nil, err
}
for _, m := range msgs {
......@@ -436,6 +440,16 @@ func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error)
d.LowerThreshold = native.Uint32(attr.Value)
case ipvsDestAttrAddressFamily:
d.AddressFamily = native.Uint16(attr.Value)
case ipvsDestAttrActiveConnections:
d.ActiveConnections = int(native.Uint16(attr.Value))
case ipvsDestAttrInactiveConnections:
d.InactiveConnections = int(native.Uint16(attr.Value))
case ipvsSvcAttrStats:
stats, err := assembleStats(attr.Value)
if err != nil {
return nil, err
}
d.Stats = DstStats(stats)
}
}
return &d, nil
......@@ -490,6 +504,60 @@ func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destinatio
return res, nil
}
// parseConfig given a ipvs netlink response this function will respond with a valid config entry, an error otherwise
func (i *Handle) parseConfig(msg []byte) (*Config, error) {
var c Config
//Remove General header for this message
hdr := deserializeGenlMsg(msg)
attrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
if err != nil {
return nil, err
}
for _, attr := range attrs {
attrType := int(attr.Attr.Type)
switch attrType {
case ipvsCmdAttrTimeoutTCP:
c.TimeoutTCP = time.Duration(native.Uint32(attr.Value)) * time.Second
case ipvsCmdAttrTimeoutTCPFin:
c.TimeoutTCPFin = time.Duration(native.Uint32(attr.Value)) * time.Second
case ipvsCmdAttrTimeoutUDP:
c.TimeoutUDP = time.Duration(native.Uint32(attr.Value)) * time.Second
}
}
return &c, nil
}
// doGetConfigCmd a wrapper function to be used by GetConfig
func (i *Handle) doGetConfigCmd() (*Config, error) {
msg, err := i.doCmdWithoutAttr(ipvsCmdGetConfig)
if err != nil {
return nil, err
}
res, err := i.parseConfig(msg[0])
if err != nil {
return res, err
}
return res, nil
}
// doSetConfigCmd a wrapper function to be used by SetConfig
func (i *Handle) doSetConfigCmd(c *Config) error {
req := newIPVSRequest(ipvsCmdSetConfig)
req.Seq = atomic.AddUint32(&i.seq, 1)
req.AddData(nl.NewRtAttr(ipvsCmdAttrTimeoutTCP, nl.Uint32Attr(uint32(c.TimeoutTCP.Seconds()))))
req.AddData(nl.NewRtAttr(ipvsCmdAttrTimeoutTCPFin, nl.Uint32Attr(uint32(c.TimeoutTCPFin.Seconds()))))
req.AddData(nl.NewRtAttr(ipvsCmdAttrTimeoutUDP, nl.Uint32Attr(uint32(c.TimeoutUDP.Seconds()))))
_, err := execute(i.sock, req, 0)
return err
}
// IPVS related netlink message format explained
/* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api.
......
language: go
go:
- "1.10.x"
- "1.11.x"
- "1.12.x"
before_script:
# make sure we keep path in tact when we sudo
- sudo sed -i -e 's/^Defaults\tsecure_path.*$//' /etc/sudoers
......@@ -9,5 +13,7 @@ before_script:
- sudo modprobe nf_conntrack_netlink
- sudo modprobe nf_conntrack_ipv4
- sudo modprobe nf_conntrack_ipv6
- sudo modprobe sch_hfsc
install:
- go get github.com/vishvananda/netns
go_import_path: github.com/vishvananda/netlink
......@@ -65,7 +65,11 @@ func (h *Handle) addrHandle(link Link, addr *Addr, req *nl.NetlinkRequest) error
msg := nl.NewIfAddrmsg(family)
msg.Index = uint32(base.Index)
msg.Scope = uint8(addr.Scope)
prefixlen, masklen := addr.Mask.Size()
mask := addr.Mask
if addr.Peer != nil {
mask = addr.Peer.Mask
}
prefixlen, masklen := mask.Size()
msg.Prefixlen = uint8(prefixlen)
req.AddData(msg)
......@@ -107,7 +111,7 @@ func (h *Handle) addrHandle(link Link, addr *Addr, req *nl.NetlinkRequest) error
if addr.Broadcast == nil {
calcBroadcast := make(net.IP, masklen/8)
for i := range localAddrData {
calcBroadcast[i] = localAddrData[i] | ^addr.Mask[i]
calcBroadcast[i] = localAddrData[i] | ^mask[i]
}
addr.Broadcast = calcBroadcast
}
......@@ -206,13 +210,17 @@ func parseAddr(m []byte) (addr Addr, family, index int, err error) {
IP: attr.Value,
Mask: net.CIDRMask(int(msg.Prefixlen), 8*len(attr.Value)),
}
addr.Peer = dst
case unix.IFA_LOCAL:
// iproute2 manual:
// If a peer address is specified, the local address
// cannot have a prefix length. The network prefix is
// associated with the peer rather than with the local
// address.
n := 8 * len(attr.Value)
local = &net.IPNet{
IP: attr.Value,
Mask: net.CIDRMask(int(msg.Prefixlen), 8*len(attr.Value)),
Mask: net.CIDRMask(n, n),
}
addr.IPNet = local
case unix.IFA_BROADCAST:
addr.Broadcast = attr.Value
case unix.IFA_LABEL:
......@@ -226,12 +234,24 @@ func parseAddr(m []byte) (addr Addr, family, index int, err error) {
}
}
// IFA_LOCAL should be there but if not, fall back to IFA_ADDRESS
// libnl addr.c comment:
// IPv6 sends the local address as IFA_ADDRESS with no
// IFA_LOCAL, IPv4 sends both IFA_LOCAL and IFA_ADDRESS
// with IFA_ADDRESS being the peer address if they differ
//
// But obviously, as there are IPv6 PtP addresses, too,
// IFA_LOCAL should also be handled for IPv6.
if local != nil {
addr.IPNet = local
if family == FAMILY_V4 && local.IP.Equal(dst.IP) {
addr.IPNet = dst
} else {
addr.IPNet = local
addr.Peer = dst
}
} else {
addr.IPNet = dst
}
addr.Scope = int(msg.Scope)
return
......
......@@ -96,7 +96,7 @@ func (h *Handle) bridgeVlanModify(cmd int, link Link, vid uint16, pvid, untagged
flags |= nl.BRIDGE_FLAGS_MASTER
}
if flags > 0 {
nl.NewRtAttrChild(br, nl.IFLA_BRIDGE_FLAGS, nl.Uint16Attr(flags))
br.AddRtAttr(nl.IFLA_BRIDGE_FLAGS, nl.Uint16Attr(flags))
}
vlanInfo := &nl.BridgeVlanInfo{Vid: vid}
if pvid {
......@@ -105,11 +105,8 @@ func (h *Handle) bridgeVlanModify(cmd int, link Link, vid uint16, pvid, untagged
if untagged {
vlanInfo.Flags |= nl.BRIDGE_VLAN_INFO_UNTAGGED
}
nl.NewRtAttrChild(br, nl.IFLA_BRIDGE_VLAN_INFO, vlanInfo.Serialize())
br.AddRtAttr(nl.IFLA_BRIDGE_VLAN_INFO, vlanInfo.Serialize())
req.AddData(br)
_, err := req.Execute(unix.NETLINK_ROUTE, 0)
if err != nil {
return err
}
return nil
return err
}
......@@ -4,25 +4,76 @@ import (
"fmt"
)
// Class interfaces for all classes
type Class interface {
Attrs() *ClassAttrs
Type() string
}
// Generic networking statistics for netlink users.
// This file contains "gnet_" prefixed structs and relevant functions.
// See Documentation/networking/getn_stats.txt in Linux source code for more details.
// GnetStatsBasic Ref: struct gnet_stats_basic { ... }
type GnetStatsBasic struct {
Bytes uint64 // number of seen bytes
Packets uint32 // number of seen packets
}
// GnetStatsRateEst Ref: struct gnet_stats_rate_est { ... }
type GnetStatsRateEst struct {
Bps uint32 // current byte rate
Pps uint32 // current packet rate
}
// GnetStatsRateEst64 Ref: struct gnet_stats_rate_est64 { ... }
type GnetStatsRateEst64 struct {
Bps uint64 // current byte rate
Pps uint64 // current packet rate
}
// GnetStatsQueue Ref: struct gnet_stats_queue { ... }
type GnetStatsQueue struct {
Qlen uint32 // queue length
Backlog uint32 // backlog size of queue
Drops uint32 // number of dropped packets
Requeues uint32 // number of requues
Overlimits uint32 // number of enqueues over the limit
}
// ClassStatistics representation based on generic networking statistics for netlink.
// See Documentation/networking/gen_stats.txt in Linux source code for more details.
type ClassStatistics struct {
Basic *GnetStatsBasic
Queue *GnetStatsQueue
RateEst *GnetStatsRateEst
}
// NewClassStatistics Construct a ClassStatistics struct which fields are all initialized by 0.
func NewClassStatistics() *ClassStatistics {
return &ClassStatistics{
Basic: &GnetStatsBasic{},
Queue: &GnetStatsQueue{},
RateEst: &GnetStatsRateEst{},
}
}
// ClassAttrs represents a netlink class. A filter is associated with a link,
// has a handle and a parent. The root filter of a device should have a
// parent == HANDLE_ROOT.
type ClassAttrs struct {
LinkIndex int
Handle uint32
Parent uint32
Leaf uint32
LinkIndex int
Handle uint32
Parent uint32
Leaf uint32
Statistics *ClassStatistics
}
func (q ClassAttrs) String() string {
return fmt.Sprintf("{LinkIndex: %d, Handle: %s, Parent: %s, Leaf: %d}", q.LinkIndex, HandleStr(q.Handle), HandleStr(q.Parent), q.Leaf)
}
// HtbClassAttrs stores the attributes of HTB class
type HtbClassAttrs struct {
// TODO handle all attributes
Rate uint64
......@@ -54,10 +105,12 @@ func (q HtbClass) String() string {
return fmt.Sprintf("{Rate: %d, Ceil: %d, Buffer: %d, Cbuffer: %d}", q.Rate, q.Ceil, q.Buffer, q.Cbuffer)
}
// Attrs returns the class attributes
func (q *HtbClass) Attrs() *ClassAttrs {
return &q.ClassAttrs
}
// Type return the class type
func (q *HtbClass) Type() string {
return "htb"
}
......@@ -69,10 +122,90 @@ type GenericClass struct {
ClassType string
}
// Attrs return the class attributes
func (class *GenericClass) Attrs() *ClassAttrs {
return &class.ClassAttrs
}
// Type return the class type
func (class *GenericClass) Type() string {
return class.ClassType
}
// ServiceCurve is the way the HFSC curve are represented
type ServiceCurve struct {
m1 uint32
d uint32
m2 uint32
}
// Attrs return the parameters of the service curve
func (c *ServiceCurve) Attrs() (uint32, uint32, uint32) {
return c.m1, c.d, c.m2
}
// HfscClass is a representation of the HFSC class
type HfscClass struct {
ClassAttrs
Rsc ServiceCurve
Fsc ServiceCurve
Usc ServiceCurve
}
// SetUsc sets the Usc curve
func (hfsc *HfscClass) SetUsc(m1 uint32, d uint32, m2 uint32) {
hfsc.Usc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// SetFsc sets the Fsc curve
func (hfsc *HfscClass) SetFsc(m1 uint32, d uint32, m2 uint32) {
hfsc.Fsc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// SetRsc sets the Rsc curve
func (hfsc *HfscClass) SetRsc(m1 uint32, d uint32, m2 uint32) {
hfsc.Rsc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// SetSC implements the SC from the tc CLI
func (hfsc *HfscClass) SetSC(m1 uint32, d uint32, m2 uint32) {
hfsc.Rsc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
hfsc.Fsc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// SetUL implements the UL from the tc CLI
func (hfsc *HfscClass) SetUL(m1 uint32, d uint32, m2 uint32) {
hfsc.Usc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// SetLS implements the LS from the tc CLI
func (hfsc *HfscClass) SetLS(m1 uint32, d uint32, m2 uint32) {
hfsc.Fsc = ServiceCurve{m1: m1 / 8, d: d, m2: m2 / 8}
}
// NewHfscClass returns a new HFSC struct with the set parameters
func NewHfscClass(attrs ClassAttrs) *HfscClass {
return &HfscClass{
ClassAttrs: attrs,
Rsc: ServiceCurve{},
Fsc: ServiceCurve{},
Usc: ServiceCurve{},
}
}
func (hfsc *HfscClass) String() string {
return fmt.Sprintf(
"{%s -- {RSC: {m1=%d d=%d m2=%d}} {FSC: {m1=%d d=%d m2=%d}} {USC: {m1=%d d=%d m2=%d}}}",
hfsc.Attrs(), hfsc.Rsc.m1*8, hfsc.Rsc.d, hfsc.Rsc.m2*8, hfsc.Fsc.m1*8, hfsc.Fsc.d, hfsc.Fsc.m2*8, hfsc.Usc.m1*8, hfsc.Usc.d, hfsc.Usc.m2*8,
)
}
// Attrs return the Hfsc parameters
func (hfsc *HfscClass) Attrs() *ClassAttrs {
return &hfsc.ClassAttrs
}
// Type return the type of the class
func (hfsc *HfscClass) Type() string {
return "hfsc"
}
package netlink
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"syscall"
"github.com/vishvananda/netlink/nl"
"golang.org/x/sys/unix"
)
// NOTE: function is in here because it uses other linux functions
// Internal tc_stats representation in Go struct.
// This is for internal uses only to deserialize the payload of rtattr.
// After the deserialization, this should be converted into the canonical stats
// struct, ClassStatistics, in case of statistics of a class.
// Ref: struct tc_stats { ... }
type tcStats struct {
Bytes uint64 // Number of enqueued bytes
Packets uint32 // Number of enqueued packets
Drops uint32 // Packets dropped because of lack of resources
Overlimits uint32 // Number of throttle events when this flow goes out of allocated bandwidth
Bps uint32 // Current flow byte rate
Pps uint32 // Current flow packet rate
Qlen uint32
Backlog uint32
}
// NewHtbClass NOTE: function is in here because it uses other linux functions
func NewHtbClass(attrs ClassAttrs, cattrs HtbClassAttrs) *HtbClass {
mtu := 1600
rate := cattrs.Rate / 8
......@@ -126,7 +146,9 @@ func classPayload(req *nl.NetlinkRequest, class Class) error {
req.AddData(nl.NewRtAttr(nl.TCA_KIND, nl.ZeroTerminated(class.Type())))
options := nl.NewRtAttr(nl.TCA_OPTIONS, nil)
if htb, ok := class.(*HtbClass); ok {
switch class.Type() {
case "htb":
htb := class.(*HtbClass)
opt := nl.TcHtbCopt{}
opt.Buffer = htb.Buffer
opt.Cbuffer = htb.Cbuffer
......@@ -151,9 +173,18 @@ func classPayload(req *nl.NetlinkRequest, class Class) error {
return errors.New("HTB: failed to calculate ceil rate table")
}
opt.Ceil = tcceil
nl.NewRtAttrChild(options, nl.TCA_HTB_PARMS, opt.Serialize())
nl.NewRtAttrChild(options, nl.TCA_HTB_RTAB, SerializeRtab(rtab))
nl.NewRtAttrChild(options, nl.TCA_HTB_CTAB, SerializeRtab(ctab))
options.AddRtAttr(nl.TCA_HTB_PARMS, opt.Serialize())
options.AddRtAttr(nl.TCA_HTB_RTAB, SerializeRtab(rtab))
options.AddRtAttr(nl.TCA_HTB_CTAB, SerializeRtab(ctab))
case "hfsc":
hfsc := class.(*HfscClass)
opt := nl.HfscCopt{}
opt.Rsc.Set(hfsc.Rsc.Attrs())
opt.Fsc.Set(hfsc.Fsc.Attrs())
opt.Usc.Set(hfsc.Usc.Attrs())
options.AddRtAttr(nl.TCA_HFSC_RSC, nl.SerializeHfscCurve(&opt.Rsc))
options.AddRtAttr(nl.TCA_HFSC_FSC, nl.SerializeHfscCurve(&opt.Fsc))
options.AddRtAttr(nl.TCA_HFSC_USC, nl.SerializeHfscCurve(&opt.Usc))
}
req.AddData(options)
return nil
......@@ -197,9 +228,10 @@ func (h *Handle) ClassList(link Link, parent uint32) ([]Class, error) {
}
base := ClassAttrs{
LinkIndex: int(msg.Ifindex),
Handle: msg.Handle,
Parent: msg.Parent,
LinkIndex: int(msg.Ifindex),
Handle: msg.Handle,
Parent: msg.Parent,
Statistics: nil,
}
var class Class
......@@ -211,6 +243,8 @@ func (h *Handle) ClassList(link Link, parent uint32) ([]Class, error) {
switch classType {
case "htb":
class = &HtbClass{}
case "hfsc":
class = &HfscClass{}
default:
class = &GenericClass{ClassType: classType}
}
......@@ -225,6 +259,26 @@ func (h *Handle) ClassList(link Link, parent uint32) ([]Class, error) {
if err != nil {
return nil, err
}
case "hfsc":
data, err := nl.ParseRouteAttr(attr.Value)
if err != nil {
return nil, err
}
_, err = parseHfscClassData(class, data)
if err != nil {
return nil, err
}
}
// For backward compatibility.
case nl.TCA_STATS:
base.Statistics, err = parseTcStats(attr.Value)
if err != nil {
return nil, err
}
case nl.TCA_STATS2:
base.Statistics, err = parseTcStats2(attr.Value)
if err != nil {
return nil, err
}
}
}
......@@ -253,3 +307,78 @@ func parseHtbClassData(class Class, data []syscall.NetlinkRouteAttr) (bool, erro
}
return detailed, nil
}
func parseHfscClassData(class Class, data []syscall.NetlinkRouteAttr) (bool, error) {
hfsc := class.(*HfscClass)
detailed := false
for _, datum := range data {
m1, d, m2 := nl.DeserializeHfscCurve(datum.Value).Attrs()
switch datum.Attr.Type {
case nl.TCA_HFSC_RSC:
hfsc.Rsc = ServiceCurve{m1: m1, d: d, m2: m2}
case nl.TCA_HFSC_FSC:
hfsc.Fsc = ServiceCurve{m1: m1, d: d, m2: m2}
case nl.TCA_HFSC_USC:
hfsc.Usc = ServiceCurve{m1: m1, d: d, m2: m2}
}
}
return detailed, nil
}
func parseTcStats(data []byte) (*ClassStatistics, error) {
buf := &bytes.Buffer{}
buf.Write(data)
native := nl.NativeEndian()
tcStats := &tcStats{}
if err := binary.Read(buf, native, tcStats); err != nil {
return nil, err
}
stats := NewClassStatistics()
stats.Basic.Bytes = tcStats.Bytes
stats.Basic.Packets = tcStats.Packets
stats.Queue.Qlen = tcStats.Qlen
stats.Queue.Backlog = tcStats.Backlog
stats.Queue.Drops = tcStats.Drops
stats.Queue.Overlimits = tcStats.Overlimits
stats.RateEst.Bps = tcStats.Bps
stats.RateEst.Pps = tcStats.Pps
return stats, nil
}
func parseGnetStats(data []byte, gnetStats interface{}) error {
buf := &bytes.Buffer{}
buf.Write(data)
native := nl.NativeEndian()
return binary.Read(buf, native, gnetStats)
}
func parseTcStats2(data []byte) (*ClassStatistics, error) {
rtAttrs, err := nl.ParseRouteAttr(data)
if err != nil {
return nil, err
}
stats := NewClassStatistics()
for _, datum := range rtAttrs {
switch datum.Attr.Type {
case nl.TCA_STATS_BASIC:
if err := parseGnetStats(datum.Value, stats.Basic); err != nil {
return nil, fmt.Errorf("Failed to parse ClassStatistics.Basic with: %v\n%s",
err, hex.Dump(datum.Value))
}
case nl.TCA_STATS_QUEUE:
if err := parseGnetStats(datum.Value, stats.Queue); err != nil {
return nil, fmt.Errorf("Failed to parse ClassStatistics.Queue with: %v\n%s",
err, hex.Dump(datum.Value))
}
case nl.TCA_STATS_RATE_EST:
if err := parseGnetStats(datum.Value, stats.RateEst); err != nil {
return nil, fmt.Errorf("Failed to parse ClassStatistics.RateEst with: %v\n%s",
err, hex.Dump(datum.Value))
}
}
}
return stats, nil
}
......@@ -135,11 +135,13 @@ func (h *Handle) dumpConntrackTable(table ConntrackTableType, family InetFamily)
// http://git.netfilter.org/libnetfilter_conntrack/tree/include/internal/object.h
// For the time being, the structure below allows to parse and extract the base information of a flow
type ipTuple struct {
SrcIP net.IP
Bytes uint64
DstIP net.IP
DstPort uint16
Packets uint64
Protocol uint8
SrcIP net.IP
SrcPort uint16
DstPort uint16
}
type ConntrackFlow struct {
......@@ -151,11 +153,12 @@ type ConntrackFlow struct {
func (s *ConntrackFlow) String() string {
// conntrack cmd output:
// udp 17 src=127.0.0.1 dst=127.0.0.1 sport=4001 dport=1234 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=1234 dport=4001 mark=0
return fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d\tsrc=%s dst=%s sport=%d dport=%d mark=%d",
// udp 17 src=127.0.0.1 dst=127.0.0.1 sport=4001 dport=1234 packets=5 bytes=532 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=1234 dport=4001 packets=10 bytes=1078 mark=0
return fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d packets=%d bytes=%d\tsrc=%s dst=%s sport=%d dport=%d packets=%d bytes=%d mark=%d",
nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol,
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Mark)
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort, s.Forward.Packets, s.Forward.Bytes,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Reverse.Packets, s.Reverse.Bytes,
s.Mark)
}
// This method parse the ip tuple structure
......@@ -220,6 +223,24 @@ func parseBERaw16(r *bytes.Reader, v *uint16) {
binary.Read(r, binary.BigEndian, v)
}
func parseBERaw64(r *bytes.Reader, v *uint64) {
binary.Read(r, binary.BigEndian, v)
}
func parseByteAndPacketCounters(r *bytes.Reader) (bytes, packets uint64) {
for i := 0; i < 2; i++ {
switch _, t, _ := parseNfAttrTL(r); t {
case nl.CTA_COUNTERS_BYTES:
parseBERaw64(r, &bytes)
case nl.CTA_COUNTERS_PACKETS:
parseBERaw64(r, &packets)
default:
return
}
}
return
}
func parseRawData(data []byte) *ConntrackFlow {
s := &ConntrackFlow{}
var proto uint8
......@@ -238,20 +259,23 @@ func parseRawData(data []byte) *ConntrackFlow {
// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
// flow information of the reverse flow
for reader.Len() > 0 {
nested, t, l := parseNfAttrTL(reader)
if nested && t == nl.CTA_TUPLE_ORIG {
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
proto = parseIpTuple(reader, &s.Forward)
}
} else if nested && t == nl.CTA_TUPLE_REPLY {
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
parseIpTuple(reader, &s.Reverse)
// Got all the useful information stop parsing
break
} else {
// Header not recognized skip it
reader.Seek(int64(l), seekCurrent)
if nested, t, l := parseNfAttrTL(reader); nested {
switch t {
case nl.CTA_TUPLE_ORIG:
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
proto = parseIpTuple(reader, &s.Forward)
}
case nl.CTA_TUPLE_REPLY:
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
parseIpTuple(reader, &s.Reverse)
} else {
// Header not recognized skip it
reader.Seek(int64(l), seekCurrent)
}
case nl.CTA_COUNTERS_ORIG:
s.Forward.Bytes, s.Forward.Packets = parseByteAndPacketCounters(reader)
case nl.CTA_COUNTERS_REPLY:
s.Reverse.Bytes, s.Reverse.Packets = parseByteAndPacketCounters(reader)
}
}
}
......@@ -285,7 +309,7 @@ func parseRawData(data []byte) *ConntrackFlow {
// Common parameters and options:
// -s, --src, --orig-src ip Source address from original direction
// -d, --dst, --orig-dst ip Destination address from original direction
// -r, --reply-src ip Source addres from reply direction
// -r, --reply-src ip Source address from reply direction
// -q, --reply-dst ip Destination address from reply direction
// -p, --protonum proto Layer 4 Protocol, eg. 'tcp'
// -f, --family proto Layer 3 Protocol, eg. 'ipv6'
......@@ -302,11 +326,14 @@ func parseRawData(data []byte) *ConntrackFlow {
type ConntrackFilterType uint8
const (
ConntrackOrigSrcIP = iota // -orig-src ip Source address from original direction
ConntrackOrigDstIP // -orig-dst ip Destination address from original direction
ConntrackNatSrcIP // -src-nat ip Source NAT ip
ConntrackNatDstIP // -dst-nat ip Destination NAT ip
ConntrackNatAnyIP // -any-nat ip Source or destination NAT ip
ConntrackOrigSrcIP = iota // -orig-src ip Source address from original direction
ConntrackOrigDstIP // -orig-dst ip Destination address from original direction
ConntrackReplySrcIP // --reply-src ip Reply Source IP
ConntrackReplyDstIP // --reply-dst ip Reply Destination IP
ConntrackReplyAnyIP // Match source or destination reply IP
ConntrackNatSrcIP = ConntrackReplySrcIP // deprecated use instead ConntrackReplySrcIP
ConntrackNatDstIP = ConntrackReplyDstIP // deprecated use instead ConntrackReplyDstIP
ConntrackNatAnyIP = ConntrackReplyAnyIP // deprecated use instaed ConntrackReplyAnyIP
)
type CustomConntrackFilter interface {
......@@ -351,17 +378,17 @@ func (f *ConntrackFilter) MatchConntrackFlow(flow *ConntrackFlow) bool {
}
// -src-nat ip Source NAT ip
if elem, found := f.ipFilter[ConntrackNatSrcIP]; match && found {
if elem, found := f.ipFilter[ConntrackReplySrcIP]; match && found {
match = match && elem.Equal(flow.Reverse.SrcIP)
}
// -dst-nat ip Destination NAT ip
if elem, found := f.ipFilter[ConntrackNatDstIP]; match && found {
if elem, found := f.ipFilter[ConntrackReplyDstIP]; match && found {
match = match && elem.Equal(flow.Reverse.DstIP)
}
// -any-nat ip Source or destination NAT ip
if elem, found := f.ipFilter[ConntrackNatAnyIP]; match && found {
// Match source or destination reply IP
if elem, found := f.ipFilter[ConntrackReplyAnyIP]; match && found {
match = match && (elem.Equal(flow.Reverse.SrcIP) || elem.Equal(flow.Reverse.DstIP))
}
......
package netlink
import (
"syscall"
"github.com/vishvananda/netlink/nl"
"golang.org/x/sys/unix"
)
// DevlinkDevEswitchAttr represents device's eswitch attributes
type DevlinkDevEswitchAttr struct {
Mode string
InlineMode string
EncapMode string
}
// DevlinkDevAttrs represents device attributes
type DevlinkDevAttrs struct {
Eswitch DevlinkDevEswitchAttr
}
// DevlinkDevice represents device and its attributes
type DevlinkDevice struct {
BusName string
DeviceName string
Attrs DevlinkDevAttrs
}
func parseDevLinkDeviceList(msgs [][]byte) ([]*DevlinkDevice, error) {
devices := make([]*DevlinkDevice, 0, len(msgs))
for _, m := range msgs {
attrs, err := nl.ParseRouteAttr(m[nl.SizeofGenlmsg:])
if err != nil {
return nil, err
}
dev := &DevlinkDevice{}
if err = dev.parseAttributes(attrs); err != nil {
return nil, err
}
devices = append(devices, dev)
}
return devices, nil
}
func parseEswitchMode(mode uint16) string {
var eswitchMode = map[uint16]string{
nl.DEVLINK_ESWITCH_MODE_LEGACY: "legacy",
nl.DEVLINK_ESWITCH_MODE_SWITCHDEV: "switchdev",
}
if eswitchMode[mode] == "" {
return "unknown"
} else {
return eswitchMode[mode]
}
}
func parseEswitchInlineMode(inlinemode uint8) string {
var eswitchInlineMode = map[uint8]string{
nl.DEVLINK_ESWITCH_INLINE_MODE_NONE: "none",
nl.DEVLINK_ESWITCH_INLINE_MODE_LINK: "link",
nl.DEVLINK_ESWITCH_INLINE_MODE_NETWORK: "network",
nl.DEVLINK_ESWITCH_INLINE_MODE_TRANSPORT: "transport",
}
if eswitchInlineMode[inlinemode] == "" {
return "unknown"
} else {
return eswitchInlineMode[inlinemode]
}
}
func parseEswitchEncapMode(encapmode uint8) string {
var eswitchEncapMode = map[uint8]string{
nl.DEVLINK_ESWITCH_ENCAP_MODE_NONE: "disable",
nl.DEVLINK_ESWITCH_ENCAP_MODE_BASIC: "enable",
}
if eswitchEncapMode[encapmode] == "" {
return "unknown"
} else {
return eswitchEncapMode[encapmode]
}
}
func (d *DevlinkDevice) parseAttributes(attrs []syscall.NetlinkRouteAttr) error {
for _, a := range attrs {
switch a.Attr.Type {
case nl.DEVLINK_ATTR_BUS_NAME:
d.BusName = string(a.Value)
case nl.DEVLINK_ATTR_DEV_NAME:
d.DeviceName = string(a.Value)
case nl.DEVLINK_ATTR_ESWITCH_MODE:
d.Attrs.Eswitch.Mode = parseEswitchMode(native.Uint16(a.Value))
case nl.DEVLINK_ATTR_ESWITCH_INLINE_MODE:
d.Attrs.Eswitch.InlineMode = parseEswitchInlineMode(uint8(a.Value[0]))
case nl.DEVLINK_ATTR_ESWITCH_ENCAP_MODE:
d.Attrs.Eswitch.EncapMode = parseEswitchEncapMode(uint8(a.Value[0]))
}
}
return nil
}
func (dev *DevlinkDevice) parseEswitchAttrs(msgs [][]byte) {
m := msgs[0]
attrs, err := nl.ParseRouteAttr(m[nl.SizeofGenlmsg:])
if err != nil {
return
}
dev.parseAttributes(attrs)
}
func (h *Handle) getEswitchAttrs(family *GenlFamily, dev *DevlinkDevice) {
msg := &nl.Genlmsg{
Command: nl.DEVLINK_CMD_ESWITCH_GET,
Version: nl.GENL_DEVLINK_VERSION,
}
req := h.newNetlinkRequest(int(family.ID), unix.NLM_F_REQUEST|unix.NLM_F_ACK)
req.AddData(msg)
b := make([]byte, len(dev.BusName))
copy(b, dev.BusName)
data := nl.NewRtAttr(nl.DEVLINK_ATTR_BUS_NAME, b)
req.AddData(data)
b = make([]byte, len(dev.DeviceName))
copy(b, dev.DeviceName)
data = nl.NewRtAttr(nl.DEVLINK_ATTR_DEV_NAME, b)
req.AddData(data)
msgs, err := req.Execute(unix.NETLINK_GENERIC, 0)
if err != nil {
return
}
dev.parseEswitchAttrs(msgs)
}
// DevLinkGetDeviceList provides a pointer to devlink devices and nil error,
// otherwise returns an error code.
func (h *Handle) DevLinkGetDeviceList() ([]*DevlinkDevice, error) {
f, err := h.GenlFamilyGet(nl.GENL_DEVLINK_NAME)
if err != nil {
return nil, err
}
msg := &nl.Genlmsg{
Command: nl.DEVLINK_CMD_GET,
Version: nl.GENL_DEVLINK_VERSION,
}
req := h.newNetlinkRequest(int(f.ID),
unix.NLM_F_REQUEST|unix.NLM_F_ACK|unix.NLM_F_DUMP)
req.AddData(msg)
msgs, err := req.Execute(unix.NETLINK_GENERIC, 0)
if err != nil {
return nil, err
}
devices, err := parseDevLinkDeviceList(msgs)
if err != nil {
return nil, err
}
for _, d := range devices {
h.getEswitchAttrs(f, d)
}
return devices, nil
}
// DevLinkGetDeviceList provides a pointer to devlink devices and nil error,
// otherwise returns an error code.
func DevLinkGetDeviceList() ([]*DevlinkDevice, error) {
return pkgHandle.DevLinkGetDeviceList()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment