180 lines
4.5 KiB
Plaintext
180 lines
4.5 KiB
Plaintext
|
// Copyright 2019 the Kilo authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package calico
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"net"
|
||
|
"time"
|
||
|
|
||
|
"k8s.io/apimachinery/pkg/labels"
|
||
|
v1informers "k8s.io/client-go/informers/core/v1"
|
||
|
"k8s.io/client-go/kubernetes"
|
||
|
v1listers "k8s.io/client-go/listers/core/v1"
|
||
|
"k8s.io/client-go/tools/cache"
|
||
|
|
||
|
"github.com/squat/kilo/pkg/ipset"
|
||
|
"github.com/squat/kilo/pkg/mesh"
|
||
|
)
|
||
|
|
||
|
type Compatibility interface {
|
||
|
Apply(*mesh.Topology, mesh.Encapsulate) error
|
||
|
Backend(mesh.Backend) mesh.Backend
|
||
|
CleanUp() error
|
||
|
Run(stop <-chan struct{}) (<-chan error, error)
|
||
|
}
|
||
|
|
||
|
// calico is a Calico compatibility layer.
|
||
|
type calico struct {
|
||
|
client kubernetes.Interface
|
||
|
errors chan error
|
||
|
ipset *ipset.Set
|
||
|
}
|
||
|
|
||
|
// New generates a new ipset.
|
||
|
func New(c kubernetes.Interface) Compatibility {
|
||
|
return &calico{
|
||
|
client: c,
|
||
|
errors: make(chan error),
|
||
|
// This is a patch until Calico supports
|
||
|
// other hosts adding IPIP iptables rules.
|
||
|
ipset: ipset.New("cali40all-hosts-net"),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Run implements the mesh.Compatibility interface.
|
||
|
// It runs the ipset controller and forwards errors along.
|
||
|
func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) {
|
||
|
return c.ipset.Run(stop)
|
||
|
}
|
||
|
|
||
|
// CleanUp stops the compatibility layer's controllers.
|
||
|
func (c *calico) CleanUp() error {
|
||
|
return c.ipset.CleanUp()
|
||
|
}
|
||
|
|
||
|
type backend struct {
|
||
|
backend mesh.Backend
|
||
|
client kubernetes.Interface
|
||
|
events chan *mesh.NodeEvent
|
||
|
informer cache.SharedIndexInformer
|
||
|
lister v1listers.NodeLister
|
||
|
}
|
||
|
|
||
|
func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error {
|
||
|
if encapsulate == mesh.NeverEncapsulate {
|
||
|
return nil
|
||
|
}
|
||
|
var peers []net.IP
|
||
|
for _, s := range t.segments {
|
||
|
if s.location == location {
|
||
|
peers = s.privateIPs
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
return c.ipset.Set(peers)
|
||
|
}
|
||
|
|
||
|
func (c *calico) Backend(b mesh.Backend) mesh.Backend {
|
||
|
ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil)
|
||
|
return &backend{
|
||
|
backend: b,
|
||
|
events: make(chan *mesh.NodeEvent),
|
||
|
informer: ni,
|
||
|
lister: v1listers.NewNodeLister(ni.GetIndexer()),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Nodes implements the mesh.Backend interface.
|
||
|
func (b *backend) Nodes() mesh.NodeBackend {
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
// Peers implements the mesh.Backend interface.
|
||
|
func (b *backend) Peers() mesh.PeerBackend {
|
||
|
// The Calico compatibility backend only wraps the node backend.
|
||
|
return b.backend.Peers()
|
||
|
}
|
||
|
|
||
|
// CleanUp removes configuration applied to the backend.
|
||
|
func (b *backend) CleanUp(name string) error {
|
||
|
return b.backend.Nodes().CleanUp(name)
|
||
|
}
|
||
|
|
||
|
// Get gets a single Node by name.
|
||
|
func (b *backend) Get(name string) (*mesh.Node, error) {
|
||
|
n, err := b.lister.Get(name)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
m, err := b.backend.Nodes().Get(name)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return translateNode(n, m), nil
|
||
|
}
|
||
|
|
||
|
// Init initializes the backend; for this backend that means
|
||
|
// syncing the informer cache and the wrapped backend.
|
||
|
func (b *backend) Init(stop <-chan struct{}) error {
|
||
|
if err := b.backend.Nodes().Init(stop); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
go b.informer.Run(stop)
|
||
|
if ok := cache.WaitForCacheSync(stop, func() bool {
|
||
|
return b.informer.HasSynced()
|
||
|
}); !ok {
|
||
|
return errors.New("failed to sync node cache")
|
||
|
}
|
||
|
go func() {
|
||
|
w := b.backend.Nodes().Watch()
|
||
|
var ne *mesh.NodeEvent
|
||
|
for {
|
||
|
select {
|
||
|
case ne = <-w:
|
||
|
b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)}
|
||
|
case <-stop:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// List gets all the Nodes in the cluster.
|
||
|
func (b *backend) List() ([]*mesh.Node, error) {
|
||
|
ns, err := b.lister.List(labels.Everything())
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
nodes := make([]*mesh.Node, len(ns))
|
||
|
for i := range ns {
|
||
|
nodes[i] = translateNode(ns[i])
|
||
|
}
|
||
|
return nodes, nil
|
||
|
}
|
||
|
|
||
|
// Set sets the fields of a node.
|
||
|
func (b *backend) Set(name string, node *mesh.Node) error {
|
||
|
// The Calico compatibility backend is read-only.
|
||
|
// Proxy all writes to the underlying backend.
|
||
|
return b.backend.Nodes().Set(name, node)
|
||
|
}
|
||
|
|
||
|
// Watch returns a chan of node events.
|
||
|
func (b *backend) Watch() <-chan *mesh.NodeEvent {
|
||
|
return b.events
|
||
|
}
|