Extend watch queue with a timeout and size limit#2285
Extend watch queue with a timeout and size limit#2285aaronlehmann merged 1 commit intomoby:masterfrom
Conversation
813ed1a to
14f3120
Compare
|
The vendoring error is something weird that came up through colliding merges that both vendored different things. We fixed it earlier today on master, so if you rebase, it should resolve that problem. I'm uncertain whether |
watch/queue/queue.go
Outdated
| "github.com/docker/go-events" | ||
| ) | ||
|
|
||
| func ErrQueueFullCloseFailed(err error) error { |
There was a problem hiding this comment.
This seems to be unused. If it's meant for external users, it seems like a strange function to expose.
watch/queue/queue.go
Outdated
| return fmt.Errorf("The queue size reached its limit but couldn't be closed: %s", err) | ||
| } | ||
|
|
||
| var ErrQueueFull = fmt.Errorf("The queue size reached its limit and was closed") |
There was a problem hiding this comment.
How about queue closed due to size limit?
watch/sinks.go
Outdated
| events "github.com/docker/go-events" | ||
| ) | ||
|
|
||
| var ErrSinkTimeout = fmt.Errorf("Timeout exceeded, tearing down sink") |
There was a problem hiding this comment.
Lowercase timeout in the message
watch/sinks.go
Outdated
| select { | ||
| case err := <-errChan: | ||
| return err | ||
| case <-time.After(s.timeout): |
There was a problem hiding this comment.
I'd suggest using time.NewTimer explicitly here instead of time.After, so that the timer can be stopped in the common case that the write succeeds before the timer fires. There's no functional difference, but it's a little more optimal from a resource management perspective.
watch/watch.go
Outdated
| } | ||
|
|
||
| // NewQueueWithOpts creates a new Queue using the full set of options. | ||
| func NewQueueWithOpts(opts QueueOpts) *Queue { |
There was a problem hiding this comment.
To me this is begging for functional arguments: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
watch/watch.go
Outdated
| // WatchWithCtx returns a channel where all items published to the queue will | ||
| // be received. The channel will be closed when the provided context is | ||
| // cancelled. | ||
| func (q *Queue) WatchWithCtx(ctx context.Context) (eventq chan events.Event) { |
There was a problem hiding this comment.
grpc has a variant of Dial which takes a context that they call DialContext, so WatchContext?
|
I think we could have something like this in go-events. We may want to back this with a buffered channel, depending on whether or not runtime does lazy allocation for buffered channel. |
| } | ||
|
|
||
| outChan := make(chan events.Event) | ||
| go func() { |
There was a problem hiding this comment.
I'd rather avoid an extra goroutine for every watch. This will be a large number for swarmkit, and it's already quite hard to read stack dumps. Either it should be opt-in with a special variant of CallbackWatch, or we should just return the extra channels and let the caller implement this select if it wants to.
| return nil | ||
| } | ||
|
|
||
| // Full returns a channel that is closed when the queue becomes full for the |
There was a problem hiding this comment.
What happens when it is no longer full?
Typically, "full" and "empty" act pretty racy for concurrent queues. When would this be used?
If we want to act on full, might want a clamping function.
There was a problem hiding this comment.
The channel is closed when a Write causes the queue to reach its limit. The queue can stop being full afterwards, and this channel is not meant as a mechanism for viewing the current full-ness state of the queue.
The main use case for this is to notify that least one Event has been dropped, and then it's up to the listener to determine if any action should be taken.
In the case of docker events, all API server implementations can receive a /events?since parameter to backfill past events, and the events stream is expected to be reliable in some versions of the CLI (1.13 to 17.03). Therefore, when a slow listener fills up its queue it's preferred to close their event stream entirely and have them re-establish it with an appropriate since parameter, rather than silently dropping events.
There was a problem hiding this comment.
Couldn't this be handled with a callback on each dropped message? This seems very fragile.
14f3120 to
1bba22a
Compare
|
Rebased off of master and added a commit which addresses all review comments until this point. |
| // debug log messages that may be confusing. It is possible that the queue | ||
| // will try to write an event to its destination channel while the queue is | ||
| // being removed from the broadcaster. Since the channel is closed before the | ||
| // queue, there is a narrow window when this is possible. In some event-based |
There was a problem hiding this comment.
"some event-based systems"?
| for { | ||
| <-ch.C | ||
| } | ||
| }() |
There was a problem hiding this comment.
Looks like this test will leak a goroutine; is ch.C ever closed? If not, can you add another channel for this goroutine to select on so it will terminate when the test is finished?
watch/sinks_test.go
Outdated
| // Make sure that closing a sink closes the channel | ||
| var errClose error | ||
| go func() { | ||
| errClose = sink.Close() |
There was a problem hiding this comment.
Why is this in a separate goroutine?
watch/watch.go
Outdated
| cancelFuncs map[events.Sink]func() | ||
|
|
||
| // closeOutChan indicates whether the watchers' channels should be closed | ||
| // when a watcher queue reaches its limit or when |
watch/watch.go
Outdated
| for _, option := range options { | ||
| err := option(q) | ||
| if err != nil { | ||
| logrus.Warnf("Failed to apply options to queue: %s", err) |
There was a problem hiding this comment.
I guess you don't want to change the signature of NewQueue, so how about a panic here instead?
watch/watch.go
Outdated
| } | ||
|
|
||
| if q.closeOutChan && q.limit == 0 { | ||
| logrus.Warnf("Unable to create queue with zero size limit and closeOutChan") |
There was a problem hiding this comment.
Hmm, I know I suggested this, but looking at the code I'm not sure there's any reason it can't be supported. It would just inhibit the optimization of returning ch.C (which is fine). Am I missing anything?
|
@alexmavr Any plans to PR this to go-events? This really belongs there as a primitive. |
|
@stevvooe |
watch/watch.go
Outdated
|
|
||
| // NewTimeoutLimitQueue creates a queue with a size limit and a request write | ||
| // timeout. | ||
| func NewTimeoutLimitQueue(timeout time.Duration, limit uint64) *Queue { |
There was a problem hiding this comment.
Let's instead define functions like
func WithTimeout(timeout time.Duration) func(*Queue) error
func WithLimit(limit uint64) func(*Queue) error
func WithCloseOutChan(closeOutChan bool) func(*Queue) errorThen you can create a queue with something like:
NewQueue(WithTimeout(30*time.Second), WithCloseOutChan(true))It's a bit more flexible that way.
|
Please sign your commits following these rules: $ git clone -b "watch-extensions" git@github.com:alexmavr/swarmkit.git somewhere
$ cd somewhere
$ git rebase -i HEAD~842353884240
editor opens
change each 'pick' to 'edit'
save the file and quit
$ git commit --amend -s --no-edit
$ git rebase --continue # and repeat the amend for each commit
$ git push -fAmending updates the existing PR. You DO NOT need to open a new one. |
Codecov Report
@@ Coverage Diff @@
## master #2285 +/- ##
==========================================
+ Coverage 60.96% 61.04% +0.08%
==========================================
Files 126 128 +2
Lines 20391 20531 +140
==========================================
+ Hits 12431 12534 +103
- Misses 6589 6632 +43
+ Partials 1371 1365 -6 |
f043ff7 to
9ce0554
Compare
So, the bounded queue model makes a lot of sense in go-events and I think we can do it with less code (maybe). I understand the time pressure, but I think if we are making a commitment to quality, we should try to land this in the right place, rather than defer to later. Once it is tested and in the code base, there will be little incentive to move it over. |
| go func() { | ||
| errClose = sink.Close() | ||
| }() | ||
| errClose = sink.Close() |
There was a problem hiding this comment.
minor: errClose := sink.Close() instead of predeclaring
| require.NoError(errClose) | ||
|
|
||
| // Close the leaking goroutine | ||
| close(doneChan) |
There was a problem hiding this comment.
Minor: defer close(doneChan) right after the channel is created.
| // If a size of 0 is provided, the LimitQueue is considered limitless. | ||
| type LimitQueue struct { | ||
| dst events.Sink | ||
| events *list.List |
There was a problem hiding this comment.
I'm not sure how much time you have to make this work, but if you can instrument the entrance and exit of a regular queue, you can avoid having to replicate all of this logic.
| if !eq.fullClosed { | ||
| eq.fullClosed = true | ||
| close(eq.full) | ||
| } |
There was a problem hiding this comment.
I think we can do a callback here. That would avoid spilling the internal channel manipulation outside of the internals.
Make sure to release the locks, then return ErrQueueFull. That will allow writer and out of band notification. It also doesn't poison the queue.
There was a problem hiding this comment.
I agree that a callback would be a better pattern here. Let's do it this way for the go-events followup
| } | ||
|
|
||
| if err := eq.dst.Write(event); err != nil { | ||
| // TODO(aaronl): Dropping events could be bad depending |
There was a problem hiding this comment.
@aaronlehmann Did we not remove this error message?
There was a problem hiding this comment.
It's suppressed by a wrapper sink in swarmkit. Discussion here: docker/go-events#11
watch/queue/queue_test.go
Outdated
| closed bool | ||
| holdChan chan struct{} | ||
| data []events.Event | ||
| mutex *sync.Mutex |
There was a problem hiding this comment.
just do mutex sync.Mutex; then it doesn't need to be initialized.
| } | ||
|
|
||
| func TestLimitQueueNoLimit(t *testing.T) { | ||
| require := require.New(t) |
There was a problem hiding this comment.
That's cool; I didn't know about this feature.
watch/watch_test.go
Outdated
| go func() { | ||
| closed := false | ||
| for range events { | ||
| // After receiving the first event, block indefinitely |
watch/watch_test.go
Outdated
|
|
||
| doneChan = make(chan struct{}) | ||
| go func() { | ||
| for !eventsClosed { |
There was a problem hiding this comment.
I think the race detector will consider this a data race. If we make it a channel that gets closed, it avoids the problem. I'd kind of like to unify this goroutine with the select below anyway. How about something like this:
timeoutTimer := time.NewTimer(time.Minute)
defer timeoutTimer.Stop()
selectLoop:
for {
select {
case <-eventsClosed:
break selectLoop
case <-time.After(writerSleepDuration):
q.Publish("new event")
case <-timeoutTimer.C:
require.Fail("Timeout exceeded")
}
}There was a problem hiding this comment.
That's a great way to restructure this, thanks for the insight.
There was a problem hiding this comment.
Didn't look into details, but sync.Once may help in certain areas...
| return len(s.data) | ||
| } | ||
|
|
||
| func (s *mockSink) String() string { |
There was a problem hiding this comment.
Do you have to take the lock here to make the race detector happy?
Signed-off-by: Alex Mavrogiannis <alex.mavrogiannis@docker.com>
8dbfddb to
2d31d73
Compare
|
Addressed final comments and squashed |
|
LGTM |
1 similar comment
|
LGTM |
- moby/swarmkit#2266 (support for templating Node.Hostname in docker executor) - moby/swarmkit#2281 (change restore action on objects to be update, not delete/create) - moby/swarmkit#2285 (extend watch queue with timeout and size limit) - moby/swarmkit#2253 (version-aware failure tracking in the scheduler) - moby/swarmkit#2275 (update containerd and port executor to container client library) - moby/swarmkit#2292 (rename some generic resources) - moby/swarmkit#2300 (limit the size of the external CA response) - moby/swarmkit#2301 (delete global tasks when the node running them is deleted) Minor cleanups, dependency bumps, and vendoring: - moby/swarmkit#2271 - moby/swarmkit#2279 - moby/swarmkit#2283 - moby/swarmkit#2282 - moby/swarmkit#2274 - moby/swarmkit#2296 (dependency bump of etcd, go-winio) Signed-off-by: Ying Li <ying.li@docker.com> Upstream-commit: 4509a00 Component: engine
- moby/swarmkit#2266 (support for templating Node.Hostname in docker executor) - moby/swarmkit#2281 (change restore action on objects to be update, not delete/create) - moby/swarmkit#2285 (extend watch queue with timeout and size limit) - moby/swarmkit#2253 (version-aware failure tracking in the scheduler) - moby/swarmkit#2275 (update containerd and port executor to container client library) - moby/swarmkit#2292 (rename some generic resources) - moby/swarmkit#2300 (limit the size of the external CA response) - moby/swarmkit#2301 (delete global tasks when the node running them is deleted) Minor cleanups, dependency bumps, and vendoring: - moby/swarmkit#2271 - moby/swarmkit#2279 - moby/swarmkit#2283 - moby/swarmkit#2282 - moby/swarmkit#2274 - moby/swarmkit#2296 (dependency bump of etcd, go-winio) Signed-off-by: Ying Li <ying.li@docker.com> Upstream-commit: 4509a00 Component: engine
This PR extends the
watchpackage with the following items:LimitQueue. This is near-identical to the implementation of a Queue from https://github.com/docker/go-events/blob/master/queue.go with the difference that the queue has an upper size limit. When that limit is reached, a channel is closed and it's up to the user to determine the desired behavior from thereTimeoutSinkwhich wraps another sink with a timeout. If the timeout is reached, the wrapped sink is closed and an error is returned to the writer.ChannelSinkGeneratorinterface which can be used to configure the sink-chain thatwatch.Queuecreates for each watcher.watch.QueueNewTimeoutLimitQueue, or the more genericNewQueueWithOpts. Edit: Feature replaced with a functional options constructorTo avoid disruption on existing users, the existing
NewQueueconstructor will return the previous configuration of sinks.Unit test times:
cc @aaronlehmann @nishanttotla
Signed-off-by: Alex Mavrogiannis alex.mavrogiannis@docker.com