Skip to content

KAFKA-8897 Follow-up: Consolidate the global state stores#10646

Merged
guozhangwang merged 5 commits into
apache:trunkfrom
guozhangwang:K8897-follow-up
Jun 4, 2021
Merged

KAFKA-8897 Follow-up: Consolidate the global state stores#10646
guozhangwang merged 5 commits into
apache:trunkfrom
guozhangwang:K8897-follow-up

Conversation

@guozhangwang

Copy link
Copy Markdown
Contributor
  1. When register state stores, add the store to globalStateStores before calling any blocking calls that may throw errors, so that upon closing we would close the stores as well.
  2. Remove the other list as a local field, and call topology.globalStateStores when needed to get the list.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang

Copy link
Copy Markdown
Contributor Author

ping @cadonna @ableegoldman for reviews.

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thanks for the PR!

I had some comments.
Could you please also check that the unit tests we have, verify that the state store is closed in case of errors and add unit tests for errors we haven't tested?

final Set<String> changelogTopics = new HashSet<>();
for (final StateStore stateStore : globalStateStores) {
globalStoreNames.add(stateStore.name());
for (final StateStore stateStore : topology.globalStateStores()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this line. Could you verify that the state store is closed in the unit test that tests line 148? The name of the test is shouldThrowStreamsExceptionForOldTopicPartitions().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just observed that in this case, the mock context does not actually use the stateManager at all, but created its own StateManagerStub, and hence would not call stateManager.registerStore, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, initialize) anyways.

Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the thread.shutdown itself to shutdown the global state manager, and hence close all state stores.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I put

        assertThat(store1.isOpen(), is(false));
        assertThat(store2.isOpen(), is(false));
        assertThat(store3.isOpen(), is(false));
        assertThat(store4.isOpen(), is(false));

on line 202 in shouldThrowStreamsExceptionForOldTopicPartitions() the test fails. Hence, we leak a state store.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the reason is that, in the unit test we do not really follow the trace of stateMgr.initialize() -> store.init() -> context.registerStore() -> stateMgr.registerStore(). That's because the context is a mock, which does not use the stateMgr at all, and hence the stores set is always empty.

If we do want to test this call trace, then we need to make the mock context to get the actual stateMgr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, I see what you mean. However, I am not sure it is a good idea to rely on the code in GlobalStreamThread that catches the fatal exception to clean up state stores (and all the rest). If we know, we throw a fatal exception, then we should clean up immediately before we throw. That makes the GlobalStateManagerImpl less error-prone, because it does not need to rely on a different class for its clean up , IMO.

for (final StateStore stateStore : topology.globalStateStores()) {
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a a IllegalStateException and a couple of IllegalArgumentExceptions on the path from opening the state store within stateStore.init() to line 182 in this.registerStore(). We do not close the state stores before we throw. I do not think this is relevant for production code, but we could leak state stores in unit tests if we do not explicitly close the state stores in the unit tests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, it might also be relevant for production code since we now can restart the stream thread after a fatal error. This is not yet possible for a global stream thread, but it might be possible in future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?

Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since store.init() call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from initialize state stores all the way up to the thread.run and to user's exceptional handler. Though we call thread.shutdown eventually we would close all tasks anyways, but in the case you raised, the state store would not be in stateManager.stores set yet and hence would be leaked.

What we can do is, e.g. in both local and global state manager, moving the globalStores.put / stores.put call at the beginning, before making any checks, so that when we throw and eventually thread.shutdown, the stores would already be in the set and would be closed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?

If the user decides to restart a stream thread in its exception handler it is possible.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, our attitude towards IllegalStateException has been pretty cavalier thus far, and it's one of the main things I'm concerned about with the REPLACE thread functionality. We should definitely be on the lookout for possible IllegalStateException occurrences in the codebase and try to triage them so things aren't just completely screwed up if Streams is allowed to continue after hitting one

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna replied your comments; could you take another look?

private final Consumer<byte[], byte[]> globalConsumer;
private final Logger log;
private final File baseDir;
private final Set<String> globalStoreNames = new HashSet<>();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just re-ordering member fields here, no adding/removing.

final Set<String> changelogTopics = new HashSet<>();
for (final StateStore stateStore : globalStateStores) {
globalStoreNames.add(stateStore.name());
for (final StateStore stateStore : topology.globalStateStores()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just observed that in this case, the mock context does not actually use the stateManager at all, but created its own StateManagerStub, and hence would not call stateManager.registerStore, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, initialize) anyways.

Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the thread.shutdown itself to shutdown the global state manager, and hence close all state stores.

for (final StateStore stateStore : topology.globalStateStores()) {
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?

Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since store.init() call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from initialize state stores all the way up to the thread.run and to user's exceptional handler. Though we call thread.shutdown eventually we would close all tasks anyways, but in the case you raised, the state store would not be in stateManager.stores set yet and hence would be leaked.

What we can do is, e.g. in both local and global state manager, moving the globalStores.put / stores.put call at the beginning, before making any checks, so that when we throw and eventually thread.shutdown, the stores would already be in the set and would be closed.

throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
}

// register the store first, so that if later an exception is thrown then eventually while we call `close`

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the store after the store names check, since if there's already a state store created, then we would not be able to book-keep both of them anyways.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we would not be able to book-keep both, but the state store in store that we just opened is still open in line 172. So we need to close the state store in store before throwing the exception otherwise we will leak it. The same applies to line 176.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I got what we were discussing now. Originally I'm thinking that since these conditions should never happen --- because in the topology when we add state stores we already check if the store names have existed or not, and hence we should never add two stores with the same name --- if it ever happens we would always treat it as fatal and crash stop immediately.

On the higher level, I think we should NOT allow users to handle illegal-s/a themselves and hence ever possibly to treat them not as fatal, but obviously today we do not enforce that.

So I think we can have two options here: 1) in the lower level hierarchy like state manager here, try to stop the stores when hitting an illegal-s/a; 2) on the higher level hierarchy as in stream thread, we enforce "stop app" on illegal-s/a. I'm a bit leaning towards 2) here but would love to hear other opinions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on disallowing the app to continue after an illegal exception. We need to reserve some kind of exception for actual critical, fatal system errors that a user can't just ignore to spin up a new thread. And that has essentially been the meaning of these illegal exceptions in Streams thus far. As I mentioned in another thread, I've been very concerned about this in the new handler since we haven't been strict in properly cleaning up after an illegal exception

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on stopping the app after a deterministic illegal * exception. I am not sure if all illegal * exception we throw are deterministic, though. I guess most of them are. For now, we could just shutdown the app for all illegal * exception and then consider to use a different exception if we discover that a illegal * exception is transient.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think we need to triage and maybe clean up the existing Illegal-type exceptions today. Some may not be deterministic, but we still just drop everything and shut down without any further attempts at cleaning up. In those cases it's probably down to the specific situation whether it's appropriate to continue doing so and disallow recovery from this, or just fix the handling so it does clean all resources

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thank you for the updates!

Here my feedback!

throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
}

// register the store first, so that if later an exception is thrown then eventually while we call `close`

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we would not be able to book-keep both, but the state store in store that we just opened is still open in line 172. So we need to close the state store in store before throwing the exception otherwise we will leak it. The same applies to line 176.

for (final StateStore stateStore : topology.globalStateStores()) {
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?

If the user decides to restart a stream thread in its exception handler it is possible.


// register the store first, so that if later an exception is thrown then eventually while we call `close`
// on the state manager this state store would be closed as well
stores.put(storeName, storeMetadata);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above.

final Set<String> changelogTopics = new HashSet<>();
for (final StateStore stateStore : globalStateStores) {
globalStoreNames.add(stateStore.name());
for (final StateStore stateStore : topology.globalStateStores()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I put

        assertThat(store1.isOpen(), is(false));
        assertThat(store2.isOpen(), is(false));
        assertThat(store3.isOpen(), is(false));
        assertThat(store4.isOpen(), is(false));

on line 202 in shouldThrowStreamsExceptionForOldTopicPartitions() the test fails. Hence, we leak a state store.

@guozhangwang

Copy link
Copy Markdown
Contributor Author

@cadonna

  1. Filed a ticket for KStreams to enforce shutdown on illegal-X exceptions as a future work.
  2. Let the global state mgr to do closing before throwing illegal-X exception as well.
  3. For unit tests though, they would not cover the case 2) as I mentioned in the PR.

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang!

I am looking forward to KAFKA-12887 and KAFKA-12812.

@guozhangwang guozhangwang merged commit b2d463a into apache:trunk Jun 4, 2021
@guozhangwang guozhangwang deleted the K8897-follow-up branch June 4, 2021 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants