Skip to content

KAFKA-9113: Unit test for ProcessorStateManager and ChangelogReader#4

Merged
guozhangwang merged 11 commits into
k9113-basefrom
K9113-unit-tests-store-changelog
Jan 16, 2020
Merged

KAFKA-9113: Unit test for ProcessorStateManager and ChangelogReader#4
guozhangwang merged 11 commits into
k9113-basefrom
K9113-unit-tests-store-changelog

Conversation

@guozhangwang

@guozhangwang guozhangwang commented Jan 10, 2020

Copy link
Copy Markdown
Owner
  1. Further refactoring on APIs between state manager and changelog reader per @cadonna's comments. Also renamed ProcessorStateManager to TaskStateManager per comments.

  2. Complete unit test for ProcessorStateManager.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)


checkpointFile.delete();
} catch (final IOException e) {
} catch (final IOException | RuntimeException e) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that some parsing exceptions are just runtime exceptions so catching both here.

final Map<TopicPartition, Long> changelogOffsets = new HashMap<>();
for (final StateStoreMetadata storeMetadata : stores.values()) {
if (storeMetadata.changelogPartition != null && storeMetadata.offset != null) {
if (storeMetadata.changelogPartition != null) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we still want to return the changelog -> offset entry even if it is null (indicating the current position unknown).


}

public void resetRestoredBatch() {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor cleanup on mock functions that are not used, ditto elsewhere.

@guozhangwang

Copy link
Copy Markdown
Owner Author

@cadonna @vvcephei @ableegoldman for reviews. Note that a large portion of the change is for the renaming part, the major one to review is on TaskStateManager/Test which is around 450 loc..

@guozhangwang guozhangwang force-pushed the K9113-unit-tests-store-changelog branch from 436efb6 to ec531d7 Compare January 11, 2020 02:44
@guozhangwang

Copy link
Copy Markdown
Owner Author

@cadonna Git messed with the renaming to delete / create files, so to help reviews I've reverted the renaming and will do in a later PR. Now the diff is less scary :)

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left a few comments, will read again once StoreChangelogReaderTest is fixed

* Register a state store for restoration.
*
* @param partition the changelog topic partition to restore
* @param partition the state store's shcangelog partition for restoring

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: spelling

final StateStoreMetadata store = mustFindStore(changelogPartition);
StateStoreMetadata storeMetadata(final TopicPartition partition) {
for (final StateStoreMetadata storeMetadata : stores.values()) {
if (storeMetadata.changelogPartition != null && storeMetadata.changelogPartition.equals(partition)) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: Could we just do partition.equals(storeMetadata.changelogPartition)?


private StateStoreMetadata findStore(final TopicPartition changelogPartition) {
final List<StateStoreMetadata> found = stores.values().stream()
.filter(metadata -> metadata.changelogPartition != null &&

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, do we also have a concern that passed in changelogPartition could be null?

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

// TODO K9113: fix tests

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: remove this

}

private ProcessorStateManager getStandByStateManager(final TaskId taskId) {
private ProcessorStateManager getStandByStateManager() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be merged with getActiveStateManager as getStateManager(AbstractTask.TaskType type)


assertThat(store.keys.size(), is(1));
assertTrue(store.keys.contains(key));
assertEquals(17, store.values.get(0).length);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: similarly, add a comment to explain 17

ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
ackedOffsets.put(persistentStorePartition, 123L);
ackedOffsets.put(nonPersistentStorePartition, 456L);
ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/otherTopic/nonRegisteredTopic


try {
stateMgr.initStoresFromCheckpointedOffsets();
fail("should have thrown procesor state exception when IO exception happens");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: spelling


try {
stateMgr.restore(storeMetadata, singletonList(consumerRecord));
fail("should have thrown procesor state exception when IO exception happens");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: same here

}

@Test
public void shouldFlushAllStoresEvenIfStoreThrowsException() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/shouldFlushAllStoresEvenIfStoreThrowsException/shouldFlushGoodStoresEvenSomeThrowsException

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit test coverage for StoreChangelogReader

subscriptions.position(entry.getKey(), newPosition);
}
}
entry.getValue().clear();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this bug while working on the unit tests: when a partition is paused, we should not clear its records since they would not be returned; we should only clear the corresponding records which are included in the results.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang There is already a PR for this issue (apache#7505) Can you maybe review and merge this PR? Thanks to @ableegoldman for pointing out the overlap.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Thus, the task raising this exception can be cleaned up and closed as "zombie".
* Indicates that one or more tasks got migrated to another thread.
*
* 1) if the task field is specified, then that single task should be cleaned up and closed as "zombie" while the

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the proposed exception semantics.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on when which version of TaskMigratedException should be used? ie, when is it possible for just one task to be a zombie while all the others can/should continue as normal

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is summarized in https://confluentinc.atlassian.net/browse/KSTREAMS-3302. Note it is just one proposal and we can debate whether we agree or disagree about that :)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense. I have some thoughts on the handling in that case but I'll save it for the sync

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in a follow-up PR, but just to be clear we should always treat a TaskMigratedException as applying to all tasks, and close them all as zombies. There's no possible way for one task to be a zombie while the others are able to continue processing

}
}

// this is an optimization: if there's no buffered records so far, then we can reuse

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to give up this optimization since the records returned from consumer is unmodifiable view, so we cannot call clear on it directly.

public void init(final ProcessorContext context,
final StateStore root) {
context.register(root, stateRestoreCallback);
if (simulateForwardOnFlush) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piggybacked minor cleanup as the simulateForwardOnFlush is not called anymore.

@guozhangwang guozhangwang changed the title KAFKA-9113: Unit test for ProcessorStateManager KAFKA-9113: Unit test for ProcessorStateManager and ChangelogReader Jan 14, 2020
@guozhangwang

Copy link
Copy Markdown
Owner Author

ping @abbccdda @ableegoldman @cadonna again for final reviews.

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a more thorough look at StoreChangelogReaderTest. Overall I think we are still missing unit test coverage for exception states, and would be good to either do a coverage for standby/active changelog in general tests, or just specify that in the meta comment saying this is a common test.


if (endOffset != null && committedOffset != null) {
if (changelogMetadata.restoreEndOffset != null)
throw new IllegalStateException("End offset for " + partition +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: this is not covered in unit test.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the places where IllegalStateException is thrown it is a place where we assert this should never happen (i.e. it did not depend on other modules) unless there's a bug, in which case we should always shout out and fail fast. So I think we do not need to have a unit test for such cases.

Other places of IllegalState is when the caller (e.g. ProcessorStateManager) had a bug like trying to register the same store twice, and only those cases that depend on other module bugs would have a test case.

final ChangelogMetadata changelogMetadata = new ChangelogMetadata(partition, stateManager);
final StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition);
if (storeMetadata == null) {
throw new IllegalStateException("Cannot find the corresponding state store metadata for changelog " +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: this is not covered in the unit test

final Long currentOffset = metadata.storeMetadata.offset();
if (endOffset == null) {
// end offset is not initialized meaning that it is from a standby task,
// this should never happen since we only call this function for active task in restoring phase

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: endOffset == null is not covered

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this should never happen (indicating a bug).

assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: I have seen multiple tests for completedChangelogs but no one adds multiple topic partitions. Could we do a test with some partitions completed while some are not.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there's shouldRestoreMultipleChangelogs

// TODO K9113: we need to consider how to handle InvalidOffsetException for consumer#poll / position
public class StoreChangelogReader implements ChangelogReader {

enum ChangelogState {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: just notice that although this enum only has 3 states, do we still want to build some state transition check for it?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state transition is simply as registered -> restoring -?-> completed, so I did not build it, but just to be safe I will add a transition for it.

@@ -364,22 +423,14 @@ public void restore() {
final ConsumerRecord<byte[], byte[]> record = iterator.next();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this iterator interface could be replaced with a for each loop

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

for (final ChangelogMetadata changelogMetadata: newPartitionsToRestore) {
final TopicPartition partition = changelogMetadata.changelogPartition;
final StateStoreMetadata storeMetadata = changelogMetadata.stateManager.storeMetadata(partition);
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: comment here for L651 for no unit test coverage

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

try {
restoreConsumer.unsubscribe();
} catch (KafkaException e) {
throw new StreamsException("Restore consumer get unexpected error unsubscribing", e);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: this case is not tested.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

assertEquals(mkSet(tp1, tp2), consumer.paused());

changelogReader.register(topicPartition, stateManager);
// transition to restore active is idempotent

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: what does this comment suggest? We are not calling transitToRestoreActive multiple times here

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the changelog reader is created it is always in restoreActive state already. I can add a few lines to make it more explicit.

expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore();
public void shouldThrowIfRestoreCallbackThrows() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for any task type neutral tests, I would feel more comfortable if we could allow testing for both active and standby state managers in two separate tests, or share the same skeleton somehow

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the test parameterized.

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the work!

@guozhangwang guozhangwang merged commit 7053099 into k9113-base Jan 16, 2020
guozhangwang added a commit to apache/kafka that referenced this pull request Feb 5, 2020
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:

Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5

Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4

Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7

Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9

Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
@guozhangwang guozhangwang deleted the K9113-unit-tests-store-changelog branch April 24, 2020 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants