KAFKA-9113: Unit test for ProcessorStateManager and ChangelogReader#4
Conversation
|
|
||
| checkpointFile.delete(); | ||
| } catch (final IOException e) { | ||
| } catch (final IOException | RuntimeException e) { |
There was a problem hiding this comment.
I realized that some parsing exceptions are just runtime exceptions so catching both here.
| final Map<TopicPartition, Long> changelogOffsets = new HashMap<>(); | ||
| for (final StateStoreMetadata storeMetadata : stores.values()) { | ||
| if (storeMetadata.changelogPartition != null && storeMetadata.offset != null) { | ||
| if (storeMetadata.changelogPartition != null) { |
There was a problem hiding this comment.
Here we still want to return the changelog -> offset entry even if it is null (indicating the current position unknown).
|
|
||
| } | ||
|
|
||
| public void resetRestoredBatch() { |
There was a problem hiding this comment.
Some minor cleanup on mock functions that are not used, ditto elsewhere.
|
@cadonna @vvcephei @ableegoldman for reviews. Note that a large portion of the change is for the renaming part, the major one to review is on TaskStateManager/Test which is around 450 loc.. |
436efb6 to
ec531d7
Compare
|
@cadonna Git messed with the renaming to delete / create files, so to help reviews I've reverted the renaming and will do in a later PR. Now the diff is less scary :) |
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left a few comments, will read again once StoreChangelogReaderTest is fixed
| * Register a state store for restoration. | ||
| * | ||
| * @param partition the changelog topic partition to restore | ||
| * @param partition the state store's shcangelog partition for restoring |
| final StateStoreMetadata store = mustFindStore(changelogPartition); | ||
| StateStoreMetadata storeMetadata(final TopicPartition partition) { | ||
| for (final StateStoreMetadata storeMetadata : stores.values()) { | ||
| if (storeMetadata.changelogPartition != null && storeMetadata.changelogPartition.equals(partition)) { |
There was a problem hiding this comment.
q: Could we just do partition.equals(storeMetadata.changelogPartition)?
|
|
||
| private StateStoreMetadata findStore(final TopicPartition changelogPartition) { | ||
| final List<StateStoreMetadata> found = stores.values().stream() | ||
| .filter(metadata -> metadata.changelogPartition != null && |
There was a problem hiding this comment.
Same here, do we also have a concern that passed in changelogPartition could be null?
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assert.fail; | ||
|
|
||
| // TODO K9113: fix tests |
| } | ||
|
|
||
| private ProcessorStateManager getStandByStateManager(final TaskId taskId) { | ||
| private ProcessorStateManager getStandByStateManager() { |
There was a problem hiding this comment.
nit: this could be merged with getActiveStateManager as getStateManager(AbstractTask.TaskType type)
|
|
||
| assertThat(store.keys.size(), is(1)); | ||
| assertTrue(store.keys.contains(key)); | ||
| assertEquals(17, store.values.get(0).length); |
There was a problem hiding this comment.
nit: similarly, add a comment to explain 17
| ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); | ||
| ackedOffsets.put(persistentStorePartition, 123L); | ||
| ackedOffsets.put(nonPersistentStorePartition, 456L); | ||
| ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L); |
|
|
||
| try { | ||
| stateMgr.initStoresFromCheckpointedOffsets(); | ||
| fail("should have thrown procesor state exception when IO exception happens"); |
|
|
||
| try { | ||
| stateMgr.restore(storeMetadata, singletonList(consumerRecord)); | ||
| fail("should have thrown procesor state exception when IO exception happens"); |
| } | ||
|
|
||
| @Test | ||
| public void shouldFlushAllStoresEvenIfStoreThrowsException() { |
There was a problem hiding this comment.
nit: s/shouldFlushAllStoresEvenIfStoreThrowsException/shouldFlushGoodStoresEvenSomeThrowsException
guozhangwang
left a comment
There was a problem hiding this comment.
Added unit test coverage for StoreChangelogReader
| subscriptions.position(entry.getKey(), newPosition); | ||
| } | ||
| } | ||
| entry.getValue().clear(); |
There was a problem hiding this comment.
Found this bug while working on the unit tests: when a partition is paused, we should not clear its records since they would not be returned; we should only clear the corresponding records which are included in the results.
There was a problem hiding this comment.
@guozhangwang There is already a PR for this issue (apache#7505) Can you maybe review and merge this PR? Thanks to @ableegoldman for pointing out the overlap.
| * Thus, the task raising this exception can be cleaned up and closed as "zombie". | ||
| * Indicates that one or more tasks got migrated to another thread. | ||
| * | ||
| * 1) if the task field is specified, then that single task should be cleaned up and closed as "zombie" while the |
There was a problem hiding this comment.
This is the proposed exception semantics.
There was a problem hiding this comment.
Can you elaborate on when which version of TaskMigratedException should be used? ie, when is it possible for just one task to be a zombie while all the others can/should continue as normal
There was a problem hiding this comment.
It is summarized in https://confluentinc.atlassian.net/browse/KSTREAMS-3302. Note it is just one proposal and we can debate whether we agree or disagree about that :)
There was a problem hiding this comment.
I see, that makes sense. I have some thoughts on the handling in that case but I'll save it for the sync
There was a problem hiding this comment.
We can do this in a follow-up PR, but just to be clear we should always treat a TaskMigratedException as applying to all tasks, and close them all as zombies. There's no possible way for one task to be a zombie while the others are able to continue processing
| } | ||
| } | ||
|
|
||
| // this is an optimization: if there's no buffered records so far, then we can reuse |
There was a problem hiding this comment.
We have to give up this optimization since the records returned from consumer is unmodifiable view, so we cannot call clear on it directly.
| public void init(final ProcessorContext context, | ||
| final StateStore root) { | ||
| context.register(root, stateRestoreCallback); | ||
| if (simulateForwardOnFlush) { |
There was a problem hiding this comment.
Piggybacked minor cleanup as the simulateForwardOnFlush is not called anymore.
|
ping @abbccdda @ableegoldman @cadonna again for final reviews. |
abbccdda
left a comment
There was a problem hiding this comment.
Took a more thorough look at StoreChangelogReaderTest. Overall I think we are still missing unit test coverage for exception states, and would be good to either do a coverage for standby/active changelog in general tests, or just specify that in the meta comment saying this is a common test.
|
|
||
| if (endOffset != null && committedOffset != null) { | ||
| if (changelogMetadata.restoreEndOffset != null) | ||
| throw new IllegalStateException("End offset for " + partition + |
There was a problem hiding this comment.
req: this is not covered in unit test.
There was a problem hiding this comment.
Most of the places where IllegalStateException is thrown it is a place where we assert this should never happen (i.e. it did not depend on other modules) unless there's a bug, in which case we should always shout out and fail fast. So I think we do not need to have a unit test for such cases.
Other places of IllegalState is when the caller (e.g. ProcessorStateManager) had a bug like trying to register the same store twice, and only those cases that depend on other module bugs would have a test case.
| final ChangelogMetadata changelogMetadata = new ChangelogMetadata(partition, stateManager); | ||
| final StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); | ||
| if (storeMetadata == null) { | ||
| throw new IllegalStateException("Cannot find the corresponding state store metadata for changelog " + |
There was a problem hiding this comment.
req: this is not covered in the unit test
| final Long currentOffset = metadata.storeMetadata.offset(); | ||
| if (endOffset == null) { | ||
| // end offset is not initialized meaning that it is from a standby task, | ||
| // this should never happen since we only call this function for active task in restoring phase |
There was a problem hiding this comment.
req: endOffset == null is not covered
There was a problem hiding this comment.
Same as above, this should never happen (indicating a bug).
| assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state()); | ||
| assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); | ||
| assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); | ||
| assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); |
There was a problem hiding this comment.
prop: I have seen multiple tests for completedChangelogs but no one adds multiple topic partitions. Could we do a test with some partitions completed while some are not.
There was a problem hiding this comment.
Yes there's shouldRestoreMultipleChangelogs
| // TODO K9113: we need to consider how to handle InvalidOffsetException for consumer#poll / position | ||
| public class StoreChangelogReader implements ChangelogReader { | ||
|
|
||
| enum ChangelogState { |
There was a problem hiding this comment.
prop: just notice that although this enum only has 3 states, do we still want to build some state transition check for it?
There was a problem hiding this comment.
The state transition is simply as registered -> restoring -?-> completed, so I did not build it, but just to be safe I will add a transition for it.
| @@ -364,22 +423,14 @@ public void restore() { | |||
| final ConsumerRecord<byte[], byte[]> record = iterator.next(); | |||
There was a problem hiding this comment.
nit: this iterator interface could be replaced with a for each loop
| for (final ChangelogMetadata changelogMetadata: newPartitionsToRestore) { | ||
| final TopicPartition partition = changelogMetadata.changelogPartition; | ||
| final StateStoreMetadata storeMetadata = changelogMetadata.stateManager.storeMetadata(partition); | ||
| final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; |
There was a problem hiding this comment.
req: comment here for L651 for no unit test coverage
| try { | ||
| restoreConsumer.unsubscribe(); | ||
| } catch (KafkaException e) { | ||
| throw new StreamsException("Restore consumer get unexpected error unsubscribing", e); |
| assertEquals(mkSet(tp1, tp2), consumer.paused()); | ||
|
|
||
| changelogReader.register(topicPartition, stateManager); | ||
| // transition to restore active is idempotent |
There was a problem hiding this comment.
q: what does this comment suggest? We are not calling transitToRestoreActive multiple times here
There was a problem hiding this comment.
When the changelog reader is created it is always in restoreActive state already. I can add a few lines to make it more explicit.
| expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); | ||
| replay(active, task); | ||
| changelogReader.restore(); | ||
| public void shouldThrowIfRestoreCallbackThrows() { |
There was a problem hiding this comment.
nit: for any task type neutral tests, I would feel more comfortable if we could allow testing for both active and standby state managers in two separate tests, or share the same skeleton somehow
There was a problem hiding this comment.
I've made the test parameterized.
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Further refactoring on APIs between state manager and changelog reader per @cadonna's comments. Also renamed ProcessorStateManager to TaskStateManager per comments.
Complete unit test for ProcessorStateManager.
Committer Checklist (excluded from commit message)