Merge pull request #97 from castai/add-custom-topology-label
feat: add support for custom topology label
This commit is contained in:
commit
a789003a58
@ -92,6 +92,7 @@ func Main() error {
|
|||||||
local := flag.Bool("local", true, "Should Kilo manage routes within a location?")
|
local := flag.Bool("local", true, "Should Kilo manage routes within a location?")
|
||||||
logLevel := flag.String("log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
|
logLevel := flag.String("log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
|
||||||
master := flag.String("master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
|
master := flag.String("master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
|
||||||
|
topologyLabel := flag.String("topology-label", k8s.RegionLabelKey, "Kubernetes node label used to group nodes into logical locations.")
|
||||||
var port uint
|
var port uint
|
||||||
flag.UintVar(&port, "port", mesh.DefaultKiloPort, "The port over which WireGuard peers should communicate.")
|
flag.UintVar(&port, "port", mesh.DefaultKiloPort, "The port over which WireGuard peers should communicate.")
|
||||||
subnet := flag.String("subnet", mesh.DefaultKiloSubnet.String(), "CIDR from which to allocate addresses for WireGuard interfaces.")
|
subnet := flag.String("subnet", mesh.DefaultKiloSubnet.String(), "CIDR from which to allocate addresses for WireGuard interfaces.")
|
||||||
@ -171,7 +172,7 @@ func Main() error {
|
|||||||
c := kubernetes.NewForConfigOrDie(config)
|
c := kubernetes.NewForConfigOrDie(config)
|
||||||
kc := kiloclient.NewForConfigOrDie(config)
|
kc := kiloclient.NewForConfigOrDie(config)
|
||||||
ec := apiextensions.NewForConfigOrDie(config)
|
ec := apiextensions.NewForConfigOrDie(config)
|
||||||
b = k8s.New(c, kc, ec)
|
b = k8s.New(c, kc, ec, *topologyLabel)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("backend %v unknown; possible values are: %s", *backend, availableBackends)
|
return fmt.Errorf("backend %v unknown; possible values are: %s", *backend, availableBackends)
|
||||||
}
|
}
|
||||||
|
@ -60,9 +60,10 @@ var (
|
|||||||
granularity mesh.Granularity
|
granularity mesh.Granularity
|
||||||
port uint32
|
port uint32
|
||||||
}
|
}
|
||||||
backend string
|
backend string
|
||||||
granularity string
|
granularity string
|
||||||
kubeconfig string
|
kubeconfig string
|
||||||
|
topologyLabel string
|
||||||
)
|
)
|
||||||
|
|
||||||
func runRoot(_ *cobra.Command, _ []string) error {
|
func runRoot(_ *cobra.Command, _ []string) error {
|
||||||
@ -83,7 +84,7 @@ func runRoot(_ *cobra.Command, _ []string) error {
|
|||||||
c := kubernetes.NewForConfigOrDie(config)
|
c := kubernetes.NewForConfigOrDie(config)
|
||||||
kc := kiloclient.NewForConfigOrDie(config)
|
kc := kiloclient.NewForConfigOrDie(config)
|
||||||
ec := apiextensions.NewForConfigOrDie(config)
|
ec := apiextensions.NewForConfigOrDie(config)
|
||||||
opts.backend = k8s.New(c, kc, ec)
|
opts.backend = k8s.New(c, kc, ec, topologyLabel)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("backend %v unknown; posible values are: %s", backend, availableBackends)
|
return fmt.Errorf("backend %v unknown; posible values are: %s", backend, availableBackends)
|
||||||
}
|
}
|
||||||
@ -110,6 +111,7 @@ func main() {
|
|||||||
cmd.PersistentFlags().StringVar(&granularity, "mesh-granularity", string(mesh.LogicalGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
|
cmd.PersistentFlags().StringVar(&granularity, "mesh-granularity", string(mesh.LogicalGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
|
||||||
cmd.PersistentFlags().StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "Path to kubeconfig.")
|
cmd.PersistentFlags().StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "Path to kubeconfig.")
|
||||||
cmd.PersistentFlags().Uint32Var(&opts.port, "port", mesh.DefaultKiloPort, "The WireGuard port over which the nodes communicate.")
|
cmd.PersistentFlags().Uint32Var(&opts.port, "port", mesh.DefaultKiloPort, "The WireGuard port over which the nodes communicate.")
|
||||||
|
cmd.PersistentFlags().StringVar(&topologyLabel, "topology-label", k8s.RegionLabelKey, "Kubernetes node label used to group nodes into logical locations.")
|
||||||
|
|
||||||
for _, subCmd := range []*cobra.Command{
|
for _, subCmd := range []*cobra.Command{
|
||||||
graph(),
|
graph(),
|
||||||
|
@ -10,7 +10,7 @@ This allows the encrypted network to serve several purposes, for example:
|
|||||||
## Logical Groups
|
## Logical Groups
|
||||||
|
|
||||||
By default, Kilo creates a mesh between the different logical locations in the cluster, e.g. data-centers, cloud providers, etc.
|
By default, Kilo creates a mesh between the different logical locations in the cluster, e.g. data-centers, cloud providers, etc.
|
||||||
Kilo will try to infer the location of the node using the [topology.kubernetes.io/region](https://kubernetes.io/docs/reference/kubernetes-api/labels-annotations-taints/#topologykubernetesioregion) node label.
|
Kilo will try to infer the location of the node using the [topology.kubernetes.io/region](https://kubernetes.io/docs/reference/kubernetes-api/labels-annotations-taints/#topologykubernetesioregion) node label. Additionally, Kilo supports custom topology label using command line flag `--topology-label=...`.
|
||||||
If this label is not set, then the [kilo.squat.ai/location](./annotations.md#location) node annotation can be used.
|
If this label is not set, then the [kilo.squat.ai/location](./annotations.md#location) node annotation can be used.
|
||||||
|
|
||||||
For example, in order to join nodes in Google Cloud and AWS into a single cluster, an administrator could use the following snippet could to annotate all nodes with `GCP` in the name:
|
For example, in order to join nodes in Google Cloud and AWS into a single cluster, an administrator could use the following snippet could to annotate all nodes with `GCP` in the name:
|
||||||
|
@ -59,8 +59,8 @@ const (
|
|||||||
locationAnnotationKey = "kilo.squat.ai/location"
|
locationAnnotationKey = "kilo.squat.ai/location"
|
||||||
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
|
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
|
||||||
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
|
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
|
||||||
|
// RegionLabelKey is the key for the well-known Kubernetes topology region label.
|
||||||
regionLabelKey = "topology.kubernetes.io/region"
|
RegionLabelKey = "topology.kubernetes.io/region"
|
||||||
jsonPatchSlash = "~1"
|
jsonPatchSlash = "~1"
|
||||||
jsonRemovePatch = `{"op": "remove", "path": "%s"}`
|
jsonRemovePatch = `{"op": "remove", "path": "%s"}`
|
||||||
)
|
)
|
||||||
@ -81,10 +81,11 @@ func (b *backend) Peers() mesh.PeerBackend {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type nodeBackend struct {
|
type nodeBackend struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
events chan *mesh.NodeEvent
|
events chan *mesh.NodeEvent
|
||||||
informer cache.SharedIndexInformer
|
informer cache.SharedIndexInformer
|
||||||
lister v1listers.NodeLister
|
lister v1listers.NodeLister
|
||||||
|
topologyLabel string
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerBackend struct {
|
type peerBackend struct {
|
||||||
@ -96,16 +97,17 @@ type peerBackend struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new instance of a mesh.Backend.
|
// New creates a new instance of a mesh.Backend.
|
||||||
func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface) mesh.Backend {
|
func New(c kubernetes.Interface, kc kiloclient.Interface, ec apiextensions.Interface, topologyLabel string) mesh.Backend {
|
||||||
ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil)
|
ni := v1informers.NewNodeInformer(c, 5*time.Minute, nil)
|
||||||
pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil)
|
pi := v1alpha1informers.NewPeerInformer(kc, 5*time.Minute, nil)
|
||||||
|
|
||||||
return &backend{
|
return &backend{
|
||||||
&nodeBackend{
|
&nodeBackend{
|
||||||
client: c,
|
client: c,
|
||||||
events: make(chan *mesh.NodeEvent),
|
events: make(chan *mesh.NodeEvent),
|
||||||
informer: ni,
|
informer: ni,
|
||||||
lister: v1listers.NewNodeLister(ni.GetIndexer()),
|
lister: v1listers.NewNodeLister(ni.GetIndexer()),
|
||||||
|
topologyLabel: topologyLabel,
|
||||||
},
|
},
|
||||||
&peerBackend{
|
&peerBackend{
|
||||||
client: kc,
|
client: kc,
|
||||||
@ -138,7 +140,7 @@ func (nb *nodeBackend) Get(name string) (*mesh.Node, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return translateNode(n), nil
|
return translateNode(n, nb.topologyLabel), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes the backend; for this backend that means
|
// Init initializes the backend; for this backend that means
|
||||||
@ -158,7 +160,7 @@ func (nb *nodeBackend) Init(stop <-chan struct{}) error {
|
|||||||
// Failed to decode Node; ignoring...
|
// Failed to decode Node; ignoring...
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n)}
|
nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n, nb.topologyLabel)}
|
||||||
},
|
},
|
||||||
UpdateFunc: func(old, obj interface{}) {
|
UpdateFunc: func(old, obj interface{}) {
|
||||||
n, ok := obj.(*v1.Node)
|
n, ok := obj.(*v1.Node)
|
||||||
@ -171,7 +173,7 @@ func (nb *nodeBackend) Init(stop <-chan struct{}) error {
|
|||||||
// Failed to decode Node; ignoring...
|
// Failed to decode Node; ignoring...
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n), Old: translateNode(o)}
|
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n, nb.topologyLabel), Old: translateNode(o, nb.topologyLabel)}
|
||||||
},
|
},
|
||||||
DeleteFunc: func(obj interface{}) {
|
DeleteFunc: func(obj interface{}) {
|
||||||
n, ok := obj.(*v1.Node)
|
n, ok := obj.(*v1.Node)
|
||||||
@ -179,7 +181,7 @@ func (nb *nodeBackend) Init(stop <-chan struct{}) error {
|
|||||||
// Failed to decode Node; ignoring...
|
// Failed to decode Node; ignoring...
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nb.events <- &mesh.NodeEvent{Type: mesh.DeleteEvent, Node: translateNode(n)}
|
nb.events <- &mesh.NodeEvent{Type: mesh.DeleteEvent, Node: translateNode(n, nb.topologyLabel)}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -194,7 +196,7 @@ func (nb *nodeBackend) List() ([]*mesh.Node, error) {
|
|||||||
}
|
}
|
||||||
nodes := make([]*mesh.Node, len(ns))
|
nodes := make([]*mesh.Node, len(ns))
|
||||||
for i := range ns {
|
for i := range ns {
|
||||||
nodes[i] = translateNode(ns[i])
|
nodes[i] = translateNode(ns[i], nb.topologyLabel)
|
||||||
}
|
}
|
||||||
return nodes, nil
|
return nodes, nil
|
||||||
}
|
}
|
||||||
@ -239,7 +241,7 @@ func (nb *nodeBackend) Watch() <-chan *mesh.NodeEvent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// translateNode translates a Kubernetes Node to a mesh.Node.
|
// translateNode translates a Kubernetes Node to a mesh.Node.
|
||||||
func translateNode(node *v1.Node) *mesh.Node {
|
func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -253,7 +255,7 @@ func translateNode(node *v1.Node) *mesh.Node {
|
|||||||
// Allow the region to be overridden by an explicit location.
|
// Allow the region to be overridden by an explicit location.
|
||||||
location, ok := node.ObjectMeta.Annotations[locationAnnotationKey]
|
location, ok := node.ObjectMeta.Annotations[locationAnnotationKey]
|
||||||
if !ok {
|
if !ok {
|
||||||
location = node.ObjectMeta.Labels[regionLabelKey]
|
location = node.ObjectMeta.Labels[topologyLabel]
|
||||||
}
|
}
|
||||||
// Allow the endpoint to be overridden.
|
// Allow the endpoint to be overridden.
|
||||||
endpoint := parseEndpoint(node.ObjectMeta.Annotations[forceEndpointAnnotationKey])
|
endpoint := parseEndpoint(node.ObjectMeta.Annotations[forceEndpointAnnotationKey])
|
||||||
|
@ -83,7 +83,7 @@ func TestTranslateNode(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "region",
|
name: "region",
|
||||||
labels: map[string]string{
|
labels: map[string]string{
|
||||||
regionLabelKey: "a",
|
RegionLabelKey: "a",
|
||||||
},
|
},
|
||||||
out: &mesh.Node{
|
out: &mesh.Node{
|
||||||
Location: "a",
|
Location: "a",
|
||||||
@ -95,7 +95,7 @@ func TestTranslateNode(t *testing.T) {
|
|||||||
locationAnnotationKey: "b",
|
locationAnnotationKey: "b",
|
||||||
},
|
},
|
||||||
labels: map[string]string{
|
labels: map[string]string{
|
||||||
regionLabelKey: "a",
|
RegionLabelKey: "a",
|
||||||
},
|
},
|
||||||
out: &mesh.Node{
|
out: &mesh.Node{
|
||||||
Location: "b",
|
Location: "b",
|
||||||
@ -172,7 +172,7 @@ func TestTranslateNode(t *testing.T) {
|
|||||||
wireGuardIPAnnotationKey: "10.4.0.1/16",
|
wireGuardIPAnnotationKey: "10.4.0.1/16",
|
||||||
},
|
},
|
||||||
labels: map[string]string{
|
labels: map[string]string{
|
||||||
regionLabelKey: "a",
|
RegionLabelKey: "a",
|
||||||
},
|
},
|
||||||
out: &mesh.Node{
|
out: &mesh.Node{
|
||||||
Endpoint: &wireguard.Endpoint{DNSOrIP: wireguard.DNSOrIP{IP: net.ParseIP("10.0.0.2")}, Port: 51821},
|
Endpoint: &wireguard.Endpoint{DNSOrIP: wireguard.DNSOrIP{IP: net.ParseIP("10.0.0.2")}, Port: 51821},
|
||||||
@ -192,7 +192,7 @@ func TestTranslateNode(t *testing.T) {
|
|||||||
n.ObjectMeta.Annotations = tc.annotations
|
n.ObjectMeta.Annotations = tc.annotations
|
||||||
n.ObjectMeta.Labels = tc.labels
|
n.ObjectMeta.Labels = tc.labels
|
||||||
n.Spec.PodCIDR = tc.subnet
|
n.Spec.PodCIDR = tc.subnet
|
||||||
node := translateNode(n)
|
node := translateNode(n, RegionLabelKey)
|
||||||
if diff := pretty.Compare(node, tc.out); diff != "" {
|
if diff := pretty.Compare(node, tc.out); diff != "" {
|
||||||
t.Errorf("test case %q: got diff: %v", tc.name, diff)
|
t.Errorf("test case %q: got diff: %v", tc.name, diff)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user