kgctl connect (#269)

* kgctl connect

Use kgctl connect to connect your laptop to a cluster.

Signed-off-by: leonnicolas <leonloechner@gmx.de>

* cmd/kgctl: finish connect command

This commit fixes some bugs and finishes the implementation of the
`kgctl connect` command.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>

* e2e: add tests for kgctl connect

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>

* docs: add documentation for `kgctl connect`

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>

* pkg/mesh: move peer route generation to mesh

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>

Co-authored-by: Lucas Servén Marín <lserven@gmail.com>
This commit is contained in:
leonnicolas 2022-04-08 13:42:13 +02:00 committed by GitHub
parent d95e590f5c
commit 0dfb744630
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 569 additions and 34 deletions

View File

@ -209,7 +209,7 @@ $(BASH_UNIT):
chmod +x $@ chmod +x $@
e2e: container $(KIND_BINARY) $(KUBECTL_BINARY) $(BASH_UNIT) bin/$(OS)/$(ARCH)/kgctl e2e: container $(KIND_BINARY) $(KUBECTL_BINARY) $(BASH_UNIT) bin/$(OS)/$(ARCH)/kgctl
KILO_IMAGE=$(IMAGE):$(ARCH)-$(VERSION) KIND_BINARY=$(KIND_BINARY) KUBECTL_BINARY=$(KUBECTL_BINARY) KGCTL_BINARY=$(shell pwd)/bin/$(OS)/$(ARCH)/kgctl $(BASH_UNIT) $(BASH_UNIT_FLAGS) ./e2e/setup.sh ./e2e/full-mesh.sh ./e2e/location-mesh.sh ./e2e/multi-cluster.sh ./e2e/handlers.sh ./e2e/teardown.sh KILO_IMAGE=$(IMAGE):$(ARCH)-$(VERSION) KIND_BINARY=$(KIND_BINARY) KUBECTL_BINARY=$(KUBECTL_BINARY) KGCTL_BINARY=$(shell pwd)/bin/$(OS)/$(ARCH)/kgctl $(BASH_UNIT) $(BASH_UNIT_FLAGS) ./e2e/setup.sh ./e2e/full-mesh.sh ./e2e/location-mesh.sh ./e2e/multi-cluster.sh ./e2e/handlers.sh ./e2e/kgctl.sh ./e2e/teardown.sh
header: .header header: .header
@HEADER=$$(cat .header); \ @HEADER=$$(cat .header); \

374
cmd/kgctl/connect_linux.go Normal file
View File

@ -0,0 +1,374 @@
// Copyright 2022 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux
// +build linux
package main
import (
"context"
"errors"
"fmt"
"net"
"os"
"sort"
"strings"
"syscall"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/spf13/cobra"
"golang.zx2c4.com/wireguard/wgctrl"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/squat/kilo/pkg/iproute"
"github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1"
"github.com/squat/kilo/pkg/mesh"
"github.com/squat/kilo/pkg/route"
"github.com/squat/kilo/pkg/wireguard"
)
var (
logLevel string
connectOpts struct {
allowedIP net.IPNet
allowedIPs []net.IPNet
privateKey string
cleanUp bool
mtu uint
resyncPeriod time.Duration
interfaceName string
persistentKeepalive int
}
)
func takeIPNet(_ net.IP, i *net.IPNet, err error) *net.IPNet {
if err != nil {
panic(err)
}
return i
}
func connect() *cobra.Command {
cmd := &cobra.Command{
Use: "connect",
Args: cobra.ExactArgs(1),
RunE: runConnect,
Short: "connect to a Kilo cluster as a peer over WireGuard",
SilenceUsage: true,
}
cmd.Flags().IPNetVarP(&connectOpts.allowedIP, "allowed-ip", "a", *takeIPNet(net.ParseCIDR("10.10.10.10/32")), "Allowed IP of the peer.")
cmd.Flags().StringSliceVar(&allowedIPs, "allowed-ips", []string{}, "Additional allowed IPs of the cluster, e.g. the service CIDR.")
cmd.Flags().StringVar(&logLevel, "log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
cmd.Flags().StringVar(&connectOpts.privateKey, "private-key", "", "Path to an existing WireGuard private key file.")
cmd.Flags().BoolVar(&connectOpts.cleanUp, "clean-up", true, "Should Kilo clean up the routes and interface when it shuts down?")
cmd.Flags().UintVar(&connectOpts.mtu, "mtu", uint(1420), "The MTU for the WireGuard interface.")
cmd.Flags().DurationVar(&connectOpts.resyncPeriod, "resync-period", 30*time.Second, "How often should Kilo reconcile?")
cmd.Flags().StringVarP(&connectOpts.interfaceName, "interface", "i", mesh.DefaultKiloInterface, "Name of the Kilo interface to use; if it does not exist, it will be created.")
cmd.Flags().IntVar(&connectOpts.persistentKeepalive, "persistent-keepalive", 10, "How often should WireGuard send keepalives? Setting to 0 will disable sending keepalives.")
availableLogLevels = strings.Join([]string{
logLevelAll,
logLevelDebug,
logLevelInfo,
logLevelWarn,
logLevelError,
logLevelNone,
}, ", ")
return cmd
}
func runConnect(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
switch logLevel {
case logLevelAll:
logger = level.NewFilter(logger, level.AllowAll())
case logLevelDebug:
logger = level.NewFilter(logger, level.AllowDebug())
case logLevelInfo:
logger = level.NewFilter(logger, level.AllowInfo())
case logLevelWarn:
logger = level.NewFilter(logger, level.AllowWarn())
case logLevelError:
logger = level.NewFilter(logger, level.AllowError())
case logLevelNone:
logger = level.NewFilter(logger, level.AllowNone())
default:
return fmt.Errorf("log level %s unknown; possible values are: %s", logLevel, availableLogLevels)
}
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
peerName := args[0]
for i := range allowedIPs {
_, aip, err := net.ParseCIDR(allowedIPs[i])
if err != nil {
return err
}
connectOpts.allowedIPs = append(connectOpts.allowedIPs, *aip)
}
var privateKey wgtypes.Key
var err error
if connectOpts.privateKey == "" {
privateKey, err = wgtypes.GeneratePrivateKey()
if err != nil {
return fmt.Errorf("failed to generate private key: %w", err)
}
} else {
raw, err := os.ReadFile(connectOpts.privateKey)
if err != nil {
return fmt.Errorf("failed to read private key: %w", err)
}
privateKey, err = wgtypes.ParseKey(string(raw))
if err != nil {
return fmt.Errorf("failed to parse private key: %w", err)
}
}
publicKey := privateKey.PublicKey()
level.Info(logger).Log("msg", "generated public key", "key", publicKey)
if _, err := opts.kc.KiloV1alpha1().Peers().Get(ctx, peerName, metav1.GetOptions{}); apierrors.IsNotFound(err) {
peer := &v1alpha1.Peer{
ObjectMeta: metav1.ObjectMeta{
Name: peerName,
},
Spec: v1alpha1.PeerSpec{
AllowedIPs: []string{connectOpts.allowedIP.String()},
PersistentKeepalive: connectOpts.persistentKeepalive,
PublicKey: publicKey.String(),
},
}
if _, err := opts.kc.KiloV1alpha1().Peers().Create(ctx, peer, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create peer: %w", err)
}
level.Info(logger).Log("msg", "created peer", "peer", peerName)
if connectOpts.cleanUp {
defer func() {
ctxWithTimeout, cancelWithTimeout := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelWithTimeout()
if err := opts.kc.KiloV1alpha1().Peers().Delete(ctxWithTimeout, peerName, metav1.DeleteOptions{}); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("failed to delete peer: %v", err))
} else {
level.Info(logger).Log("msg", "deleted peer", "peer", peerName)
}
}()
}
} else if err != nil {
return fmt.Errorf("failed to get peer: %w", err)
}
iface, _, err := wireguard.New(connectOpts.interfaceName, connectOpts.mtu)
if err != nil {
return fmt.Errorf("failed to create wg interface: %w", err)
}
level.Info(logger).Log("msg", "created WireGuard interface", "name", connectOpts.interfaceName, "index", iface)
table := route.NewTable()
if connectOpts.cleanUp {
defer cleanUp(iface, table, logger)
}
if err := iproute.SetAddress(iface, &connectOpts.allowedIP); err != nil {
return err
}
level.Info(logger).Log("msg", "set IP address of WireGuard interface", "IP", connectOpts.allowedIP.String())
if err := iproute.Set(iface, true); err != nil {
return err
}
var g run.Group
g.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM))
{
g.Add(
func() error {
errCh, err := table.Run(ctx.Done())
if err != nil {
return fmt.Errorf("failed to watch for route table updates: %w", err)
}
for {
select {
case err, ok := <-errCh:
if ok {
level.Error(logger).Log("err", err.Error())
} else {
return nil
}
case <-ctx.Done():
return nil
}
}
},
func(err error) {
cancel()
var serr run.SignalError
if ok := errors.As(err, &serr); ok {
level.Debug(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error())
} else {
level.Error(logger).Log("msg", "received error", "err", err.Error())
}
},
)
}
{
g.Add(
func() error {
level.Info(logger).Log("msg", "starting syncer")
for {
if err := sync(table, peerName, privateKey, iface, logger); err != nil {
level.Error(logger).Log("msg", "failed to sync", "err", err.Error())
}
select {
case <-time.After(connectOpts.resyncPeriod):
case <-ctx.Done():
return nil
}
}
}, func(err error) {
cancel()
var serr run.SignalError
if ok := errors.As(err, &serr); ok {
level.Debug(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error())
} else {
level.Error(logger).Log("msg", "received error", "err", err.Error())
}
})
}
err = g.Run()
var serr run.SignalError
if ok := errors.As(err, &serr); ok {
return nil
}
return err
}
func cleanUp(iface int, t *route.Table, logger log.Logger) {
if err := iproute.Set(iface, false); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("failed to set WireGuard interface down: %v", err))
}
if err := iproute.RemoveInterface(iface); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("failed to remove WireGuard interface: %v", err))
}
if err := t.CleanUp(); err != nil {
level.Error(logger).Log("failed to clean up routes: %v", err)
}
return
}
func sync(table *route.Table, peerName string, privateKey wgtypes.Key, iface int, logger log.Logger) error {
ns, err := opts.backend.Nodes().List()
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
for _, n := range ns {
_, err := n.Endpoint.UDPAddr(true)
if err != nil {
return err
}
}
ps, err := opts.backend.Peers().List()
if err != nil {
return fmt.Errorf("failed to list peers: %w", err)
}
// Obtain the Granularity by looking at the annotation of the first node.
if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil {
return fmt.Errorf("failed to determine granularity: %w", err)
}
var hostname string
var subnet *net.IPNet
nodes := make(map[string]*mesh.Node)
var nodeNames []string
for _, n := range ns {
if n.Ready() {
nodes[n.Name] = n
hostname = n.Name
nodeNames = append(nodeNames, n.Name)
}
if n.WireGuardIP != nil && subnet == nil {
subnet = n.WireGuardIP
}
}
if len(nodes) == 0 {
return errors.New("did not find any valid Kilo nodes in the cluster")
}
if subnet == nil {
return errors.New("did not find a valid Kilo subnet on any node")
}
subnet.IP = subnet.IP.Mask(subnet.Mask)
sort.Strings(nodeNames)
nodes[nodeNames[0]].AllowedLocationIPs = append(nodes[nodeNames[0]].AllowedLocationIPs, connectOpts.allowedIPs...)
peers := make(map[string]*mesh.Peer)
for _, p := range ps {
if p.Ready() {
peers[p.Name] = p
}
}
if _, ok := peers[peerName]; !ok {
return fmt.Errorf("did not find any peer named %q in the cluster", peerName)
}
t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, opts.port, wgtypes.Key{}, subnet, *peers[peerName].PersistentKeepaliveInterval, logger)
if err != nil {
return fmt.Errorf("failed to create topology: %w", err)
}
conf := t.PeerConf(peerName)
conf.PrivateKey = &privateKey
conf.ListenPort = &opts.port
wgClient, err := wgctrl.New()
if err != nil {
return err
}
defer wgClient.Close()
current, err := wgClient.Device(connectOpts.interfaceName)
if err != nil {
return err
}
var equal bool
var diff string
equal, diff = conf.Equal(current)
if !equal {
// If the key is empty, then it's the first time we are running
// so don't bother printing a diff.
if current.PrivateKey != [wgtypes.KeyLen]byte{} {
level.Info(logger).Log("msg", "WireGuard configurations are different", "diff", diff)
}
level.Debug(logger).Log("msg", "setting WireGuard config", "config", conf.WGConfig())
if err := wgClient.ConfigureDevice(connectOpts.interfaceName, conf.WGConfig()); err != nil {
return err
}
}
if err := table.Set(t.PeerRoutes(peerName, iface, connectOpts.allowedIPs)); err != nil {
return fmt.Errorf("failed to update route table: %w", err)
}
return nil
}

View File

@ -0,0 +1,35 @@
// Copyright 2022 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !linux
// +build !linux
package main
import (
"errors"
"github.com/spf13/cobra"
)
func connect() *cobra.Command {
cmd := &cobra.Command{
Use: "connect",
Short: "not supporred on this OS",
RunE: func(_ *cobra.Command, _ []string) error {
return errors.New("this command is not supported on this OS")
},
}
return cmd
}

View File

@ -34,15 +34,15 @@ func graph() *cobra.Command {
func runGraph(_ *cobra.Command, _ []string) error { func runGraph(_ *cobra.Command, _ []string) error {
ns, err := opts.backend.Nodes().List() ns, err := opts.backend.Nodes().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list nodes: %v", err) return fmt.Errorf("failed to list nodes: %w", err)
} }
ps, err := opts.backend.Peers().List() ps, err := opts.backend.Peers().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list peers: %v", err) return fmt.Errorf("failed to list peers: %w", err)
} }
// Obtain the Granularity by looking at the annotation of the first node. // Obtain the Granularity by looking at the annotation of the first node.
if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil {
return fmt.Errorf("failed to obtain granularity: %w", err) return fmt.Errorf("failed to determine granularity: %w", err)
} }
var hostname string var hostname string
@ -69,11 +69,11 @@ func runGraph(_ *cobra.Command, _ []string) error {
} }
t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil) t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to create topology: %v", err) return fmt.Errorf("failed to create topology: %w", err)
} }
g, err := t.Dot() g, err := t.Dot()
if err != nil { if err != nil {
return fmt.Errorf("failed to generate graph: %v", err) return fmt.Errorf("failed to generate graph: %w", err)
} }
fmt.Println(g) fmt.Println(g)
return nil return nil

View File

@ -62,6 +62,7 @@ var (
opts struct { opts struct {
backend mesh.Backend backend mesh.Backend
granularity mesh.Granularity granularity mesh.Granularity
kc kiloclient.Interface
port int port int
} }
backend string backend string
@ -81,29 +82,29 @@ func runRoot(_ *cobra.Command, _ []string) error {
case mesh.FullGranularity: case mesh.FullGranularity:
case mesh.AutoGranularity: case mesh.AutoGranularity:
default: default:
return fmt.Errorf("mesh granularity %v unknown; posible values are: %s", granularity, availableGranularities) return fmt.Errorf("mesh granularity %s unknown; posible values are: %s", granularity, availableGranularities)
} }
switch backend { switch backend {
case k8s.Backend: case k8s.Backend:
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil { if err != nil {
return fmt.Errorf("failed to create Kubernetes config: %v", err) return fmt.Errorf("failed to create Kubernetes config: %w", err)
} }
c := kubernetes.NewForConfigOrDie(config) c := kubernetes.NewForConfigOrDie(config)
kc := kiloclient.NewForConfigOrDie(config) opts.kc = kiloclient.NewForConfigOrDie(config)
ec := apiextensions.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config)
opts.backend = k8s.New(c, kc, ec, topologyLabel, log.NewNopLogger()) opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger())
default: default:
return fmt.Errorf("backend %v unknown; posible values are: %s", backend, availableBackends) return fmt.Errorf("backend %s unknown; posible values are: %s", backend, availableBackends)
} }
if err := opts.backend.Nodes().Init(make(chan struct{})); err != nil { if err := opts.backend.Nodes().Init(make(chan struct{})); err != nil {
return fmt.Errorf("failed to initialize node backend: %v", err) return fmt.Errorf("failed to initialize node backend: %w", err)
} }
if err := opts.backend.Peers().Init(make(chan struct{})); err != nil { if err := opts.backend.Peers().Init(make(chan struct{})); err != nil {
return fmt.Errorf("failed to initialize peer backend: %v", err) return fmt.Errorf("failed to initialize peer backend: %w", err)
} }
return nil return nil
} }
@ -130,6 +131,7 @@ func main() {
for _, subCmd := range []*cobra.Command{ for _, subCmd := range []*cobra.Command{
graph(), graph(),
showConf(), showConf(),
connect(),
} { } {
cmd.AddCommand(subCmd) cmd.AddCommand(subCmd)
} }
@ -140,7 +142,7 @@ func main() {
} }
} }
func optainGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity, error) { func determineGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity, error) {
if gr == mesh.AutoGranularity { if gr == mesh.AutoGranularity {
if len(ns) == 0 { if len(ns) == 0 {
return gr, errors.New("could not get any nodes") return gr, errors.New("could not get any nodes")
@ -150,7 +152,7 @@ func optainGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity,
case mesh.LogicalGranularity: case mesh.LogicalGranularity:
case mesh.FullGranularity: case mesh.FullGranularity:
default: default:
return ret, fmt.Errorf("mesh granularity %v is not supported", opts.granularity) return ret, fmt.Errorf("mesh granularity %s is not supported", opts.granularity)
} }
return ret, nil return ret, nil
} }

View File

@ -83,7 +83,7 @@ func runShowConf(c *cobra.Command, args []string) error {
case outputFormatYAML: case outputFormatYAML:
showConfOpts.serializer = json.NewYAMLSerializer(json.DefaultMetaFactory, peerCreatorTyper{}, peerCreatorTyper{}) showConfOpts.serializer = json.NewYAMLSerializer(json.DefaultMetaFactory, peerCreatorTyper{}, peerCreatorTyper{})
default: default:
return fmt.Errorf("output format %v unknown; posible values are: %s", showConfOpts.output, availableOutputFormats) return fmt.Errorf("output format %s unknown; posible values are: %s", showConfOpts.output, availableOutputFormats)
} }
for i := range allowedIPs { for i := range allowedIPs {
_, aip, err := net.ParseCIDR(allowedIPs[i]) _, aip, err := net.ParseCIDR(allowedIPs[i])
@ -116,15 +116,15 @@ func showConfPeer() *cobra.Command {
func runShowConfNode(_ *cobra.Command, args []string) error { func runShowConfNode(_ *cobra.Command, args []string) error {
ns, err := opts.backend.Nodes().List() ns, err := opts.backend.Nodes().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list nodes: %v", err) return fmt.Errorf("failed to list nodes: %w", err)
} }
ps, err := opts.backend.Peers().List() ps, err := opts.backend.Peers().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list peers: %v", err) return fmt.Errorf("failed to list peers: %w", err)
} }
// Obtain the Granularity by looking at the annotation of the first node. // Obtain the Granularity by looking at the annotation of the first node.
if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil {
return fmt.Errorf("failed to obtain granularity: %w", err) return fmt.Errorf("failed to determine granularity: %w", err)
} }
hostname := args[0] hostname := args[0]
subnet := mesh.DefaultKiloSubnet subnet := mesh.DefaultKiloSubnet
@ -154,7 +154,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error {
t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil) t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to create topology: %v", err) return fmt.Errorf("failed to create topology: %w", err)
} }
var found bool var found bool
@ -172,7 +172,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error {
if !showConfOpts.asPeer { if !showConfOpts.asPeer {
c, err := t.Conf().Bytes() c, err := t.Conf().Bytes()
if err != nil { if err != nil {
return fmt.Errorf("failed to generate configuration: %v", err) return fmt.Errorf("failed to generate configuration: %w", err)
} }
_, err = os.Stdout.Write(c) _, err = os.Stdout.Write(c)
return err return err
@ -202,7 +202,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error {
Peers: []wireguard.Peer{*p}, Peers: []wireguard.Peer{*p},
}).Bytes() }).Bytes()
if err != nil { if err != nil {
return fmt.Errorf("failed to generate configuration: %v", err) return fmt.Errorf("failed to generate configuration: %w", err)
} }
_, err = os.Stdout.Write(c) _, err = os.Stdout.Write(c)
return err return err
@ -213,15 +213,15 @@ func runShowConfNode(_ *cobra.Command, args []string) error {
func runShowConfPeer(_ *cobra.Command, args []string) error { func runShowConfPeer(_ *cobra.Command, args []string) error {
ns, err := opts.backend.Nodes().List() ns, err := opts.backend.Nodes().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list nodes: %v", err) return fmt.Errorf("failed to list nodes: %w", err)
} }
ps, err := opts.backend.Peers().List() ps, err := opts.backend.Peers().List()
if err != nil { if err != nil {
return fmt.Errorf("failed to list peers: %v", err) return fmt.Errorf("failed to list peers: %w", err)
} }
// Obtain the Granularity by looking at the annotation of the first node. // Obtain the Granularity by looking at the annotation of the first node.
if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil {
return fmt.Errorf("failed to obtain granularity: %w", err) return fmt.Errorf("failed to determine granularity: %w", err)
} }
var hostname string var hostname string
subnet := mesh.DefaultKiloSubnet subnet := mesh.DefaultKiloSubnet
@ -257,12 +257,12 @@ func runShowConfPeer(_ *cobra.Command, args []string) error {
} }
t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, pka, nil) t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, pka, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to create topology: %v", err) return fmt.Errorf("failed to create topology: %w", err)
} }
if !showConfOpts.asPeer { if !showConfOpts.asPeer {
c, err := t.PeerConf(peer).Bytes() c, err := t.PeerConf(peer).Bytes()
if err != nil { if err != nil {
return fmt.Errorf("failed to generate configuration: %v", err) return fmt.Errorf("failed to generate configuration: %w", err)
} }
_, err = os.Stdout.Write(c) _, err = os.Stdout.Write(c)
return err return err
@ -286,7 +286,7 @@ func runShowConfPeer(_ *cobra.Command, args []string) error {
Peers: []wireguard.Peer{*p}, Peers: []wireguard.Peer{*p},
}).Bytes() }).Bytes()
if err != nil { if err != nil {
return fmt.Errorf("failed to generate configuration: %v", err) return fmt.Errorf("failed to generate configuration: %w", err)
} }
_, err = os.Stdout.Write(c) _, err = os.Stdout.Write(c)
return err return err

View File

@ -54,9 +54,47 @@ arkade get kgctl
|Command|Syntax|Description| |Command|Syntax|Description|
|----|----|-------| |----|----|-------|
|[connect](#connect)|`kgctl connect <peer-name> [flags]`|Connect the host to the cluster, setting up the required interfaces, routes, and keys.|
|[graph](#graph)|`kgctl graph [flags]`|Produce a graph in GraphViz format representing the topology of the cluster.| |[graph](#graph)|`kgctl graph [flags]`|Produce a graph in GraphViz format representing the topology of the cluster.|
|[showconf](#showconf)|`kgctl showconf ( node \| peer ) NAME [flags]`|Show the WireGuard configuration for a node or peer in the mesh.| |[showconf](#showconf)|`kgctl showconf ( node \| peer ) <name> [flags]`|Show the WireGuard configuration for a node or peer in the mesh.|
### connect
The `connect` command configures the local host as a WireGuard Peer of the cluster and applies all of the necessary networking configuration to connect to the cluster.
As long as the process is running, it will watch the cluster for changes and automatically manage the configuration for new or updated Peers and Nodes.
If the given Peer name does not exist in the cluster, the command will register a new Peer and generate the necessary WireGuard keys.
When the command exits, all of the configuration, including newly registered Peers, is cleaned up.
Example:
```shell
PEER_NAME=laptop
SERVICECIDR=10.43.0.0/16
kgctl connect $PEER_NAME --allowed-ips $SERVICECIDR
```
The local host is now connected to the cluster and all IPs from the cluster and any registered Peers are fully routable.
When combined with the `--clean-up false` flag, the configuration produced by the command is persistent and will remain in effect even after the process is stopped.
With the service CIDR of the cluster routable from the local host, Kubernetes DNS names can now be resolved by the cluster DNS provider.
For example, the following snippet could be used to resolve the clusterIP of the Kubernetes API:
```shell
dig @$(kubectl get service -n kube-system kube-dns -o=jsonpath='{.spec.clusterIP}') kubernetes.default.svc.cluster.local +short
# > 10.43.0.1
```
For convenience, the cluster DNS provider's IP address can be configured as the local host's DNS server, making Kubernetes DNS names easily resolvable.
For example, if using `systemd-resolved`, the following snippet could be used:
```shell
systemd-resolve --interface kilo0 --set-dns $(kubectl get service -n kube-system kube-dns -o=jsonpath='{.spec.clusterIP}') --set-domain cluster.local
# Now all lookups for DNS names ending in `.cluster.local` will be routed over the `kilo0` interface to the cluster DNS provider.
dig kubernetes.default.svc.cluster.local +short
# > 10.43.0.1
```
> **Note**: The `connect` command is currently only supported on Linux.
> **Note**: The `connect` command requires the `CAP_NET_ADMIN` capability in order to configure the host's networking stack; unprivileged users will need to use `sudo` or similar tools.
### graph ### graph

17
e2e/kgctl.sh Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
# shellcheck disable=SC1091
. lib.sh
setup_suite() {
# shellcheck disable=SC2016
block_until_ready_by_name kube-system kilo-userspace
_kubectl wait pod -l app.kubernetes.io/name=adjacency --for=condition=Ready --timeout 3m
}
test_connect() {
local PEER=test
local ALLOWED_IP=10.5.0.1/32
docker run -d --name="$PEER" --rm --network=host --cap-add=NET_ADMIN -v "$KGCTL_BINARY":/kgctl -v "$PWD/$KUBECONFIG":/kubeconfig --entrypoint=/kgctl alpine --kubeconfig /kubeconfig connect "$PEER" --allowed-ip "$ALLOWED_IP"
assert "retry 10 5 '' check_ping --local" "should be able to ping Pods from host"
docker stop "$PEER"
}

View File

@ -235,6 +235,74 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
return routes, rules return routes, rules
} }
// PeerRoutes generates a slice of routes and rules for a given peer in the Topology.
func (t *Topology) PeerRoutes(name string, kiloIface int, additionalAllowedIPs []net.IPNet) ([]*netlink.Route, []*netlink.Rule) {
var routes []*netlink.Route
var rules []*netlink.Rule
for _, segment := range t.segments {
for i := range segment.cidrs {
// Add routes to the Pod CIDRs of nodes in other segments.
routes = append(routes, &netlink.Route{
Dst: segment.cidrs[i],
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.wireGuardIP,
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
for i := range segment.privateIPs {
// Add routes to the private IPs of nodes in other segments.
routes = append(routes, &netlink.Route{
Dst: oneAddressCIDR(segment.privateIPs[i]),
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.wireGuardIP,
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
// Add routes for the allowed location IPs of all segments.
for i := range segment.allowedLocationIPs {
routes = append(routes, &netlink.Route{
Dst: &segment.allowedLocationIPs[i],
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.wireGuardIP,
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
routes = append(routes, &netlink.Route{
Dst: oneAddressCIDR(segment.wireGuardIP),
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
// Add routes for the allowed IPs of peers.
for _, peer := range t.peers {
// Don't add routes to ourselves.
if peer.Name == name {
continue
}
for i := range peer.AllowedIPs {
routes = append(routes, &netlink.Route{
Dst: &peer.AllowedIPs[i],
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
}
for i := range additionalAllowedIPs {
routes = append(routes, &netlink.Route{
Dst: &additionalAllowedIPs[i],
Flags: int(netlink.FLAG_ONLINK),
Gw: t.segments[0].wireGuardIP,
LinkIndex: kiloIface,
Protocol: unix.RTPROT_STATIC,
})
}
return routes, rules
}
func encapsulateRoute(route *netlink.Route, encapsulate encapsulation.Strategy, subnet *net.IPNet, tunlIface int) *netlink.Route { func encapsulateRoute(route *netlink.Route, encapsulate encapsulation.Strategy, subnet *net.IPNet, tunlIface int) *netlink.Route {
if encapsulate == encapsulation.Always || (encapsulate == encapsulation.CrossSubnet && !subnet.Contains(route.Gw)) { if encapsulate == encapsulation.Always || (encapsulate == encapsulation.CrossSubnet && !subnet.Contains(route.Gw)) {
route.LinkIndex = tunlIface route.LinkIndex = tunlIface

View File

@ -65,6 +65,7 @@ type Topology struct {
logger log.Logger logger log.Logger
} }
// segment represents one logical unit in the topology that is united by one common WireGuard IP.
type segment struct { type segment struct {
allowedIPs []net.IPNet allowedIPs []net.IPNet
endpoint *wireguard.Endpoint endpoint *wireguard.Endpoint
@ -376,7 +377,7 @@ func (t *Topology) PeerConf(name string) *wireguard.Conf {
PresharedKey: psk, PresharedKey: psk,
PublicKey: s.key, PublicKey: s.key,
}, },
Endpoint: s.endpoint, Endpoint: t.updateEndpoint(s.endpoint, s.key, &s.persistentKeepalive),
} }
c.Peers = append(c.Peers, peer) c.Peers = append(c.Peers, peer)
} }
@ -390,7 +391,7 @@ func (t *Topology) PeerConf(name string) *wireguard.Conf {
PersistentKeepaliveInterval: pka, PersistentKeepaliveInterval: pka,
PublicKey: t.peers[i].PublicKey, PublicKey: t.peers[i].PublicKey,
}, },
Endpoint: t.peers[i].Endpoint, Endpoint: t.updateEndpoint(t.peers[i].Endpoint, t.peers[i].PublicKey, t.peers[i].PersistentKeepaliveInterval),
} }
c.Peers = append(c.Peers, peer) c.Peers = append(c.Peers, peer)
} }