2019-01-18 01:50:10 +00:00
|
|
|
/*
|
|
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
|
|
|
|
|
|
"k8s.io/klog"
|
|
|
|
)
|
|
|
|
|
|
|
|
// NewDeltaFIFO returns a Store which can be used process changes to items.
|
|
|
|
//
|
|
|
|
// keyFunc is used to figure out what key an object should have. (It's
|
|
|
|
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
|
|
|
|
//
|
|
|
|
// 'keyLister' is expected to return a list of keys that the consumer of
|
|
|
|
// this queue "knows about". It is used to decide which items are missing
|
|
|
|
// when Replace() is called; 'Deleted' deltas are produced for these items.
|
|
|
|
// It may be nil if you don't need to detect all deletions.
|
|
|
|
// TODO: consider merging keyLister with this object, tracking a list of
|
|
|
|
// "known" keys when Pop() is called. Have to think about how that
|
|
|
|
// affects error retrying.
|
|
|
|
// NOTE: It is possible to misuse this and cause a race when using an
|
|
|
|
// external known object source.
|
|
|
|
// Whether there is a potential race depends on how the comsumer
|
|
|
|
// modifies knownObjects. In Pop(), process function is called under
|
|
|
|
// lock, so it is safe to update data structures in it that need to be
|
|
|
|
// in sync with the queue (e.g. knownObjects).
|
|
|
|
//
|
|
|
|
// Example:
|
|
|
|
// In case of sharedIndexInformer being a consumer
|
|
|
|
// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
|
|
|
|
// src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
|
|
|
|
// there is no race as knownObjects (s.indexer) is modified safely
|
|
|
|
// under DeltaFIFO's lock. The only exceptions are GetStore() and
|
|
|
|
// GetIndexer() methods, which expose ways to modify the underlying
|
|
|
|
// storage. Currently these two methods are used for creating Lister
|
|
|
|
// and internal tests.
|
|
|
|
//
|
|
|
|
// Also see the comment on DeltaFIFO.
|
|
|
|
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
|
|
|
|
f := &DeltaFIFO{
|
|
|
|
items: map[string]Deltas{},
|
|
|
|
queue: []string{},
|
|
|
|
keyFunc: keyFunc,
|
|
|
|
knownObjects: knownObjects,
|
|
|
|
}
|
|
|
|
f.cond.L = &f.lock
|
|
|
|
return f
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeltaFIFO is like FIFO, but allows you to process deletes.
|
|
|
|
//
|
|
|
|
// DeltaFIFO is a producer-consumer queue, where a Reflector is
|
|
|
|
// intended to be the producer, and the consumer is whatever calls
|
|
|
|
// the Pop() method.
|
|
|
|
//
|
|
|
|
// DeltaFIFO solves this use case:
|
|
|
|
// * You want to process every object change (delta) at most once.
|
|
|
|
// * When you process an object, you want to see everything
|
|
|
|
// that's happened to it since you last processed it.
|
|
|
|
// * You want to process the deletion of objects.
|
|
|
|
// * You might want to periodically reprocess objects.
|
|
|
|
//
|
|
|
|
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
|
|
|
|
// interface{} to satisfy the Store/Queue interfaces, but it
|
|
|
|
// will always return an object of type Deltas.
|
|
|
|
//
|
|
|
|
// A note on threading: If you call Pop() in parallel from multiple
|
|
|
|
// threads, you could end up with multiple threads processing slightly
|
|
|
|
// different versions of the same object.
|
|
|
|
//
|
|
|
|
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
|
|
|
|
// to list keys that are "known", for the purpose of figuring out which
|
|
|
|
// items have been deleted when Replace() or Delete() are called. The deleted
|
|
|
|
// object will be included in the DeleteFinalStateUnknown markers. These objects
|
|
|
|
// could be stale.
|
|
|
|
type DeltaFIFO struct {
|
|
|
|
// lock/cond protects access to 'items' and 'queue'.
|
|
|
|
lock sync.RWMutex
|
|
|
|
cond sync.Cond
|
|
|
|
|
|
|
|
// We depend on the property that items in the set are in
|
|
|
|
// the queue and vice versa, and that all Deltas in this
|
|
|
|
// map have at least one Delta.
|
|
|
|
items map[string]Deltas
|
|
|
|
queue []string
|
|
|
|
|
|
|
|
// populated is true if the first batch of items inserted by Replace() has been populated
|
|
|
|
// or Delete/Add/Update was called first.
|
|
|
|
populated bool
|
|
|
|
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
|
|
|
initialPopulationCount int
|
|
|
|
|
|
|
|
// keyFunc is used to make the key used for queued item
|
|
|
|
// insertion and retrieval, and should be deterministic.
|
|
|
|
keyFunc KeyFunc
|
|
|
|
|
|
|
|
// knownObjects list keys that are "known", for the
|
|
|
|
// purpose of figuring out which items have been deleted
|
|
|
|
// when Replace() or Delete() is called.
|
|
|
|
knownObjects KeyListerGetter
|
|
|
|
|
|
|
|
// Indication the queue is closed.
|
|
|
|
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
|
|
|
|
// Currently, not used to gate any of CRED operations.
|
|
|
|
closed bool
|
|
|
|
closedLock sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
|
|
|
|
// object with zero length is encountered (should be impossible,
|
|
|
|
// but included for completeness).
|
|
|
|
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
|
|
|
|
)
|
|
|
|
|
|
|
|
// Close the queue.
|
|
|
|
func (f *DeltaFIFO) Close() {
|
|
|
|
f.closedLock.Lock()
|
|
|
|
defer f.closedLock.Unlock()
|
|
|
|
f.closed = true
|
|
|
|
f.cond.Broadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
|
|
|
|
// DeletedFinalStateUnknown objects.
|
|
|
|
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
|
|
|
if d, ok := obj.(Deltas); ok {
|
|
|
|
if len(d) == 0 {
|
|
|
|
return "", KeyError{obj, ErrZeroLengthDeltasObject}
|
|
|
|
}
|
|
|
|
obj = d.Newest().Object
|
|
|
|
}
|
|
|
|
if d, ok := obj.(DeletedFinalStateUnknown); ok {
|
|
|
|
return d.Key, nil
|
|
|
|
}
|
|
|
|
return f.keyFunc(obj)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
|
|
|
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
|
|
|
func (f *DeltaFIFO) HasSynced() bool {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
return f.populated && f.initialPopulationCount == 0
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
|
|
|
// if it doesn't already exist in the set.
|
|
|
|
func (f *DeltaFIFO) Add(obj interface{}) error {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
f.populated = true
|
|
|
|
return f.queueActionLocked(Added, obj)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update is just like Add, but makes an Updated Delta.
|
|
|
|
func (f *DeltaFIFO) Update(obj interface{}) error {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
f.populated = true
|
|
|
|
return f.queueActionLocked(Updated, obj)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete is just like Add, but makes an Deleted Delta. If the item does not
|
|
|
|
// already exist, it will be ignored. (It may have already been deleted by a
|
|
|
|
// Replace (re-list), for example.
|
|
|
|
func (f *DeltaFIFO) Delete(obj interface{}) error {
|
|
|
|
id, err := f.KeyOf(obj)
|
|
|
|
if err != nil {
|
|
|
|
return KeyError{obj, err}
|
|
|
|
}
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
f.populated = true
|
|
|
|
if f.knownObjects == nil {
|
|
|
|
if _, exists := f.items[id]; !exists {
|
|
|
|
// Presumably, this was deleted when a relist happened.
|
|
|
|
// Don't provide a second report of the same deletion.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// We only want to skip the "deletion" action if the object doesn't
|
|
|
|
// exist in knownObjects and it doesn't have corresponding item in items.
|
|
|
|
// Note that even if there is a "deletion" action in items, we can ignore it,
|
|
|
|
// because it will be deduped automatically in "queueActionLocked"
|
|
|
|
_, exists, err := f.knownObjects.GetByKey(id)
|
|
|
|
_, itemsExist := f.items[id]
|
|
|
|
if err == nil && !exists && !itemsExist {
|
|
|
|
// Presumably, this was deleted when a relist happened.
|
|
|
|
// Don't provide a second report of the same deletion.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return f.queueActionLocked(Deleted, obj)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
|
|
|
|
// present in the set, it is neither enqueued nor added to the set.
|
|
|
|
//
|
|
|
|
// This is useful in a single producer/consumer scenario so that the consumer can
|
|
|
|
// safely retry items without contending with the producer and potentially enqueueing
|
|
|
|
// stale items.
|
|
|
|
//
|
|
|
|
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
|
|
|
|
// different from the Add/Update/Delete functions.
|
|
|
|
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
|
|
|
|
deltas, ok := obj.(Deltas)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
|
|
|
|
}
|
|
|
|
id, err := f.KeyOf(deltas.Newest().Object)
|
|
|
|
if err != nil {
|
|
|
|
return KeyError{obj, err}
|
|
|
|
}
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
f.addIfNotPresent(id, deltas)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
|
|
|
|
// already holds the fifo lock.
|
|
|
|
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
|
|
|
|
f.populated = true
|
|
|
|
if _, exists := f.items[id]; exists {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
f.queue = append(f.queue, id)
|
|
|
|
f.items[id] = deltas
|
|
|
|
f.cond.Broadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
// re-listing and watching can deliver the same update multiple times in any
|
|
|
|
// order. This will combine the most recent two deltas if they are the same.
|
|
|
|
func dedupDeltas(deltas Deltas) Deltas {
|
|
|
|
n := len(deltas)
|
|
|
|
if n < 2 {
|
|
|
|
return deltas
|
|
|
|
}
|
|
|
|
a := &deltas[n-1]
|
|
|
|
b := &deltas[n-2]
|
|
|
|
if out := isDup(a, b); out != nil {
|
|
|
|
d := append(Deltas{}, deltas[:n-2]...)
|
|
|
|
return append(d, *out)
|
|
|
|
}
|
|
|
|
return deltas
|
|
|
|
}
|
|
|
|
|
|
|
|
// If a & b represent the same event, returns the delta that ought to be kept.
|
|
|
|
// Otherwise, returns nil.
|
|
|
|
// TODO: is there anything other than deletions that need deduping?
|
|
|
|
func isDup(a, b *Delta) *Delta {
|
|
|
|
if out := isDeletionDup(a, b); out != nil {
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
// TODO: Detect other duplicate situations? Are there any?
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// keep the one with the most information if both are deletions.
|
|
|
|
func isDeletionDup(a, b *Delta) *Delta {
|
|
|
|
if b.Type != Deleted || a.Type != Deleted {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Do more sophisticated checks, or is this sufficient?
|
|
|
|
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
// willObjectBeDeletedLocked returns true only if the last delta for the
|
|
|
|
// given object is Delete. Caller must lock first.
|
|
|
|
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
|
|
|
deltas := f.items[id]
|
|
|
|
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
|
|
|
}
|
|
|
|
|
|
|
|
// queueActionLocked appends to the delta list for the object.
|
|
|
|
// Caller must lock first.
|
|
|
|
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
|
|
|
id, err := f.KeyOf(obj)
|
|
|
|
if err != nil {
|
|
|
|
return KeyError{obj, err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If object is supposed to be deleted (last event is Deleted),
|
|
|
|
// then we should ignore Sync events, because it would result in
|
|
|
|
// recreation of this object.
|
|
|
|
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
newDeltas := append(f.items[id], Delta{actionType, obj})
|
|
|
|
newDeltas = dedupDeltas(newDeltas)
|
|
|
|
|
|
|
|
if len(newDeltas) > 0 {
|
|
|
|
if _, exists := f.items[id]; !exists {
|
|
|
|
f.queue = append(f.queue, id)
|
|
|
|
}
|
|
|
|
f.items[id] = newDeltas
|
|
|
|
f.cond.Broadcast()
|
|
|
|
} else {
|
|
|
|
// We need to remove this from our map (extra items in the queue are
|
|
|
|
// ignored if they are not in the map).
|
|
|
|
delete(f.items, id)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// List returns a list of all the items; it returns the object
|
|
|
|
// from the most recent Delta.
|
|
|
|
// You should treat the items returned inside the deltas as immutable.
|
|
|
|
func (f *DeltaFIFO) List() []interface{} {
|
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
|
|
|
return f.listLocked()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *DeltaFIFO) listLocked() []interface{} {
|
|
|
|
list := make([]interface{}, 0, len(f.items))
|
|
|
|
for _, item := range f.items {
|
|
|
|
list = append(list, item.Newest().Object)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListKeys returns a list of all the keys of the objects currently
|
|
|
|
// in the FIFO.
|
|
|
|
func (f *DeltaFIFO) ListKeys() []string {
|
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
|
|
|
list := make([]string, 0, len(f.items))
|
|
|
|
for key := range f.items {
|
|
|
|
list = append(list, key)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get returns the complete list of deltas for the requested item,
|
|
|
|
// or sets exists=false.
|
|
|
|
// You should treat the items returned inside the deltas as immutable.
|
|
|
|
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
|
|
|
key, err := f.KeyOf(obj)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, KeyError{obj, err}
|
|
|
|
}
|
|
|
|
return f.GetByKey(key)
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetByKey returns the complete list of deltas for the requested item,
|
|
|
|
// setting exists=false if that list is empty.
|
|
|
|
// You should treat the items returned inside the deltas as immutable.
|
|
|
|
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
|
|
|
d, exists := f.items[key]
|
|
|
|
if exists {
|
|
|
|
// Copy item's slice so operations on this slice
|
|
|
|
// won't interfere with the object we return.
|
|
|
|
d = copyDeltas(d)
|
|
|
|
}
|
|
|
|
return d, exists, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Checks if the queue is closed
|
|
|
|
func (f *DeltaFIFO) IsClosed() bool {
|
|
|
|
f.closedLock.Lock()
|
|
|
|
defer f.closedLock.Unlock()
|
|
|
|
return f.closed
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pop blocks until an item is added to the queue, and then returns it. If
|
|
|
|
// multiple items are ready, they are returned in the order in which they were
|
|
|
|
// added/updated. The item is removed from the queue (and the store) before it
|
|
|
|
// is returned, so if you don't successfully process it, you need to add it back
|
|
|
|
// with AddIfNotPresent().
|
|
|
|
// process function is called under lock, so it is safe update data structures
|
|
|
|
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
|
|
|
|
// may return an instance of ErrRequeue with a nested error to indicate the current
|
|
|
|
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
|
|
|
|
//
|
|
|
|
// Pop returns a 'Deltas', which has a complete list of all the things
|
|
|
|
// that happened to the object (deltas) while it was sitting in the queue.
|
|
|
|
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
for {
|
|
|
|
for len(f.queue) == 0 {
|
|
|
|
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
|
|
|
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
|
|
|
// Which causes this loop to continue and return from the Pop().
|
|
|
|
if f.IsClosed() {
|
|
|
|
return nil, FIFOClosedError
|
|
|
|
}
|
|
|
|
|
|
|
|
f.cond.Wait()
|
|
|
|
}
|
|
|
|
id := f.queue[0]
|
|
|
|
f.queue = f.queue[1:]
|
|
|
|
if f.initialPopulationCount > 0 {
|
|
|
|
f.initialPopulationCount--
|
|
|
|
}
|
|
|
|
item, ok := f.items[id]
|
|
|
|
if !ok {
|
|
|
|
// Item may have been deleted subsequently.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
delete(f.items, id)
|
|
|
|
err := process(item)
|
|
|
|
if e, ok := err.(ErrRequeue); ok {
|
|
|
|
f.addIfNotPresent(id, item)
|
|
|
|
err = e.Err
|
|
|
|
}
|
|
|
|
// Don't need to copyDeltas here, because we're transferring
|
|
|
|
// ownership to the caller.
|
|
|
|
return item, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Replace will delete the contents of 'f', using instead the given map.
|
|
|
|
// 'f' takes ownership of the map, you should not reference the map again
|
|
|
|
// after calling this function. f's queue is reset, too; upon return, it
|
|
|
|
// will contain the items in the map, in no particular order.
|
|
|
|
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
keys := make(sets.String, len(list))
|
|
|
|
|
|
|
|
for _, item := range list {
|
|
|
|
key, err := f.KeyOf(item)
|
|
|
|
if err != nil {
|
|
|
|
return KeyError{item, err}
|
|
|
|
}
|
|
|
|
keys.Insert(key)
|
|
|
|
if err := f.queueActionLocked(Sync, item); err != nil {
|
|
|
|
return fmt.Errorf("couldn't enqueue object: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if f.knownObjects == nil {
|
|
|
|
// Do deletion detection against our own list.
|
2019-05-03 10:50:21 +00:00
|
|
|
queuedDeletions := 0
|
2019-01-18 01:50:10 +00:00
|
|
|
for k, oldItem := range f.items {
|
|
|
|
if keys.Has(k) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var deletedObj interface{}
|
|
|
|
if n := oldItem.Newest(); n != nil {
|
|
|
|
deletedObj = n.Object
|
|
|
|
}
|
2019-05-03 10:50:21 +00:00
|
|
|
queuedDeletions++
|
2019-01-18 01:50:10 +00:00
|
|
|
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !f.populated {
|
|
|
|
f.populated = true
|
2019-05-03 10:50:21 +00:00
|
|
|
// While there shouldn't be any queued deletions in the initial
|
|
|
|
// population of the queue, it's better to be on the safe side.
|
|
|
|
f.initialPopulationCount = len(list) + queuedDeletions
|
2019-01-18 01:50:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Detect deletions not already in the queue.
|
|
|
|
knownKeys := f.knownObjects.ListKeys()
|
|
|
|
queuedDeletions := 0
|
|
|
|
for _, k := range knownKeys {
|
|
|
|
if keys.Has(k) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
|
|
|
if err != nil {
|
|
|
|
deletedObj = nil
|
|
|
|
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
|
|
|
} else if !exists {
|
|
|
|
deletedObj = nil
|
|
|
|
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
|
|
|
}
|
|
|
|
queuedDeletions++
|
|
|
|
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !f.populated {
|
|
|
|
f.populated = true
|
|
|
|
f.initialPopulationCount = len(list) + queuedDeletions
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Resync will send a sync event for each item
|
|
|
|
func (f *DeltaFIFO) Resync() error {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
|
|
|
|
if f.knownObjects == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
keys := f.knownObjects.ListKeys()
|
|
|
|
for _, k := range keys {
|
|
|
|
if err := f.syncKeyLocked(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *DeltaFIFO) syncKey(key string) error {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
|
|
|
|
return f.syncKeyLocked(key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *DeltaFIFO) syncKeyLocked(key string) error {
|
|
|
|
obj, exists, err := f.knownObjects.GetByKey(key)
|
|
|
|
if err != nil {
|
|
|
|
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
|
|
|
|
return nil
|
|
|
|
} else if !exists {
|
|
|
|
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we are doing Resync() and there is already an event queued for that object,
|
|
|
|
// we ignore the Resync for it. This is to avoid the race, in which the resync
|
|
|
|
// comes with the previous value of object (since queueing an event for the object
|
|
|
|
// doesn't trigger changing the underlying store <knownObjects>.
|
|
|
|
id, err := f.KeyOf(obj)
|
|
|
|
if err != nil {
|
|
|
|
return KeyError{obj, err}
|
|
|
|
}
|
|
|
|
if len(f.items[id]) > 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := f.queueActionLocked(Sync, obj); err != nil {
|
|
|
|
return fmt.Errorf("couldn't queue object: %v", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
|
|
|
|
type KeyListerGetter interface {
|
|
|
|
KeyLister
|
|
|
|
KeyGetter
|
|
|
|
}
|
|
|
|
|
|
|
|
// A KeyLister is anything that knows how to list its keys.
|
|
|
|
type KeyLister interface {
|
|
|
|
ListKeys() []string
|
|
|
|
}
|
|
|
|
|
|
|
|
// A KeyGetter is anything that knows how to get the value stored under a given key.
|
|
|
|
type KeyGetter interface {
|
|
|
|
GetByKey(key string) (interface{}, bool, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeltaType is the type of a change (addition, deletion, etc)
|
|
|
|
type DeltaType string
|
|
|
|
|
|
|
|
const (
|
|
|
|
Added DeltaType = "Added"
|
|
|
|
Updated DeltaType = "Updated"
|
|
|
|
Deleted DeltaType = "Deleted"
|
|
|
|
// The other types are obvious. You'll get Sync deltas when:
|
|
|
|
// * A watch expires/errors out and a new list/watch cycle is started.
|
|
|
|
// * You've turned on periodic syncs.
|
|
|
|
// (Anything that trigger's DeltaFIFO's Replace() method.)
|
|
|
|
Sync DeltaType = "Sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Delta is the type stored by a DeltaFIFO. It tells you what change
|
|
|
|
// happened, and the object's state after* that change.
|
|
|
|
//
|
|
|
|
// [*] Unless the change is a deletion, and then you'll get the final
|
|
|
|
// state of the object before it was deleted.
|
|
|
|
type Delta struct {
|
|
|
|
Type DeltaType
|
|
|
|
Object interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Deltas is a list of one or more 'Delta's to an individual object.
|
|
|
|
// The oldest delta is at index 0, the newest delta is the last one.
|
|
|
|
type Deltas []Delta
|
|
|
|
|
|
|
|
// Oldest is a convenience function that returns the oldest delta, or
|
|
|
|
// nil if there are no deltas.
|
|
|
|
func (d Deltas) Oldest() *Delta {
|
|
|
|
if len(d) > 0 {
|
|
|
|
return &d[0]
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Newest is a convenience function that returns the newest delta, or
|
|
|
|
// nil if there are no deltas.
|
|
|
|
func (d Deltas) Newest() *Delta {
|
|
|
|
if n := len(d); n > 0 {
|
|
|
|
return &d[n-1]
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
|
|
|
|
// the objects in the slice. This allows Get/List to return an object that we
|
|
|
|
// know won't be clobbered by a subsequent modifications.
|
|
|
|
func copyDeltas(d Deltas) Deltas {
|
|
|
|
d2 := make(Deltas, len(d))
|
|
|
|
copy(d2, d)
|
|
|
|
return d2
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
|
|
|
|
// an object was deleted but the watch deletion event was missed. In this
|
|
|
|
// case we don't know the final "resting" state of the object, so there's
|
|
|
|
// a chance the included `Obj` is stale.
|
|
|
|
type DeletedFinalStateUnknown struct {
|
|
|
|
Key string
|
|
|
|
Obj interface{}
|
|
|
|
}
|