KAFKA-9231: Streams Threads may die from recoverable errors with EOS enabled#7748
Conversation
|
Hey @mjsax , what do you think about this fix? The code flow might be a bit weird. I initially just wrapped the |
|
Note: I'm targeting 2.4 branch so that I can more easily test this fix. I'll re-target to |
|
We should merge this to 2.4 as well? |
|
@bbejeck pointed out a flaw in my prior approach, which I've corrected in the latest commit. |
| try { | ||
| stateMgr.flush(); | ||
| } catch (final ProcessorStateException e) { | ||
| if (e.getCause() instanceof RetriableException) { |
There was a problem hiding this comment.
Is this intentional? ProducerFencedException doesn't inherit from RetriableException, so unless I'm missing something, this will still kill the thread. Same for below as well in StandbyTask.
There was a problem hiding this comment.
Oh, boy. You're right... Really off my game today.
|
Thanks for the review, @bbejeck . I'm testing it now. Tomorrow, I'll look into adding a regression test for this, as you suggested. I'll also run the system tests overnight. |
| stateMgr.flush(); | ||
| try { | ||
| stateMgr.flush(); | ||
| } catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Why do we catch it at this level? Seems we should move it into ProcessorStateManger#flush() ?
Also note, that ProcessorStateManger#flush() only throws the first exception what might not be what we want to detect this case correctly.
There was a problem hiding this comment.
Yeah, I wasn't sure... This way is a bit roundabout. On the other hand, right now, TaskMigratedException is only thrown by the tasks themselves or the threads that run them. It didn't seem within the responsibility of the ProcessorStateManager to reason about what specific exceptions imply about the state of the task.
There was a problem hiding this comment.
Fair enough. It's not ideal that we get a ProcessorStateException and have to call getCause(), but well, it is what it is.
There was a problem hiding this comment.
Reading the code here: we throw TaskMigratedException in both StreamTask as well as StreamThread. For the first case, the exception is thrown all the way to the caller of TaskManager and rethrown (hence in the level of StreamThread); the latter case is only a few exceptional situations where the tasks are closed due to underlying exceptions.
I feel throwing the exception in the level of StreamTask is fine, but we did too many capture / rethrow in different layers on top of it, which could be cleaned up in a separate tech-debt PR.
There was a problem hiding this comment.
I very much agree. It's quite difficult to trace which exceptions would exist in different parts of the stack, and it would be nice to clean it up in the future.
There was a problem hiding this comment.
@vvcephei while working on the cleanup PR I realized that when we call store.flush internally we should not blindly wrap all thrown exceptions from stores, but only those not StreamsException -- i.e. if the error is not from the store itself (like an IOException) but from the streams library, then just throw it out directly.
Will do this small refactoring in my PR.
| try { | ||
| stateMgr.flush(); | ||
| } catch (final ProcessorStateException e) { | ||
| if (e.getCause() instanceof ProducerFencedException) { |
There was a problem hiding this comment.
The same argument as for GlobalStateStore should apply here -- StandbyTask don't write but only read, hence, no ProducerFencedException could ever happen here.
There was a problem hiding this comment.
Oy. Good call. I've fixed it.
| stateMgr.flush(); | ||
| } catch (final ProcessorStateException e) { | ||
| if (e.getCause() instanceof ProducerFencedException) { | ||
| log.warn("Caught a recoverable Kafka exception while flushing state. Initiating a rebalance to attempt recovery.", e); |
There was a problem hiding this comment.
Thanks! I didn't unwrap it in the log, since this preserves the full call stack (which helps track down why the exception happened)
There was a problem hiding this comment.
I don't think we should log anything here -- we log and info level statement when we catch a TaskMigratedException and it's contains the full context as we pass in this and e -- otherwise, we would double log (also WARN does not seem to be appropriate anyway)
There was a problem hiding this comment.
Roger that. I can remove it.
abbccdda
left a comment
There was a problem hiding this comment.
Thanks John for the fix, LGTM. Do we still need to add another unit test to cover the change?
@vvcephei yes, we can merge to 2.4 branch. If it regression, we can plan for another RC. Otherwise it will land in 2.4.1. |
|
Thanks, all, I do still need to add test coverage before we can consider merging this. |
| log.info("Failed to suspend stream task {} since it got migrated to another thread already. " + | ||
| "Closing it as zombie and moving on.", id); | ||
| firstException.compareAndSet(null, closeZombieTask(task)); | ||
| tryCloseZombieTask(task); |
There was a problem hiding this comment.
Sorry, I should have documented why I made another round of changes. When I tested the code prior to this change, we still had threads dying with a different exception...
[2019-11-27 11:28:59,965] ERROR [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] stream-thread [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] failed to suspend stream tasks
at org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
at org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
... 1 more
Caused by: org.apache.kafka.streams.errors.TaskMigratedException: Client request for task 1_0 has been fenced due to a rebalance
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:182)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:787)
at org.apache.kafka.streams.processor.internals.AssignedTasks.closeZombieTask(AssignedTasks.java:102)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:151)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128)
at org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
... 7 more
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store logData10MinuteFinalCount-store
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:178)
... 13 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: task [1_0] Abort sending since producer got fenced with a previous record (timestamp 1574853975939) to topic windowed-node-counts due to org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
[2019-11-27 11:28:59,966] INFO [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] stream-thread [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-27 11:28:59,969] INFO [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] stream-thread [stream-soak-test-92665673-8057-4ee8-8619-bffd3670817d-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread)
It turns out that firstException actually winds up being a fatal exception. The usages of this method express an intent to ignore any exceptions, so that's what this change does. I also made sure that any relevant state tracking still happens correctly. I.e., even if we can't cleanly close this zombie task, it's still removed from the "previous running" collection, etc.
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM -- I would avoid double logging. (And one clarification question.)
|
Hey, all, the two fixes above have revealed a third bug in the code, in which an Here's the exception I've seen: Therefore, I've introduced the latest commit to this PR and re-started my test. |
| } catch (final ProducerFencedException fatal) { | ||
| throw new TaskMigratedException(this, fatal); | ||
| } catch (final ProducerFencedException | UnknownProducerIdException e) { | ||
| throw new TaskMigratedException(this, e); |
There was a problem hiding this comment.
I don't think that an UnknownProducerIdException implies that a task was migrated.
There was a problem hiding this comment.
Agreed, but TaskMigratedException is the mechanism to initiate a rebalance, which is currently the only way that we can re-initialize the producers.
I think we can stand to re-evaluate how we're handling all these different producer and consumer exceptions (with and without eos). Right now, there are a lot of catch blocks for specific exceptions sprinkled throughout the codebase, and we universally either crash the thread or initiate a rebalance to attempt recovery. It would certainly benefit from developing a holistic approach, but right now, I'm just trying to get Streams threads to quit dying within a few hours of running in EOS mode, and I'm trying to restrain myself from broad refactoring so we can hope to merge this in for 2.4.0.
(About that last statement, I've been on holiday this week, so I haven't had time to document the comprehensive analysis in the ticket, but I do plan to make a case that it is a regression. When I ran 2.3 under similar conditions, we still had threads dying, but they were only dying from KIP-360-related-causes. In 2.4, Streams threads are dying from multiple new causes, related to the changes that we made in 2.4... Anyway, more details coming early next week, then we can discuss it)
There was a problem hiding this comment.
Give the fix for the root cause of UnknownProducerIdException (via KIP-360), I am wondering if we should actually treat it as fatal? I also don't see why it would be regression if KS dies if this exception is thrown though? (I understand that we did introduce some regression bugs, but those seem unrelated to UnknownProducerIdException?)
To be fair, 2.4.0 does not contain a full implementation of KIP-360 and thus, this fix might be a workaround... But frankly, I am not very happy about it, even if I understand the desire to just stabilize it somehow without a refactoring that won't make it into 2.4.0 anyway.
There was a problem hiding this comment.
Ok, then it seems worth hashing it out. Can you elaborate what is there to be unhappy about? I figured it would be reasonable to treat it as "recoverable by creating a new Producer" instead of "fatal and you should immediately terminate the thread".
What I was thinking was that is seems like the broker is telling us it doesn't know who we are (anymore). Because of the context, we know that it used to know who we were, so it must have forgotten somehow, and it doesn't really matter how because we can always shrug it off and re-create our producer to try again.
IIUC, KIP-360 would reduce the occurrence of the exception by keeping your producer id cached even after its ttl has expired, but it by no means guarantees that it'll remember you regardless of how long you're silent.
From that perspective it seems reasonable to catch this exception and conclude, "Oops, it looks like we were silent too long and our Producer has effectively expired. We should make a new one and try again." This isn't the same thing as getting fenced, but the resolution is the same (make a new Producer and try again).
Then again, this perspective may be based on a faulty understanding of the system. Is this approach unsafe, and we should actually terminate the application instead?
There was a problem hiding this comment.
For this type or error, before KIP-360, the producer should self-recover and never throw an exception. (Cf "motivation" section of the KIP). If this error occurs, something really bad happened for the transaction and we are in an unknown state -- not sure if rebalancing and retrying is the right approach for this case \cc @guozhangwang @hachikuji @bob-barrett -- also, maybe it's just sufficient to close the task locally and recreate it -- I don't see a need to trigger a rebalance (in case it is ok to recorve from within KS).
This isn't the same thing as getting fenced, but the resolution is the same (make a new Producer and try again).
I disagree. If we got fenced, a new producer with the same transactional.id was created and we know that we don't own the task any longer. For a UnknownProducerIdException we still own the task.
Is this approach unsafe, and we should actually terminate the application instead?
I don't thinks it's unsafe, but it's not the "right" fix IMHO, and this bug is also not a regression.
There was a problem hiding this comment.
Thanks for the clarification, @mjsax .
I agree that it would be better to just close the task and re-open it, but I don't feel comfortable introducing a whole new task lifecycle at this point for the 2.4.0 release. My rationale was that triggering a rebalance would put Kafka Streams through a well-known and well-tested code path that will result in the task getting closed and then opened again.
I do think it makes sense to go ahead and handle this case along with the two regressions. In my testing, this exception was just as fatal for Streams as the other two. Specifically, unless I include all three fixes, my soak test saw StreamThreads start dying within a few hours.
It seems like maybe a good approach right now would be to take the simple and sub-optimal path of just rebalancing when we encounter this exception for the 2.4.0 release, and file a Jira to optimize it in the way you suggest.
It would be good, by the way, to find out the answer to your question of whether it's safe to continue or not. Note that the current behavior, both in 2.4 and 2.3, without this patch is that a thread will get this exception and then shut itself down. This leads to a rebalance (since a member has left the group), after which the task is assigned to another thread, which continues processing it. Thus, we are already responding to this condition by rebalancing. It's just that we permanently lose a thread in the process. With this patch, we still rebalance, but we get to keep the thread running. If it's really unsafe to continue processing, though, we should stop the entire Streams application and force the operator to diagnose the problem with the brokers and start the app back up in a safe state.
There was a problem hiding this comment.
I shared some thoughts on the comment above: before KIP-447, when we re-trigger the rebalance if the task is indeed migrated out the corresponding producer would be closed, otherwise that producer will be retained. But I think after KIP-447 we need to revisit this again.
At the moment I feel okay to treat UnknownProducerId equally as ProducerFenced.
There was a problem hiding this comment.
Thanks. That will be a good opportunity to clean up what is honestly a pretty ham-handed approach here to gracefully deal with this condition without changing too much code.
|
Hi again, all, I've updated the PR description, and also upgraded the Jira ticket to "Blocker". The rationale is detailed here: Please take a look and let me know if you agree. Also, as a status update, with the most recent commit, my soak test cluster has been running and stable for 27 hours. This is significantly longer than any of the iterations since I started this testing effort (previous record is like 6 hours), so I'm cautiously optimistic that I've discovered all the immediately available bugs. All that then remains is to add specific regression tests for my changes, and to resolve the outstanding code review conversation about catching UnknownProducerIdException. |
|
Hello again, all, After quite a bit of wrestling with the tests, I've decided only to implement unit-style regression tests. I tried to figure out some way we could exercise these conditions in integration tests to ensure that the cluster actually does just rebalance and continue working, but there are so many separate, tiny branches that any such approach would have a low probability of really exercising the desired paths, and a high probability of only failing rarely, even if we do introduce a regression. I think the regression tests are a good addition. It turns out that we were actually totally lacking in coverage on most of the "producer fenced" branches. This might have something to do with how these bugs made their way into the code base to begin with. My soak test is still running without losing any more threads, so as far as I'm concerned, it seems like the PR results in a stable Kafka Streams with EOS. At this point, I'm going to run the system tests overnight, and we can try to conclude the code review tomorrow. |
|
I've reported the unrelated test failures: |
|
Retest this, please. |
guozhangwang
left a comment
There was a problem hiding this comment.
Reviewed the non-testing part of the PR which looks good to me, thanks @vvcephei.
Just some more thoughts about how UnknownProducerId can kill a thread since it was not observed (or at least not as common as to catch our eyes before): when producer.send gets the unknown producer id it gets remembered in the sendException field as a StreamsException, which is checked in each 1) send, 2) flush, 3) close calls.
In case 3), the caller (StreamTask#maybeAbortTransactionAndCloseRecordCollector) would capture it and sallow, in case 1) and 2), it would cause the thread die because of the StreamsException. So from the Streams's pov, it could be a pretty common scenario.
However, for most cases (e.g. due to deleteRecords) this should not thrown because in TransactionManager#canRetry we have the following logic:
if (error == Errors.UNKNOWN_PRODUCER_ID) {
if (response.logStartOffset == -1)
// We don't know the log start offset with this response. We should just retry the request until we get it.
// The UNKNOWN_PRODUCER_ID error code was added along with the new ProduceResponse which includes the
// logStartOffset. So the '-1' sentinel is not for backward compatibility. Instead, it is possible for
// a broker to not know the logStartOffset at when it is returning the response because the partition
// may have moved away from the broker from the time the error was initially raised to the time the
// response was being constructed. In these cases, we should just retry the request: we are guaranteed
// to eventually get a logStartOffset once things settle down.
return true;
if (batch.sequenceHasBeenReset()) {
// When the first inflight batch fails due to the truncation case, then the sequences of all the other
// in flight batches would have been restarted from the beginning. However, when those responses
// come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
// reset the sequence numbers to the beginning.
return true;
} else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
// The head of the log has been removed, probably due to the retention time elapsing. In this case,
// we expect to lose the producer state. Reset the sequences of all inflight batches to be from the beginning
// and retry them.
startSequencesAtBeginning(batch.topicPartition);
return true;
}
}
However since there's no log4j entries we cannot tell what conditions would cause the above to NOT trigger and hence the onCompletion callback would trigger with the UnknownProducerIdException, but from the code this logic has never been changed since day 1 so I think it is not a regression that sometimes the UnknownProducerId exception would indeed be thrown to the caller --- in this case Stream's record collector --- 's face. So in all I think it is reasonable to ask Streams to better handling it.
Post 2.4 though, with KIP-447 in place we should consider refactoring the code of handling those errors more specifically.
| stateMgr.flush(); | ||
| try { | ||
| stateMgr.flush(); | ||
| } catch (final ProcessorStateException e) { |
There was a problem hiding this comment.
Reading the code here: we throw TaskMigratedException in both StreamTask as well as StreamThread. For the first case, the exception is thrown all the way to the caller of TaskManager and rethrown (hence in the level of StreamThread); the latter case is only a few exceptional situations where the tasks are closed due to underlying exceptions.
I feel throwing the exception in the level of StreamTask is fine, but we did too many capture / rethrow in different layers on top of it, which could be cleaned up in a separate tech-debt PR.
| producer.flush(); | ||
| try { | ||
| producer.flush(); | ||
| } catch (final ProducerFencedException | UnknownProducerIdException e) { |
There was a problem hiding this comment.
For UnknownProducerIdException we actually can still reuse the client and just need to send the next produce request (unlike ProducerFencedException we cannot reuse the client and hence have to create a new client). However when we translate them into TaskMigrationException on higher level it would be treated equally as re-triggering a rebalance.
It's just that in current EOS implementation (before KIP-447) have one producer per task, hence when the task is closed the corresponding producer is closed and discarded as well. In KIP-447 however we may still need to treat them differently: for UnknownProducerIdException we can just log and proceed, for ProducerFencedException we need to close the producer and re-create a new one.
There was a problem hiding this comment.
Thanks. One thing I'm not sure about, but felt at the time of putting this together was that, if we get an UnknownProducerIdException on one call, we'd probably just keep getting it on all subsequent calls. It seemed to me that the exception was originating from a condition where the broker "forgot" about our id, and if it has "forgotten" about us, I'm not sure why it would "remember" us later on.
If that's not true, then this approach certainly may result in more close+reopen operations than is necessary.
| } catch (final ProducerFencedException fatal) { | ||
| throw new TaskMigratedException(this, fatal); | ||
| } catch (final ProducerFencedException | UnknownProducerIdException e) { | ||
| throw new TaskMigratedException(this, e); |
There was a problem hiding this comment.
I shared some thoughts on the comment above: before KIP-447, when we re-trigger the rebalance if the task is indeed migrated out the corresponding producer would be closed, otherwise that producer will be retained. But I think after KIP-447 we need to revisit this again.
At the moment I feel okay to treat UnknownProducerId equally as ProducerFenced.
| consumer.commitSync(consumedOffsetsAndMetadata); | ||
| } | ||
| } catch (final CommitFailedException | ProducerFencedException error) { | ||
| } catch (final CommitFailedException | ProducerFencedException | UnknownProducerIdException error) { |
There was a problem hiding this comment.
For here and producer.beginTransaction above: I think beginTxn / sendOffsetsToTxn / commitTxn would not throw UnknownProducerIdException but it does not harm to be more careful for 2.4 release -- so if you want to keep it as is I'm fine with it too.
There was a problem hiding this comment.
Ok, I wasn't sure, so I just put it everywhere that we already check for ProducerFencedException, on the rationale that, if we can get fenced, then we must have a transactional id, and if we have an id, then it could be "unknown". Clearly, this is more intuitive than analytical, so I find your feedback plausible.
| exception.toString() | ||
| ) | ||
| ), | ||
| exception |
There was a problem hiding this comment.
Minor for log4j succinctness: from looking at the production log files, I think keeping the whole stack trace of the original exception is not necessary since we know exactly it would be thrown from
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:202)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
So may be we can just log the original exception name; similarly in recordSendError we can just keep the original exception's name but not the stack trace since we know exactly it is from the same Sender.failBatch trace.
There was a problem hiding this comment.
In practice, the exceptions we're recording here are ApiExceptions, so they don't contain stack traces anyway.
I actually added the exception to the "cause" on purpose, because it was a pain trying to understand the stack traces we were previously logging while I was digging in to this issue. Even if the log messages are more verbose, they're easy to follow. I.e., it's a nice chain of "E1 caused by E2 caused by E3", instead of "E1-with-E3-in-the-message caused by E2".
I actually wanted to take it farther by removing the exception.toString from the message, but I didn't want to be too bold in this patch, and cause even more controversy.
|
Thanks for the thorough review, @guozhangwang ! I agree that this change is probably more agressive than it needs to be, and would therefore result in more rebalances than we'd strictly need to recover from the error condition. I think I'll leave it the way it is, for now, though. My fear is that we might take out some of the catches, but overlook some edge case and wind up losing threads again after a few more days of soak testing. Just to be sure, I'll make a follow-on ticket to revisit this exception handling, which would naturally happen as part of KIP-447, but could also be done independently up front. |
…enabled (#7748) Fix three independent causes of threads dying: 1. `ProducerFencedException` isn't properly handed while suspending a task, and leads to the thread dying. 2. `IllegalStateException`: an internal assertion is violated because a store can get orphaned when an exception is thrown during initialization, again leading to the thread dying. 3. `UnknownProducerIdException`: This exception isn't expected by the Streams code, so when we get it, the relevant thread dies. It's not clear whether we always need to catch this, and in the future, we won't expect it at all, but we are catching it now to be sure we're resilient if/when it happens. Important note: this might actually harm performance if the errors turn out to be ignorable, and we will now rebalance instead of ignoring them. Also, add missing test coverage for the exception handling code. Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>
We missed a branch in which we might catch a ProducerFencedException. It should always be converted to a TaskMigratedException.
Committer Checklist (excluded from commit message)