Skip to content

changefeedccl: pubsub sink refactor to batching sink#100188

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
samiskin:pubsub-refactor
Apr 7, 2023
Merged

changefeedccl: pubsub sink refactor to batching sink#100188
craig[bot] merged 1 commit intocockroachdb:masterfrom
samiskin:pubsub-refactor

Conversation

@samiskin
Copy link
Copy Markdown
Contributor

@samiskin samiskin commented Mar 30, 2023

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to #99086 which moves the Pubsub sink to the batching sink framework.

The changes involve:

  1. Moves the Pubsub code to match the SinkClient interface, moving to using the lower level v1 pubsub API that lets us publish batches manually
  2. Removing the extra call to json.Marshal
  3. Moving to using the pstest package for validating results in unit tests
  4. Adding topic handling to the batching sink, where batches are created per-topic
  5. Added a pubsub_sink_config since it can now handle Retry and Flush config settings
  6. Added metrics even to the old pubsub for the purpose of comparing the two versions

At default settings, this resulted in a peak of 90k messages per second on a single node with throughput at 27.6% cpu usage, putting it at a similar level to kafka.

Running pubsub v2 across all of TPCC (nodes ran out of ranges at different speeds):
Screenshot 2023-03-30 at 3 38 25 PM

Running pubsub v1 (barely visible, 2k messages per second) followed by v2 on tpcc.order_line (in v2 only 2 nodes ended up having ranges assigned to them):
Screenshot 2023-04-04 at 12 53 45 PM

In the following graphs from the cloud console, where v1 was ran followed by v2, you can see how the main reason v1 was slow was that it wasn't able to batch different keys together.
Screenshot 2023-04-04 at 12 59 51 PM

Publish requests remained the same despite way more messages in v2
Screenshot 2023-04-04 at 1 46 51 PM

Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@samiskin samiskin force-pushed the pubsub-refactor branch 8 times, most recently from bc928f7 to f2039fe Compare March 31, 2023 03:43
@samiskin samiskin marked this pull request as ready for review March 31, 2023 03:43
@samiskin samiskin requested review from a team as code owners March 31, 2023 03:43
@samiskin samiskin requested review from herkolategan, miretskiy and smg260 and removed request for a team March 31, 2023 03:43
@miretskiy
Copy link
Copy Markdown
Contributor

Do you have any graphs on perf comparisson?

@samiskin samiskin force-pushed the pubsub-refactor branch 3 times, most recently from cb86c9d to a951ec0 Compare April 4, 2023 17:48
@samiskin
Copy link
Copy Markdown
Contributor Author

samiskin commented Apr 4, 2023

@miretskiy Added the comparison graphs.

Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 8 of 11 files at r1, 1 of 2 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)


pkg/ccl/changefeedccl/sink.go line 171 at r1 (raw file):

// NewPubsubSinkEnabled determines whether or not the refactored Webhook sink
// or the deprecated sink should be used.
var NewPubsubSinkEnabled = settings.RegisterBoolSetting(

Almost read it as a function NewXXX() ...
Maybe rename to PubsubV2Enabled or some such?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 203 at r1 (raw file):

		var buffer bytes.Buffer
		keyPrefix := "{\"Key\":"
		valInfix := ",\"Value\":"

consi


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):

		keyPrefix := "{\"Key\":"
		valInfix := ",\"Value\":"
		topicSuffix := fmt.Sprintf(",\"Topic\":\"%s\"}", psb.topic)

if you care about performance, you'd avoid Sprintf...
Probably costs you more than whatever buffer.Grow will save you.

buffer.WriteString(`"Topic": "`)
buffer.WriteString(psb.topic)
buffer.Write(`"`)

Also, it's a bit unfortunate that you have to decleare those keyPrefix/Infix... I just don't think it's necessary.
Something like the following would do well enough, imo:

var scratch [128]byte. // we have 30 bytes of metadata... so, kinda guess key/value/topic lengths. 
buffer := bytes.NewBuffer(scratch[:0])
bytes.WriteString(`{"Key": "`)
bytes.WriteString(key)
bytes.WriteString(`, "Value": `)
bytes.WriteString(value)
bytes.WriteString(`, "Topic": "`)
bytes.WriteString(psb.topic)
bytes.WriteString(`"}`)

Note, you can probably create create bytes.Buffer once -- if you still worried about allocations (you really shouldn't in this case).

Also.. what are the guarantees around topic containing double quotes?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):

		buffer.Grow(len(keyPrefix) + len(key) + len(valInfix) + len(value) + len(topicSuffix))
		buffer.WriteString(keyPrefix)
		buffer.Write(key)

are you missing closing quote after key?

@samiskin samiskin force-pushed the pubsub-refactor branch 4 times, most recently from 961cc3b to c228f04 Compare April 4, 2023 19:29
Copy link
Copy Markdown
Contributor Author

@samiskin samiskin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @miretskiy, and @smg260)


pkg/ccl/changefeedccl/sink.go line 171 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Almost read it as a function NewXXX() ...
Maybe rename to PubsubV2Enabled or some such?

Done.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 203 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

consi

Removed the variables entirely


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

if you care about performance, you'd avoid Sprintf...
Probably costs you more than whatever buffer.Grow will save you.

buffer.WriteString(`"Topic": "`)
buffer.WriteString(psb.topic)
buffer.Write(`"`)

Also, it's a bit unfortunate that you have to decleare those keyPrefix/Infix... I just don't think it's necessary.
Something like the following would do well enough, imo:

var scratch [128]byte. // we have 30 bytes of metadata... so, kinda guess key/value/topic lengths. 
buffer := bytes.NewBuffer(scratch[:0])
bytes.WriteString(`{"Key": "`)
bytes.WriteString(key)
bytes.WriteString(`, "Value": `)
bytes.WriteString(value)
bytes.WriteString(`, "Topic": "`)
bytes.WriteString(psb.topic)
bytes.WriteString(`"}`)

Note, you can probably create create bytes.Buffer once -- if you still worried about allocations (you really shouldn't in this case).

Also.. what are the guarantees around topic containing double quotes?

I added the buffer.Grow after noticing the flamegraph show the underlying growths having visible cpu cost (I don't remember exactly how much it was).

I switched it to buffer.Grow(26 /* Key/Value/Topic keys */ + len(key) + len(value) + len(psb.topicEncoded)) just now. I don't think reusing a scratch works since we can't have successive Appends writing over previous ones being stored in the pb.PubsubMessagearray being built up.

I also made the topic value go through a per-batch json.FromString(topic).Format


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

are you missing closing quote after key?

The key bytes are already encoded with quotes

Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

I added the buffer.Grow after noticing the flamegraph show the underlying growths having visible cpu cost (I don't remember exactly how much it was).

I switched it to buffer.Grow(26 /* Key/Value/Topic keys */ + len(key) + len(value) + len(psb.topicEncoded)) just now. I don't think reusing a scratch works since we can't have successive Appends writing over previous ones being stored in the pb.PubsubMessagearray being built up.

I also made the topic value go through a per-batch json.FromString(topic).Format

well, local scratch buffer definitely works because successive Appends will have their own buffer. THe goal is to avoid
allocation at all -- in the common case.
You won't be able to share the buffer though -- for the reason you mention above.

Can we add some specific tests around this function -- to verify that the encoding is good (i.e to test that your json.FromString stuff works)...
I don't doubt that it does -- but I want to make sure we don't have silly mistakes.
You might find it easier to pull the guts of this method (buffer.Write) into some helper and writing specific tests around that.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

The key bytes are already encoded with quotes

Comment?

Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)

@samiskin samiskin force-pushed the pubsub-refactor branch 3 times, most recently from 4d3d097 to 060053d Compare April 5, 2023 21:11
@samiskin samiskin added the backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only label Apr 6, 2023
@samiskin
Copy link
Copy Markdown
Contributor Author

samiskin commented Apr 6, 2023

Added a test using the name randomizer as table names for the pubsub sink, which includes stuff like quotes and slashes and emojis

@samiskin
Copy link
Copy Markdown
Contributor Author

samiskin commented Apr 6, 2023

bors r+

1 similar comment
@samiskin
Copy link
Copy Markdown
Contributor Author

samiskin commented Apr 6, 2023

bors r+

@samiskin samiskin force-pushed the pubsub-refactor branch 2 times, most recently from 5c299a5 to ed9a082 Compare April 6, 2023 18:37
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
@samiskin
Copy link
Copy Markdown
Contributor Author

samiskin commented Apr 7, 2023

bors r+

1 similar comment
@rickystewart
Copy link
Copy Markdown
Collaborator

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Apr 7, 2023

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants