changefeedccl: pubsub sink refactor to batching sink#100188
changefeedccl: pubsub sink refactor to batching sink#100188craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
bc928f7 to
f2039fe
Compare
|
Do you have any graphs on perf comparisson? |
cb86c9d to
a951ec0
Compare
|
@miretskiy Added the comparison graphs. |
a951ec0 to
e713202
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 8 of 11 files at r1, 1 of 2 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)
pkg/ccl/changefeedccl/sink.go line 171 at r1 (raw file):
// NewPubsubSinkEnabled determines whether or not the refactored Webhook sink // or the deprecated sink should be used. var NewPubsubSinkEnabled = settings.RegisterBoolSetting(
Almost read it as a function NewXXX() ...
Maybe rename to PubsubV2Enabled or some such?
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 203 at r1 (raw file):
var buffer bytes.Buffer keyPrefix := "{\"Key\":" valInfix := ",\"Value\":"
consi
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):
keyPrefix := "{\"Key\":" valInfix := ",\"Value\":" topicSuffix := fmt.Sprintf(",\"Topic\":\"%s\"}", psb.topic)
if you care about performance, you'd avoid Sprintf...
Probably costs you more than whatever buffer.Grow will save you.
buffer.WriteString(`"Topic": "`)
buffer.WriteString(psb.topic)
buffer.Write(`"`)
Also, it's a bit unfortunate that you have to decleare those keyPrefix/Infix... I just don't think it's necessary.
Something like the following would do well enough, imo:
var scratch [128]byte. // we have 30 bytes of metadata... so, kinda guess key/value/topic lengths.
buffer := bytes.NewBuffer(scratch[:0])
bytes.WriteString(`{"Key": "`)
bytes.WriteString(key)
bytes.WriteString(`, "Value": `)
bytes.WriteString(value)
bytes.WriteString(`, "Topic": "`)
bytes.WriteString(psb.topic)
bytes.WriteString(`"}`)
Note, you can probably create create bytes.Buffer once -- if you still worried about allocations (you really shouldn't in this case).
Also.. what are the guarantees around topic containing double quotes?
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):
buffer.Grow(len(keyPrefix) + len(key) + len(valInfix) + len(value) + len(topicSuffix)) buffer.WriteString(keyPrefix) buffer.Write(key)
are you missing closing quote after key?
961cc3b to
c228f04
Compare
samiskin
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @miretskiy, and @smg260)
pkg/ccl/changefeedccl/sink.go line 171 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Almost read it as a function NewXXX() ...
Maybe rename toPubsubV2Enabledor some such?
Done.
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 203 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
consi
Removed the variables entirely
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
if you care about performance, you'd avoid Sprintf...
Probably costs you more than whatever buffer.Grow will save you.buffer.WriteString(`"Topic": "`) buffer.WriteString(psb.topic) buffer.Write(`"`)Also, it's a bit unfortunate that you have to decleare those keyPrefix/Infix... I just don't think it's necessary.
Something like the following would do well enough, imo:var scratch [128]byte. // we have 30 bytes of metadata... so, kinda guess key/value/topic lengths. buffer := bytes.NewBuffer(scratch[:0]) bytes.WriteString(`{"Key": "`) bytes.WriteString(key) bytes.WriteString(`, "Value": `) bytes.WriteString(value) bytes.WriteString(`, "Topic": "`) bytes.WriteString(psb.topic) bytes.WriteString(`"}`)Note, you can probably create create bytes.Buffer once -- if you still worried about allocations (you really shouldn't in this case).
Also.. what are the guarantees around topic containing double quotes?
I added the buffer.Grow after noticing the flamegraph show the underlying growths having visible cpu cost (I don't remember exactly how much it was).
I switched it to buffer.Grow(26 /* Key/Value/Topic keys */ + len(key) + len(value) + len(psb.topicEncoded)) just now. I don't think reusing a scratch works since we can't have successive Appends writing over previous ones being stored in the pb.PubsubMessagearray being built up.
I also made the topic value go through a per-batch json.FromString(topic).Format
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
are you missing closing quote after key?
The key bytes are already encoded with quotes
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 204 at r1 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
I added the
buffer.Growafter noticing the flamegraph show the underlying growths having visible cpu cost (I don't remember exactly how much it was).I switched it to
buffer.Grow(26 /* Key/Value/Topic keys */ + len(key) + len(value) + len(psb.topicEncoded))just now. I don't think reusing a scratch works since we can't have successive Appends writing over previous ones being stored in thepb.PubsubMessagearray being built up.I also made the topic value go through a per-batch
json.FromString(topic).Format
well, local scratch buffer definitely works because successive Appends will have their own buffer. THe goal is to avoid
allocation at all -- in the common case.
You won't be able to share the buffer though -- for the reason you mention above.
Can we add some specific tests around this function -- to verify that the encoding is good (i.e to test that your json.FromString stuff works)...
I don't doubt that it does -- but I want to make sure we don't have silly mistakes.
You might find it easier to pull the guts of this method (buffer.Write) into some helper and writing specific tests around that.
pkg/ccl/changefeedccl/sink_pubsub_v2.go line 208 at r1 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
The key bytes are already encoded with quotes
Comment?
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @samiskin, and @smg260)
4d3d097 to
060053d
Compare
|
Added a test using the name randomizer as table names for the pubsub sink, which includes stuff like quotes and slashes and emojis |
|
bors r+ |
1 similar comment
|
bors r+ |
5c299a5 to
ed9a082
Compare
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237 This change is a followup to cockroachdb#99086 which moves the Pubsub sink to the batching sink framework. The changes involve: 1. Moves the Pubsub code to match the `SinkClient` interface, moving to using the lower level v1 pubsub API that lets us publish batches manually 3. Removing the extra call to json.Marshal 4. Moving to using the `pstest` package for validating results in unit tests 5. Adding topic handling to the batching sink, where batches are created per-topic 6. Added a pubsub_sink_config since it can now handle Retry and Flush config settings Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.
ed9a082 to
699b7f4
Compare
|
bors r+ |
1 similar comment
|
bors r+ |
|
Build succeeded: |
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237
This change is a followup to #99086 which moves the Pubsub sink to the batching sink framework.
The changes involve:
SinkClientinterface, moving to using the lower level v1 pubsub API that lets us publish batches manuallypstestpackage for validating results in unit testsAt default settings, this resulted in a peak of 90k messages per second on a single node with throughput at 27.6% cpu usage, putting it at a similar level to kafka.
Running pubsub v2 across all of TPCC (nodes ran out of ranges at different speeds):

Running pubsub v1 (barely visible, 2k messages per second) followed by v2 on tpcc.order_line (in v2 only 2 nodes ended up having ranges assigned to them):

In the following graphs from the cloud console, where v1 was ran followed by v2, you can see how the main reason v1 was slow was that it wasn't able to batch different keys together.

Publish requests remained the same despite way more messages in v2

Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.