pkg: fix reconciling peer updates

This commit is contained in:
Lucas Servén Marín 2019-05-08 17:10:33 +02:00
parent 545bc4186f
commit 034c27ab78
No known key found for this signature in database
GPG Key ID: 586FEAF680DA74AD
2 changed files with 43 additions and 21 deletions

View File

@ -154,13 +154,18 @@ func (nb *nodeBackend) Init(stop <-chan struct{}) error {
}
nb.events <- &mesh.NodeEvent{Type: mesh.AddEvent, Node: translateNode(n)}
},
UpdateFunc: func(_, obj interface{}) {
UpdateFunc: func(old, obj interface{}) {
n, ok := obj.(*v1.Node)
if !ok {
// Failed to decode Node; ignoring...
return
}
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n)}
o, ok := old.(*v1.Node)
if !ok {
// Failed to decode Node; ignoring...
return
}
nb.events <- &mesh.NodeEvent{Type: mesh.UpdateEvent, Node: translateNode(n), Old: translateNode(o)}
},
DeleteFunc: func(obj interface{}) {
n, ok := obj.(*v1.Node)
@ -369,13 +374,18 @@ func (pb *peerBackend) Init(stop <-chan struct{}) error {
}
pb.events <- &mesh.PeerEvent{Type: mesh.AddEvent, Peer: translatePeer(p)}
},
UpdateFunc: func(_, obj interface{}) {
UpdateFunc: func(old, obj interface{}) {
p, ok := obj.(*v1alpha1.Peer)
if !ok || p.Validate() != nil {
// Failed to decode Peer; ignoring...
return
}
pb.events <- &mesh.PeerEvent{Type: mesh.UpdateEvent, Peer: translatePeer(p)}
o, ok := old.(*v1alpha1.Peer)
if !ok || o.Validate() != nil {
// Failed to decode Peer; ignoring...
return
}
pb.events <- &mesh.PeerEvent{Type: mesh.UpdateEvent, Peer: translatePeer(p), Old: translatePeer(o)}
},
DeleteFunc: func(obj interface{}) {
p, ok := obj.(*v1alpha1.Peer)

View File

@ -124,12 +124,14 @@ const (
type NodeEvent struct {
Type EventType
Node *Node
Old *Node
}
// PeerEvent represents an event concerning a peer in the cluster.
type PeerEvent struct {
Type EventType
Peer *Peer
Old *Peer
}
// Backend can create clients for all of the
@ -415,8 +417,7 @@ func (m *Mesh) syncNodes(e *NodeEvent) {
// An existing node is no longer valid
// so remove it from the mesh.
if _, ok := m.nodes[e.Node.Name]; ok {
level.Info(logger).Log("msg", "node is no longer in the mesh", "node", e.Node)
delete(m.nodes, e.Node.Name)
level.Info(logger).Log("msg", "node is no longer ready", "node", e.Node)
diff = true
}
} else {
@ -454,8 +455,7 @@ func (m *Mesh) syncPeers(e *PeerEvent) {
// An existing peer is no longer valid
// so remove it from the mesh.
if _, ok := m.peers[key]; ok {
level.Info(logger).Log("msg", "peer is no longer in the mesh", "peer", e.Peer)
delete(m.peers, key)
level.Info(logger).Log("msg", "peer is no longer ready", "peer", e.Peer)
diff = true
}
} else {
@ -463,6 +463,10 @@ func (m *Mesh) syncPeers(e *PeerEvent) {
case AddEvent:
fallthrough
case UpdateEvent:
if e.Old != nil && key != string(e.Old.PublicKey) {
delete(m.peers, string(e.Old.PublicKey))
diff = true
}
if !peersAreEqual(m.peers[key], e.Peer) {
m.peers[key] = e.Peer
diff = true
@ -483,16 +487,19 @@ func (m *Mesh) syncPeers(e *PeerEvent) {
// in the backend.
func (m *Mesh) checkIn() {
m.mu.Lock()
defer m.mu.Unlock()
n := m.nodes[m.hostname]
m.mu.Unlock()
if n == nil {
level.Debug(m.logger).Log("msg", "no local node found in backend")
return
}
oldTime := n.LastSeen
n.LastSeen = time.Now().Unix()
if err := m.Nodes().Set(m.hostname, n); err != nil {
level.Error(m.logger).Log("error", fmt.Sprintf("failed to set local node: %v", err), "node", n)
m.errorCounter.WithLabelValues("checkin").Inc()
// Revert time.
n.LastSeen = oldTime
return
}
level.Debug(m.logger).Log("msg", "successfully checked in local node in backend")
@ -526,6 +533,7 @@ func (m *Mesh) handleLocal(n *Node) {
level.Debug(m.logger).Log("msg", "successfully reconciled local node against backend")
}
m.mu.Lock()
n = m.nodes[m.hostname]
if n == nil {
n = &Node{}
@ -543,31 +551,33 @@ func (m *Mesh) applyTopology() {
m.reconcileCounter.Inc()
m.mu.Lock()
defer m.mu.Unlock()
// Ensure all unready nodes are removed.
// Ensure only ready nodes are considered.
nodes := make(map[string]*Node)
var readyNodes float64
for k := range m.nodes {
if !m.nodes[k].Ready() {
delete(m.nodes, k)
continue
}
nodes[k] = m.nodes[k]
readyNodes++
}
// Ensure all unready peers are removed.
// Ensure only ready nodes are considered.
peers := make(map[string]*Peer)
var readyPeers float64
for k := range m.peers {
if !m.peers[k].Ready() {
delete(m.peers, k)
continue
}
peers[k] = m.peers[k]
readyPeers++
}
m.nodesGuage.Set(readyNodes)
m.peersGuage.Set(readyPeers)
// We cannot do anything with the topology until the local node is available.
if m.nodes[m.hostname] == nil {
if nodes[m.hostname] == nil {
return
}
t, err := NewTopology(m.nodes, m.peers, m.granularity, m.hostname, m.port, m.priv, m.subnet)
t, err := NewTopology(nodes, peers, m.granularity, m.hostname, m.port, m.priv, m.subnet)
if err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
@ -586,17 +596,17 @@ func (m *Mesh) applyTopology() {
}
rules := iptables.ForwardRules(m.subnet)
var peerCIDRs []*net.IPNet
for _, p := range m.peers {
for _, p := range peers {
rules = append(rules, iptables.ForwardRules(p.AllowedIPs...)...)
peerCIDRs = append(peerCIDRs, p.AllowedIPs...)
}
rules = append(rules, iptables.MasqueradeRules(m.subnet, oneAddressCIDR(t.privateIP.IP), m.nodes[m.hostname].Subnet, t.RemoteSubnets(), peerCIDRs)...)
rules = append(rules, iptables.MasqueradeRules(m.subnet, oneAddressCIDR(t.privateIP.IP), nodes[m.hostname].Subnet, t.RemoteSubnets(), peerCIDRs)...)
// If we are handling local routes, ensure the local
// tunnel has an IP address and IPIP traffic is allowed.
if m.encapsulate != NeverEncapsulate && m.local {
var cidrs []*net.IPNet
for _, s := range t.segments {
if s.location == m.nodes[m.hostname].Location {
if s.location == nodes[m.hostname].Location {
for i := range s.privateIPs {
cidrs = append(cidrs, oneAddressCIDR(s.privateIPs[i]))
}
@ -607,7 +617,7 @@ func (m *Mesh) applyTopology() {
// If we are handling local routes, ensure the local
// tunnel has an IP address.
if err := iproute.SetAddress(m.tunlIface, oneAddressCIDR(newAllocator(*m.nodes[m.hostname].Subnet).next().IP)); err != nil {
if err := iproute.SetAddress(m.tunlIface, oneAddressCIDR(newAllocator(*nodes[m.hostname].Subnet).next().IP)); err != nil {
level.Error(m.logger).Log("error", err)
m.errorCounter.WithLabelValues("apply").Inc()
return
@ -727,8 +737,10 @@ func nodesAreEqual(a, b *Node) bool {
if a == b {
return true
}
// Ignore LastSeen when comparing equality.
return ipNetsEqual(a.ExternalIP, b.ExternalIP) && string(a.Key) == string(b.Key) && ipNetsEqual(a.InternalIP, b.InternalIP) && a.Leader == b.Leader && a.Location == b.Location && a.Name == b.Name && subnetsEqual(a.Subnet, b.Subnet)
// Ignore LastSeen when comparing equality we want to check if the nodes are
// equivalent. However, we do want to check if LastSeen has transitioned
// between valid and invalid.
return ipNetsEqual(a.ExternalIP, b.ExternalIP) && string(a.Key) == string(b.Key) && ipNetsEqual(a.InternalIP, b.InternalIP) && a.Leader == b.Leader && a.Location == b.Location && a.Name == b.Name && subnetsEqual(a.Subnet, b.Subnet) && a.Ready() == b.Ready()
}
func peersAreEqual(a, b *Peer) bool {