changefeedccl: webhook and pubsub sink refactor#95369
Open
samiskin wants to merge 5 commits intocockroachdb:masterfrom
Open
changefeedccl: webhook and pubsub sink refactor#95369samiskin wants to merge 5 commits intocockroachdb:masterfrom
samiskin wants to merge 5 commits intocockroachdb:masterfrom
Conversation
Member
be8065a to
4281bc0
Compare
4281bc0 to
2728234
Compare
e0fd089 to
065de41
Compare
miretskiy
suggested changes
Jan 19, 2023
Contributor
miretskiy
left a comment
There was a problem hiding this comment.
I have not reviewed this code; The PR description alone is massive.
I think the scope and the scale of changes, code wise, but also complexity wise is significant and concerning.
Let's meet/discuss in person.
The webhook sink and pubsub sink both lagged far behind other sinks in terms of maximum throughput due to their parallelization architecture (as low as only a few thousand messages per second). This PR creates a new framework for processing events for a sink. The pubsub sink was also lacking metrics that the other sinks emitted. SinkClients are abstractions over all sink-specific logic where batches of messages can be Encoded into a payload that is able to be Emitted to the external resource. batchingSinkEmitter handles batching events together and flushing them upon reaching a size or frequency limit. parallelSinkEmitter instantiates multiple batching emitters across many worker goroutines to parallelize the load, sharded on the event key. flushingSinkEmitter allows blocking on all events draining out of the parallel emitter. parallelBatchingSink puts them all together into a complete Sink implementation with the appropriate EmitRow/Flush/... methods. Both webhook and pubsub sinks end up as instantiations of the general parallelBatchingSink with their own SinkClient implemmentations. With batch sizes of 100 on a three node 16 cpu cluster on tpcc.order_line the sinks can reach almost 400k messages per second. Release note (performance improvement): Changefeeds emitting to webhook and gcpubsub endpoints are now able to emit at significantly higher throughput.
065de41 to
daacee0
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-19355
This PR implements both the Webhook and Pubsub sinks as instances of a new framework for setting up an event pipeline for a changefeed aggregator.
Reviewing suggestions:
Sink-specific code is encapsulated in a
SinkClientinterfaceEach SinkClient implementation describes how to turn a slice of kvs or a resolved message into some sink-specific package that is ready to be emitted to the external sink.
It'd be nice if a Sink could've just been like a buffer with Append() and Flush(), however:
EncodeBatchtakes in a single separatetopicparameter.The Pubsub sink was moved to use the gcpubsub
PublisherClientto allow for us to control the batches directly, and as a result the testfeed had to be moved to use a mock pubsub server instead.The coalescing of individual messages into batches, followed by the encoding and emitting of these batches via a SinkClient, is handled by a
batchingSinkEmitterA given
batchingSinkEmitterhas a single explicit topic associated with it due to the aforementioned detail of topics being their own endpoints that need their own unique batches.Messages entering through
Emitare sent to the BatchWorker in order to be combined into an appropriately sized list of messages. The combining into batches work has to be in its own goroutine since the Flushing has to be triggerable by either a size limit or a timer, so it needs to be able to block on a timer without blocking the caller ofEmit().Once a batch is to be flushed, its encoded into a
SinkPayloadand then sent to the EmitWorker who's job is solely the I/O of callingEmitPayload. This is done in its own goroutine so that while its blocked on I/O the next batch of messages can continue to be built up.Confirmation of a successful emit is sent via the
successChproperty of the emitter (successCh <- numFlushed). Failures are communicated through anerrorCh, and upon the first failure all future emits are ignored to avoid ordering issues. The reason for using channels here will be covered in theFlushingEmittersection.Since batchers are topic-specific, any future logic around dynamically altering behaviour such as batch sizes and throughput will be specific to that topic, which I think is a benefit to this setup. Different topics can have different configurations and different limitations, and this can allow for each batcher to be optimal to its specific endpoint.
Emitting to a sink is largely an I/O bound task, and is served well by being done in parallel. A given
batchingSinkEmittercannot emit events in parallel as it must maintain per-key ordering. Initially I just had the batching done within eachparallelEventConsumerworker since it is already parallel and it'd be nice to not have an extra fan out step, however the throughput ended up being a lot lower with it only being 8 goroutines, and I was wary of just increasing that number since thekvEventToRowConsumeris more of a CPU-bound task that may also require a notable amount of memory. The idea of tying these two together felt like it'd hurt us eventually, especially with how many goroutines gcpubsub uses by default (25 * runtime.GOMAXPROCS(0).So given that we need sink processing to be its own fan out, we have a
parallelSinkEmitterthat emits to instances ofSinkEmitters across many parallel workers.This creates
numWorkerworker goroutines and passes events to them sharded based on key. Each worker then creates a unique instance of aSinkEmitterfor each topic that passes through it (created via atopicEmitterFactory(topic)call). This means that the total maxbatchingSinkEmitters that can be active will benumWorkers * number of topics. Its unfortunate that it'd also scale based on the number of topics but I couldn't think of much way around this though, since I think at leastnumWorkersgoroutines should be usable by any topic at once since there may be times where only a single topic is having a ton of activity.Similar to the batching emitter, successes and errors are communicated via channels, which will be explained in the next section.
The
sinkEventstruct that's been passed through each stage encapsulates the three types of messages that are relevantTriggering a Flush is done by sending a specific
shouldFlushtype ofsinkEventmessage through the entire pipeline. By sending it through the same channels the other messages go through, it is guaranteed that the processing of that Flush message will also happen after all the messages that've been sent so far. The only step that cares about a Flush request is thebatchingSinkEmitter, which upon receiving it flushes out a batch even if its not full and its Frequency timer hasn't fired yet.The purpose of the
successCh chan intchannel is to be able to block until all emitted messages have been fully sent, which is required by a sink'sFlush()call.Its unfortunate that some parallelization logic had to be duplicated between
event_processingandparallel_sink_emitter, but I hoped I could at least share theFlush()inflight tracking ofevent_processingby having the pipeline use asuccessChof "n messages successfully flushed" and watch that channel to decrementevent_processing'sinFlightcounter. That way itsFlush()could just block until the entire pipeline completed, not just its own pipeline.This worked all the way up to
Flush()not only needing to block until all inflight messages have completed but also needing to trigger all messages to flush out without waiting (i.e. force flushing the unfinished batches waiting for a Frequency timer to trigger). I need to be able to both ensure that all messages within theevent_processingworkers have flushed out to the sink pipeline, and then after that point I can send the flush request through the second parallelism stage since it has all the relevant messages in flight. This means that both parallel stages require their own Flush stage.As a result, there is also a
FlushingEmitterstage that wraps anAsyncEmitterto watch itsSuccesses()andErrors()channel to implement aFlush() errorfunction that triggers a flush and returns an error if any messages errored.Even though we need multiple parallelism and flush stages, one pair for encoding KVs and another for processing those encoded values, it should be easy to share the same code across them both in followup work to unify the rest of the code as the abstractions apply in the same way.
So far we have a
FlushingEmitterwhich allows the flushing of anAsyncEmitterwhich allows the parallel instantiation of multipleSinkEmitterswhich emit via aSinkClient. In order to make this an actualSinkimplementation though, we need one more wrapper.You can click on this image to view a larger diagram of the setup
Performance
Pubsub Throughput with default config (batches of 100) on 3 nodes, 16 CPUs each, on the
tpcc.order_linetable, with 32 workers:Webhook Throughput with default config (no batching) on 3 nodes, 16 CPUs each, on the
tpcc.order_linetable, with 32 workers and 16MaxConnsPerHost:Webhook Throughput with batches of 100 on 3 nodes, 16 CPUs each, on the
tpcc.order_linetable, with 32 workers:(For context, Kafka with default config would be around 205k messages per second at ~27% CPU)
Tuning Followup
With respect to configuration, there's now a number of new numbers that can have different values:
parallelSinkEmitterThere's both concerns around throughput as well as CPU usage to be considered. The current numbers are selected such that the CPU usage was around 30% at most.
Performance wise, due to requiring the CPU usage to be that low, these don't match Kafka just yet. Further investigation and tuning will be done during Stability period. For now I'd rather err on lower CPU usage with solid throughput rather than the highest throughput possible.
Turning on the Pacer resulted in strange behaviour where after a while the throughput would gradually tank to around 1000 messages per second. It is disabled by default and I plan to figure this out in the stability period.
Refactoring Followup
I deliberately made this work all require 0 changes at and above the
event_processinglayer, its just anotherSinkimplementation. Ideally this could encompass all the sinks at once, but I didn't go that far for the following reasons:I think work to unify the Kafka sink under this structure can be justified in the near future by using the implementation of proper dynamic batching as an excuse. Parquet may come under it as well due to upcoming optimization work, and then Cloudstorage may not be that big a change to include if Parquet is already handled.
Release note (performance improvement): Changefeeds emitting to webhook
and gcpubsub endpoints are now able to emit at significantly higher
throughput.