Skip to content

KAFKA-9113: P4, Refactor Task Lifecycle#6

Merged
guozhangwang merged 67 commits into
guozhangwang:k9113-basefrom
vvcephei:refactor-task-lifecycle
Jan 24, 2020
Merged

KAFKA-9113: P4, Refactor Task Lifecycle#6
guozhangwang merged 67 commits into
guozhangwang:k9113-basefrom
vvcephei:refactor-task-lifecycle

Conversation

@vvcephei

Copy link
Copy Markdown

@guozhangwang : I'm in the middle of this implementation, but here's what I have so far, if you're curious.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vvcephei ! I continued work on standby task based on your branch, and also briefly went through your PR and left some high-level comments. Most of the changes around StandbyTask are here: https://github.com/guozhangwang/kafka/pull/7/files#diff-7012e86d489a291404d83f961ad40217

CREATED, RESTORING, RUNNING, REVOKED, CLOSED;


static void validateTransition(final State oldState, final State newState) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want running -> running, restoring -> restoring etc? Is there a specific scenario that we'd like to cover?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed them.

* </pre>
*/
@Override
public void suspend() {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since StandbyTask do not have suspension / resumption since it should only have created / restoring / done states, I'd suggest we just remove it from Task interface and only have it in StreamTask.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I was thinking that the only place we would suspend is in listener#onPartitionsRevoked where we only suspend active tasks, and similarly in assignor#onAssignment we only resume active tasks in "suspended" state, but I can buy the argument of easy testing, will make a thorough pass on TaskManager.

* @throws StreamsException if the store's change log does not contain the partition
*/
boolean initializeStateStores();
void initializeStateStores();

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove initializeStateStores / initializeTopology / initializeMetadata with transitionTo (the initializeIfNeeded can also be private):

  1. for for both active and standby transitionTo(Created -> Restoring) always register stores; for active task we also initialize metadata.
  2. for active TransitionTo(Restoring -> Running) we initialize the topology.
  3. for standby there's no Transition to Running / Suspended, if it ever gets call we throw IllegalState.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I pushed a commit to cleanup the StandbyTaskTest, and left some more comments.


// no topology needs initialized, we can transit to RUNNING
// right after registered the stores
transitionTo(State.RUNNING);

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we CREATED -> RUNNING is not valid as we expect it to always first transit to RESTORING. If we do not want to make task-specific transition rules (I'd prefer that) I suggest we make standby to be in RESTORING as its normal processing state.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it seems to muddy the waters. It seems easier to think of "restoring" as some up-front setup, while "running" is the steady state. Since standbys don't need any up-front setup, it seems more straightforward to skip over restoring than to skip over running.

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java Outdated
@Override
public void close(final boolean clean) throws ProcessorStateException {
ProcessorStateException firstException = null;
public void close() throws ProcessorStateException {

@guozhangwang guozhangwang Jan 21, 2020

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment: the clean flag is not used inside state store anymore so I've removed it.

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java Outdated

void commit();

void suspend();

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about removing suspend / resume from Task since they are only for StreamTask?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it as part of an effort to make the task management in general ignorant of the specific type of task. E.g., when the TaskManager calls this method, it's because it entered a rebalance and therefore wants to suspend all processing work. StandbyTasks know that they don't have any processing work, so they can implement this as a no-op, but the TaskManager doesn't need to worry about whether different kinds of tasks have processing work. It can just tell all tasks to suspend processing. Standby tasks can say (completely truthfully) "Ok, I've suspended all active processing (by doing nothing, since I never had any to begin with".

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but in TaskManager I saw you still specifically loop over active tasks first then standby tasks :)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! That was to invoke the active or standby task creator, I think... Of course we have to know what kind of task we need to construct so we can call the right constructor.

Comment thread streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java Outdated

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on the current PR (non-testing part only).

Comment thread clients/src/main/java/org/apache/kafka/common/utils/Utils.java Outdated

public static class InternalConfig {
public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit on why we need to pass this to consumer as well?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to abuse the TaskManager to also be a "container" for the StreamsMetadataState object. But its purpose is unrelated, so I broke the dependency and just inject both classes into the StreamsPartitionAssignor.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

}
}

void addSubscribedTopics(final List<TopicPartition> partitions, final String logPrefix) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function used?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the end of "handleAssignment" in the TaskManager. This whole function used to be in the TaskManager.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, maybe rename to addSubscribedTopicsFromAssignment, and inline updateSubscribedTopics? Just a nit.

final Exception exception = firstException.get();
if (exception != null) {
throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception);
for (final TaskId taskId : revokedTasks) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a check that these tasks are actives only? Also do we guarantee that the task-manager would not host both active and standby of the same task-id?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was uneasy about that early on, but it should be impossible for a single instance of KaflaStreams, let alone one of its threads, to host more than one copy of a single task, active or otherwise.

return active.previousRunningTaskIds();
return tasks.values()
.stream()
.filter(t -> t.isActive() && t.state() == Task.State.SUSPENDED)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that right prev-running == current suspended? Just checking.

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a commit that fixed a bunch of bugs I found while debugging integration tests.

@vvcephei could you also add some unit test coverage so that we can capture those cases earlier?

log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
streamThread.shutdown();
} else {
taskManager.handleRebalanceComplete();

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering within consumer coordinator is the following:

  1. assignor#onAssignment
  2. the assignment gets updated internally
  3. listener#onPartitionsAssigned

So we have to call consumer.pause(all) it inside 3) since otherwise the assignment() is still stale.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh boy...

changelogOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset);
// for changelog whose offset is unknown, use 0L indicating earliest offset
// otherwise return the current offset + 1 as the next offset to fetch
changelogOffsets.put(

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note the offset inside state store is the last record's offset, but to return changelogOffsets used for comparing with other end offsets etc we need to return the next offset to process (and yes it is still vulnerable to txn marker cases but that's fine for lag info I think).

} else {
throw new IllegalStateException("Expected only to resume from SUSPENDED state, but was " + state());
switch (state()) {
case RUNNING:

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may call assignor#onAssignment multiple times without onPartitionsRevoked, therefore not all existing active tasks are in SUSPENDED when we are handleAssignmenting. For those that are still in RUNNING / RESTORING we just do nothing.

// if the state is still in PARTITION_ASSIGNED after the poll call
if (state == State.PARTITIONS_ASSIGNED) {
if (taskManager.initializeNewTasksAndCheckForCompletedRestoration()) {
if (taskManager.checkForCompletedRestoration()) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I have to break the changelogReader#restore out of the TaskManager so that 1) we call it every iteration no matter if we are restoring active or updating standby; 2) we only need to initialize tasks once.

if (committed > 0) {
final long commitLatency = advanceNowAndComputeLatency();
commitSensor.record(commitLatency / (double) committed, now);
if (state == State.RUNNING) {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only process active tasks if we are in RUNNING state: note that there maybe some buffered records already even after a rebalance and so if we proceed we could mistakenly process them from the buffer even if we are not in RUNNING.

logPrefix
);

// initialize the created tasks

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment above: we move the initialization logic right after creation so that within the iteration we only check for completion.

@guozhangwang guozhangwang changed the title [WIP] Refactor task lifecycle KAFKA-9113: P4, Refactor Task Lifecycle Jan 24, 2020
@guozhangwang guozhangwang merged commit b3c8148 into guozhangwang:k9113-base Jan 24, 2020
@vvcephei vvcephei deleted the refactor-task-lifecycle branch January 25, 2020 00:52
@vvcephei vvcephei restored the refactor-task-lifecycle branch January 27, 2020 00:23
*/
public class StandbyTask extends AbstractTask {
public class StandbyTask extends AbstractTask implements Task {
private final TaskId id;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move TaskId from AbstractTask to StandbyTask and also StreamTask -- seem we are duplicating code -- every task has an ID? Similar for other class members? Why not following best practice of the OO model?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can move this to AbstractTask.

private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks,
final Map<TaskId, StandbyTask> standbyTasks) {
private void updateThreadMetadata(final Map<TaskId, Task> activeTasks,
final Map<TaskId, Task> standbyTasks) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we weaken the types here?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the task manager now maintains the active / standby tasks in a single collection so the exposed APIs only return a map of Tasks. Also for the caller there's no need to have the stronger types.

}
};

public abstract boolean hasBeenRunning();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantic meaning of this method and why do we need it?

@guozhangwang guozhangwang Feb 3, 2020

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of this was to avoid calling closeTopology duplicated times if the state has already been transited to CLOSING -- now that we've had this branching logic on the caller (StreamTask) I think we can remove this part.

} else if (oldState == RESTORING && (newState == RUNNING || newState == SUSPENDED || newState == CLOSING)) {
} else if (oldState == RUNNING && (newState == SUSPENDED || newState == CLOSING)) {
} else if (oldState == SUSPENDED && (newState == RUNNING || newState == CLOSING)) {
} else if (oldState == CLOSING && (newState == CLOSING || newState == CLOSED)) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quite weird way to express a logical condition... Why not just use || ? Or even better, why not encode valid transitions in an enum type similar to existing code -> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L137-L143

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it to Task following the similar pattern of KafkaStreams, StreamThread. LMK WDYT.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

guozhangwang added a commit to apache/kafka that referenced this pull request Feb 5, 2020
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:

Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5

Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4

Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7

Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9

Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants