Skip to content

streamingccl: allow stream ingestion processors to keep running on GenerationEvent#68195

Merged
craig[bot] merged 2 commits intocockroachdb:masterfrom
annezhu98:hang-processors-on-losing-client
Aug 10, 2021
Merged

streamingccl: allow stream ingestion processors to keep running on GenerationEvent#68195
craig[bot] merged 2 commits intocockroachdb:masterfrom
annezhu98:hang-processors-on-losing-client

Conversation

@annezhu98
Copy link
Copy Markdown

@annezhu98 annezhu98 commented Jul 28, 2021

Previously, a stream ingestion processor would shut down if it ever loses connection with its stream client. With generation support, the processor should not immediately move to draining state, instead, it should be in StateRunning to poll for cutover signal sent by the coordinator. Generation support will be implemented by the following PR: #67189

The first commit adds GenerationEvent as a valid event type that can be emitted over a cluster stream.
The second commit implements the mechanism that keeps processors running when losing connection with the client.

Add `GenerationEvent` as a possible event type to be emitted over a cluster stream. When a `GenerationEvent` is emitted, we should be able to get its topology as well as the start time of the new generation.

Release note: None
@annezhu98 annezhu98 requested review from a team, adityamaru and pbardea July 28, 2021 19:07
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@annezhu98 annezhu98 changed the title Hang processors on losing client streamingccl: allow stream ingestion processors to keep running on GenerationEvent Jul 28, 2021
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch 2 times, most recently from 8950deb to d6adbe7 Compare July 28, 2021 20:54
case driver.ErrBadConn:
select {
case <-eventCh:
eventCh <- streamingccl.MakeGenerationEvent()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense for the ingestion processor to be aware of the concept of generations at all. Would it be simpler if we just swallowed ErrBadConn in the driver in the client which would hang the processor until its context was cancelled.

Also does errors.Is(err, driver.ErrBadConn) work here? I think that's the more standard way of checking for errors (see https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20190318_error_handling.md for more info).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processor would go to a different state If we swallow ErrBadConn on the client side, since there are no more values to be read from eventCh, consequently we'd stop reading from cutoverCh. I'm not sure if there's a better way of differentiating the error returned by the client.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pbardea If I'm understanding correctly, the issue with swallowing the error is that we return from the sinkless client. The return then triggers the eventCh for the partition to be closed, which in turn causes the merged channel in the processor to be closed. Once the merged channel is closed, the proc drains and we're in a bit of a soup.

Thinking ahead to multi-partition where we probably want to do more than just wait for a cutover (ingest up until some time), a generation event would give us a place to trigger this custom logic? There might be a better way though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, my original thinking was that we wouldn't close the eventCh and the processors would be unaware that the client had disconnected, it would just hang waiting for its context to be canceled. (When the client sees the error it just reads from <-ctx.Done().)

The tl;dr of below is that given that the coordinator doesn't yet have a way to send processors messages, this approach should be okay for now, but I think it's worth keeping an eye on potentially moving this to the coordinator down the line.


A bit more consideration to why it may be good to move to a scheme where the coordinator is the only one aware of "cutovers times" and "generation events":

Such a scheme would allow each ingestion processor to not need to worry about global state like generations and cutover time, but only to ingest data from the stream, and to watch for a "ingest until at least " signal from the coordinator. (We don't have a way for the coordinator to provide a signal like that today).

The underlying concern is that on large clusters, we'll develop these many to 1 relationships that could put unnecessary pressure on some nodes. (We have this today with cutover time polling since all 100 nodes would poll the job record on avg 3 times a second and that would only increase as the polling interval is reduced).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree, filed #68475.

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from d6adbe7 to 729a504 Compare July 30, 2021 00:45
return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "client disconnected")
waitingForCutover = true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if instead of this bool we just:

<-sip.cutoverCh
sip.internalDrained = true
return nil, nil

from below. This is also most likely going to change in the future when in multi-partition streaming we want to continue reading events up until some time in the future (as indicated by the GenerationEvent)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd probably also want to wait on ctx.Done() in addition to cutover channel here.

Copy link
Copy Markdown
Author

@annezhu98 annezhu98 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic here, hope it makes sense now. When a generation event is received, we wait for sip.cutoverCh and sip.Ctx.Done(), whichever comes first. If we received value from sip.cutoverCh, the function returns. If the context gets cancelled before a cutover signal is sent, the function returns as well.


return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "client disconnected")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of logging a generation event. This will be more useful when GE actually contains a generation time or something for multi parititon. For the time being lets change the verbiage to "GenerationEvent received" since client disconnected is specific to the sinkless client.

