KAFKA-8897 Follow-up: Consolidate the global state stores#10646
Conversation
|
ping @cadonna @ableegoldman for reviews. |
cadonna
left a comment
There was a problem hiding this comment.
@guozhangwang Thanks for the PR!
I had some comments.
Could you please also check that the unit tests we have, verify that the state store is closed in case of errors and add unit tests for errors we haven't tested?
| final Set<String> changelogTopics = new HashSet<>(); | ||
| for (final StateStore stateStore : globalStateStores) { | ||
| globalStoreNames.add(stateStore.name()); | ||
| for (final StateStore stateStore : topology.globalStateStores()) { |
There was a problem hiding this comment.
Not really related to this line. Could you verify that the state store is closed in the unit test that tests line 148? The name of the test is shouldThrowStreamsExceptionForOldTopicPartitions().
There was a problem hiding this comment.
Actually I just observed that in this case, the mock context does not actually use the stateManager at all, but created its own StateManagerStub, and hence would not call stateManager.registerStore, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, initialize) anyways.
Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the thread.shutdown itself to shutdown the global state manager, and hence close all state stores.
There was a problem hiding this comment.
If I put
assertThat(store1.isOpen(), is(false));
assertThat(store2.isOpen(), is(false));
assertThat(store3.isOpen(), is(false));
assertThat(store4.isOpen(), is(false));
on line 202 in shouldThrowStreamsExceptionForOldTopicPartitions() the test fails. Hence, we leak a state store.
There was a problem hiding this comment.
Yes, but the reason is that, in the unit test we do not really follow the trace of stateMgr.initialize() -> store.init() -> context.registerStore() -> stateMgr.registerStore(). That's because the context is a mock, which does not use the stateMgr at all, and hence the stores set is always empty.
If we do want to test this call trace, then we need to make the mock context to get the actual stateMgr.
There was a problem hiding this comment.
Now, I see what you mean. However, I am not sure it is a good idea to rely on the code in GlobalStreamThread that catches the fatal exception to clean up state stores (and all the rest). If we know, we throw a fatal exception, then we should clean up immediately before we throw. That makes the GlobalStateManagerImpl less error-prone, because it does not need to rely on a different class for its clean up , IMO.
| for (final StateStore stateStore : topology.globalStateStores()) { | ||
| final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); | ||
| changelogTopics.add(sourceTopic); | ||
| stateStore.init((StateStoreContext) globalProcessorContext, stateStore); |
There was a problem hiding this comment.
There are a a IllegalStateException and a couple of IllegalArgumentExceptions on the path from opening the state store within stateStore.init() to line 182 in this.registerStore(). We do not close the state stores before we throw. I do not think this is relevant for production code, but we could leak state stores in unit tests if we do not explicitly close the state stores in the unit tests.
There was a problem hiding this comment.
On a second thought, it might also be relevant for production code since we now can restart the stream thread after a fatal error. This is not yet possible for a global stream thread, but it might be possible in future.
There was a problem hiding this comment.
Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since store.init() call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from initialize state stores all the way up to the thread.run and to user's exceptional handler. Though we call thread.shutdown eventually we would close all tasks anyways, but in the case you raised, the state store would not be in stateManager.stores set yet and hence would be leaked.
What we can do is, e.g. in both local and global state manager, moving the globalStores.put / stores.put call at the beginning, before making any checks, so that when we throw and eventually thread.shutdown, the stores would already be in the set and would be closed.
There was a problem hiding this comment.
Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
If the user decides to restart a stream thread in its exception handler it is possible.
There was a problem hiding this comment.
Yeah, our attitude towards IllegalStateException has been pretty cavalier thus far, and it's one of the main things I'm concerned about with the REPLACE thread functionality. We should definitely be on the lookout for possible IllegalStateException occurrences in the codebase and try to triage them so things aren't just completely screwed up if Streams is allowed to continue after hitting one
guozhangwang
left a comment
There was a problem hiding this comment.
@cadonna replied your comments; could you take another look?
| private final Consumer<byte[], byte[]> globalConsumer; | ||
| private final Logger log; | ||
| private final File baseDir; | ||
| private final Set<String> globalStoreNames = new HashSet<>(); |
There was a problem hiding this comment.
Just re-ordering member fields here, no adding/removing.
| final Set<String> changelogTopics = new HashSet<>(); | ||
| for (final StateStore stateStore : globalStateStores) { | ||
| globalStoreNames.add(stateStore.name()); | ||
| for (final StateStore stateStore : topology.globalStateStores()) { |
There was a problem hiding this comment.
Actually I just observed that in this case, the mock context does not actually use the stateManager at all, but created its own StateManagerStub, and hence would not call stateManager.registerStore, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, initialize) anyways.
Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the thread.shutdown itself to shutdown the global state manager, and hence close all state stores.
| for (final StateStore stateStore : topology.globalStateStores()) { | ||
| final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); | ||
| changelogTopics.add(sourceTopic); | ||
| stateStore.init((StateStoreContext) globalProcessorContext, stateStore); |
There was a problem hiding this comment.
Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since store.init() call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from initialize state stores all the way up to the thread.run and to user's exceptional handler. Though we call thread.shutdown eventually we would close all tasks anyways, but in the case you raised, the state store would not be in stateManager.stores set yet and hence would be leaked.
What we can do is, e.g. in both local and global state manager, moving the globalStores.put / stores.put call at the beginning, before making any checks, so that when we throw and eventually thread.shutdown, the stores would already be in the set and would be closed.
| throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); | ||
| } | ||
|
|
||
| // register the store first, so that if later an exception is thrown then eventually while we call `close` |
There was a problem hiding this comment.
I add the store after the store names check, since if there's already a state store created, then we would not be able to book-keep both of them anyways.
There was a problem hiding this comment.
I agree that we would not be able to book-keep both, but the state store in store that we just opened is still open in line 172. So we need to close the state store in store before throwing the exception otherwise we will leak it. The same applies to line 176.
There was a problem hiding this comment.
Okay, I think I got what we were discussing now. Originally I'm thinking that since these conditions should never happen --- because in the topology when we add state stores we already check if the store names have existed or not, and hence we should never add two stores with the same name --- if it ever happens we would always treat it as fatal and crash stop immediately.
On the higher level, I think we should NOT allow users to handle illegal-s/a themselves and hence ever possibly to treat them not as fatal, but obviously today we do not enforce that.
So I think we can have two options here: 1) in the lower level hierarchy like state manager here, try to stop the stores when hitting an illegal-s/a; 2) on the higher level hierarchy as in stream thread, we enforce "stop app" on illegal-s/a. I'm a bit leaning towards 2) here but would love to hear other opinions.
There was a problem hiding this comment.
cc @ableegoldman @wcarlson5 @rodesai too.
There was a problem hiding this comment.
+1 on disallowing the app to continue after an illegal exception. We need to reserve some kind of exception for actual critical, fatal system errors that a user can't just ignore to spin up a new thread. And that has essentially been the meaning of these illegal exceptions in Streams thus far. As I mentioned in another thread, I've been very concerned about this in the new handler since we haven't been strict in properly cleaning up after an illegal exception
There was a problem hiding this comment.
+1 on stopping the app after a deterministic illegal * exception. I am not sure if all illegal * exception we throw are deterministic, though. I guess most of them are. For now, we could just shutdown the app for all illegal * exception and then consider to use a different exception if we discover that a illegal * exception is transient.
There was a problem hiding this comment.
I definitely think we need to triage and maybe clean up the existing Illegal-type exceptions today. Some may not be deterministic, but we still just drop everything and shut down without any further attempts at cleaning up. In those cases it's probably down to the specific situation whether it's appropriate to continue doing so and disallow recovery from this, or just fix the handling so it does clean all resources
cadonna
left a comment
There was a problem hiding this comment.
@guozhangwang Thank you for the updates!
Here my feedback!
| throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); | ||
| } | ||
|
|
||
| // register the store first, so that if later an exception is thrown then eventually while we call `close` |
There was a problem hiding this comment.
I agree that we would not be able to book-keep both, but the state store in store that we just opened is still open in line 172. So we need to close the state store in store before throwing the exception otherwise we will leak it. The same applies to line 176.
| for (final StateStore stateStore : topology.globalStateStores()) { | ||
| final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); | ||
| changelogTopics.add(sourceTopic); | ||
| stateStore.init((StateStoreContext) globalProcessorContext, stateStore); |
There was a problem hiding this comment.
Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
If the user decides to restart a stream thread in its exception handler it is possible.
|
|
||
| // register the store first, so that if later an exception is thrown then eventually while we call `close` | ||
| // on the state manager this state store would be closed as well | ||
| stores.put(storeName, storeMetadata); |
| final Set<String> changelogTopics = new HashSet<>(); | ||
| for (final StateStore stateStore : globalStateStores) { | ||
| globalStoreNames.add(stateStore.name()); | ||
| for (final StateStore stateStore : topology.globalStateStores()) { |
There was a problem hiding this comment.
If I put
assertThat(store1.isOpen(), is(false));
assertThat(store2.isOpen(), is(false));
assertThat(store3.isOpen(), is(false));
assertThat(store4.isOpen(), is(false));
on line 202 in shouldThrowStreamsExceptionForOldTopicPartitions() the test fails. Hence, we leak a state store.
|
cadonna
left a comment
There was a problem hiding this comment.
Thanks @guozhangwang!
I am looking forward to KAFKA-12887 and KAFKA-12812.
Committer Checklist (excluded from commit message)