Skip to content

changefeedccl: Rework webhook sink flushing implementation.#69223

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
miretskiy:webhook
Aug 23, 2021
Merged

changefeedccl: Rework webhook sink flushing implementation.#69223
craig[bot] merged 1 commit intocockroachdb:masterfrom
miretskiy:webhook

Conversation

@miretskiy
Copy link
Copy Markdown
Contributor

Stop relying on wait group to implement flush logic in webhook sink.
The wait group does not respect context cancellation. Because of that,
it is possible that the caller blocks, waiting for Flush to complete,
while immediately after blocking, the context is cancelled.
When this happens the go routines running responsible for processing
the messages may terminate prior to decrementing wait group counts.

Instead of relying on waitgroup, use synchronization provided by the
channels themselves, and introduce a new type of worker request (flush)
which correctly flushes and waits for flush to complete, while respecting
context cancellation.

Fixes #69175

Release Notes: None

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy and @spiffyyeng)

a discussion (no related file):
:lgtm: Nice how the addition of the batcher created a nice place from which to coordinate this.



pkg/ccl/changefeedccl/sink_webhook.go, line 509 at r1 (raw file):

				return
			}
			alloc.Release(s.workerCtx)

Do we not need to release this in the error case? I suppose not since we'll be relying on the whole changefeed restarting and all pending allocations being released.


pkg/ccl/changefeedccl/sink_webhook.go, line 564 at r1 (raw file):

// drainWithError saves the first error message encounted by webhook workers,
// and requests all workers to terminate.
func (s *webhookSink) drainWithError(err error) {

[nit] drain to me implies that we would process in-flight messages (i.e. drain the queue), but exitWorkers() is going to abandon those messages (which is fine, but perhaps exitWorkersWithError(err error) would be more accurate.


pkg/ccl/changefeedccl/sink_webhook.go, line 574 at r1 (raw file):

}

// sinkError checks if inflightTracker has an error on the buffer and returns

[nit] inflightTracker is no more

@miretskiy miretskiy requested a review from stevendanna August 23, 2021 14:59
Copy link
Copy Markdown
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @spiffyyeng and @stevendanna)

a discussion (no related file):

Previously, stevendanna (Steven Danna) wrote…

:lgtm: Nice how the addition of the batcher created a nice place from which to coordinate this.

Ack.



pkg/ccl/changefeedccl/sink_webhook.go, line 509 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Do we not need to release this in the error case? I suppose not since we'll be relying on the whole changefeed restarting and all pending allocations being released.

no; we will return an error to the caller either during close or next call to flush/emit. Either way, all memory gets released when we restart.


pkg/ccl/changefeedccl/sink_webhook.go, line 564 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

[nit] drain to me implies that we would process in-flight messages (i.e. drain the queue), but exitWorkers() is going to abandon those messages (which is fine, but perhaps exitWorkersWithError(err error) would be more accurate.

ack.


pkg/ccl/changefeedccl/sink_webhook.go, line 574 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

[nit] inflightTracker is no more

Done.

@miretskiy miretskiy force-pushed the webhook branch 3 times, most recently from bc13fbd to e7045bb Compare August 23, 2021 15:04
Stop relying on wait group to implement flush logic in webhook sink.
The wait group does not respect context cancellation.  Because of that,
it is possible that the caller blocks, waiting for Flush to complete,
while immediately after blocking, the context is cancelled.
When this happens the go routines running responsible for processing
the messages may terminate prior to decrementing wait group counts.

Instead of relying on waitgroup, use synchronization provided by the
channels themselves, and introduce a new type of worker request (flush)
which correctly flushes and waits for flush to complete, while respecting
context cancellation.

Fixes cockroachdb#69175

Release Notes: None
@miretskiy
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Aug 23, 2021

Build succeeded:

@craig craig bot merged commit afaa938 into cockroachdb:master Aug 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

changefeedccl: TestChangefeedNemeses/webhook - 887.84s

4 participants