// Copyright 2019 the Kilo authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package calico import ( "errors" "net" "time" "k8s.io/apimachinery/pkg/labels" v1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "github.com/squat/kilo/pkg/ipset" "github.com/squat/kilo/pkg/mesh" ) type Compatibility interface { Apply(*mesh.Topology, mesh.Encapsulate) error Backend(mesh.Backend) mesh.Backend CleanUp() error Run(stop <-chan struct{}) (<-chan error, error) } // calico is a Calico compatibility layer. type calico struct { client kubernetes.Interface errors chan error ipset *ipset.Set } // New generates a new ipset. func New(c kubernetes.Interface) Compatibility { return &calico{ client: c, errors: make(chan error), // This is a patch until Calico supports // other hosts adding IPIP iptables rules. ipset: ipset.New("cali40all-hosts-net"), } } // Run implements the mesh.Compatibility interface. // It runs the ipset controller and forwards errors along. func (c *calico) Run(stop <-chan struct{}) (<-chan error, error) { return c.ipset.Run(stop) } // CleanUp stops the compatibility layer's controllers. func (c *calico) CleanUp() error { return c.ipset.CleanUp() } type backend struct { backend mesh.Backend client kubernetes.Interface events chan *mesh.NodeEvent informer cache.SharedIndexInformer lister v1listers.NodeLister } func (c *calico) Apply(t *mesh.Topology, encapsulate mesh.Encapsulate, location string) error { if encapsulate == mesh.NeverEncapsulate { return nil } var peers []net.IP for _, s := range t.segments { if s.location == location { peers = s.privateIPs break } } return c.ipset.Set(peers) } func (c *calico) Backend(b mesh.Backend) mesh.Backend { ni := v1informers.NewNodeInformer(c.client, 5*time.Minute, nil) return &backend{ backend: b, events: make(chan *mesh.NodeEvent), informer: ni, lister: v1listers.NewNodeLister(ni.GetIndexer()), } } // Nodes implements the mesh.Backend interface. func (b *backend) Nodes() mesh.NodeBackend { return b } // Peers implements the mesh.Backend interface. func (b *backend) Peers() mesh.PeerBackend { // The Calico compatibility backend only wraps the node backend. return b.backend.Peers() } // CleanUp removes configuration applied to the backend. func (b *backend) CleanUp(name string) error { return b.backend.Nodes().CleanUp(name) } // Get gets a single Node by name. func (b *backend) Get(name string) (*mesh.Node, error) { n, err := b.lister.Get(name) if err != nil { return nil, err } m, err := b.backend.Nodes().Get(name) if err != nil { return nil, err } return translateNode(n, m), nil } // Init initializes the backend; for this backend that means // syncing the informer cache and the wrapped backend. func (b *backend) Init(stop <-chan struct{}) error { if err := b.backend.Nodes().Init(stop); err != nil { return err } go b.informer.Run(stop) if ok := cache.WaitForCacheSync(stop, func() bool { return b.informer.HasSynced() }); !ok { return errors.New("failed to sync node cache") } go func() { w := b.backend.Nodes().Watch() var ne *mesh.NodeEvent for { select { case ne = <-w: b.events <- &mesh.NodeEvent{Type: ne.Type, Node: translateNode(n, ne.Node)} case <-stop: return } } }() return nil } // List gets all the Nodes in the cluster. func (b *backend) List() ([]*mesh.Node, error) { ns, err := b.lister.List(labels.Everything()) if err != nil { return nil, err } nodes := make([]*mesh.Node, len(ns)) for i := range ns { nodes[i] = translateNode(ns[i]) } return nodes, nil } // Set sets the fields of a node. func (b *backend) Set(name string, node *mesh.Node) error { // The Calico compatibility backend is read-only. // Proxy all writes to the underlying backend. return b.backend.Nodes().Set(name, node) } // Watch returns a chan of node events. func (b *backend) Watch() <-chan *mesh.NodeEvent { return b.events }