cmd/kg,pkg: add --resync-period flag

This commit introduces a new `--resync-period` flag to control how often
the Kilo controllers should reconcile.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
This commit is contained in:
Lucas Servén Marín
2021-02-28 18:33:01 +01:00
parent c060bf24e2
commit 8dbbc636b5
8 changed files with 58 additions and 68 deletions

View File

@@ -198,10 +198,11 @@ func chainToString(table, chain string) string {
// Controller is able to reconcile a given set of iptables rules.
type Controller struct {
v4 Client
v6 Client
errors chan error
logger log.Logger
v4 Client
v6 Client
errors chan error
logger log.Logger
resyncPeriod time.Duration
sync.Mutex
rules []Rule
@@ -218,6 +219,13 @@ func WithLogger(logger log.Logger) ControllerOption {
}
}
// WithResyncPeriod modifies how often the controller reconciles.
func WithResyncPeriod(resyncPeriod time.Duration) ControllerOption {
return func(c *Controller) {
c.resyncPeriod = resyncPeriod
}
}
// WithClients adds iptables clients to the controller.
func WithClients(v4, v6 Client) ControllerOption {
return func(c *Controller) {
@@ -266,16 +274,18 @@ func (c *Controller) Run(stop <-chan struct{}) (<-chan error, error) {
c.subscribed = true
c.Unlock()
go func() {
t := time.NewTimer(c.resyncPeriod)
defer close(c.errors)
for {
select {
case <-time.After(30 * time.Second):
case <-t.C:
if err := c.reconcile(); err != nil {
nonBlockingSend(c.errors, fmt.Errorf("failed to reconcile rules: %v", err))
}
t.Reset(c.resyncPeriod)
case <-stop:
return
}
if err := c.reconcile(); err != nil {
nonBlockingSend(c.errors, fmt.Errorf("failed to reconcile rules: %v", err))
}
}
}()
return c.errors, nil

View File

@@ -22,9 +22,9 @@ import (
)
const (
// resyncPeriod is how often the mesh checks state if no events have been received.
resyncPeriod = 30 * time.Second
// DefaultKiloInterface is the default iterface created and used by Kilo.
// checkInPeriod is how often nodes should check-in.
checkInPeriod = 30 * time.Second
// DefaultKiloInterface is the default interface created and used by Kilo.
DefaultKiloInterface = "kilo0"
// DefaultKiloPort is the default UDP port Kilo uses.
DefaultKiloPort = 51820
@@ -70,7 +70,7 @@ type Node struct {
// Ready indicates whether or not the node is ready.
func (n *Node) Ready() bool {
// Nodes that are not leaders will not have WireGuardIPs, so it is not required.
return n != nil && n.Endpoint != nil && !(n.Endpoint.IP == nil && n.Endpoint.DNS == "") && n.Endpoint.Port != 0 && n.Key != nil && n.Subnet != nil && time.Now().Unix()-n.LastSeen < int64(resyncPeriod)*2/int64(time.Second)
return n != nil && n.Endpoint != nil && !(n.Endpoint.IP == nil && n.Endpoint.DNS == "") && n.Endpoint.Port != 0 && n.Key != nil && n.Subnet != nil && time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
}
// Peer represents a peer in the network.

View File

@@ -65,6 +65,7 @@ type Mesh struct {
priv []byte
privIface int
pub []byte
resyncPeriod time.Duration
stop chan struct{}
subnet *net.IPNet
table *route.Table
@@ -85,7 +86,7 @@ type Mesh struct {
}
// New returns a new Mesh instance.
func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port uint32, subnet *net.IPNet, local, cni bool, cniPath, iface string, cleanUpIface bool, createIface bool, logger log.Logger) (*Mesh, error) {
func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port uint32, subnet *net.IPNet, local, cni bool, cniPath, iface string, cleanUpIface bool, createIface bool, resyncPeriod time.Duration, logger log.Logger) (*Mesh, error) {
if err := os.MkdirAll(kiloPath, 0700); err != nil {
return nil, fmt.Errorf("failed to create directory to store configuration: %v", err)
}
@@ -143,7 +144,7 @@ func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularit
level.Debug(logger).Log("msg", "running without a private IP address")
}
level.Debug(logger).Log("msg", fmt.Sprintf("using %s as the public IP address", publicIP.String()))
ipTables, err := iptables.New(iptables.WithLogger(log.With(logger, "component", "iptables")))
ipTables, err := iptables.New(iptables.WithLogger(log.With(logger, "component", "iptables")), iptables.WithResyncPeriod(resyncPeriod))
if err != nil {
return nil, fmt.Errorf("failed to IP tables controller: %v", err)
}
@@ -234,7 +235,8 @@ func (m *Mesh) Run() error {
}
}()
defer m.cleanUp()
t := time.NewTimer(resyncPeriod)
resync := time.NewTimer(m.resyncPeriod)
checkIn := time.NewTimer(checkInPeriod)
nw := m.Nodes().Watch()
pw := m.Peers().Watch()
var ne *NodeEvent
@@ -245,13 +247,15 @@ func (m *Mesh) Run() error {
m.syncNodes(ne)
case pe = <-pw:
m.syncPeers(pe)
case <-t.C:
case <-checkIn.C:
m.checkIn()
checkIn.Reset(checkInPeriod)
case <-resync.C:
if m.cni {
m.updateCNIConfig()
}
m.applyTopology()
t.Reset(resyncPeriod)
resync.Reset(m.resyncPeriod)
case <-m.stop:
return nil
}