KAFKA-9113: StreamTask unit test#9
Conversation
| } else if (oldState == SUSPENDED && (newState == RUNNING || newState == CLOSING)) { | ||
| } else if (oldState == CLOSING && (newState == CLOSING || newState == CLOSED)) { | ||
|
|
||
| if (oldState.isValidTransition(newState)) { |
There was a problem hiding this comment.
Further simplified this transition -- suspended must first transit to restoring and then running.
| public void close() { | ||
| clear(); | ||
| partitionQueues.clear(); | ||
| streamTime = RecordQueue.UNKNOWN; |
There was a problem hiding this comment.
@mjsax this is to skip resetting the partition time so that we do not need to re-initialize metadata upon resuming again.
| @@ -1,33 +0,0 @@ | |||
| /* | |||
| * Licensed to the Apache Software Foundation (ASF) under one or more | |||
There was a problem hiding this comment.
We do not need this class anymore since it is now merged into TaskMigratedException.
| try { | ||
| TaskUtils.closeStateManager(log, logPrefix, stateMgr, stateDirectory, id); | ||
| } catch (final RuntimeException error) { | ||
| if (clean) { |
There was a problem hiding this comment.
This is merged into TaskUtils.closeStateManager
| } | ||
| } | ||
|
|
||
| private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) { |
There was a problem hiding this comment.
Make these private since in tests we only call initializeIfNeeded now.
| return true; | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
@vvcephei the diff looks a bit over since there are some function reordering, but the real changes are only in suspend / resume / commit / close branching on the state.
| initializeMetadata(); | ||
| transitionTo(State.RUNNING); | ||
| // just re-initilaize the topology is sufficient | ||
| initializeTopology(); |
There was a problem hiding this comment.
This is a discovered bug-fix: in resumption we should re-initialize the topology: previously it is called in AssignedTasks but after the consolidation it was not called anymore.
| */ | ||
| void handleRevocation(final Collection<TopicPartition> revokedPartitions) { | ||
| final Set<TaskId> revokedTasks = new HashSet<>(); | ||
| final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions); |
There was a problem hiding this comment.
This is to make the check stricter.
| assertThat(task00.state(), is(Task.State.RUNNING)); | ||
|
|
||
| assertThrows(StreamsException.class, () -> taskManager.handleRevocation(taskId00Partitions)); | ||
| // TODO K9113: is it safe for the task to still be RUNNING, or should we trap it in SUSPENDING or something? |
There was a problem hiding this comment.
I think it is safe to still be RUNNING, since when this happens the thread would be notified and be closing (and hence closing all tasks).
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Completed task.suspend / resume / close branched on state.
Completed StreamTask unit tests.
Committer Checklist (excluded from commit message)