KAFKA-9113: P4, Refactor Task Lifecycle#6
Conversation
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @vvcephei ! I continued work on standby task based on your branch, and also briefly went through your PR and left some high-level comments. Most of the changes around StandbyTask are here: https://github.com/guozhangwang/kafka/pull/7/files#diff-7012e86d489a291404d83f961ad40217
| CREATED, RESTORING, RUNNING, REVOKED, CLOSED; | ||
|
|
||
|
|
||
| static void validateTransition(final State oldState, final State newState) { |
There was a problem hiding this comment.
Why do we want running -> running, restoring -> restoring etc? Is there a specific scenario that we'd like to cover?
| * </pre> | ||
| */ | ||
| @Override | ||
| public void suspend() { |
There was a problem hiding this comment.
Since StandbyTask do not have suspension / resumption since it should only have created / restoring / done states, I'd suggest we just remove it from Task interface and only have it in StreamTask.
There was a problem hiding this comment.
I see. I was thinking that the only place we would suspend is in listener#onPartitionsRevoked where we only suspend active tasks, and similarly in assignor#onAssignment we only resume active tasks in "suspended" state, but I can buy the argument of easy testing, will make a thorough pass on TaskManager.
| * @throws StreamsException if the store's change log does not contain the partition | ||
| */ | ||
| boolean initializeStateStores(); | ||
| void initializeStateStores(); |
There was a problem hiding this comment.
I think we can remove initializeStateStores / initializeTopology / initializeMetadata with transitionTo (the initializeIfNeeded can also be private):
- for for both active and standby transitionTo(Created -> Restoring) always register stores; for active task we also initialize metadata.
- for active TransitionTo(Restoring -> Running) we initialize the topology.
- for standby there's no Transition to Running / Suspended, if it ever gets call we throw IllegalState.
guozhangwang
left a comment
There was a problem hiding this comment.
@vvcephei I pushed a commit to cleanup the StandbyTaskTest, and left some more comments.
|
|
||
| // no topology needs initialized, we can transit to RUNNING | ||
| // right after registered the stores | ||
| transitionTo(State.RUNNING); |
There was a problem hiding this comment.
Right now we CREATED -> RUNNING is not valid as we expect it to always first transit to RESTORING. If we do not want to make task-specific transition rules (I'd prefer that) I suggest we make standby to be in RESTORING as its normal processing state.
There was a problem hiding this comment.
We could, but it seems to muddy the waters. It seems easier to think of "restoring" as some up-front setup, while "running" is the steady state. Since standbys don't need any up-front setup, it seems more straightforward to skip over restoring than to skip over running.
| @Override | ||
| public void close(final boolean clean) throws ProcessorStateException { | ||
| ProcessorStateException firstException = null; | ||
| public void close() throws ProcessorStateException { |
There was a problem hiding this comment.
Not a comment: the clean flag is not used inside state store anymore so I've removed it.
|
|
||
| void commit(); | ||
|
|
||
| void suspend(); |
There was a problem hiding this comment.
How about removing suspend / resume from Task since they are only for StreamTask?
There was a problem hiding this comment.
I added it as part of an effort to make the task management in general ignorant of the specific type of task. E.g., when the TaskManager calls this method, it's because it entered a rebalance and therefore wants to suspend all processing work. StandbyTasks know that they don't have any processing work, so they can implement this as a no-op, but the TaskManager doesn't need to worry about whether different kinds of tasks have processing work. It can just tell all tasks to suspend processing. Standby tasks can say (completely truthfully) "Ok, I've suspended all active processing (by doing nothing, since I never had any to begin with".
There was a problem hiding this comment.
Okay, but in TaskManager I saw you still specifically loop over active tasks first then standby tasks :)
There was a problem hiding this comment.
Aha! That was to invoke the active or standby task creator, I think... Of course we have to know what kind of task we need to construct so we can call the right constructor.
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass on the current PR (non-testing part only).
|
|
||
| public static class InternalConfig { | ||
| public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; | ||
| public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__"; |
There was a problem hiding this comment.
Could you elaborate a bit on why we need to pass this to consumer as well?
There was a problem hiding this comment.
We used to abuse the TaskManager to also be a "container" for the StreamsMetadataState object. But its purpose is unrelated, so I broke the dependency and just inject both classes into the StreamsPartitionAssignor.
| } | ||
| } | ||
|
|
||
| void addSubscribedTopics(final List<TopicPartition> partitions, final String logPrefix) { |
There was a problem hiding this comment.
Where is this function used?
There was a problem hiding this comment.
at the end of "handleAssignment" in the TaskManager. This whole function used to be in the TaskManager.
There was a problem hiding this comment.
Got it, maybe rename to addSubscribedTopicsFromAssignment, and inline updateSubscribedTopics? Just a nit.
| final Exception exception = firstException.get(); | ||
| if (exception != null) { | ||
| throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception); | ||
| for (final TaskId taskId : revokedTasks) { |
There was a problem hiding this comment.
nit: maybe add a check that these tasks are actives only? Also do we guarantee that the task-manager would not host both active and standby of the same task-id?
There was a problem hiding this comment.
I was uneasy about that early on, but it should be impossible for a single instance of KaflaStreams, let alone one of its threads, to host more than one copy of a single task, active or otherwise.
| return active.previousRunningTaskIds(); | ||
| return tasks.values() | ||
| .stream() | ||
| .filter(t -> t.isActive() && t.state() == Task.State.SUSPENDED) |
There was a problem hiding this comment.
Is that right prev-running == current suspended? Just checking.
guozhangwang
left a comment
There was a problem hiding this comment.
Pushed a commit that fixed a bunch of bugs I found while debugging integration tests.
@vvcephei could you also add some unit test coverage so that we can capture those cases earlier?
| log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode()); | ||
| streamThread.shutdown(); | ||
| } else { | ||
| taskManager.handleRebalanceComplete(); |
There was a problem hiding this comment.
The ordering within consumer coordinator is the following:
- assignor#onAssignment
- the assignment gets updated internally
- listener#onPartitionsAssigned
So we have to call consumer.pause(all) it inside 3) since otherwise the assignment() is still stale.
| changelogOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset); | ||
| // for changelog whose offset is unknown, use 0L indicating earliest offset | ||
| // otherwise return the current offset + 1 as the next offset to fetch | ||
| changelogOffsets.put( |
There was a problem hiding this comment.
note the offset inside state store is the last record's offset, but to return changelogOffsets used for comparing with other end offsets etc we need to return the next offset to process (and yes it is still vulnerable to txn marker cases but that's fine for lag info I think).
| } else { | ||
| throw new IllegalStateException("Expected only to resume from SUSPENDED state, but was " + state()); | ||
| switch (state()) { | ||
| case RUNNING: |
There was a problem hiding this comment.
We may call assignor#onAssignment multiple times without onPartitionsRevoked, therefore not all existing active tasks are in SUSPENDED when we are handleAssignmenting. For those that are still in RUNNING / RESTORING we just do nothing.
| // if the state is still in PARTITION_ASSIGNED after the poll call | ||
| if (state == State.PARTITIONS_ASSIGNED) { | ||
| if (taskManager.initializeNewTasksAndCheckForCompletedRestoration()) { | ||
| if (taskManager.checkForCompletedRestoration()) { |
There was a problem hiding this comment.
Here I have to break the changelogReader#restore out of the TaskManager so that 1) we call it every iteration no matter if we are restoring active or updating standby; 2) we only need to initialize tasks once.
| if (committed > 0) { | ||
| final long commitLatency = advanceNowAndComputeLatency(); | ||
| commitSensor.record(commitLatency / (double) committed, now); | ||
| if (state == State.RUNNING) { |
There was a problem hiding this comment.
We should only process active tasks if we are in RUNNING state: note that there maybe some buffered records already even after a rebalance and so if we proceed we could mistakenly process them from the buffer even if we are not in RUNNING.
| logPrefix | ||
| ); | ||
|
|
||
| // initialize the created tasks |
There was a problem hiding this comment.
See other comment above: we move the initialization logic right after creation so that within the iteration we only check for completion.
| */ | ||
| public class StandbyTask extends AbstractTask { | ||
| public class StandbyTask extends AbstractTask implements Task { | ||
| private final TaskId id; |
There was a problem hiding this comment.
Why do we move TaskId from AbstractTask to StandbyTask and also StreamTask -- seem we are duplicating code -- every task has an ID? Similar for other class members? Why not following best practice of the OO model?
There was a problem hiding this comment.
Yeah I can move this to AbstractTask.
| private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, | ||
| final Map<TaskId, StandbyTask> standbyTasks) { | ||
| private void updateThreadMetadata(final Map<TaskId, Task> activeTasks, | ||
| final Map<TaskId, Task> standbyTasks) { |
There was a problem hiding this comment.
Because the task manager now maintains the active / standby tasks in a single collection so the exposed APIs only return a map of Tasks. Also for the caller there's no need to have the stronger types.
| } | ||
| }; | ||
|
|
||
| public abstract boolean hasBeenRunning(); |
There was a problem hiding this comment.
What is the semantic meaning of this method and why do we need it?
There was a problem hiding this comment.
The main purpose of this was to avoid calling closeTopology duplicated times if the state has already been transited to CLOSING -- now that we've had this branching logic on the caller (StreamTask) I think we can remove this part.
| } else if (oldState == RESTORING && (newState == RUNNING || newState == SUSPENDED || newState == CLOSING)) { | ||
| } else if (oldState == RUNNING && (newState == SUSPENDED || newState == CLOSING)) { | ||
| } else if (oldState == SUSPENDED && (newState == RUNNING || newState == CLOSING)) { | ||
| } else if (oldState == CLOSING && (newState == CLOSING || newState == CLOSED)) { |
There was a problem hiding this comment.
This is a quite weird way to express a logical condition... Why not just use || ? Or even better, why not encode valid transitions in an enum type similar to existing code -> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L137-L143
There was a problem hiding this comment.
I've moved it to Task following the similar pattern of KafkaStreams, StreamThread. LMK WDYT.
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
@guozhangwang : I'm in the middle of this implementation, but here's what I have so far, if you're curious.
Committer Checklist (excluded from commit message)