kilo/pkg/calico/calico

180 lines
4.5 KiB
Plaintext

// Copyright 2019 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package calico
import (
"errors"
"net"
"time"
"k8s.io/apimachinery/pkg/labels"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/squat/kilo/pkg/ipset"
"github.com/squat/kilo/pkg/mesh"
)
type Compatibility interface {
Apply(*mesh.Topology, mesh.Encapsulate) error
Backend(mesh.Backend) mesh.Backend
CleanUp() error
Run(stop <-chan struct{}) (<-chan error, error)
}
// calico is a Calico compatibility layer.
type calico struct {
client kubernetes.Interface
errors chan error
ipset *ipset.Set
}
// New generates a new ipset.
func New(c kubernetes.Interface) Compatibility {
return &calico{
client: c,
errors: make(chan error),
// This is a patch until Calico supports
// other hosts adding IPIP iptables rules.
ipset: ipset.New("cali40all-hosts-net"),
}
}
// Run implements the mesh.Compatibility interface.
// It runs the ipset controller and forwards errors along.
func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) {
return c.ipset.Run(stop)
}
// CleanUp stops the compatibility layer's controllers.
func (c *calico) CleanUp() error {
return c.ipset.CleanUp()
}
type backend struct {
backend mesh.Backend
client kubernetes.Interface
events chan *mesh.NodeEvent
informer cache.SharedIndexInformer
lister v1listers.NodeLister
}
func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error {
if encapsulate == mesh.NeverEncapsulate {
return nil
}
var peers []net.IP
for _, s := range t.segments {
if s.location == location {
peers = s.privateIPs
break
}
}
return c.ipset.Set(peers)
}
func (c *calico) Backend(b mesh.Backend) mesh.Backend {
ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil)
return &backend{
backend: b,
events: make(chan *mesh.NodeEvent),
informer: ni,
lister: v1listers.NewNodeLister(ni.GetIndexer()),
}
}
// Nodes implements the mesh.Backend interface.
func (b *backend) Nodes() mesh.NodeBackend {
return b
}
// Peers implements the mesh.Backend interface.
func (b *backend) Peers() mesh.PeerBackend {
// The Calico compatibility backend only wraps the node backend.
return b.backend.Peers()
}
// CleanUp removes configuration applied to the backend.
func (b *backend) CleanUp(name string) error {
return b.backend.Nodes().CleanUp(name)
}
// Get gets a single Node by name.
func (b *backend) Get(name string) (*mesh.Node, error) {
n, err := b.lister.Get(name)
if err != nil {
return nil, err
}
m, err := b.backend.Nodes().Get(name)
if err != nil {
return nil, err
}
return translateNode(n, m), nil
}
// Init initializes the backend; for this backend that means
// syncing the informer cache and the wrapped backend.
func (b *backend) Init(stop <-chan struct{}) error {
if err := b.backend.Nodes().Init(stop); err != nil {
return err
}
go b.informer.Run(stop)
if ok := cache.WaitForCacheSync(stop, func() bool {
return b.informer.HasSynced()
}); !ok {
return errors.New("failed to sync node cache")
}
go func() {
w := b.backend.Nodes().Watch()
var ne *mesh.NodeEvent
for {
select {
case ne = <-w:
b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)}
case <-stop:
return
}
}
}()
return nil
}
// List gets all the Nodes in the cluster.
func (b *backend) List() ([]*mesh.Node, error) {
ns, err := b.lister.List(labels.Everything())
if err != nil {
return nil, err
}
nodes := make([]*mesh.Node, len(ns))
for i := range ns {
nodes[i] = translateNode(ns[i])
}
return nodes, nil
}
// Set sets the fields of a node.
func (b *backend) Set(name string, node *mesh.Node) error {
// The Calico compatibility backend is read-only.
// Proxy all writes to the underlying backend.
return b.backend.Nodes().Set(name, node)
}
// Watch returns a chan of node events.
func (b *backend) Watch() <-chan *mesh.NodeEvent {
return b.events
}