kilo/pkg/mesh/topology.go

425 lines
14 KiB
Go
Raw Permalink Normal View History

2019-01-18 01:50:10 +00:00
// Copyright 2019 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesh
import (
"errors"
"net"
"sort"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/kilo-io/kilo/pkg/wireguard"
2019-01-18 01:50:10 +00:00
)
const (
logicalLocationPrefix = "location:"
nodeLocationPrefix = "node:"
)
2019-01-18 01:50:10 +00:00
// Topology represents the logical structure of the overlay network.
type Topology struct {
// key is the private key of the node creating the topology.
key []byte
port uint32
2019-01-18 01:50:10 +00:00
// Location is the logical location of the local host.
location string
segments []*segment
peers []*Peer
2019-01-18 01:50:10 +00:00
// hostname is the hostname of the local host.
hostname string
// leader represents whether or not the local host
// is the segment leader.
leader bool
// persistentKeepalive is the interval in seconds of the emission
// of keepalive packets by the local node to its peers.
persistentKeepalive int
// privateIP is the private IP address of the local node.
privateIP *net.IPNet
// subnet is the Pod subnet of the local node.
2019-01-18 01:50:10 +00:00
subnet *net.IPNet
// wireGuardCIDR is the allocated CIDR of the WireGuard
// interface of the local node within the Kilo subnet.
// If the local node is not the leader of a location, then
// the IP is the 0th address in the subnet, i.e. the CIDR
// is equal to the Kilo subnet.
2019-01-18 01:50:10 +00:00
wireGuardCIDR *net.IPNet
// discoveredEndpoints is the updated map of valid discovered Endpoints
discoveredEndpoints map[string]*wireguard.Endpoint
logger log.Logger
2019-01-18 01:50:10 +00:00
}
type segment struct {
allowedIPs []*net.IPNet
endpoint *wireguard.Endpoint
key []byte
persistentKeepalive int
2019-01-18 01:50:10 +00:00
// Location is the logical location of this segment.
location string
2019-01-18 01:50:10 +00:00
// cidrs is a slice of subnets of all peers in the segment.
cidrs []*net.IPNet
// hostnames is a slice of the hostnames of the peers in the segment.
hostnames []string
// leader is the index of the leader of the segment.
leader int
// privateIPs is a slice of private IPs of all peers in the segment.
privateIPs []net.IP
// wireGuardIP is the allocated IP address of the WireGuard
// interface on the leader of the segment.
wireGuardIP net.IP
// allowedLocationIPs are not part of the cluster and are not peers.
// They are directly routable from nodes within the segment.
// A classic example is a printer that ought to be routable from other locations.
allowedLocationIPs []*net.IPNet
2019-01-18 01:50:10 +00:00
}
// NewTopology creates a new Topology struct from a given set of nodes and peers.
func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port uint32, key []byte, subnet *net.IPNet, persistentKeepalive int, logger log.Logger) (*Topology, error) {
if logger == nil {
logger = log.NewNopLogger()
}
2019-01-18 01:50:10 +00:00
topoMap := make(map[string][]*Node)
for _, node := range nodes {
var location string
switch granularity {
case LogicalGranularity:
location = logicalLocationPrefix + node.Location
// Put node in a different location, if no private
// IP was found.
if node.InternalIP == nil {
location = nodeLocationPrefix + node.Name
}
case FullGranularity:
location = nodeLocationPrefix + node.Name
2019-01-18 01:50:10 +00:00
}
topoMap[location] = append(topoMap[location], node)
}
var localLocation string
switch granularity {
case LogicalGranularity:
localLocation = logicalLocationPrefix + nodes[hostname].Location
if nodes[hostname].InternalIP == nil {
localLocation = nodeLocationPrefix + hostname
}
case FullGranularity:
localLocation = nodeLocationPrefix + hostname
2019-01-18 01:50:10 +00:00
}
t := Topology{key: key, port: port, hostname: hostname, location: localLocation, persistentKeepalive: persistentKeepalive, privateIP: nodes[hostname].InternalIP, subnet: nodes[hostname].Subnet, wireGuardCIDR: subnet, discoveredEndpoints: make(map[string]*wireguard.Endpoint), logger: logger}
2019-01-18 01:50:10 +00:00
for location := range topoMap {
// Sort the location so the result is stable.
sort.Slice(topoMap[location], func(i, j int) bool {
return topoMap[location][i].Name < topoMap[location][j].Name
})
leader := findLeader(topoMap[location])
if location == localLocation && topoMap[location][leader].Name == hostname {
t.leader = true
}
var allowedIPs []*net.IPNet
allowedLocationIPsMap := make(map[string]struct{})
var allowedLocationIPs []*net.IPNet
2019-01-18 01:50:10 +00:00
var cidrs []*net.IPNet
var hostnames []string
var privateIPs []net.IP
for _, node := range topoMap[location] {
// Allowed IPs should include:
// - the node's allocated subnet
// - the node's WireGuard IP
// - the node's internal IP
// - IPs that were specified by the allowed-location-ips annotation
allowedIPs = append(allowedIPs, node.Subnet)
for _, ip := range node.AllowedLocationIPs {
if _, ok := allowedLocationIPsMap[ip.String()]; !ok {
allowedLocationIPs = append(allowedLocationIPs, ip)
allowedLocationIPsMap[ip.String()] = struct{}{}
}
}
if node.InternalIP != nil {
allowedIPs = append(allowedIPs, oneAddressCIDR(node.InternalIP.IP))
privateIPs = append(privateIPs, node.InternalIP.IP)
}
2019-01-18 01:50:10 +00:00
cidrs = append(cidrs, node.Subnet)
hostnames = append(hostnames, node.Name)
}
// The sorting has no function, but makes testing easier.
sort.Slice(allowedLocationIPs, func(i, j int) bool {
return allowedLocationIPs[i].String() < allowedLocationIPs[j].String()
})
t.segments = append(t.segments, &segment{
allowedIPs: allowedIPs,
endpoint: topoMap[location][leader].Endpoint,
key: topoMap[location][leader].Key,
persistentKeepalive: topoMap[location][leader].PersistentKeepalive,
location: location,
cidrs: cidrs,
hostnames: hostnames,
leader: leader,
privateIPs: privateIPs,
allowedLocationIPs: allowedLocationIPs,
2019-01-18 01:50:10 +00:00
})
}
// Sort the Topology segments so the result is stable.
sort.Slice(t.segments, func(i, j int) bool {
return t.segments[i].location < t.segments[j].location
})
for _, peer := range peers {
t.peers = append(t.peers, peer)
}
// Sort the Topology peers so the result is stable.
sort.Slice(t.peers, func(i, j int) bool {
return t.peers[i].Name < t.peers[j].Name
2019-01-18 01:50:10 +00:00
})
// We need to defensively deduplicate peer allowed IPs. If two peers claim the same IP,
// the WireGuard configuration could flap, causing the interface to churn.
t.peers = deduplicatePeerIPs(t.peers)
// Copy the host node DiscoveredEndpoints in the topology as a starting point.
for key := range nodes[hostname].DiscoveredEndpoints {
t.discoveredEndpoints[key] = nodes[hostname].DiscoveredEndpoints[key]
}
2019-01-18 01:50:10 +00:00
// Allocate IPs to the segment leaders in a stable, coordination-free manner.
a := newAllocator(*subnet)
for _, segment := range t.segments {
2019-01-18 01:50:10 +00:00
ipNet := a.next()
if ipNet == nil {
return nil, errors.New("failed to allocate an IP address; ran out of IP addresses")
}
segment.wireGuardIP = ipNet.IP
segment.allowedIPs = append(segment.allowedIPs, oneAddressCIDR(ipNet.IP))
if t.leader && segment.location == t.location {
t.wireGuardCIDR = &net.IPNet{IP: ipNet.IP, Mask: subnet.Mask}
2019-01-18 01:50:10 +00:00
}
// Now that the topology is ordered, update the discoveredEndpoints map
// add new ones by going through the ordered topology: segments, nodes
for _, node := range topoMap[segment.location] {
for key := range node.DiscoveredEndpoints {
if _, ok := t.discoveredEndpoints[key]; !ok {
t.discoveredEndpoints[key] = node.DiscoveredEndpoints[key]
}
}
}
// Check for intersecting IPs in allowed location IPs
segment.allowedLocationIPs = t.filterAllowedLocationIPs(segment.allowedLocationIPs, segment.location)
2019-01-18 01:50:10 +00:00
}
return &t, nil
}
func intersect(n1, n2 *net.IPNet) bool {
return n1.Contains(n2.IP) || n2.Contains(n1.IP)
}
func (t *Topology) filterAllowedLocationIPs(ips []*net.IPNet, location string) (ret []*net.IPNet) {
CheckIPs:
for _, ip := range ips {
for _, s := range t.segments {
// Check if allowed location IPs are also allowed in other locations.
if location != s.location {
for _, i := range s.allowedLocationIPs {
if intersect(ip, i) {
level.Warn(t.logger).Log("msg", "overlapping allowed location IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
continue CheckIPs
}
}
}
// Check if allowed location IPs intersect with the allowed IPs.
for _, i := range s.allowedIPs {
if intersect(ip, i) {
level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with allowed IPnets", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
continue CheckIPs
}
}
// Check if allowed location IPs intersect with the private IPs of the segment.
for _, i := range s.privateIPs {
if ip.Contains(i) {
level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with privateIP", "IP", ip.String(), "IP2", i.String(), "segment-location", s.location)
continue CheckIPs
}
}
}
// Check if allowed location IPs intersect with allowed IPs of peers.
for _, p := range t.peers {
for _, i := range p.AllowedIPs {
if intersect(ip, i) {
level.Warn(t.logger).Log("msg", "overlapping allowed location IPnet with peer IPnet", "IP", ip.String(), "IP2", i.String(), "peer", p.Name)
continue CheckIPs
}
}
}
ret = append(ret, ip)
}
return
}
func (t *Topology) updateEndpoint(endpoint *wireguard.Endpoint, key []byte, persistentKeepalive int) *wireguard.Endpoint {
// Do not update non-nat peers
if persistentKeepalive == 0 {
return endpoint
}
e, ok := t.discoveredEndpoints[string(key)]
if ok {
return e
}
return endpoint
}
2019-01-18 01:50:10 +00:00
// Conf generates a WireGuard configuration file for a given Topology.
func (t *Topology) Conf() *wireguard.Conf {
c := &wireguard.Conf{
Interface: &wireguard.Interface{
PrivateKey: t.key,
ListenPort: t.port,
},
}
for _, s := range t.segments {
if s.location == t.location {
continue
}
peer := &wireguard.Peer{
AllowedIPs: append(s.allowedIPs, s.allowedLocationIPs...),
Endpoint: t.updateEndpoint(s.endpoint, s.key, s.persistentKeepalive),
PersistentKeepalive: t.persistentKeepalive,
PublicKey: s.key,
}
c.Peers = append(c.Peers, peer)
}
for _, p := range t.peers {
peer := &wireguard.Peer{
AllowedIPs: p.AllowedIPs,
Endpoint: t.updateEndpoint(p.Endpoint, p.PublicKey, p.PersistentKeepalive),
PersistentKeepalive: t.persistentKeepalive,
PresharedKey: p.PresharedKey,
PublicKey: p.PublicKey,
}
c.Peers = append(c.Peers, peer)
}
return c
}
// AsPeer generates the WireGuard peer configuration for the local location of the given Topology.
// This configuration can be used to configure this location as a peer of another WireGuard interface.
func (t *Topology) AsPeer() *wireguard.Peer {
for _, s := range t.segments {
if s.location != t.location {
continue
}
return &wireguard.Peer{
AllowedIPs: s.allowedIPs,
Endpoint: s.endpoint,
PublicKey: s.key,
}
}
return nil
}
// PeerConf generates a WireGuard configuration file for a given peer in a Topology.
func (t *Topology) PeerConf(name string) *wireguard.Conf {
var pka int
var psk []byte
for i := range t.peers {
if t.peers[i].Name == name {
pka = t.peers[i].PersistentKeepalive
psk = t.peers[i].PresharedKey
break
}
}
c := &wireguard.Conf{}
for _, s := range t.segments {
peer := &wireguard.Peer{
AllowedIPs: s.allowedIPs,
Endpoint: s.endpoint,
PersistentKeepalive: pka,
PresharedKey: psk,
PublicKey: s.key,
}
c.Peers = append(c.Peers, peer)
}
for i := range t.peers {
if t.peers[i].Name == name {
continue
}
peer := &wireguard.Peer{
AllowedIPs: t.peers[i].AllowedIPs,
PersistentKeepalive: pka,
PublicKey: t.peers[i].PublicKey,
Endpoint: t.peers[i].Endpoint,
}
c.Peers = append(c.Peers, peer)
2019-01-18 01:50:10 +00:00
}
return c
2019-01-18 01:50:10 +00:00
}
// oneAddressCIDR takes an IP address and returns a CIDR
// that contains only that address.
func oneAddressCIDR(ip net.IP) *net.IPNet {
return &net.IPNet{IP: ip, Mask: net.CIDRMask(len(ip)*8, len(ip)*8)}
}
// findLeader selects a leader for the nodes in a segment;
// it will select the first node that says it should lead
// or the first node in the segment if none have volunteered,
// always preferring those with a public external IP address,
func findLeader(nodes []*Node) int {
var leaders, public []int
for i := range nodes {
if nodes[i].Leader {
if isPublic(nodes[i].Endpoint.IP) {
2019-01-18 01:50:10 +00:00
return i
}
leaders = append(leaders, i)
2019-01-18 01:50:10 +00:00
}
if isPublic(nodes[i].Endpoint.IP) {
2019-01-18 01:50:10 +00:00
public = append(public, i)
}
}
if len(leaders) != 0 {
return leaders[0]
}
if len(public) != 0 {
return public[0]
}
return 0
}
func deduplicatePeerIPs(peers []*Peer) []*Peer {
ps := make([]*Peer, len(peers))
ips := make(map[string]struct{})
for i, peer := range peers {
p := Peer{
Name: peer.Name,
Peer: wireguard.Peer{
Endpoint: peer.Endpoint,
PersistentKeepalive: peer.PersistentKeepalive,
PresharedKey: peer.PresharedKey,
PublicKey: peer.PublicKey,
},
}
for _, ip := range peer.AllowedIPs {
if _, ok := ips[ip.String()]; ok {
continue
}
p.AllowedIPs = append(p.AllowedIPs, ip)
ips[ip.String()] = struct{}{}
}
ps[i] = &p
}
return ps
}