changefeedccl: Rework webhook sink flushing implementation.#69223
changefeedccl: Rework webhook sink flushing implementation.#69223craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
stevendanna
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @miretskiy and @spiffyyeng)
a discussion (no related file):
Nice how the addition of the batcher created a nice place from which to coordinate this.
pkg/ccl/changefeedccl/sink_webhook.go, line 509 at r1 (raw file):
return } alloc.Release(s.workerCtx)
Do we not need to release this in the error case? I suppose not since we'll be relying on the whole changefeed restarting and all pending allocations being released.
pkg/ccl/changefeedccl/sink_webhook.go, line 564 at r1 (raw file):
// drainWithError saves the first error message encounted by webhook workers, // and requests all workers to terminate. func (s *webhookSink) drainWithError(err error) {
[nit] drain to me implies that we would process in-flight messages (i.e. drain the queue), but exitWorkers() is going to abandon those messages (which is fine, but perhaps exitWorkersWithError(err error) would be more accurate.
pkg/ccl/changefeedccl/sink_webhook.go, line 574 at r1 (raw file):
} // sinkError checks if inflightTracker has an error on the buffer and returns
[nit] inflightTracker is no more
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @spiffyyeng and @stevendanna)
a discussion (no related file):
Previously, stevendanna (Steven Danna) wrote…
:lgtm: Nice how the addition of the batcher created a nice place from which to coordinate this.
Ack.
pkg/ccl/changefeedccl/sink_webhook.go, line 509 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Do we not need to release this in the error case? I suppose not since we'll be relying on the whole changefeed restarting and all pending allocations being released.
no; we will return an error to the caller either during close or next call to flush/emit. Either way, all memory gets released when we restart.
pkg/ccl/changefeedccl/sink_webhook.go, line 564 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
[nit]
drainto me implies that we would process in-flight messages (i.e. drain the queue), but exitWorkers() is going to abandon those messages (which is fine, but perhapsexitWorkersWithError(err error)would be more accurate.
ack.
pkg/ccl/changefeedccl/sink_webhook.go, line 574 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
[nit] inflightTracker is no more
Done.
bc13fbd to
e7045bb
Compare
Stop relying on wait group to implement flush logic in webhook sink. The wait group does not respect context cancellation. Because of that, it is possible that the caller blocks, waiting for Flush to complete, while immediately after blocking, the context is cancelled. When this happens the go routines running responsible for processing the messages may terminate prior to decrementing wait group counts. Instead of relying on waitgroup, use synchronization provided by the channels themselves, and introduce a new type of worker request (flush) which correctly flushes and waits for flush to complete, while respecting context cancellation. Fixes cockroachdb#69175 Release Notes: None
|
bors r+ |
|
Build succeeded: |
Stop relying on wait group to implement flush logic in webhook sink.
The wait group does not respect context cancellation. Because of that,
it is possible that the caller blocks, waiting for Flush to complete,
while immediately after blocking, the context is cancelled.
When this happens the go routines running responsible for processing
the messages may terminate prior to decrementing wait group counts.
Instead of relying on waitgroup, use synchronization provided by the
channels themselves, and introduce a new type of worker request (flush)
which correctly flushes and waits for flush to complete, while respecting
context cancellation.
Fixes #69175
Release Notes: None