pkg/{k8s,mesh}: introduce liveness checks

This commit introduces liveness checks to Kilo. This allows the Kilo
daemons to take nodes with inactive or dead Kilo deamons out of the
topology until they are alive again.
This commit is contained in:
Lucas Servén Marín 2019-04-02 18:25:08 +02:00
parent a8467f779e
commit 72bfb762b9
No known key found for this signature in database
GPG Key ID: 586FEAF680DA74AD
4 changed files with 64 additions and 7 deletions

View File

@ -20,10 +20,11 @@ import (
"fmt" "fmt"
"net" "net"
"path" "path"
"strconv"
"strings" "strings"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
@ -42,6 +43,7 @@ const (
forceExternalIPAnnotationKey = "kilo.squat.ai/force-external-ip" forceExternalIPAnnotationKey = "kilo.squat.ai/force-external-ip"
internalIPAnnotationKey = "kilo.squat.ai/internal-ip" internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
keyAnnotationKey = "kilo.squat.ai/key" keyAnnotationKey = "kilo.squat.ai/key"
lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
leaderAnnotationKey = "kilo.squat.ai/leader" leaderAnnotationKey = "kilo.squat.ai/leader"
locationAnnotationKey = "kilo.squat.ai/location" locationAnnotationKey = "kilo.squat.ai/location"
regionLabelKey = "failure-domain.beta.kubernetes.io/region" regionLabelKey = "failure-domain.beta.kubernetes.io/region"
@ -76,6 +78,7 @@ func (b *backend) CleanUp(name string) error {
fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(externalIPAnnotationKey, "/", jsonPatchSlash, 1))), fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(externalIPAnnotationKey, "/", jsonPatchSlash, 1))),
fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(internalIPAnnotationKey, "/", jsonPatchSlash, 1))), fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(internalIPAnnotationKey, "/", jsonPatchSlash, 1))),
fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(keyAnnotationKey, "/", jsonPatchSlash, 1))), fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(keyAnnotationKey, "/", jsonPatchSlash, 1))),
fmt.Sprintf(jsonRemovePatch, path.Join("/metadata", "annotations", strings.Replace(lastSeenAnnotationKey, "/", jsonPatchSlash, 1))),
}, ",") + "]") }, ",") + "]")
if _, err := b.client.CoreV1().Nodes().Patch(name, types.JSONPatchType, patch); err != nil { if _, err := b.client.CoreV1().Nodes().Patch(name, types.JSONPatchType, patch); err != nil {
return fmt.Errorf("failed to patch node: %v", err) return fmt.Errorf("failed to patch node: %v", err)
@ -155,6 +158,7 @@ func (b *backend) Set(name string, node *mesh.Node) error {
n.ObjectMeta.Annotations[externalIPAnnotationKey] = node.ExternalIP.String() n.ObjectMeta.Annotations[externalIPAnnotationKey] = node.ExternalIP.String()
n.ObjectMeta.Annotations[internalIPAnnotationKey] = node.InternalIP.String() n.ObjectMeta.Annotations[internalIPAnnotationKey] = node.InternalIP.String()
n.ObjectMeta.Annotations[keyAnnotationKey] = string(node.Key) n.ObjectMeta.Annotations[keyAnnotationKey] = string(node.Key)
n.ObjectMeta.Annotations[lastSeenAnnotationKey] = strconv.FormatInt(node.LastSeen, 10)
oldData, err := json.Marshal(old) oldData, err := json.Marshal(old)
if err != nil { if err != nil {
return err return err
@ -200,6 +204,14 @@ func translateNode(node *v1.Node) *mesh.Node {
if !ok { if !ok {
externalIP = node.ObjectMeta.Annotations[externalIPAnnotationKey] externalIP = node.ObjectMeta.Annotations[externalIPAnnotationKey]
} }
var lastSeen int64
if ls, ok := node.ObjectMeta.Annotations[lastSeenAnnotationKey]; !ok {
lastSeen = 0
} else {
if lastSeen, err = strconv.ParseInt(ls, 10, 64); err != nil {
lastSeen = 0
}
}
return &mesh.Node{ return &mesh.Node{
// ExternalIP and InternalIP should only ever fail to parse if the // ExternalIP and InternalIP should only ever fail to parse if the
// remote node's mesh has not yet set its IP address; // remote node's mesh has not yet set its IP address;
@ -208,6 +220,7 @@ func translateNode(node *v1.Node) *mesh.Node {
ExternalIP: normalizeIP(externalIP), ExternalIP: normalizeIP(externalIP),
InternalIP: normalizeIP(node.ObjectMeta.Annotations[internalIPAnnotationKey]), InternalIP: normalizeIP(node.ObjectMeta.Annotations[internalIPAnnotationKey]),
Key: []byte(node.ObjectMeta.Annotations[keyAnnotationKey]), Key: []byte(node.ObjectMeta.Annotations[keyAnnotationKey]),
LastSeen: lastSeen,
Leader: leader, Leader: leader,
Location: location, Location: location,
Name: node.Name, Name: node.Name,

View File

@ -19,7 +19,7 @@ import (
"testing" "testing"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"github.com/squat/kilo/pkg/mesh" "github.com/squat/kilo/pkg/mesh"
) )
@ -38,15 +38,15 @@ func TestTranslateNode(t *testing.T) {
out: &mesh.Node{}, out: &mesh.Node{},
}, },
{ {
name: "invalid ip", name: "invalid ips",
annotations: map[string]string{ annotations: map[string]string{
externalIPAnnotationKey: "10.0.0.1", externalIPAnnotationKey: "10.0.0.1",
internalIPAnnotationKey: "10.0.0.1", internalIPAnnotationKey: "foo",
}, },
out: &mesh.Node{}, out: &mesh.Node{},
}, },
{ {
name: "valid ip", name: "valid ips",
annotations: map[string]string{ annotations: map[string]string{
externalIPAnnotationKey: "10.0.0.1/24", externalIPAnnotationKey: "10.0.0.1/24",
internalIPAnnotationKey: "10.0.0.2/32", internalIPAnnotationKey: "10.0.0.2/32",
@ -109,6 +109,13 @@ func TestTranslateNode(t *testing.T) {
ExternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(24, 32)}, ExternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(24, 32)},
}, },
}, },
{
name: "invalid time",
annotations: map[string]string{
lastSeenAnnotationKey: "foo",
},
out: &mesh.Node{},
},
{ {
name: "complete", name: "complete",
annotations: map[string]string{ annotations: map[string]string{
@ -116,6 +123,7 @@ func TestTranslateNode(t *testing.T) {
forceExternalIPAnnotationKey: "10.0.0.2/24", forceExternalIPAnnotationKey: "10.0.0.2/24",
internalIPAnnotationKey: "10.0.0.2/32", internalIPAnnotationKey: "10.0.0.2/32",
keyAnnotationKey: "foo", keyAnnotationKey: "foo",
lastSeenAnnotationKey: "1000000000",
leaderAnnotationKey: "", leaderAnnotationKey: "",
locationAnnotationKey: "b", locationAnnotationKey: "b",
}, },
@ -126,6 +134,7 @@ func TestTranslateNode(t *testing.T) {
ExternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(24, 32)}, ExternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(24, 32)},
InternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(32, 32)}, InternalIP: &net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.CIDRMask(32, 32)},
Key: []byte("foo"), Key: []byte("foo"),
LastSeen: 1000000000,
Leader: true, Leader: true,
Location: "b", Location: "b",
Subnet: &net.IPNet{IP: net.ParseIP("10.2.1.0"), Mask: net.CIDRMask(24, 32)}, Subnet: &net.IPNet{IP: net.ParseIP("10.2.1.0"), Mask: net.CIDRMask(24, 32)},

View File

@ -77,6 +77,9 @@ type Node struct {
ExternalIP *net.IPNet ExternalIP *net.IPNet
Key []byte Key []byte
InternalIP *net.IPNet InternalIP *net.IPNet
// LastSeen is a Unix time for the last time
// the node confirmed it was live.
LastSeen int64
// Leader is a suggestion to Kilo that // Leader is a suggestion to Kilo that
// the node wants to lead its segment. // the node wants to lead its segment.
Leader bool Leader bool
@ -87,7 +90,7 @@ type Node struct {
// Ready indicates whether or not the node is ready. // Ready indicates whether or not the node is ready.
func (n *Node) Ready() bool { func (n *Node) Ready() bool {
return n != nil && n.ExternalIP != nil && n.Key != nil && n.InternalIP != nil && n.Subnet != nil return n != nil && n.ExternalIP != nil && n.Key != nil && n.InternalIP != nil && n.Subnet != nil && time.Now().Unix()-n.LastSeen < int64(resyncPeriod)*2/int64(time.Second)
} }
// EventType describes what kind of an action an event represents. // EventType describes what kind of an action an event represents.
@ -284,6 +287,7 @@ func (m *Mesh) Run() error {
case e = <-w: case e = <-w:
m.sync(e) m.sync(e)
case <-t.C: case <-t.C:
m.checkIn()
m.applyTopology() m.applyTopology()
t.Reset(resyncPeriod) t.Reset(resyncPeriod)
case <-m.stop: case <-m.stop:
@ -332,6 +336,25 @@ func (m *Mesh) sync(e *Event) {
} }
} }
// checkIn will try to update the local node's LastSeen timestamp
// in the backend.
func (m *Mesh) checkIn() {
m.mu.Lock()
n := m.nodes[m.hostname]
m.mu.Unlock()
if n == nil {
level.Debug(m.logger).Log("msg", "no local node found in backend")
return
}
n.LastSeen = time.Now().Unix()
if err := m.Set(m.hostname, n); err != nil {
level.Error(m.logger).Log("error", fmt.Sprintf("failed to set local node: %v", err), "node", n)
m.errorCounter.WithLabelValues("checkin").Inc()
return
}
level.Debug(m.logger).Log("msg", "successfully checked in local node in backend")
}
func (m *Mesh) handleLocal(n *Node) { func (m *Mesh) handleLocal(n *Node) {
// Allow the external IP to be overridden. // Allow the external IP to be overridden.
if n.ExternalIP == nil { if n.ExternalIP == nil {
@ -340,7 +363,16 @@ func (m *Mesh) handleLocal(n *Node) {
// Compare the given node to the calculated local node. // Compare the given node to the calculated local node.
// Take leader, location, and subnet from the argument, as these // Take leader, location, and subnet from the argument, as these
// are not determined by kilo. // are not determined by kilo.
local := &Node{ExternalIP: n.ExternalIP, Key: m.pub, InternalIP: m.internalIP, Leader: n.Leader, Location: n.Location, Name: m.hostname, Subnet: n.Subnet} local := &Node{
ExternalIP: n.ExternalIP,
Key: m.pub,
InternalIP: m.internalIP,
LastSeen: time.Now().Unix(),
Leader: n.Leader,
Location: n.Location,
Name: m.hostname,
Subnet: n.Subnet,
}
if !nodesAreEqual(n, local) { if !nodesAreEqual(n, local) {
level.Debug(m.logger).Log("msg", "local node differs from backend") level.Debug(m.logger).Log("msg", "local node differs from backend")
if err := m.Set(m.hostname, local); err != nil { if err := m.Set(m.hostname, local); err != nil {
@ -543,6 +575,7 @@ func nodesAreEqual(a, b *Node) bool {
if a == b { if a == b {
return true return true
} }
// Ignore LastSeen when comparing equality.
return ipNetsEqual(a.ExternalIP, b.ExternalIP) && string(a.Key) == string(b.Key) && ipNetsEqual(a.InternalIP, b.InternalIP) && a.Leader == b.Leader && a.Location == b.Location && a.Name == b.Name && subnetsEqual(a.Subnet, b.Subnet) return ipNetsEqual(a.ExternalIP, b.ExternalIP) && string(a.Key) == string(b.Key) && ipNetsEqual(a.InternalIP, b.InternalIP) && a.Leader == b.Leader && a.Location == b.Location && a.Name == b.Name && subnetsEqual(a.Subnet, b.Subnet)
} }

View File

@ -17,6 +17,7 @@ package mesh
import ( import (
"net" "net"
"testing" "testing"
"time"
) )
func TestNewAllocator(t *testing.T) { func TestNewAllocator(t *testing.T) {
@ -133,6 +134,7 @@ func TestReady(t *testing.T) {
ExternalIP: externalIP, ExternalIP: externalIP,
InternalIP: internalIP, InternalIP: internalIP,
Key: []byte{}, Key: []byte{},
LastSeen: time.Now().Unix(),
Subnet: &net.IPNet{IP: net.ParseIP("10.2.0.0"), Mask: net.CIDRMask(16, 32)}, Subnet: &net.IPNet{IP: net.ParseIP("10.2.0.0"), Mask: net.CIDRMask(16, 32)},
}, },
ready: true, ready: true,