streamingccl: allow stream ingestion processors to keep running on GenerationEvent#68195
Conversation
Add `GenerationEvent` as a possible event type to be emitted over a cluster stream. When a `GenerationEvent` is emitted, we should be able to get its topology as well as the start time of the new generation. Release note: None
GenerationEvent
8950deb to
d6adbe7
Compare
| case driver.ErrBadConn: | ||
| select { | ||
| case <-eventCh: | ||
| eventCh <- streamingccl.MakeGenerationEvent() |
There was a problem hiding this comment.
I wonder if it makes sense for the ingestion processor to be aware of the concept of generations at all. Would it be simpler if we just swallowed ErrBadConn in the driver in the client which would hang the processor until its context was cancelled.
Also does errors.Is(err, driver.ErrBadConn) work here? I think that's the more standard way of checking for errors (see https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20190318_error_handling.md for more info).
There was a problem hiding this comment.
The processor would go to a different state If we swallow ErrBadConn on the client side, since there are no more values to be read from eventCh, consequently we'd stop reading from cutoverCh. I'm not sure if there's a better way of differentiating the error returned by the client.
There was a problem hiding this comment.
@pbardea If I'm understanding correctly, the issue with swallowing the error is that we return from the sinkless client. The return then triggers the eventCh for the partition to be closed, which in turn causes the merged channel in the processor to be closed. Once the merged channel is closed, the proc drains and we're in a bit of a soup.
Thinking ahead to multi-partition where we probably want to do more than just wait for a cutover (ingest up until some time), a generation event would give us a place to trigger this custom logic? There might be a better way though
There was a problem hiding this comment.
Gotcha, my original thinking was that we wouldn't close the eventCh and the processors would be unaware that the client had disconnected, it would just hang waiting for its context to be canceled. (When the client sees the error it just reads from <-ctx.Done().)
The tl;dr of below is that given that the coordinator doesn't yet have a way to send processors messages, this approach should be okay for now, but I think it's worth keeping an eye on potentially moving this to the coordinator down the line.
A bit more consideration to why it may be good to move to a scheme where the coordinator is the only one aware of "cutovers times" and "generation events":
Such a scheme would allow each ingestion processor to not need to worry about global state like generations and cutover time, but only to ingest data from the stream, and to watch for a "ingest until at least " signal from the coordinator. (We don't have a way for the coordinator to provide a signal like that today).
The underlying concern is that on large clusters, we'll develop these many to 1 relationships that could put unnecessary pressure on some nodes. (We have this today with cutover time polling since all 100 nodes would poll the job record on avg 3 times a second and that would only increase as the polling interval is reduced).
d6adbe7 to
729a504
Compare
| return sip.flush() | ||
| case streamingccl.GenerationEvent: | ||
| log.Info(sip.Ctx, "client disconnected") | ||
| waitingForCutover = true |
There was a problem hiding this comment.
I'd prefer if instead of this bool we just:
<-sip.cutoverCh
sip.internalDrained = true
return nil, nil
from below. This is also most likely going to change in the future when in multi-partition streaming we want to continue reading events up until some time in the future (as indicated by the GenerationEvent)?
There was a problem hiding this comment.
We'd probably also want to wait on ctx.Done() in addition to cutover channel here.
There was a problem hiding this comment.
I updated the logic here, hope it makes sense now. When a generation event is received, we wait for sip.cutoverCh and sip.Ctx.Done(), whichever comes first. If we received value from sip.cutoverCh, the function returns. If the context gets cancelled before a cutover signal is sent, the function returns as well.
|
|
||
| return sip.flush() | ||
| case streamingccl.GenerationEvent: | ||
| log.Info(sip.Ctx, "client disconnected") |
There was a problem hiding this comment.
I like the idea of logging a generation event. This will be more useful when GE actually contains a generation time or something for multi parititon. For the time being lets change the verbiage to "GenerationEvent received" since client disconnected is specific to the sinkless client.
| return sip.flush() | ||
| } | ||
|
|
||
| // If we lost connection with the client, wait for cutover signal instead of |
There was a problem hiding this comment.
I don't think we need any of this once we address the comment below?
| } | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| if err := rows.Err(); errors.Is(err, driver.ErrBadConn) { |
There was a problem hiding this comment.
mmm this seems a little wrong, it should probably be:
if err := rows.Err(); err != nil {
// Maybe add a comment about why we are special casing a client disconnect.
if errors.Is(err, driver.ErrBadConn) {
select {
case eventCh <- streamingccl.MakeGenerationEvent():
case <-ctx.Done():
errCh <- ctx.Err()
}
} else {
errCh <- err
}
}
Let's chat offline about how select blocks and channels work, just to make sure we're on the same page 🙂
| testutils.IsError(meta.Err, "this client always returns an error") | ||
| }) | ||
|
|
||
| t.Run("stream ingestion processor hangs on losing client connection", func(t *testing.T) { |
There was a problem hiding this comment.
I haven't reviewed the test yet, and I'm going to think if there is any other way we can structure this. I don't particularly like sleep in tests 😋 time-dependent tests often flake and get skipped
There was a problem hiding this comment.
Perhaps instead of testing if the client hangs, we could test if the processor returns an error or not when the client disconnects?
e1455d8 to
2b63a09
Compare
pbardea
left a comment
There was a problem hiding this comment.
Generally LGTM -- it would be nice if we could simplify the processor to not use with waitingForCutover like Aditya suggested
990ec84 to
f8b0365
Compare
eba6dfc to
d645add
Compare
| // Send a cutover signal to shut down the processor | ||
| sip.cutoverCh <- struct{}{} | ||
|
|
||
| // The processor should have been moved to draining state with a nil error |
There was a problem hiding this comment.
We need to move the wg.Wait() above this so we are guaranteed that sip.Run()` has returned.
can we then use out returned by getStreamIngestionProcessor to check:
if !out.ProducerClosed() {
t.Fatalf("output RowReceiver not closed")
}
for {
row := out.NextNoMeta(t)
if row == nil {
break
}
// We don't expect any rows so...
t.Fatal(...)
}
|
|
||
| <-interceptCh | ||
|
|
||
| // Send a cutover signal to shut down the processor |
There was a problem hiding this comment.
Update comment to:
// The sip processor has received a gen event and is thus waiting for a cutover signal, so let's send one!
| } | ||
| } | ||
|
|
||
| func markGenerationEventAsReceived( |
There was a problem hiding this comment.
can we pass a func() as a parameter here instead of a channel? This will give us flexibility in the future to add stuff to that func.
Maybe also rename the function to makeGenerationEventReceived
|
@annezhu98 I'm happy with the PR once the above comments are addressed and you've |
d645add to
271de0a
Compare
| } | ||
|
|
||
| // The processor should have been moved to draining state with a nil error | ||
| _, meta := sip.Next() |
There was a problem hiding this comment.
I don't think we need this any longer. out is a row buffer that is filled by sip.Next(). The out.NextNoMeta checks that none of the output was a meta.
pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go
Show resolved
Hide resolved
271de0a to
25cbeb7
Compare
25cbeb7 to
f57ba74
Compare
|
@annezhu98 i don't think we should check that the eventCh is closed. Instead can you try applying this patch and seeing if it works. We can change the mock client to not send a closed, fixed size channel but instead return a channel that needs to be read from for more events to be sent on it. This way, when the interceptor is hit we are guaranteed the sip has read from it: |
6805b11 to
7999186
Compare
| err := sip.checkForCutoverSignal(ctx, sip.closePoller) | ||
| if err != nil { | ||
| sip.pollingErr = errors.Wrap(err, "error while polling job for cutover signal") | ||
| sip.mu.pollingErr = errors.Wrap(err, "error while polling job for cutover signal") |
There was a problem hiding this comment.
we need to sip.mu.Lock() and sip.mu.Unlock() the mutex around this.
|
|
||
| if sip.pollingErr != nil { | ||
| sip.MoveToDraining(sip.pollingErr) | ||
| if sip.mu.pollingErr != nil { |
| // mu is used to provide thread-safe read-write operations to ingestionErr | ||
| // and pollingErr. | ||
| mu struct { | ||
| sync.Mutex |
There was a problem hiding this comment.
we usually use syncutil.Mutex
7999186 to
ff7b0eb
Compare
| } | ||
| close(eventCh) | ||
| go func() { | ||
| defer close(eventCh) |
There was a problem hiding this comment.
@annezhu98 one nit, though you can pick it up in your next PR to prevent another push since this is only testing code. We should close(errCh) too.
ff7b0eb to
11e9a38
Compare
…am client Previously, if the sinkless client loses connection, the processor would receive an error and move to draining. With the concept of generation, the sinkless client should send over a `GenerationEvent` once it has lost connection. On receiving a `GenerationEvent`, the processor should wait for a cutover signal to be sent (the mechanism for issuing cutover signals on new generation will be implemented in a following PR). Release note: None
11e9a38 to
f5244f4
Compare
|
bors r+ |
|
Build succeeded: |
Previously, a stream ingestion processor would shut down if it ever loses connection with its stream client. With generation support, the processor should not immediately move to draining state, instead, it should be in
StateRunningto poll for cutover signal sent by the coordinator. Generation support will be implemented by the following PR: #67189The first commit adds
GenerationEventas a valid event type that can be emitted over a cluster stream.The second commit implements the mechanism that keeps processors running when losing connection with the client.