Skip to content

KAFKA-9113: StreamTask unit test#9

Merged
guozhangwang merged 7 commits into
k9113-basefrom
K9113-stream-task-test
Jan 28, 2020
Merged

KAFKA-9113: StreamTask unit test#9
guozhangwang merged 7 commits into
k9113-basefrom
K9113-stream-task-test

Conversation

@guozhangwang

Copy link
Copy Markdown
Owner

Completed task.suspend / resume / close branched on state.

Completed StreamTask unit tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei @mjsax @abbccdda for reviews.

} else if (oldState == SUSPENDED && (newState == RUNNING || newState == CLOSING)) {
} else if (oldState == CLOSING && (newState == CLOSING || newState == CLOSED)) {

if (oldState.isValidTransition(newState)) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further simplified this transition -- suspended must first transit to restoring and then running.

public void close() {
clear();
partitionQueues.clear();
streamTime = RecordQueue.UNKNOWN;

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax this is to skip resetting the partition time so that we do not need to re-initialize metadata upon resuming again.

@@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need this class anymore since it is now merged into TaskMigratedException.

try {
TaskUtils.closeStateManager(log, logPrefix, stateMgr, stateDirectory, id);
} catch (final RuntimeException error) {
if (clean) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is merged into TaskUtils.closeStateManager

}
}

private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these private since in tests we only call initializeIfNeeded now.

return true;
}

/**

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei the diff looks a bit over since there are some function reordering, but the real changes are only in suspend / resume / commit / close branching on the state.

initializeMetadata();
transitionTo(State.RUNNING);
// just re-initilaize the topology is sufficient
initializeTopology();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a discovered bug-fix: in resumption we should re-initialize the topology: previously it is called in AssignedTasks but after the consolidation it was not called anymore.

*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TaskId> revokedTasks = new HashSet<>();
final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make the check stricter.

assertThat(task00.state(), is(Task.State.RUNNING));

assertThrows(StreamsException.class, () -> taskManager.handleRevocation(taskId00Partitions));
// TODO K9113: is it safe for the task to still be RUNNING, or should we trap it in SUSPENDING or something?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safe to still be RUNNING, since when this happens the thread would be notified and be closing (and hence closing all tasks).

@guozhangwang guozhangwang merged commit 754275d into k9113-base Jan 28, 2020
guozhangwang added a commit to apache/kafka that referenced this pull request Feb 5, 2020
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:

Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5

Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4

Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7

Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9

Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
@guozhangwang guozhangwang deleted the K9113-stream-task-test branch April 24, 2020 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant