KAFKA-10810: Replace stream threads#9697
Conversation
There was a problem hiding this comment.
We need to throw the error or the task gets lost and we drop records
There was a problem hiding this comment.
removing from the thread list does 2 things
- keeps DEAD threads out of the list as kip-663 dictates
- ensures the next thread has the same name
There was a problem hiding this comment.
To ensure #2 holds, do we need to have a mutex on thread creation? Or does it not matter?
There was a problem hiding this comment.
It won't matter. Ensures is too strong of a word. IF there has been other threads removed before this it may have a different name. However this ensures that the replacement thread will never have a thread index larger than the number of threads
| " The streams client is going to shut down now. ", throwable); | ||
| close(Duration.ZERO); | ||
| } | ||
| final StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0]; |
There was a problem hiding this comment.
Is this equivalent to Thread.currentThead()?
| produceMessages(0L, inputTopic, "A"); | ||
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); | ||
|
|
||
| assertThat(processorValueCollector.size(), equalTo(3)); |
There was a problem hiding this comment.
It's not obvious to me how this verifies that the thread actually got replaced. Maybe an explanatory comment is in order?
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, @wcarlson5 ! I had a few thoughts above.
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR @wcarlson5 !
Here my feedback!
| final StreamThread deadThread = (StreamThread) Thread.currentThread(); | ||
| threads.remove(deadThread); | ||
| addStreamThread(); | ||
| deadThread.shutdown(); |
There was a problem hiding this comment.
Do we need to shutdown the dead stream thread? completeShutDown() will be called anyways.
There was a problem hiding this comment.
I don't think it matters, it just set the thread state earlier, but we can delete it
| case REPLACE_THREAD: | ||
| if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) { | ||
| log.warn("The global thread cannot be replaced. Reverting to shutting down the client."); | ||
| log.error("Encountered the following exception during processing " + | ||
| "and the registered exception handler opted to " + action + "." + | ||
| " The streams client is going to shut down now. ", throwable); | ||
| close(Duration.ZERO); | ||
| } | ||
| final StreamThread deadThread = (StreamThread) Thread.currentThread(); | ||
| threads.remove(deadThread); | ||
| addStreamThread(); | ||
| deadThread.shutdown(); | ||
| if (throwable instanceof RuntimeException) { | ||
| throw (RuntimeException) throwable; | ||
| } else if (throwable instanceof Error) { | ||
| throw (Error) throwable; | ||
| } else { | ||
| throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); | ||
| } |
There was a problem hiding this comment.
I think it would be cleaner to extract this code to a separate method.
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); | ||
|
|
||
| assertThat(processorValueCollector.size(), equalTo(3)); | ||
| //because we only have 2 threads at the start and each record kills a thread we must have replaced threads |
There was a problem hiding this comment.
Could you please be a bit clearer in the explanatory comment? BTW, we execute this test also once with just one stream thread so the 2 stream threads in the comment are not correct. Also, wouldn't it be better to explain the verification in the call to assertThat() instead of in a comment? You can pass a reason to the method.
There was a problem hiding this comment.
I edited the test to be based on the number of thread instead of hard coding. And gave a reason
| count.getAndIncrement(); | ||
| if (count.get() > 2) { | ||
| return SHUTDOWN_CLIENT; | ||
| } | ||
| return REPLACE_THREAD; | ||
| }); |
There was a problem hiding this comment.
I think it would be better to have a test that shows that a new thread that replaced a failed one, actually is able to process records. So, I would let the new thread process some records and then shutdown the client with a normal close.
Maybe similar applies to the shutdown tests. First let the client/application process some records and then throw an exception that shuts down the client/application. I guess, this last paragraph is something for a separate PR.
There was a problem hiding this comment.
We can change the test so that we verify the replaced threads can process records.
I am not sure that is necessary for the shutdown as testing if streams can process some records once started should be tested elsewhere, but in any case I think that the PR is not the place for this discussion
| final StreamThread deadThread = (StreamThread) Thread.currentThread(); | ||
| threads.remove(deadThread); | ||
| addStreamThread(); | ||
| deadThread.shutdown(); |
There was a problem hiding this comment.
I don't think it matters, it just set the thread state earlier, but we can delete it
| case REPLACE_THREAD: | ||
| if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) { | ||
| log.warn("The global thread cannot be replaced. Reverting to shutting down the client."); | ||
| log.error("Encountered the following exception during processing " + | ||
| "and the registered exception handler opted to " + action + "." + | ||
| " The streams client is going to shut down now. ", throwable); | ||
| close(Duration.ZERO); | ||
| } | ||
| final StreamThread deadThread = (StreamThread) Thread.currentThread(); | ||
| threads.remove(deadThread); | ||
| addStreamThread(); | ||
| deadThread.shutdown(); | ||
| if (throwable instanceof RuntimeException) { | ||
| throw (RuntimeException) throwable; | ||
| } else if (throwable instanceof Error) { | ||
| throw (Error) throwable; | ||
| } else { | ||
| throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable); | ||
| } |
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); | ||
|
|
||
| assertThat(processorValueCollector.size(), equalTo(3)); | ||
| //because we only have 2 threads at the start and each record kills a thread we must have replaced threads |
There was a problem hiding this comment.
I edited the test to be based on the number of thread instead of hard coding. And gave a reason
| count.getAndIncrement(); | ||
| if (count.get() > 2) { | ||
| return SHUTDOWN_CLIENT; | ||
| } | ||
| return REPLACE_THREAD; | ||
| }); |
There was a problem hiding this comment.
We can change the test so that we verify the replaced threads can process records.
I am not sure that is necessary for the shutdown as testing if streams can process some records once started should be tested elsewhere, but in any case I think that the PR is not the place for this discussion
| * Enumeration that describes the response from the exception handler. | ||
| */ | ||
| enum StreamThreadExceptionResponse { | ||
| REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"), |
There was a problem hiding this comment.
I would rename it to REPLACE_STREAM_THREAD.
There was a problem hiding this comment.
Oh, good catch.
Just a quick question: did we misname this option in the KIP? A StreamThread is a specific kind of thread. What I mean is that a GlobalStreamThread is not a StreamThread. Perhaps REPLACE_THREAD and SHUTDOWN_THREAD would have been better, more general names, for these. If you agree, I think we can just amend the KIP and fix it in this PR.
There was a problem hiding this comment.
It should be the string is REPLACE_THREAD. Ill just fix that, its just left over from a period of time where we thought of calling it REPLACE_STREAM_THREAD
There was a problem hiding this comment.
I actually would be in favor of calling the enum value REPLACE_STREAM_THREAD. A stream thread is a stream thread and a global stream thread is a global stream thread. I am aware that the KIP calls the enum value differently, but we also have a config that is called 'NUM_STREAM_THREADS_CONFIG' and we have also 'addStreamThread()' and removeStreamThread(). So I guess, the name to the outside of this is stream thread and not thread. We have also other threads in Kafka Streams like the state directory cleaner thread and the RocksDB metrics recording thread.
There was a problem hiding this comment.
What if we add an option to replace the global thread?
There was a problem hiding this comment.
Ah, now I got it! Sorry! Makes sense! In that case we can reuse REPLACE_THREAD also for the global stream thread. Forgot about that!
|
what can i do with replace new thread?Can i skip the current offset in exception handler,cause the exception occured by the data with current offset? |
|
@zydzjy I don't think you can do that yet, at least not with anything I have added here. The idea is to recover from transient expectations without having to restart the client. |
StreamThreads can now be replaced in the streams uncaught exception handler
Committer Checklist (excluded from commit message)