Skip to content

streamclient: add random stream client#59139

Merged
craig[bot] merged 5 commits intocockroachdb:masterfrom
pbardea:random-stream-client
Jan 25, 2021
Merged

streamclient: add random stream client#59139
craig[bot] merged 5 commits intocockroachdb:masterfrom
pbardea:random-stream-client

Conversation

@pbardea
Copy link
Copy Markdown
Contributor

@pbardea pbardea commented Jan 19, 2021

See individual commits, but this PR does some cleanup while introducing a
stream client implementation that randomly generates rows.

I broke down each change into its own commit for reviews, but let me know
if splitting this into separate PRs would be helpful.

@pbardea pbardea requested a review from adityamaru January 19, 2021 14:30
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Contributor

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 18 of 18 files at r1, 7 of 7 files at r2, 5 of 5 files at r3, 10 of 10 files at r4, 3 of 3 files at r5, 1 of 1 files at r6, 1 of 1 files at r7.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @pbardea)


pkg/ccl/streamingccl/streamclient/client.go, line 30 at r3 (raw file):

the partition and close the event channel

nit: last part of the sentence is repeated.


pkg/ccl/streamingccl/streamclient/client_test.go, line 89 at r6 (raw file):

				panic(fmt.Sprintf("unexpected event type %v", event.Type()))
			}
			numReceivedEvents++

can we delete?


pkg/ccl/streamingccl/streamclient/random_stream_client.go, line 186 at r4 (raw file):

			}

			// TODO: Consider keeping an in-memory copy so that tests can verify

+1 this was on my mind too.


pkg/ccl/streamingccl/streamclient/random_stream_client.go, line 199 at r4 (raw file):

}

func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue {

love this new version!


pkg/kv/bulk/sst_batcher.go, line 146 at r4 (raw file):

// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
	if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) && b.disallowShadowing {

nice catch

@pbardea pbardea force-pushed the random-stream-client branch from 0c8c6b8 to 30c839b Compare January 19, 2021 19:21
Copy link
Copy Markdown
Contributor Author

@pbardea pbardea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamclient/client.go, line 30 at r3 (raw file):

Previously, adityamaru (Aditya Maru) wrote…
the partition and close the event channel

nit: last part of the sentence is repeated.

Done.


pkg/ccl/streamingccl/streamclient/client_test.go, line 89 at r6 (raw file):

Previously, adityamaru (Aditya Maru) wrote…

can we delete?

Nice catch. Done.


pkg/ccl/streamingccl/streamclient/stream_client.go, line 36 at r7 (raw file):

Previously, pbardea (Paul Bardea) wrote…

This doesn't need to be a select.

Done.


pkg/kv/bulk/sst_batcher.go, line 146 at r4 (raw file):

Previously, adityamaru (Aditya Maru) wrote…

nice catch

I don't think this was a bug before since we always ingested a snapshot of the data rather than the entire revision history.

@pbardea pbardea force-pushed the random-stream-client branch 6 times, most recently from d7973a7 to 2871c62 Compare January 20, 2021 01:52
@pbardea
Copy link
Copy Markdown
Contributor Author

pbardea commented Jan 20, 2021

The tests against CI were flaking so I through together a change that allows the tests to inject "observers" into the client. Curious what you think about this approach. It allows the tests to run deterministically and we could leverage this by adding another "observer" in the test that keeps the in-memory state like we were talking about.

@adityamaru
Copy link
Copy Markdown
Contributor

The tests against CI were flaking so I through together a change that allows the tests to inject "observers" into the client. Curious what you think about this approach. It allows the tests to run deterministically and we could leverage this by adding another "observer" in the test that keeps the in-memory state like we were talking about.

Just looked at it, and it's a good idea. I guess we still rely on being able to get a hold of the client to be able to invoke client.RegisterInterceptor, which will be hard when we write job level tests. We'd probably still need to add a testing knob where we deposit our "interceptors" and then the ingestion processor can register those for us in the NewStreamClient call for test:// URIs?

@pbardea pbardea force-pushed the random-stream-client branch 4 times, most recently from a86a18b to 8ea8ee1 Compare January 21, 2021 15:58
@pbardea pbardea requested review from a team as code owners January 21, 2021 17:14
@pbardea pbardea force-pushed the random-stream-client branch from c959c06 to a94502b Compare January 21, 2021 17:15
@pbardea pbardea removed request for a team January 21, 2021 19:12
This commit refactors the stream ingestion processor to do all of its
work during the Next() call rather than starting a parallel producer
goroutine. This was not needed since there is no pipeline of stages to
process in this processor.

Release note: None
Stream clients now take in a context when opening an event stream for a
given partition. To close the event stream returned by the client, the
given context should be cancelled.

Release note: None
This commit introduces a new stream client implementation that generates
events of a specific schema for a table ID that is specified by the
stream URI. Properties of the stream, such as the frequency of the
events and the range of the randomly generated KVs can be controlled
with the appropriate parameters specified in the stream address.

To use the new stream client the `NewStreamClient` constructor has been
modified to accept a stream address. The stream address allows the
client to determine which client implementation should be used.

Further, the addition of this client exposed a bug in the SST batcher
which rejects batches that modify the same key more than once, even if
disallowShadowing is set to false.

Release note: None
@pbardea pbardea force-pushed the random-stream-client branch from a94502b to be6163f Compare January 22, 2021 20:34
@adityamaru
Copy link
Copy Markdown
Contributor

LGTM!

@pbardea
Copy link
Copy Markdown
Contributor Author

pbardea commented Jan 25, 2021

TFTR!
bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Jan 25, 2021

Build succeeded:

@craig craig bot merged commit b51d9f1 into cockroachdb:master Jan 25, 2021
@shermanCRL
Copy link
Copy Markdown
Contributor

cc @HonoreDB this might be informative, as its the client of the producer stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants