Skip to content

KAFKA-10810: Replace stream threads#9697

Merged
vvcephei merged 10 commits into
apache:trunkfrom
wcarlson5:wcarlson-replace-thread
Dec 11, 2020
Merged

KAFKA-10810: Replace stream threads#9697
vvcephei merged 10 commits into
apache:trunkfrom
wcarlson5:wcarlson-replace-thread

Conversation

@wcarlson5

Copy link
Copy Markdown
Contributor

StreamThreads can now be replaced in the streams uncaught exception handler

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@wcarlson5 wcarlson5 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna part 6 is ready for review

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to throw the error or the task gets lost and we drop records

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing from the thread list does 2 things

  1. keeps DEAD threads out of the list as kip-663 dictates
  2. ensures the next thread has the same name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure #2 holds, do we need to have a mutex on thread creation? Or does it not matter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't matter. Ensures is too strong of a word. IF there has been other threads removed before this it may have a different name. However this ensures that the replacement thread will never have a thread index larger than the number of threads

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
" The streams client is going to shut down now. ", throwable);
close(Duration.ZERO);
}
final StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to Thread.currentThead()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is now :)

produceMessages(0L, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(3));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me how this verifies that the thread actually got replaced. Maybe an explanatory comment is in order?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@vvcephei vvcephei left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @wcarlson5 ! I had a few thoughts above.

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @wcarlson5 !

Here my feedback!

final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
addStreamThread();
deadThread.shutdown();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to shutdown the dead stream thread? completeShutDown() will be called anyways.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters, it just set the thread state earlier, but we can delete it

Comment on lines +447 to +465
case REPLACE_THREAD:
if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
log.error("Encountered the following exception during processing " +
"and the registered exception handler opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
close(Duration.ZERO);
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
addStreamThread();
deadThread.shutdown();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to extract this code to a separate method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do that

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(3));
//because we only have 2 threads at the start and each record kills a thread we must have replaced threads

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please be a bit clearer in the explanatory comment? BTW, we execute this test also once with just one stream thread so the 2 stream threads in the comment are not correct. Also, wouldn't it be better to explain the verification in the call to assertThat() instead of in a comment? You can pass a reason to the method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I edited the test to be based on the number of thread instead of hard coding. And gave a reason

Comment on lines +224 to +229
count.getAndIncrement();
if (count.get() > 2) {
return SHUTDOWN_CLIENT;
}
return REPLACE_THREAD;
});

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a test that shows that a new thread that replaced a failed one, actually is able to process records. So, I would let the new thread process some records and then shutdown the client with a normal close.

Maybe similar applies to the shutdown tests. First let the client/application process some records and then throw an exception that shuts down the client/application. I guess, this last paragraph is something for a separate PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the test so that we verify the replaced threads can process records.

I am not sure that is necessary for the shutdown as testing if streams can process some records once started should be tested elsewhere, but in any case I think that the PR is not the place for this discussion

@wcarlson5 wcarlson5 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna here are some changes

final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
addStreamThread();
deadThread.shutdown();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters, it just set the thread state earlier, but we can delete it

Comment on lines +447 to +465
case REPLACE_THREAD:
if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
log.error("Encountered the following exception during processing " +
"and the registered exception handler opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
close(Duration.ZERO);
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
addStreamThread();
deadThread.shutdown();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do that

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(3));
//because we only have 2 threads at the start and each record kills a thread we must have replaced threads

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I edited the test to be based on the number of thread instead of hard coding. And gave a reason

Comment on lines +224 to +229
count.getAndIncrement();
if (count.get() > 2) {
return SHUTDOWN_CLIENT;
}
return REPLACE_THREAD;
});

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the test so that we verify the replaced threads can process records.

I am not sure that is necessary for the shutdown as testing if streams can process some records once started should be tested elsewhere, but in any case I think that the PR is not the place for this discussion

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
* Enumeration that describes the response from the exception handler.
*/
enum StreamThreadExceptionResponse {
REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename it to REPLACE_STREAM_THREAD.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch.

Just a quick question: did we misname this option in the KIP? A StreamThread is a specific kind of thread. What I mean is that a GlobalStreamThread is not a StreamThread. Perhaps REPLACE_THREAD and SHUTDOWN_THREAD would have been better, more general names, for these. If you agree, I think we can just amend the KIP and fix it in this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be the string is REPLACE_THREAD. Ill just fix that, its just left over from a period of time where we thought of calling it REPLACE_STREAM_THREAD

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually would be in favor of calling the enum value REPLACE_STREAM_THREAD. A stream thread is a stream thread and a global stream thread is a global stream thread. I am aware that the KIP calls the enum value differently, but we also have a config that is called 'NUM_STREAM_THREADS_CONFIG' and we have also 'addStreamThread()' and removeStreamThread(). So I guess, the name to the outside of this is stream thread and not thread. We have also other threads in Kafka Streams like the state directory cleaner thread and the RocksDB metrics recording thread.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we add an option to replace the global thread?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I got it! Sorry! Makes sense! In that case we can reuse REPLACE_THREAD also for the global stream thread. Forgot about that!

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wcarlson5!

LGTM!

@vvcephei vvcephei left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wcarlson5 !

@vvcephei vvcephei merged commit d5dc7df into apache:trunk Dec 11, 2020
@wcarlson5 wcarlson5 deleted the wcarlson-replace-thread branch December 11, 2020 21:09
@mjsax mjsax added the kip Requires or implements a KIP label Jan 8, 2021
@zydzjy

zydzjy commented Jan 8, 2021

Copy link
Copy Markdown

what can i do with replace new thread?Can i skip the current offset in exception handler,cause the exception occured by the data with current offset?

@wcarlson5

Copy link
Copy Markdown
Contributor Author

@zydzjy I don't think you can do that yet, at least not with anything I have added here. The idea is to recover from transient expectations without having to restart the client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants