Skip to content

changefeedccl: webhook and pubsub sink refactor#95369

Open
samiskin wants to merge 5 commits intocockroachdb:masterfrom
samiskin:webhook-pubsub-sink-refactor
Open

changefeedccl: webhook and pubsub sink refactor#95369
samiskin wants to merge 5 commits intocockroachdb:masterfrom
samiskin:webhook-pubsub-sink-refactor

Conversation

@samiskin
Copy link
Copy Markdown
Contributor

@samiskin samiskin commented Jan 17, 2023

Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-19355

This PR implements both the Webhook and Pubsub sinks as instances of a new framework for setting up an event pipeline for a changefeed aggregator.

Reviewing suggestions:

  • Read this full description first
  • For the initial review, use the individual commits as groupings for changes. The last commit is just generated files but it has the current commit message that they'll all merge into.
  • For sink_pubsub and sink_webhook changes just view the full file rather than the diff

Sink-specific code is encapsulated in a SinkClient interface

type SinkClient interface {
	EncodeBatch(topic string, batch []messagePayload) (SinkPayload, error)
	EncodeResolvedMessage(resolvedMessagePayload) (SinkPayload, error)
	EmitPayload(SinkPayload) error
	Close() error
}

type SinkPayload interface{}

Each SinkClient implementation describes how to turn a slice of kvs or a resolved message into some sink-specific package that is ready to be emitted to the external sink.

It'd be nice if a Sink could've just been like a buffer with Append() and Flush(), however:

  1. There's work that goes into taking a completed batch and encoding it into a package that can be emitted to the sink. I wanted to keep the Emitting work as a purely IO task that can be retried without wasting any effort. In order to do this I split things up into an EncodeBatch and an EmitPayload step.
  2. Resolved messages can't be mixed with batches of kvs, so it needs to be treated differently and has its own Encode method.
  3. While the Webhook sink emits the same regardless of topic, for Pubsub and Kafka endpoints you can't just emit a batch of messages across topics at once. Each topic is its own endpoint to emit a given batch to. In order to enforce this "batches must be for a single topic", EncodeBatch takes in a single separate topic parameter.

The Pubsub sink was moved to use the gcpubsub PublisherClient to allow for us to control the batches directly, and as a result the testfeed had to be moved to use a mock pubsub server instead.


The coalescing of individual messages into batches, followed by the encoding and emitting of these batches via a SinkClient, is handled by a batchingSinkEmitter

type SinkEmitter interface {
	Emit(*sinkEvent)
	Close()
}

type batchingSinkEmitter struct {
	client    SinkClient
	topic string
	batchCfg  sinkBatchConfig
	retryOpts retry.Options
	
	successCh chan int
	errorCh   chan error
         ...
}

func (bs *batchingSinkEmitter) Emit(payload *sinkEvent) { ... }
func (bs *batchingSinkEmitter) startBatchWorker() error { ... }
func (bs *batchingSinkEmitter) startEmitWorker() error { ... }

A given batchingSinkEmitter has a single explicit topic associated with it due to the aforementioned detail of topics being their own endpoints that need their own unique batches.

Messages entering through Emit are sent to the BatchWorker in order to be combined into an appropriately sized list of messages. The combining into batches work has to be in its own goroutine since the Flushing has to be triggerable by either a size limit or a timer, so it needs to be able to block on a timer without blocking the caller of Emit().

Once a batch is to be flushed, its encoded into a SinkPayload and then sent to the EmitWorker who's job is solely the I/O of calling EmitPayload. This is done in its own goroutine so that while its blocked on I/O the next batch of messages can continue to be built up.

Confirmation of a successful emit is sent via the successCh property of the emitter (successCh <- numFlushed). Failures are communicated through an errorCh, and upon the first failure all future emits are ignored to avoid ordering issues. The reason for using channels here will be covered in the FlushingEmitter section.

Since batchers are topic-specific, any future logic around dynamically altering behaviour such as batch sizes and throughput will be specific to that topic, which I think is a benefit to this setup. Different topics can have different configurations and different limitations, and this can allow for each batcher to be optimal to its specific endpoint.


Emitting to a sink is largely an I/O bound task, and is served well by being done in parallel. A given batchingSinkEmitter cannot emit events in parallel as it must maintain per-key ordering. Initially I just had the batching done within each parallelEventConsumer worker since it is already parallel and it'd be nice to not have an extra fan out step, however the throughput ended up being a lot lower with it only being 8 goroutines, and I was wary of just increasing that number since the kvEventToRowConsumer is more of a CPU-bound task that may also require a notable amount of memory. The idea of tying these two together felt like it'd hurt us eventually, especially with how many goroutines gcpubsub uses by default (25 * runtime.GOMAXPROCS(0).

So given that we need sink processing to be its own fan out, we have a parallelSinkEmitter that emits to instances of SinkEmitters across many parallel workers.

type AsyncEmitter interface {
	SinkEmitter
	Successes() chan int
	Errors() chan error
}

type parallelSinkEmitter struct {
	topicEmitterFactory TopicEmitterFactory
	numWorkers int64
	successCh chan int
	errorCh   chan error
        ...
}
type TopicEmitterFactory = func(topic string, successCh chan int, errorCh chan error) SinkEmitter

func (pse *parallelSinkEmitter) Emit(payload *sinkEvent) {...}
func (pse *parallelSinkEmitter) Successes() chan int {...}
func (pse *parallelSinkEmitter) Errors() chan error {...}
func (pse *parallelSinkEmitter) workerLoop(input chan *sinkEvent) error {...}

This creates numWorker worker goroutines and passes events to them sharded based on key. Each worker then creates a unique instance of a SinkEmitter for each topic that passes through it (created via a topicEmitterFactory(topic) call). This means that the total max batchingSinkEmitters that can be active will be numWorkers * number of topics. Its unfortunate that it'd also scale based on the number of topics but I couldn't think of much way around this though, since I think at least numWorkers goroutines should be usable by any topic at once since there may be times where only a single topic is having a ton of activity.

Similar to the batching emitter, successes and errors are communicated via channels, which will be explained in the next section.


The sinkEvent struct that's been passed through each stage encapsulates the three types of messages that are relevant

type sinkEvent struct {
	// Set if its a Flush event
	shouldFlush bool

	// Set if its a Resolved event
	resolved *resolvedMessagePayload

	// Set if its a KV Event
	msg   messagePayload
	alloc kvevent.Alloc
	mvcc  hlc.Timestamp
}

Triggering a Flush is done by sending a specific shouldFlush type of sinkEvent message through the entire pipeline. By sending it through the same channels the other messages go through, it is guaranteed that the processing of that Flush message will also happen after all the messages that've been sent so far. The only step that cares about a Flush request is the batchingSinkEmitter, which upon receiving it flushes out a batch even if its not full and its Frequency timer hasn't fired yet.

The purpose of the successCh chan int channel is to be able to block until all emitted messages have been fully sent, which is required by a sink's Flush() call.

Its unfortunate that some parallelization logic had to be duplicated between event_processing and parallel_sink_emitter, but I hoped I could at least share the Flush() inflight tracking of event_processing by having the pipeline use a successCh of "n messages successfully flushed" and watch that channel to decrement event_processing's inFlight counter. That way its Flush() could just block until the entire pipeline completed, not just its own pipeline.

This worked all the way up to Flush() not only needing to block until all inflight messages have completed but also needing to trigger all messages to flush out without waiting (i.e. force flushing the unfinished batches waiting for a Frequency timer to trigger). I need to be able to both ensure that all messages within the event_processing workers have flushed out to the sink pipeline, and then after that point I can send the flush request through the second parallelism stage since it has all the relevant messages in flight. This means that both parallel stages require their own Flush stage.

As a result, there is also a FlushingEmitter stage that wraps an AsyncEmitter to watch its Successes() and Errors() channel to implement a Flush() error function that triggers a flush and returns an error if any messages errored.

type FlushingEmitter interface {
	SinkEmitter
	Flush() error
}

type flushingSinkEmitter struct {
	wrapped AsyncEmitter
         ...
}

func (fs *flushingSinkEmitter) Emit(event *sinkEvent) {...}
func (fs *flushingSinkEmitter) Flush() error {...}
func (fs *flushingSinkEmitter) emitConfirmationWorker(ctx context.Context) {...}

Even though we need multiple parallelism and flush stages, one pair for encoding KVs and another for processing those encoded values, it should be easy to share the same code across them both in followup work to unify the rest of the code as the abstractions apply in the same way.


So far we have a FlushingEmitter which allows the flushing of an AsyncEmitter which allows the parallel instantiation of multiple SinkEmitters which emit via a SinkClient. In order to make this an actual Sink implementation though, we need one more wrapper.

type parallelBatchingSink struct {
	emitter FlushingEmitter

	client       SinkClient
	topicNamer   *TopicNamer
	concreteType sinkType
}

func makeParallelBatchingSink(
	client SinkClient,
	batchCfg sinkBatchConfig,
	retryOpts retry.Options,
         ...
) Sink {
	batchingEmitterFactory := func(topic string, successCh chan int, errorCh chan error) SinkEmitter {
		return makeBatchingSinkEmitter(..., topic, successCh, errorCh)
	}
	parallelEmitter := makeParallelSinkEmitter(..., batchingEmitterFactory)
	flushingEmitter := makeFlushingEmitter(..., parallelEmitter)

	return &parallelBatchingSink{..., flushingEmitter, client}
}

You can click on this image to view a larger diagram of the setup

image


Performance

Pubsub Throughput with default config (batches of 100) on 3 nodes, 16 CPUs each, on the tpcc.order_line table, with 32 workers:

Screenshot 2023-01-17 at 1 42 44 AM

Webhook Throughput with default config (no batching) on 3 nodes, 16 CPUs each, on the tpcc.order_line table, with 32 workers and 16 MaxConnsPerHost:

Screenshot 2023-01-17 at 10 39 06 AM

Webhook Throughput with batches of 100 on 3 nodes, 16 CPUs each, on the tpcc.order_line table, with 32 workers:

Screenshot 2023-01-17 at 10 37 03 AM

(For context, Kafka with default config would be around 205k messages per second at ~27% CPU)


Tuning Followup

With respect to configuration, there's now a number of new numbers that can have different values:

  • Number of parallel workers in the parallelSinkEmitter
  • Sizes of the buffers for each channel
  • Default batching config for webhook and pubsub
  • MaxConnsPerHost for the Webhook http client

There's both concerns around throughput as well as CPU usage to be considered. The current numbers are selected such that the CPU usage was around 30% at most.

Performance wise, due to requiring the CPU usage to be that low, these don't match Kafka just yet. Further investigation and tuning will be done during Stability period. For now I'd rather err on lower CPU usage with solid throughput rather than the highest throughput possible.

Turning on the Pacer resulted in strange behaviour where after a while the throughput would gradually tank to around 1000 messages per second. It is disabled by default and I plan to figure this out in the stability period.


Refactoring Followup

I deliberately made this work all require 0 changes at and above the event_processing layer, its just another Sink implementation. Ideally this could encompass all the sinks at once, but I didn't go that far for the following reasons:

  1. There's currently ongoing work such as making the Kafka sink react to rate limits that I didn't want to conflict with
  2. Insights from trying to optimize the Parquet sink may result in desired architectural changes and increasing the surface area of it would make those changes more work.
  3. It didn't end up allowing for fewer goroutines / simplicity, since I needed another parallel stage regardless and batching needed its own goroutines as well.
  4. De-risking this PR and lowering the time requirements, to allow for more time to complete other important work for the release and have less chance of bugs to deal with.

I think work to unify the Kafka sink under this structure can be justified in the near future by using the implementation of proper dynamic batching as an excuse. Parquet may come under it as well due to upcoming optimization work, and then Cloudstorage may not be that big a change to include if Parquet is already handled.


Release note (performance improvement): Changefeeds emitting to webhook
and gcpubsub endpoints are now able to emit at significantly higher
throughput.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@samiskin samiskin force-pushed the webhook-pubsub-sink-refactor branch 4 times, most recently from be8065a to 4281bc0 Compare January 17, 2023 18:00
@samiskin samiskin marked this pull request as ready for review January 17, 2023 18:06
@samiskin samiskin requested review from a team as code owners January 17, 2023 18:06
@samiskin samiskin requested review from HonoreDB, jayshrivastava, miretskiy, renatolabs and srosenberg and removed request for a team January 17, 2023 18:06
@samiskin samiskin force-pushed the webhook-pubsub-sink-refactor branch from 4281bc0 to 2728234 Compare January 17, 2023 18:18
@samiskin samiskin force-pushed the webhook-pubsub-sink-refactor branch 2 times, most recently from e0fd089 to 065de41 Compare January 18, 2023 20:44
Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not reviewed this code; The PR description alone is massive.

I think the scope and the scale of changes, code wise, but also complexity wise is significant and concerning.

Let's meet/discuss in person.

The webhook sink and pubsub sink both lagged far behind other sinks in
terms of maximum throughput due to their parallelization architecture
(as low as only a few thousand messages per second). This PR creates a
new framework for processing events for a sink.  The pubsub sink was
also lacking metrics that the other sinks emitted.

SinkClients are abstractions over all sink-specific logic where batches
of messages can be Encoded into a payload that is able to be Emitted to
the external resource.

batchingSinkEmitter handles batching events together and flushing them
upon reaching a size or frequency limit.

parallelSinkEmitter instantiates multiple batching emitters across many
worker goroutines to parallelize the load, sharded on the event key.

flushingSinkEmitter allows blocking on all events draining out of the
parallel emitter.

parallelBatchingSink puts them all together into a complete Sink
implementation with the appropriate EmitRow/Flush/... methods.

Both webhook and pubsub sinks end up as instantiations of the general
parallelBatchingSink with their own SinkClient implemmentations.

With batch sizes of 100 on a three node 16 cpu cluster on
tpcc.order_line the sinks can reach almost 400k messages per second.

Release note (performance improvement): Changefeeds emitting to webhook
and gcpubsub endpoints are now able to emit at significantly higher
throughput.
@samiskin samiskin force-pushed the webhook-pubsub-sink-refactor branch from 065de41 to daacee0 Compare January 19, 2023 18:55
@samiskin samiskin requested a review from a team January 19, 2023 18:55
@dhartunian dhartunian removed the request for review from a team February 6, 2023 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

changefeedccl: Webhook sink is slow

3 participants