Unverified Commit e6f84188 authored by Darren Shepherd's avatar Darren Shepherd Committed by GitHub
Browse files

Merge pull request #1045 from ibuildthecloud/dqlite4

Add DQLite support but disabled by the build
parents 405f85aa 840c5911
No related merge requests found
Showing with 915 additions and 103 deletions
+915 -103
......@@ -64,6 +64,7 @@ require (
github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect
github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/canonical/go-dqlite v1.1.0
github.com/containerd/cgroups v0.0.0-20190923161937-abd0b19954a6 // indirect
github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect
......@@ -75,30 +76,32 @@ require (
github.com/containernetworking/plugins v0.8.2 // indirect
github.com/coreos/flannel v0.11.0
github.com/coreos/go-iptables v0.4.2
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f
github.com/docker/docker v0.7.3-0.20190731001754-589f1dad8dad
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/libnetwork v0.8.0-dev.2.0.20190624125649-f0e46a78ea34 // indirect
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4 // indirect
github.com/go-bindata/go-bindata v3.1.2+incompatible
github.com/go-sql-driver/mysql v1.4.1
github.com/gofrs/flock v0.7.1 // indirect
github.com/gogo/googleapis v1.3.0 // indirect
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/gorilla/websocket v1.4.1
github.com/juju/errors v0.0.0-20190806202954-0232dcc7464d // indirect
github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2 // indirect
github.com/kubernetes-sigs/cri-tools v0.0.0-00010101000000-000000000000
github.com/lib/pq v1.1.1
github.com/lxc/lxd v0.0.0-20191108214106-60ea15630455
github.com/mattn/go-sqlite3 v1.10.0
github.com/mindprince/gonvml v0.0.0-20190828220739-9ebdce4bb989 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/opencontainers/runc v1.0.0-rc2.0.20190611121236-6cc515888830
github.com/pkg/errors v0.8.1
github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect
github.com/rancher/dynamiclistener v0.1.1-0.20191108205817-245f86cc340a
github.com/rancher/dynamiclistener v0.1.1-0.20191110035254-aaa5bc0d2a07
github.com/rancher/helm-controller v0.2.2
github.com/rancher/kine v0.1.2-0.20191107225357-527576e3452f
github.com/rancher/kine v0.2.1
github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.2.0
github.com/rancher/wrangler-api v0.2.0
......@@ -108,11 +111,12 @@ require (
github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/theckman/go-flock v0.7.1 // indirect
github.com/urfave/cli v1.21.0
go.etcd.io/bbolt v1.3.3 // indirect
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3
google.golang.org/grpc v1.23.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 // indirect
gopkg.in/yaml.v2 v2.2.4
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
......
This diff is collapsed.
......@@ -9,6 +9,8 @@ RUN cd image/bin && \
FROM scratch
COPY --from=base /image /
RUN mkdir -p /etc && \
echo 'hosts: files dns' > /etc/nsswitch.conf
RUN chmod 1777 /tmp
VOLUME /var/lib/kubelet
VOLUME /var/lib/rancher/k3s
......
// +build dqlite
package cmds
const (
hideDqlite = false
)
......@@ -129,38 +129,6 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command {
Destination: &ServerConfig.TokenFile,
EnvVar: "K3S_TOKEN_FILE",
},
cli.StringFlag{
Name: "agent-token",
Usage: "(cluster) Shared secret used to join agents to the cluster, but not agents",
Destination: &ServerConfig.AgentToken,
EnvVar: "K3S_AGENT_TOKEN",
},
cli.StringFlag{
Name: "agent-token-file",
Usage: "(cluster) File containing the agent secret",
Destination: &ServerConfig.AgentTokenFile,
EnvVar: "K3S_AGENT_TOKEN_FILE",
},
cli.StringFlag{
Name: "server,s",
Usage: "(cluster) Server to connect to, used to join a cluster",
EnvVar: "K3S_URL",
Destination: &ServerConfig.ServerURL,
},
cli.BoolFlag{
Name: "cluster-init",
Hidden: hideDqlite,
Usage: "(cluster) Initialize new cluster master",
EnvVar: "K3S_CLUSTER_INIT",
Destination: &ServerConfig.ClusterInit,
},
cli.BoolFlag{
Name: "cluster-reset",
Hidden: hideDqlite,
Usage: "(cluster) Forget all peers and become a single cluster new cluster master",
EnvVar: "K3S_CLUSTER_RESET",
Destination: &ServerConfig.ClusterReset,
},
cli.StringFlag{
Name: "write-kubeconfig,o",
Usage: "(client) Write kubeconfig for admin client to this file",
......@@ -260,6 +228,38 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command {
Usage: "(experimental) Run rootless",
Destination: &ServerConfig.Rootless,
},
cli.StringFlag{
Name: "agent-token",
Usage: "(experimental/cluster) Shared secret used to join agents to the cluster, but not agents",
Destination: &ServerConfig.AgentToken,
EnvVar: "K3S_AGENT_TOKEN",
},
cli.StringFlag{
Name: "agent-token-file",
Usage: "(experimental/cluster) File containing the agent secret",
Destination: &ServerConfig.AgentTokenFile,
EnvVar: "K3S_AGENT_TOKEN_FILE",
},
cli.StringFlag{
Name: "server,s",
Usage: "(experimental/cluster) Server to connect to, used to join a cluster",
EnvVar: "K3S_URL",
Destination: &ServerConfig.ServerURL,
},
cli.BoolFlag{
Name: "cluster-init",
Hidden: hideDqlite,
Usage: "(experimental/cluster) Initialize new cluster master",
EnvVar: "K3S_CLUSTER_INIT",
Destination: &ServerConfig.ClusterInit,
},
cli.BoolFlag{
Name: "cluster-reset",
Hidden: hideDqlite,
Usage: "(experimental/cluster) Forget all peers and become a single cluster new cluster master",
EnvVar: "K3S_CLUSTER_RESET",
Destination: &ServerConfig.ClusterReset,
},
// Hidden/Deprecated flags below
......
......@@ -2,42 +2,73 @@ package cluster
import (
"context"
"strings"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/kine/pkg/client"
"github.com/rancher/kine/pkg/endpoint"
)
type Cluster struct {
token string
clientAccessInfo *clientaccess.Info
config *config.Control
runtime *config.ControlRuntime
db interface{}
runJoin bool
storageStarted bool
etcdConfig endpoint.ETCDConfig
joining bool
saveBootstrap bool
storageClient client.Client
}
func (c *Cluster) Start(ctx context.Context) error {
join, err := c.shouldJoin()
if err != nil {
if err := c.startClusterAndHTTPS(ctx); err != nil {
return err
}
if join {
if err := c.join(); err != nil {
if c.runJoin {
if err := c.postJoin(ctx); err != nil {
return err
}
}
if err := c.startClusterAndHTTPS(ctx); err != nil {
if err := c.testClusterDB(ctx); err != nil {
return err
}
if join {
if err := c.postJoin(ctx); err != nil {
if c.saveBootstrap {
if err := c.save(ctx); err != nil {
return err
}
}
if c.runJoin {
if err := c.joined(); err != nil {
return err
}
}
return c.joined()
return c.startStorage(ctx)
}
func (c *Cluster) startStorage(ctx context.Context) error {
if c.storageStarted {
return nil
}
c.storageStarted = true
etcdConfig, err := endpoint.Listen(ctx, c.config.Storage)
if err != nil {
return err
}
c.etcdConfig = etcdConfig
c.config.Storage.Config = etcdConfig.TLSConfig
c.config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect
return nil
}
func New(config *config.Control) *Cluster {
......
// +build dqlite
package cluster
import (
"context"
"crypto/tls"
"encoding/json"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/canonical/go-dqlite/client"
"github.com/rancher/dynamiclistener/factory"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/dqlite"
"github.com/rancher/kine/pkg/endpoint"
v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
)
func (c *Cluster) testClusterDB(ctx context.Context) error {
if !c.dqliteEnabled() {
return nil
}
dqlite := c.db.(*dqlite.DQLite)
for {
if err := dqlite.Test(ctx); err != nil {
logrus.Infof("Failed to test dqlite connection: %v", err)
} else {
return nil
}
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
}
func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
if !c.dqliteEnabled() {
return l, handler, nil
}
dqlite := dqlite.New(c.config.DataDir, c.config.AdvertiseIP, c.config.AdvertisePort, func() v1.NodeController {
if c.runtime.Core == nil {
return nil
}
return c.runtime.Core.Core().V1().Node()
})
certs, err := toGetCerts(c.runtime)
if err != nil {
return nil, nil, err
}
handler, err = dqlite.Start(ctx, c.config.ClusterInit, c.config.ClusterReset, certs, handler)
if err != nil {
return nil, nil, err
}
if c.config.ClusterReset {
if err := dqlite.Reset(ctx); err == nil {
logrus.Info("Cluster reset successful, now rejoin members")
os.Exit(0)
} else {
logrus.Fatalf("Cluster reset failed: %v", err)
}
}
c.db = dqlite
if !strings.HasPrefix(c.config.Storage.Endpoint, "dqlite://") {
c.config.Storage = endpoint.Config{
Endpoint: dqlite.StorageEndpoint,
}
}
return l, handler, err
}
func (c *Cluster) dqliteEnabled() bool {
stamp := filepath.Join(c.config.DataDir, "db", "state.dqlite")
if _, err := os.Stat(stamp); err == nil {
return true
}
driver, _ := endpoint.ParseStorageEndpoint(c.config.Storage.Endpoint)
if driver == endpoint.DQLiteBackend {
return true
}
return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != ""))
}
func (c *Cluster) postJoin(ctx context.Context) error {
if !c.dqliteEnabled() {
return nil
}
resp, err := clientaccess.Get("/db/info", c.clientAccessInfo)
if err != nil {
return err
}
dqlite := c.db.(*dqlite.DQLite)
var nodes []client.NodeInfo
if err := json.Unmarshal(resp, &nodes); err != nil {
return err
}
return dqlite.Join(ctx, nodes)
}
func toGetCerts(runtime *config.ControlRuntime) (*dqlite.Certs, error) {
clientCA, _, err := factory.LoadCerts(runtime.ClientCA, runtime.ClientCAKey)
if err != nil {
return nil, err
}
ca, _, err := factory.LoadCerts(runtime.ServerCA, runtime.ServerCAKey)
if err != nil {
return nil, err
}
clientCert, err := tls.LoadX509KeyPair(runtime.ClientKubeAPICert, runtime.ClientKubeAPIKey)
if err != nil {
return nil, err
}
return &dqlite.Certs{
ServerTrust: ca,
ClientTrust: clientCA,
ClientCert: clientCert,
}, nil
}
package cluster
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"strings"
"github.com/rancher/k3s/pkg/token"
"golang.org/x/crypto/pbkdf2"
)
func storageKey(passphrase string) string {
d := sha256.New()
d.Write([]byte(passphrase))
return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12]
}
func keyHash(passphrase string) string {
d := sha256.New()
d.Write([]byte(passphrase))
return hex.EncodeToString(d.Sum(nil)[:])[:12]
}
func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
salt, err := token.Random(8)
if err != nil {
return nil, err
}
clearKey := pbkdf2.Key([]byte(passphrase), []byte(salt), 4096, 32, sha1.New)
key, err := aes.NewCipher(clearKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(key)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
_, err = io.ReadFull(rand.Reader, nonce)
if err != nil {
return nil, err
}
sealed := gcm.Seal(nonce, nonce, plaintext, nil)
return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil
}
func decrypt(passphrase string, ciphertext []byte) ([]byte, error) {
parts := strings.SplitN(string(ciphertext), ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid cipher text, not : delimited")
}
clearKey := pbkdf2.Key([]byte(passphrase), []byte(parts[0]), 4096, 32, sha1.New)
key, err := aes.NewCipher(clearKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(key)
if err != nil {
return nil, err
}
data, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
return nil, err
}
return gcm.Open(nil, data[:gcm.NonceSize()], data[gcm.NonceSize():], nil)
}
......@@ -2,6 +2,7 @@ package cluster
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
......@@ -11,18 +12,38 @@ import (
"github.com/sirupsen/logrus"
)
func (c *Cluster) Join(ctx context.Context) error {
runJoin, err := c.shouldJoin()
if err != nil {
return err
}
c.runJoin = runJoin
if runJoin {
if err := c.join(ctx); err != nil {
return err
}
}
return nil
}
func (c *Cluster) shouldJoin() (bool, error) {
if c.config.JoinURL == "" {
return false, nil
dqlite := c.dqliteEnabled()
if dqlite {
c.runtime.HTTPBootstrap = true
if c.config.JoinURL == "" {
return false, nil
}
}
stamp := filepath.Join(c.config.DataDir, "db/joined")
stamp := c.joinStamp()
if _, err := os.Stat(stamp); err == nil {
logrus.Info("Already joined to cluster, not rejoining")
logrus.Info("Cluster bootstrap already complete")
return false, nil
}
if c.config.Token == "" {
if dqlite && c.config.Token == "" {
return false, fmt.Errorf("K3S_TOKEN is required to join a cluster")
}
......@@ -46,14 +67,11 @@ func (c *Cluster) joined() error {
return f.Close()
}
func (c *Cluster) join() error {
c.runtime.Cluster.Join = true
func (c *Cluster) httpJoin() error {
token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server")
if err != nil {
return err
}
c.token = token
info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token)
if err != nil {
......@@ -69,6 +87,20 @@ func (c *Cluster) join() error {
return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap)
}
func (c *Cluster) join(ctx context.Context) error {
c.joining = true
if c.runtime.HTTPBootstrap {
return c.httpJoin()
}
if err := c.storageJoin(ctx); err != nil {
return err
}
return nil
}
func (c *Cluster) joinStamp() string {
return filepath.Join(c.config.DataDir, "db/joined")
return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token))
}
......@@ -8,6 +8,10 @@ import (
"net/http"
)
func (c *Cluster) testClusterDB(ctx context.Context) error {
return nil
}
func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
return l, handler, nil
}
......@@ -15,3 +19,7 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt
func (c *Cluster) postJoin(ctx context.Context) error {
return nil
}
func (c *Cluster) dqliteEnabled() bool {
return false
}
package cluster
import (
"bytes"
"context"
"github.com/rancher/k3s/pkg/bootstrap"
"github.com/rancher/kine/pkg/client"
)
func (c *Cluster) save(ctx context.Context) error {
buf := &bytes.Buffer{}
if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil {
return err
}
data, err := encrypt(c.config.Token, buf.Bytes())
if err != nil {
return err
}
return c.storageClient.Create(ctx, storageKey(c.config.Token), data)
}
func (c *Cluster) storageJoin(ctx context.Context) error {
if err := c.startStorage(ctx); err != nil {
return err
}
storageClient, err := client.New(c.etcdConfig)
if err != nil {
return err
}
c.storageClient = storageClient
value, err := storageClient.Get(ctx, storageKey(c.config.Token))
if err == client.ErrNotFound {
c.saveBootstrap = true
return nil
} else if err != nil {
return err
}
data, err := decrypt(c.config.Token, value.Data)
if err != nil {
return err
}
return bootstrap.Read(bytes.NewBuffer(data), &c.runtime.ControlRuntimeBootstrap)
}
......@@ -132,6 +132,8 @@ type ControlRuntimeBootstrap struct {
type ControlRuntime struct {
ControlRuntimeBootstrap
HTTPBootstrap bool
ClientKubeAPICert string
ClientKubeAPIKey string
NodePasswdFile string
......@@ -169,12 +171,7 @@ type ControlRuntime struct {
ClientK3sControllerCert string
ClientK3sControllerKey string
Cluster ClusterConfig
Core *core.Factory
}
type ClusterConfig struct {
Join bool
Core *core.Factory
}
type ArgString []string
......
......@@ -26,14 +26,14 @@ import (
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/passwd"
"github.com/rancher/k3s/pkg/token"
"github.com/rancher/kine/pkg/endpoint"
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
app2 "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
......@@ -316,11 +316,13 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro
runtime.ClientAuthProxyCert = path.Join(config.DataDir, "tls", "client-auth-proxy.crt")
runtime.ClientAuthProxyKey = path.Join(config.DataDir, "tls", "client-auth-proxy.key")
if err := genCerts(config, runtime); err != nil {
cluster := cluster.New(config)
if err := cluster.Join(ctx); err != nil {
return err
}
if err := cluster.New(config).Start(ctx); err != nil {
if err := genCerts(config, runtime); err != nil {
return err
}
......@@ -336,23 +338,11 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro
return err
}
if err := prepareStorageBackend(ctx, config); err != nil {
return err
}
return readTokens(runtime)
}
func prepareStorageBackend(ctx context.Context, config *config.Control) error {
etcdConfig, err := endpoint.Listen(ctx, config.Storage)
if err != nil {
if err := readTokens(runtime); err != nil {
return err
}
config.Storage.Config = etcdConfig.TLSConfig
config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
config.NoLeaderElect = !etcdConfig.LeaderElect
return nil
return cluster.Start(ctx)
}
func readTokens(runtime *config.ControlRuntime) error {
......@@ -858,25 +848,24 @@ func waitForAPIServer(ctx context.Context, runtime *config.ControlRuntime) error
return err
}
discoveryclient, err := discovery.NewDiscoveryClientForConfig(restConfig)
k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
for i := 0; i < 60; i++ {
info, err := discoveryclient.ServerVersion()
if err == nil {
logrus.Infof("apiserver %s is up and running", info)
return nil
}
logrus.Infof("waiting for apiserver to become available")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
continue
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-promise(func() error { return app2.WaitForAPIServer(k8sClient, 5*time.Minute) }):
return err
}
}
return fmt.Errorf("timeout waiting for apiserver")
func promise(f func() error) <-chan error {
c := make(chan error, 1)
go func() {
c <- f()
close(c)
}()
return c
}
package client
import (
"context"
"fmt"
"strconv"
"github.com/canonical/go-dqlite/client"
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
const (
allKey = "_all_"
nodeID = "cluster.k3s.cattle.io/node-id"
nodeAddress = "cluster.k3s.cattle.io/node-address"
master = "node-role.kubernetes.io/master"
)
func Register(ctx context.Context, nodeName string, nodeInfo client.NodeInfo,
nodeStore client.NodeStore, nodes controllerv1.NodeController, opts []client.Option) {
h := &handler{
nodeStore: nodeStore,
nodeController: nodes,
nodeName: nodeName,
id: strconv.FormatUint(nodeInfo.ID, 10),
address: nodeInfo.Address,
ctx: ctx,
opts: opts,
}
nodes.OnChange(ctx, "dqlite-client", h.sync)
nodes.OnRemove(ctx, "dqlite-client", h.onRemove)
}
type handler struct {
nodeStore client.NodeStore
nodeController controllerv1.NodeController
nodeName string
id string
address string
ctx context.Context
opts []client.Option
}
func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
if key == allKey {
return nil, h.updateNodeStore()
}
if node == nil {
return nil, nil
}
if key == h.nodeName {
return h.handleSelf(node)
}
if node.Labels[master] == "true" {
h.nodeController.Enqueue(allKey)
}
return node, nil
}
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations[nodeID] == h.id && node.Annotations[nodeAddress] == h.address {
return node, nil
}
node = node.DeepCopy()
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[nodeID] = h.id
node.Annotations[nodeAddress] = h.address
return h.nodeController.Update(node)
}
func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
address := node.Annotations[nodeAddress]
if address == "" {
return node, nil
}
return node, h.delete(address)
}
func (h *handler) delete(address string) error {
c, err := client.FindLeader(h.ctx, h.nodeStore, h.opts...)
if err != nil {
return err
}
defer c.Close()
members, err := c.Cluster(h.ctx)
if err != nil {
return err
}
for _, member := range members {
if member.Address == address {
logrus.Infof("Removing %s %d from dqlite", member.Address, member.ID)
return c.Remove(h.ctx, member.ID)
}
}
return nil
}
func (h *handler) updateNodeStore() error {
nodes, err := h.nodeController.Cache().List(labels.SelectorFromSet(labels.Set{
master: "true",
}))
if err != nil {
return err
}
var (
nodeInfos []client.NodeInfo
seen = map[string]bool{}
)
for _, node := range nodes {
address, ok := node.Annotations[nodeAddress]
if !ok {
continue
}
nodeIDStr, ok := node.Annotations[nodeID]
if !ok {
continue
}
id, err := strconv.ParseUint(nodeIDStr, 10, 64)
if err != nil {
logrus.Errorf("invalid %s=%s, must be a number: %v", nodeID, nodeIDStr, err)
continue
}
if !seen[address] {
nodeInfos = append(nodeInfos, client.NodeInfo{
ID: id,
Address: address,
})
seen[address] = true
}
}
if len(nodeInfos) == 0 {
return fmt.Errorf("not setting dqlient NodeStore len to 0")
}
return h.nodeStore.Set(h.ctx, nodeInfos)
}
package dialer
import (
"context"
"crypto/tls"
"fmt"
"net"
"github.com/canonical/go-dqlite/client"
"github.com/rancher/k3s/pkg/dqlite/pipe"
)
func NewHTTPDialer(advertiseAddress, bindAddress string, tls *tls.Config) (client.DialFunc, error) {
d := &dialer{
advertiseAddress: advertiseAddress,
bindAddress: bindAddress,
tls: tls,
}
return d.dial, nil
}
type dialer struct {
advertiseAddress string
bindAddress string
tls *tls.Config
}
func (d *dialer) dial(ctx context.Context, address string) (net.Conn, error) {
if address == d.advertiseAddress {
return net.Dial("unix", d.bindAddress)
}
url := fmt.Sprintf("https://%s/db/connect", address)
return pipe.ToHTTP(ctx, url, d.tls)
}
package dqlite
import (
"context"
"github.com/canonical/go-dqlite/client"
"github.com/sirupsen/logrus"
)
func (d *DQLite) Test(ctx context.Context) error {
var ips []string
peers, err := d.NodeStore.Get(ctx)
if err != nil {
return err
}
for _, peer := range peers {
ips = append(ips, peer.Address)
}
logrus.Infof("Testing connection to peers %v", ips)
return d.Join(ctx, nil)
}
func (d *DQLite) Join(ctx context.Context, nodes []client.NodeInfo) error {
if len(nodes) > 0 {
if err := d.NodeStore.Set(ctx, nodes); err != nil {
return err
}
}
client, err := client.FindLeader(ctx, d.NodeStore, d.clientOpts...)
if err != nil {
return err
}
defer client.Close()
current, err := client.Cluster(ctx)
if err != nil {
return err
}
for _, testNode := range current {
if testNode.Address == d.NodeInfo.Address {
return nil
}
}
logrus.Infof("Joining dqlite cluster as address=%s, id=%d")
return client.Add(ctx, d.NodeInfo)
}
package dqlite
import (
"strings"
"github.com/canonical/go-dqlite/client"
"github.com/sirupsen/logrus"
)
func log() client.LogFunc {
return func(level client.LogLevel, s string, i ...interface{}) {
switch level {
case client.LogDebug:
logrus.Debugf(s, i...)
case client.LogError:
logrus.Errorf(s, i...)
case client.LogInfo:
if strings.HasPrefix(s, "connected") {
logrus.Debugf(s, i...)
} else {
logrus.Infof(s, i...)
}
case client.LogWarn:
logrus.Warnf(s, i...)
}
}
}
package pipe
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"github.com/pkg/errors"
)
func ToHTTP(ctx context.Context, url string, tlsConfig *tls.Config) (net.Conn, error) {
request, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return nil, err
}
request = request.WithContext(ctx)
netDial := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
netDial.Deadline = deadline
}
conn, err := tls.DialWithDialer(netDial, "tcp", request.URL.Host, tlsConfig)
if err != nil {
return nil, errors.Wrap(err, "tls dial")
}
err = request.Write(conn)
if err != nil {
return nil, errors.Wrap(err, "request write")
}
response, err := http.ReadResponse(bufio.NewReader(conn), request)
if err != nil {
return nil, errors.Wrap(err, "read request")
}
if response.StatusCode != http.StatusSwitchingProtocols {
return nil, fmt.Errorf("expected 101 response, got: %d", response.StatusCode)
}
listener, err := net.Listen("unix", "")
if err != nil {
return nil, errors.Wrap(err, "Failed to create unix listener")
}
defer listener.Close()
if err := Unix(conn, listener.Addr().String()); err != nil {
return nil, err
}
return listener.Accept()
}
package pipe
import (
"io"
"net"
"github.com/lxc/lxd/shared/eagain"
"github.com/sirupsen/logrus"
)
func UnixPiper(srcs <-chan net.Conn, bindAddress string) {
for src := range srcs {
go Unix(src, bindAddress)
}
}
func Unix(src net.Conn, target string) error {
dst, err := net.Dial("unix", target)
if err != nil {
src.Close()
return err
}
Connect(src, dst)
return nil
}
func Connect(src net.Conn, dst net.Conn) {
go func() {
_, err := io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src})
if err != nil && err != io.EOF {
logrus.Debugf("copy pipe src->dst closed: %v", err)
}
src.Close()
dst.Close()
}()
go func() {
_, err := io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst})
if err != nil {
logrus.Debugf("copy pipe dst->src closed: %v", err)
}
src.Close()
dst.Close()
}()
}
package dqlite
import (
"context"
"net"
"net/http"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/dqlite/pipe"
)
var (
upgradeResponse = []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: dqlite\r\n\r\n")
)
type proxy struct {
conns chan net.Conn
}
func newProxy(ctx context.Context, bindAddress string) http.Handler {
p := &proxy{
conns: make(chan net.Conn, 100),
}
go func() {
<-ctx.Done()
close(p.conns)
}()
go pipe.UnixPiper(p.conns, bindAddress)
return p
}
func (h *proxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
hijacker, ok := rw.(http.Hijacker)
if !ok {
http.Error(rw, "failed to hijack", http.StatusInternalServerError)
return
}
conn, _, err := hijacker.Hijack()
if err != nil {
err := errors.Wrap(err, "Hijack connection")
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
if n, err := conn.Write(upgradeResponse); err != nil || n != len(upgradeResponse) {
conn.Close()
return
}
h.conns <- conn
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment