*: add complete CNI support

This commit enables Kilo to work as an independent networking provider.
This is done by leveraging CNI. Kilo brings the necessary CNI plugins to
operate and takes care of all networking.

Add-on compatibility for Calico, Flannel, etc, will be re-introduced
shortly.
This commit is contained in:
Lucas Servén Marín
2019-05-07 01:49:55 +02:00
parent 72991949ac
commit b3a3c37e0a
12 changed files with 439 additions and 77 deletions

View File

@@ -29,7 +29,6 @@ import (
"github.com/vishvananda/netlink"
"github.com/squat/kilo/pkg/iproute"
"github.com/squat/kilo/pkg/ipset"
"github.com/squat/kilo/pkg/iptables"
"github.com/squat/kilo/pkg/route"
"github.com/squat/kilo/pkg/wireguard"
@@ -46,6 +45,8 @@ const (
ConfPath = KiloPath + "/conf"
// DefaultKiloPort is the default UDP port Kilo uses.
DefaultKiloPort = 51820
// DefaultCNIPath is the default path to the CNI config file.
DefaultCNIPath = "/etc/cni/net.d/10-kilo.conflist"
)
// Granularity represents the abstraction level at which the network
@@ -171,12 +172,13 @@ type PeerBackend interface {
// Mesh is able to create Kilo network meshes.
type Mesh struct {
Backend
cni bool
cniPath string
encapsulate Encapsulate
externalIP *net.IPNet
granularity Granularity
hostname string
internalIP *net.IPNet
ipset *ipset.Set
ipTables *iptables.Controller
kiloIface int
key []byte
@@ -205,7 +207,7 @@ type Mesh struct {
}
// New returns a new Mesh instance.
func New(backend Backend, encapsulate Encapsulate, granularity Granularity, hostname string, port uint32, subnet *net.IPNet, local bool, logger log.Logger) (*Mesh, error) {
func New(backend Backend, encapsulate Encapsulate, granularity Granularity, hostname string, port uint32, subnet *net.IPNet, local, cni bool, cniPath string, logger log.Logger) (*Mesh, error) {
if err := os.MkdirAll(KiloPath, 0700); err != nil {
return nil, fmt.Errorf("failed to create directory to store configuration: %v", err)
}
@@ -259,28 +261,27 @@ func New(backend Backend, encapsulate Encapsulate, granularity Granularity, host
}
return &Mesh{
Backend: backend,
cni: cni,
cniPath: cniPath,
encapsulate: encapsulate,
externalIP: publicIP,
granularity: granularity,
hostname: hostname,
internalIP: privateIP,
// This is a patch until Calico supports
// other hosts adding IPIP iptables rules.
ipset: ipset.New("cali40all-hosts-net"),
ipTables: ipTables,
kiloIface: kiloIface,
nodes: make(map[string]*Node),
peers: make(map[string]*Peer),
port: port,
priv: private,
privIface: privIface,
pub: public,
pubIface: pubIface,
local: local,
stop: make(chan struct{}),
subnet: subnet,
table: route.NewTable(),
tunlIface: tunlIface,
ipTables: ipTables,
kiloIface: kiloIface,
nodes: make(map[string]*Node),
peers: make(map[string]*Peer),
port: port,
priv: private,
privIface: privIface,
pub: public,
pubIface: pubIface,
local: local,
stop: make(chan struct{}),
subnet: subnet,
table: route.NewTable(),
tunlIface: tunlIface,
errorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "kilo_errors_total",
Help: "Number of errors that occurred while administering the mesh.",
@@ -309,10 +310,6 @@ func (m *Mesh) Run() error {
if err := m.Peers().Init(m.stop); err != nil {
return fmt.Errorf("failed to initialize peer backend: %v", err)
}
ipsetErrors, err := m.ipset.Run(m.stop)
if err != nil {
return fmt.Errorf("failed to watch for ipset updates: %v", err)
}
ipTablesErrors, err := m.ipTables.Run(m.stop)
if err != nil {
return fmt.Errorf("failed to watch for IP tables updates: %v", err)
@@ -325,7 +322,6 @@ func (m *Mesh) Run() error {
for {
var err error
select {
case err = <-ipsetErrors:
case err = <-ipTablesErrors:
case err = <-routeErrors:
case <-m.stop:
@@ -351,6 +347,9 @@ func (m *Mesh) Run() error {
m.syncPeers(pe)
case <-t.C:
m.checkIn()
if m.cni {
m.updateCNIConfig()
}
m.syncEndpoints()
m.applyTopology()
t.Reset(resyncPeriod)
@@ -585,45 +584,40 @@ func (m *Mesh) applyTopology() {
m.errorCounter.WithLabelValues("apply").Inc()
return
}
var private *net.IPNet
// If we are not encapsulating packets to the local private network,
// then pass the private IP to add an exception to the NAT rule.
if m.encapsulate != AlwaysEncapsulate {
private = t.privateIP
}
rules := iptables.MasqueradeRules(private, m.nodes[m.hostname].Subnet, t.RemoteSubnets())
rules = append(rules, iptables.ForwardRules(m.subnet)...)
rules := iptables.ForwardRules(m.subnet)
var peerCIDRs []*net.IPNet
for _, p := range m.peers {
rules = append(rules, iptables.ForwardRules(p.AllowedIPs...)...)
peerCIDRs = append(peerCIDRs, p.AllowedIPs...)
}
rules = append(rules, iptables.MasqueradeRules(m.subnet, oneAddressCIDR(t.privateIP.IP), m.nodes[m.hostname].Subnet, t.RemoteSubnets(), peerCIDRs)...)
// If we are handling local routes, ensure the local
// tunnel has an IP address and IPIP traffic is allowed.
if m.encapsulate != NeverEncapsulate && m.local {
var cidrs []*net.IPNet
for _, s := range t.segments {
if s.location == m.nodes[m.hostname].Location {
for i := range s.privateIPs {
cidrs = append(cidrs, oneAddressCIDR(s.privateIPs[i]))
}
break
}
}
rules = append(rules, iptables.EncapsulateRules(cidrs)...)
// If we are handling local routes, ensure the local
// tunnel has an IP address.
if err := iproute.SetAddress(m.tunlIface, oneAddressCIDR(newAllocator(*m.nodes[m.hostname].Subnet).next().IP)); err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
return
}
}
if err := m.ipTables.Set(rules); err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
return
}
if m.encapsulate != NeverEncapsulate {
var peers []net.IP
for _, s := range t.segments {
if s.location == m.nodes[m.hostname].Location {
peers = s.privateIPs
break
}
}
if err := m.ipset.Set(peers); err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
return
}
// If we are handling local routes, ensure the local
// tunnel has an IP address.
if m.local {
if err := iproute.SetAddress(m.tunlIface, oneAddressCIDR(newAllocator(*m.nodes[m.hostname].Subnet).next().IP)); err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
return
}
}
}
if t.leader {
if err := iproute.SetAddress(m.kiloIface, t.wireGuardCIDR); err != nil {
level.Error(m.logger).Log("error", err)
@@ -720,10 +714,6 @@ func (m *Mesh) cleanUp() {
level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up peer backend: %v", err))
m.errorCounter.WithLabelValues("cleanUp").Inc()
}
if err := m.ipset.CleanUp(); err != nil {
level.Error(m.logger).Log("error", fmt.Sprintf("failed to clean up ipset: %v", err))
m.errorCounter.WithLabelValues("cleanUp").Inc()
}
}
func isSelf(hostname string, node *Node) bool {
@@ -781,6 +771,12 @@ func ipNetsEqual(a, b *net.IPNet) bool {
}
func subnetsEqual(a, b *net.IPNet) bool {
if a == nil && b == nil {
return true
}
if (a != nil) != (b != nil) {
return false
}
if a.Mask.String() != b.Mask.String() {
return false
}