go.mod: bump client-go and api machinerie

I had to run `make generate`.
Some API functions got additional parameters `Options` and `Context`.
I used empty options and `context.TODO()` for now.

Signed-off-by: leonnicolas <leonloechner@gmx.de>
This commit is contained in:
leonnicolas
2021-05-15 12:08:31 +02:00
parent f2c37b9de6
commit a3bf13711c
2386 changed files with 419055 additions and 183398 deletions

View File

@@ -40,15 +40,12 @@ const incomingQueueLength = 25
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
// TODO: see if this lock is needed now that new watchers go through
// the incoming channel.
lock sync.Mutex
watchers map[int64]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
stopped chan struct{}
// How large to make watcher's channel.
watchQueueLength int
@@ -68,6 +65,23 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength),
stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
@@ -96,10 +110,15 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
// The purpose of this terrible hack is so that watchers added after an event
// won't ever see that event, and will always see any event after they are
// added.
func (b *Broadcaster) blockQueue(f func()) {
func (m *Broadcaster) blockQueue(f func()) {
select {
case <-m.stopped:
return
default:
}
var wg sync.WaitGroup
wg.Add(1)
b.incoming <- Event{
m.incoming <- Event{
Type: internalRunFunctionMarker,
Object: functionFakeRuntimeObject(func() {
defer wg.Done()
@@ -111,12 +130,11 @@ func (b *Broadcaster) blockQueue(f func()) {
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
// of previous events. It will block until the watcher is actually added to the
// broadcaster.
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
@@ -127,18 +145,22 @@ func (m *Broadcaster) Watch() Interface {
}
m.watchers[id] = w
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
}
return w
}
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
// The returned watch will have a queue length that is at least large enough to accommodate
// all of the items in queuedEvents.
// all of the items in queuedEvents. It will block until the watcher is actually added to
// the broadcaster.
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
length := m.watchQueueLength
@@ -156,26 +178,29 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
w.result <- e
}
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
}
return w
}
// stopWatching stops the given watcher and removes it from the list.
func (m *Broadcaster) stopWatching(id int64) {
m.lock.Lock()
defer m.lock.Unlock()
w, ok := m.watchers[id]
if !ok {
// No need to do anything, it's already been removed from the list.
return
}
delete(m.watchers, id)
close(w.result)
m.blockQueue(func() {
w, ok := m.watchers[id]
if !ok {
// No need to do anything, it's already been removed from the list.
return
}
delete(m.watchers, id)
close(w.result)
})
}
// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Broadcaster) closeAll() {
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
close(w.result)
}
@@ -189,14 +214,29 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up. Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
select {
case m.incoming <- Event{action, obj}:
return true
default:
return false
}
}
// Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
// that since they can be buffered, this means that the watchers might not
// have received the data yet as it can remain sitting in the buffered
// channel.
// channel. It will block until the broadcaster stop request is actually executed
func (m *Broadcaster) Shutdown() {
close(m.incoming)
m.blockQueue(func() {
close(m.stopped)
close(m.incoming)
})
m.distributing.Wait()
}
@@ -217,8 +257,6 @@ func (m *Broadcaster) loop() {
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
@@ -252,6 +290,7 @@ func (mw *broadcasterWatcher) ResultChan() <-chan Event {
}
// Stop stops watching and removes mw from its list.
// It will block until the watcher stop request is actually executed
func (mw *broadcasterWatcher) Stop() {
mw.stop.Do(func() {
close(mw.stopped)

View File

@@ -17,13 +17,15 @@ limitations under the License.
package watch
import (
"fmt"
"io"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog"
)
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
@@ -39,23 +41,37 @@ type Decoder interface {
Close()
}
// Reporter hides the details of how an error is turned into a runtime.Object for
// reporting on a watch stream since this package may not import a higher level report.
type Reporter interface {
// AsObject must convert err into a valid runtime.Object for the watch stream.
AsObject(err error) runtime.Object
}
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
sync.Mutex
source Decoder
result chan Event
stopped bool
source Decoder
reporter Reporter
result chan Event
done chan struct{}
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder) *StreamWatcher {
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
source: d,
source: d,
reporter: r,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
}
go sw.receive()
return sw
@@ -71,49 +87,50 @@ func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
sw.Lock()
defer sw.Unlock()
if !sw.stopped {
sw.stopped = true
// closing a closed channel always panics, therefore check before closing
select {
case <-sw.done:
default:
close(sw.done)
sw.source.Close()
}
}
// stopping returns true if Stop() was called previously.
func (sw *StreamWatcher) stopping() bool {
sw.Lock()
defer sw.Unlock()
return sw.stopped
}
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash()
defer close(sw.result)
defer sw.Stop()
defer utilruntime.HandleCrash()
for {
action, obj, err := sw.source.Decode()
if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
msg := "Unable to decode an event from the watch stream: %v"
if net.IsProbableEOF(err) {
klog.V(5).Infof(msg, err)
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
klog.Errorf(msg, err)
select {
case <-sw.done:
case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}:
}
}
}
return
}
sw.result <- Event{
select {
case <-sw.done:
return
case sw.result <- Event{
Type: action,
Object: obj,
}:
}
}
}

View File

@@ -20,7 +20,7 @@ import (
"fmt"
"sync"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -32,8 +32,8 @@ type Interface interface {
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan Event
}
@@ -44,8 +44,11 @@ const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
)
var (
DefaultChanSize int32 = 100
)
@@ -57,6 +60,10 @@ type Event struct {
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
@@ -85,7 +92,7 @@ func (w emptyWatch) ResultChan() <-chan Event {
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type FakeWatcher struct {
result chan Event
Stopped bool
stopped bool
sync.Mutex
}
@@ -105,24 +112,24 @@ func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
if !f.stopped {
klog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
f.stopped = true
}
}
func (f *FakeWatcher) IsStopped() bool {
f.Lock()
defer f.Unlock()
return f.Stopped
return f.stopped
}
// Reset prepares the watcher to be reused.
func (f *FakeWatcher) Reset() {
f.Lock()
defer f.Unlock()
f.Stopped = false
f.stopped = false
f.result = make(chan Event)
}
@@ -269,7 +276,7 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
// ProxyWatcher lets you wrap your channel in watch Interface. threadsafe.
type ProxyWatcher struct {
result chan Event
stopCh chan struct{}