return sip.flush()
}

// If we lost connection with the client, wait for cutover signal instead of
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need any of this once we address the comment below?

}
}
if err := rows.Err(); err != nil {
if err := rows.Err(); errors.Is(err, driver.ErrBadConn) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm this seems a little wrong, it should probably be:

if err := rows.Err(); err != nil {
  // Maybe add a comment about why we are special casing a client disconnect.
  if errors.Is(err, driver.ErrBadConn) {
       select {
          case eventCh <- streamingccl.MakeGenerationEvent():
          case <-ctx.Done():
				       errCh <- ctx.Err()
			 }
  } else {
      errCh <- err
  }
}

Let's chat offline about how select blocks and channels work, just to make sure we're on the same page 🙂

testutils.IsError(meta.Err, "this client always returns an error")
})

t.Run("stream ingestion processor hangs on losing client connection", func(t *testing.T) {
Copy link
Copy Markdown
Contributor

@adityamaru adityamaru Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed the test yet, and I'm going to think if there is any other way we can structure this. I don't particularly like sleep in tests 😋 time-dependent tests often flake and get skipped

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps instead of testing if the client hangs, we could test if the processor returns an error or not when the client disconnects?

@pbardea pbardea self-requested a review August 3, 2021 12:37
@adityamaru adityamaru self-requested a review August 3, 2021 13:59
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch 2 times, most recently from e1455d8 to 2b63a09 Compare August 3, 2021 20:24
Copy link
Copy Markdown
Contributor

@pbardea pbardea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM -- it would be nice if we could simplify the processor to not use with waitingForCutover like Aditya suggested

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch 4 times, most recently from 990ec84 to f8b0365 Compare August 4, 2021 20:20
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch 2 times, most recently from eba6dfc to d645add Compare August 5, 2021 22:00
// Send a cutover signal to shut down the processor
sip.cutoverCh <- struct{}{}

// The processor should have been moved to draining state with a nil error
Copy link
Copy Markdown
Contributor

@adityamaru adityamaru Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to move the wg.Wait() above this so we are guaranteed that sip.Run()` has returned.

can we then use out returned by getStreamIngestionProcessor to check:

if !out.ProducerClosed() {
		t.Fatalf("output RowReceiver not closed")
}

for {
			row := out.NextNoMeta(t)
			if row == nil {
				break
			}
      // We don't expect any rows so...
      t.Fatal(...)
}


<-interceptCh

// Send a cutover signal to shut down the processor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment to:
// The sip processor has received a gen event and is thus waiting for a cutover signal, so let's send one!

}
}

func markGenerationEventAsReceived(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass a func() as a parameter here instead of a channel? This will give us flexibility in the future to add stuff to that func.

Maybe also rename the function to makeGenerationEventReceived

@adityamaru
Copy link
Copy Markdown
Contributor

@annezhu98 I'm happy with the PR once the above comments are addressed and you've make stressrace'd the tests 🙂 Nice work on this!

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from d645add to 271de0a Compare August 6, 2021 16:33
}

// The processor should have been moved to draining state with a nil error
_, meta := sip.Next()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this any longer. out is a row buffer that is filled by sip.Next(). The out.NextNoMeta checks that none of the output was a meta.

@adityamaru adityamaru self-requested a review August 6, 2021 17:13
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from 271de0a to 25cbeb7 Compare August 6, 2021 18:10
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from 25cbeb7 to f57ba74 Compare August 6, 2021 20:28
@adityamaru
Copy link
Copy Markdown
Contributor

adityamaru commented Aug 9, 2021

@annezhu98 i don't think we should check that the eventCh is closed. Instead can you try applying this patch and seeing if it works. We can change the mock client to not send a closed, fixed size channel but instead return a channel that needs to be read from for more events to be sent on it. This way, when the interceptor is hit we are guaranteed the sip has read from it:

diff --git a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go
index e294c9b383..23bfb5c36c 100644
--- a/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go
+++ b/pkg/ccl/streamingccl/streamclient/cockroach_sinkless_replication_client.go
@@ -134,6 +134,7 @@ func (m *sinklessReplicationClient) ConsumePartition(
 			} else {
 				errCh <- err
 			}
+			return
 		}
 	}()
 
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
index 757a98def6..1ce4bd067d 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
@@ -44,7 +44,7 @@ import (
 	"github.com/stretchr/testify/require"
 )
 
-// mockStreamClient will return the slice of events associated to the stream
+// mockStreamClient will return a channel of events associated to the stream
 // partition being consumed. Stream partitions are identified by unique
 // partition addresses.
 type mockStreamClient struct {
@@ -73,7 +73,7 @@ func (m *mockStreamClient) GetTopology(
 
 // ConsumePartition implements the Client interface.
 func (m *mockStreamClient) ConsumePartition(
-	_ context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp,
+	ctx context.Context, address streamingccl.PartitionAddress, _ hlc.Timestamp,
 ) (chan streamingccl.Event, chan error, error) {
 	var events []streamingccl.Event
 	var ok bool
@@ -81,25 +81,32 @@ func (m *mockStreamClient) ConsumePartition(
 		return nil, nil, errors.Newf("no events found for paritition %s", address)
 	}
 
-	eventCh := make(chan streamingccl.Event, len(events))
+	errCh := make(chan error)
+	eventCh := make(chan streamingccl.Event)
 
-	for _, event := range events {
-		eventCh <- event
+	go func() {
+		defer close(eventCh)
+		for _, event := range events {
+			select {
+			case eventCh <- event:
+			case <-ctx.Done():
+				errCh <- ctx.Err()
+			}
 
-		func() {
-			m.mu.Lock()
-			defer m.mu.Unlock()
+			func() {
+				m.mu.Lock()
+				defer m.mu.Unlock()
 
-			if len(m.mu.interceptors) > 0 {
-				for _, interceptor := range m.mu.interceptors {
-					if interceptor != nil {
-						interceptor(event, address)
+				if len(m.mu.interceptors) > 0 {
+					for _, interceptor := range m.mu.interceptors {
+						if interceptor != nil {
+							interceptor(event, address)
+						}
 					}
 				}
-			}
-		}()
-	}
-	close(eventCh)
+			}()
+		}
+	}()
 
 	return eventCh, nil, nil
 }

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch 2 times, most recently from 6805b11 to 7999186 Compare August 9, 2021 22:05
err := sip.checkForCutoverSignal(ctx, sip.closePoller)
if err != nil {
sip.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
sip.mu.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to sip.mu.Lock() and sip.mu.Unlock() the mutex around this.


if sip.pollingErr != nil {
sip.MoveToDraining(sip.pollingErr)
if sip.mu.pollingErr != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto comment as above

// mu is used to provide thread-safe read-write operations to ingestionErr
// and pollingErr.
mu struct {
sync.Mutex
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually use syncutil.Mutex

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from 7999186 to ff7b0eb Compare August 9, 2021 22:20
}
close(eventCh)
go func() {
defer close(eventCh)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@annezhu98 one nit, though you can pick it up in your next PR to prevent another push since this is only testing code. We should close(errCh) too.

@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from ff7b0eb to 11e9a38 Compare August 10, 2021 14:35
…am client

Previously, if the sinkless client loses connection, the processor would receive an error and move to draining. With the concept of generation, the sinkless client should send over a `GenerationEvent` once it has lost connection. On receiving a `GenerationEvent`, the processor should wait for a cutover signal to be sent (the mechanism for issuing cutover signals on new generation will be implemented in a following PR).

Release note: None
@annezhu98 annezhu98 force-pushed the hang-processors-on-losing-client branch from 11e9a38 to f5244f4 Compare August 10, 2021 14:41
@annezhu98
Copy link
Copy Markdown
Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Aug 10, 2021

Build succeeded:

@craig craig bot merged commit f28f98f into cockroachdb:master Aug 10, 2021
@annezhu98 annezhu98 deleted the hang-processors-on-losing-client branch August 10, 2021 20:05
@annezhu98 annezhu98 restored the hang-processors-on-losing-client branch August 17, 2021 18:23
@annezhu98 annezhu98 deleted the hang-processors-on-losing-client branch August 17, 2021 19:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants