-
Notifications
You must be signed in to change notification settings - Fork 3k
Expand file tree
/
Copy pathqueue.go
More file actions
370 lines (302 loc) · 9.31 KB
/
queue.go
File metadata and controls
370 lines (302 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
)
// Deprecated: Interface is deprecated, use TypedInterface instead.
type Interface TypedInterface[any]
type TypedInterface[T comparable] interface {
Add(item T)
Len() int
Get() (item T, shutdown bool)
Done(item T)
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
// Queue is the underlying storage for items. The functions below are always
// called from the same goroutine.
type Queue[T comparable] interface {
// Touch can be hooked when an existing item is added again. This may be
// useful if the implementation allows priority change for the given item.
Touch(item T)
// Push adds a new item.
Push(item T)
// Len tells the total number of items.
Len() int
// Pop retrieves an item.
Pop() (item T)
}
// DefaultQueue is a slice based FIFO queue.
func DefaultQueue[T comparable]() Queue[T] {
return new(queue[T])
}
// queue is a slice which implements Queue.
type queue[T comparable] []T
func (q *queue[T]) Touch(item T) {}
func (q *queue[T]) Push(item T) {
*q = append(*q, item)
}
func (q *queue[T]) Len() int {
return len(*q)
}
func (q *queue[T]) Pop() (item T) {
item = (*q)[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
(*q)[0] = *new(T)
*q = (*q)[1:]
return item
}
// QueueConfig specifies optional configurations to customize an Interface.
// Deprecated: use TypedQueueConfig instead.
type QueueConfig = TypedQueueConfig[any]
type TypedQueueConfig[T comparable] struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
Queue Queue[T]
}
// New constructs a new work queue (see the package comment).
//
// Deprecated: use NewTyped instead.
func New() *Type {
return NewWithConfig(QueueConfig{
Name: "",
})
}
// NewTyped constructs a new work queue (see the package comment).
func NewTyped[T comparable]() *Typed[T] {
return NewTypedWithConfig(TypedQueueConfig[T]{
Name: "",
})
}
// NewWithConfig constructs a new workqueue with ability to
// customize different properties.
//
// Deprecated: use NewTypedWithConfig instead.
func NewWithConfig(config QueueConfig) *Type {
return NewTypedWithConfig(config)
}
// NewTypedWithConfig constructs a new workqueue with ability to
// customize different properties.
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}
// NewNamed creates a new named queue.
// Deprecated: Use NewWithConfig instead.
func NewNamed(name string) *Type {
return NewWithConfig(QueueConfig{
Name: name,
})
}
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
metricsProvider := globalMetricsProvider
if config.MetricsProvider != nil {
metricsProvider = config.MetricsProvider
}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = DefaultQueue[T]()
}
return newQueue(
config.Clock,
config.Queue,
newQueueMetrics[T](metricsProvider, config.Name, config.Clock),
updatePeriod,
)
}
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] {
t := &Typed[T]{
clock: c,
queue: queue,
dirty: sets.Set[T]{},
processing: sets.Set[T]{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
stopCh: make(chan struct{}),
}
// Don't start the goroutine for a type of noMetrics so we don't consume
// resources unnecessarily
if _, ok := metrics.(noMetrics[T]); !ok {
t.wg.Go(t.updateUnfinishedWorkLoop)
}
return t
}
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment).
// Deprecated: Use Typed instead.
type Type = Typed[any]
type Typed[t comparable] struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue Queue[t]
// dirty defines all of the items that need to be processed.
dirty sets.Set[t]
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing sets.Set[t]
cond *sync.Cond
shuttingDown bool
drain bool
metrics queueMetrics[t]
unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
// wg manages goroutines started by the queue to allow graceful shutdown
// ShutDown() will wait for goroutines to exit before returning.
wg sync.WaitGroup
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
}
// Add marks item as needing processing. When the queue is shutdown new
// items will silently be ignored and not queued or marked as dirty for
// reprocessing.
func (q *Typed[T]) Add(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.Has(item) {
// the same item is added again before it is processed, call the Touch
// function if the queue cares about it (for e.g, reset its priority)
if !q.processing.Has(item) {
q.queue.Touch(item)
}
return
}
q.metrics.add(item)
q.dirty.Insert(item)
if q.processing.Has(item) {
return
}
q.queue.Push(item)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Typed[T]) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.queue.Len()
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Typed[T]) Get() (item T, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for q.queue.Len() == 0 && !q.shuttingDown {
q.cond.Wait()
}
if q.queue.Len() == 0 {
// We must be shutting down.
return *new(T), true
}
item = q.queue.Pop()
q.metrics.get(item)
q.processing.Insert(item)
q.dirty.Delete(item)
return item, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Typed[T]) Done(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.Delete(item)
if q.dirty.Has(item) {
q.queue.Push(item)
q.cond.Signal()
} else if q.processing.Len() == 0 {
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it. Worker
// goroutines will continue processing items in the queue until it is
// empty and then receive the shutdown signal.
func (q *Typed[T]) ShutDown() {
defer q.wg.Wait()
q.stopOnce.Do(func() {
defer close(q.stopCh)
})
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.drain = false
q.shuttingDown = true
q.cond.Broadcast()
}
// ShutDownWithDrain is equivalent to ShutDown but waits until all items
// in the queue have been processed.
// ShutDown can be called after ShutDownWithDrain to force
// ShutDownWithDrain to stop waiting.
// Workers must call Done on an item after processing it, otherwise
// ShutDownWithDrain will block indefinitely.
func (q *Typed[T]) ShutDownWithDrain() {
defer q.wg.Wait()
q.stopOnce.Do(func() {
defer close(q.stopCh)
})
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.drain = true
q.shuttingDown = true
q.cond.Broadcast()
for q.processing.Len() != 0 && q.drain {
q.cond.Wait()
}
}
func (q *Typed[T]) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}
func (q *Typed[T]) updateUnfinishedWork() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
}
}
func (q *Typed[T]) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for {
select {
case <-t.C():
q.updateUnfinishedWork()
case <-q.stopCh:
return
}
}
}