Skip to content

KAFKA-9113, P3: ProcessorStateManager and ChangelogReader#3

Merged
guozhangwang merged 32 commits into
k9113-basefrom
K9113-state-manager-p3
Jan 9, 2020
Merged

KAFKA-9113, P3: ProcessorStateManager and ChangelogReader#3
guozhangwang merged 32 commits into
k9113-basefrom
K9113-state-manager-p3

Conversation

@guozhangwang

@guozhangwang guozhangwang commented Dec 31, 2019

Copy link
Copy Markdown
Owner

This PR mainly refactors ProcessorStateManager and ChangelogReader.

  1. ProcessorStateManager is the source of truth for the current offset for each state store, which is either the read offset during restoring, or the written offset during normal processing. When initializing, the offset maybe read from the checkpoint file, if there's none then it is initialized as null. When checkpointing, that offset would be written to the file.

The written offsets are passed in from the record collector (the checkpoint APIs are slightly modified).

The store manager is responsible for restore state stores, which is used for both updating standby tasks as well as restoring active tasks (see 2) below), and it also exposed some internal functions like changelogAsSource to ChangelogReader only.

  1. ChangelogReader is created one-per-thread and used for both standby tasks and restoring active tasks. The restore consumer is moved into the reader and should be abstracted from the task manager -- only the reader would manage the restore consumer for its assigned partitions, decide when to complete restoring a changelog partition etc -- it maintains the source of truth for restoration state. Also the standby tasks updating logic from the thread would be largely reduced and moved into the changelog reader as well.

Since it is shared for multiple tasks and hence multiple state managers, the register(store) would take its corresponding state manager as well for restoring.

The reader keeps a map of the changelog metadata containing its current restored offset (read from the state manager above), end offsets and limit offsets, and most importantly its state:

  • registered: after register call, before the end/limit-offset and current offsets are initialized (current offsets are kept at the state manager, so we do not duplicate the source-of-truth in store changeloger).
  • restoring: after the end/limit-offset and current offsets are initialized, it would be add to restore consumer so that next poll call can fetch it.
  • completed: when the end-offset have reached. Only restoring tasks can transit to this state, and standby tasks would always be in "restoring" after transit from "registered".

The StateRestorer is then removed as it is now replaced with the bookkeeping ChangelogMetadata.

One critical change is that it no longer unsubscribe / reassign partitions, instead it would be assigned with both partitions for standby tasks / active tasks, but it would pause / resume. More specifically, the reader would only be in a state of updating standby tasks or restoring active tasks at a given time, in that state other partitions would be paused.

The following is the interaction between thread and its changelog reader (cc @vvcephei this is leaving to you for thread / task manager :)

  • after a rebalance, the thread may register new tasks and their state store changelogs to its changelog reader, which would then add them as registered. Then the thread would be transit to restoring state, and it should call changelogReader.transitToRestoreActive, which would pause all its partitions from standby tasks and can move on to restore active tasks.

  • during the restoration we may use limit offset for source changelog topics, and internally we pause / resume when the limit offsets are reached / updated.

  • the changelog reader expose completedChangelogs to the caller thread to determine when all the active tasks have completed, and when it decides to transit to running, it would then call changelogReader.transitToUpdateStandby which would resume the standby partitions.

This PR also makes extracting the restoration logic out of the main stream thread much simpler since all we need to do is to move the ChangelogReader into the other restoring thread(s). cc @cadonna @bbejeck

Other minor changes would be pointed out in code as comments.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang changed the base branch from trunk to k9113-base January 3, 2020 05:52

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a first pass for third PR, it only compiles (tests would fail) but the main logic change is ready for reviews.

@vvcephei @ableegoldman @cadonna @abbccdda please take a look as it relates to some bug fixes and also some further works like KIP-447 / restoration optimization.

}
}

void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re-init logic inside task / state manger is when a restoring process hit an OutOfRangeException from consumer, this happens when e.g. the changelog was truncated at the time, and we just want to re-init the restoration inside the task at the lower level.

In the past we have seen quite lot of messiness and bugs because of this logic, and I've decided to remove it and instead push it to higher-level thread to decide what to do: e.g. it can close it as unclean and re-create it again, in which case the initialization / restoration would be executed. I think it is much cleaner in this way.

The thread-level change would not be in this PR so we just remove it here.

}
}
}
if (allTasksRunning()) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we push the management of changelogs to ChangelogReader the task manager should be totally agnostic to this metadata, and also the bookkeeping restoredPartitions etc could be removed -- just make a tiny change here so I can leave a comment to @vvcephei

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

* @param stateManager the state manager used for restoring (one per task)
*/
void register(final StateRestorer restorer);
void register(final TopicPartition partition, final ProcessorStateManager stateManager);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the description, we need to pass in the state manager as well since changelog reader is per-thread and hence could take on multiple tasks / state managers.

* @return all topic partitions that have been restored
*/
Collection<TopicPartition> restore(final RestoringTasks active);
void restore();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not return the partitions that have been restored, instead it is queryable from completedChangelogs below, which is cleaner.

* Update offset limit of a given changelog partition
*/
Map<TopicPartition, Long> restoredOffsets();
void updateLimitOffsets();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to be called by the task manager when it wants to update the limit offsets: here some optimization can be considered e.g. we know that limit offset may be updated when the thread is about to commit, so maybe other thread hosting the active tasks of its standbys may also be committing.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadocs imply this should accept a changelog partition, is it missing an argument or are the docs incorrect?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs are incorrect now: the partitions are managed inside the changelog reader, and when it is called the changelog reader knows which changelogs it maintains has a limit offset that can be updated -- i.e. the caller only need to consider the calling frequency so to speak.

Thanks for the catch.

}
}

// TODO: we always try to restore as a batch when some records are accumulated, which may result in

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part related to our optimization for bulk loading e.g.: instead of always whatever you've polled from restore consumer, we can buffer more data and then write them as larger batches. cc @cadonna

}

@Override
public void remove(final List<TopicPartition> revokedChangelogs) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a task is removed either gracefully or due to errors (unclean close), this function should be called cc @vvcephei

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: should this also be triggered in onPartitionsLost? Then revokedChanglogs is probably not a good name.

}

@Override
public void clear() {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the thread is being closed this should be called (already included in this PR, just FYI).

}
}

private void initializeCommittedOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into the ChangelogReader.

}

standbyRecords = remainingStandbyRecords;
changelogReader.restore();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced since it is now in changelog reader.

@ableegoldman ableegoldman left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't finished going over the entire StoreChangelogReader, but in general this looks good!

void allowUpdateOfOffsetLimit() {
updateOffsetLimits = true;
}
} No newline at end of file

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add newline to end of file

}
public void update() {
// we use the changelog reader to do the actual restoration work,
// and here we only need to update the offset limits when necessary

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be implemented or is that planned for a later PR?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is going to be in a later PR when we touch on StandbyTask.

@@ -1038,85 +1044,11 @@ boolean maybeCommit() {
private void maybeUpdateStandbyTasks() {
if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) {
if (processStandbyRecords) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I brought this up once before, but it seems odd to me that we only process standby records after an active task commit (ie processStandbyRecords is set to true). If there's a good reason for it then we should make it clear in the code/comments. Otherwise I think we should aim to eliminate special handling of standbys like this, especially with KIP-441 and KIP-535 coming up (not necessarily in this PR, cc/ @vvcephei )

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original motivation I think, is a very "weird" heuristic, the current thread host a standby task A while its corresponding active task A is likely hosted by another thread, if that thread does not process A and hence write more records to the changelog, then we would not get any for A to update -- furthermore, if the changelog is a source topic, if the other thread hosting A does not commit, then we cannot update A` any further because of limit offset.

And when this thread is committing for another active task, say B, then it is "likely" that the other thread has committed for its active task A too, so it is a good time to try to update A`.

Like I said, this is not a very good one as an after-though, when we (@vvcephei) work on the thread class itself, we can discuss whether to keep this or not.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But processStandbyRecords is a thread-level variable, and different threads aren't really synchronized on when they commit within an instance much less across instances (and the threads would have to be from different instances since we would never host standby A and active A together). Granted the commit interval should be the same, but won't the actual commit times drift out of sync?
Also, even if this heuristic does "work" and saves us from trying to process a standby when it's already caught up and the active hasn't yet committed new records, if a standby is at all behind the active task it seems like this "optimization" will only make it fall further behind. But, we can revisit this during the thread class refactoring

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree with you, I feel this heuristic does not really worth it and there maybe better ones that we can use to replace it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah. That's a pretty subtle heuristic. It would definitely not hurt to clean it up, too.


// source of the truth of the current registered changelogs;
// NOTE a changelog would only be removed when its corresponding task
// is being removed from the task; otherwise it would stay in this map even after completed

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"its corresponding task is being removed from the task" --> "from the thread"?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

return false;
} else {
log.debug("Skip re-adding restorer for changelog {}", restorer.partition());
// note the end offset returned of the consumer is actually the last offset + 1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true with transaction markers? Also, the condition below seems backwards to me; if currentOffset + 1 is less than the endOffset doesn't that mean it hasn't finished restoring to the end?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The txn marker needs to be handled carefully indeed -- there were bugs incurred because of this.

For end-offsets returned from the consumer, I think it covers the txn markers already, but I'll double check to make sure.


needsInitializing.add(restorer.partition());
// Once some new tasks are created, we transit to restore them and pause on the existing standby tasks.
// NOTE: even if the newly created tasks do not need any restoring, we still first transit to this state and then

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great simplification: the Streams state, which is based on the task state transitions, should not depend or need to be handled differently based on characteristics of the tasks (eg stateful vs stateless, revoked while still restoring vs running, etc)

// immediately transit back -- there's no overhead of transiting back and forth but simplifies the logic a lot.
// TODO K9113: this function should be called by stream thread
public void transitToRestoreActive() {
if (state != ChangelogReaderState.STANDBY_UPDATING) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. It's hard to say without the full context of when/where/how this will be called, but why not make this idempotent and safe for the stream thread to simply call any time it is assigned new tasks? Otherwise the stream thread has to be very careful eg not to call this method if it happened to still have some tasks in restoration before the rebalance gave it new ones.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great point.

I thought that the new stream tasks should only be created / registered all together within the rebalance callback, and today they are indeed registered together in initializeNewTasks and that's why I decide to make the contract with the caller more strict.

But the scenario is that when a new rebalance happens, some tasks may still be restoring i.e. the thread / changelog reader is still in restoring-active mode, so this would break. I will update accordingly. Thanks!

@guozhangwang

Copy link
Copy Markdown
Owner Author

@ableegoldman I've addressed your comments in the latest commits, 1) I mark each changelog as active or standby, and make the transitToActive call idempotent by only pausing all partitions of standby, and 2) I fixed the previous issue where for active tasks with source changelogs, use committed offset as end offset, and also let all changelogs to transit to RESTORING first even if there's nothing to restore, and then check and transit to COMPLETED in a later call.

// we would start restoring from beginning and it does not end yet
return false;
} else {
// NOTE there are several corner cases that we need to consider:

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman @vvcephei @cadonna This was a tricky thing that bites us several times due to limit offset / txn markers etc. I hope y current proposal is clean and elegant enough to cover those cases. But please pay extra attention and help me thinking if there are corner cases that can still break it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this seems good to me. Certainly better than trying to keep all those arithmetic operations straight. As I read it, we're basically just waiting until we either fetch a record past the limit, or would fetch a record past the limit to know that we're done.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems also good to me. However, tests are a better way to verify correctness than reviewer.
A side question: A stand-by task does not have a endOffset because it has limitOffset, right? I am wondering if could simplify the code by using endOffset for limitOffset in stand-by tasks. It seems to me that for active tasks endOffset is not null and limitOffset is null while for stand-by tasks it is the inverse. But maybe, I am missing something.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For standby tasks the end-offset may be not-null if the changelog topic is piggy-backed with the source topic. It is a one-time thing admittedly (most cases the end-offset > committed-offset, and we only take care of log truncation case), but I'm a bit concerned that consolidating them into a single field may be less readable than keeping two fields -- I agree it is a bit verbose, but feel it worth it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For standby tasks the end-offset may be not-null

In the comments above ChangelogMetadata.restoreEndOffset it says " only for active restoring tasks (for standby changelog it is null)"
I think it's going to get quite confusing to use null/not-null of two fields to encode other information about the ChangelogMetadata's state. It seems like we've already potentially mixed up what it means, and when which one can/will be null.
Instead of consolidating them into a single field, could we just always initialize both and just store a flag isSource and maybe isActive/Standby? I think it will make the code a lot more clear, and help prevent future bugs. Someone who needs to make a change here should not need to know/remember to null check offsets, and correctly interpret what it means when one is/isn't null

@ableegoldman ableegoldman Jan 16, 2020

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another argument for initializing both is that we will likely need both for KIP-535 and/or KIP-441. I'm not sure how 535 is implementing the offset tracking, but I feel pretty strongly that we should maintain a localized single source of truth for all the offsets. Pinning one of them to be null just to encode some orthogonal information about what kind of task it is seems pretty inflexible cc @guozhangwang @vvcephei

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the consolidation, I've thought about that but there's one tricky thing (not a blocker, just bringing this up):

  1. end-offset is only set once and otherwise kept as null: for standby it is never set, for active it is set as min(end-offset, optional(committed-offset)) where committed-offset is required for source-changelog.

  2. limit-offset is set multiple times and otherwise kept as null: for source-changelog standby it is initialized to 0 to forbid the standby tasks to be updated at all before we get a first update on the value, for non-source-changelog and active it is always null.

So because for source-changelog standby tasks limit-offset is initialized as 0 while for active tasks the end-offset is initialized as null is the key difference here: of course I can also initialize the end-offset for active as 0 as well but that's not necessary and is a regression.

Regarding the offset tracking, it should be computed as end-offset minus current-offset for active and min(end-offset, optional(limit-offset) minus current-offset for standby, and current-offset source-of-truth is ProcessorStateManager#StoreMetadata#offset.

If we want to consolidate them, then one idea would be:

  1. for source-changelogs, always initialize limit-offset as 0, otherwise always initialize as INF; and then periodically update the limit-offset for source-changelogs.

  2. periodically update the log-end-offset for all changelogs: for active restoring we know the end-offset should not be updated at all.

  3. define end-offset = min(log-end-offset, limit-offset). For active and non-source-changelogs it is reduced to log-end-offset; however only check for completion for active.

  4. the for KIP-535 where we report a tuple of (current-offset, end-offset), current-offset always read from ProcessorStateManager, and end-offset read as:

  1. for active processing, just the last offset from RecordCollector.
  2. for active restoring, return the log-end-offset.
  3. for standby, return the end-offset calculated above.

To me it seems equally complex compared with the current proposal.

@ableegoldman ableegoldman Jan 17, 2020

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it was a mistake to drag the offset lag computation in to this. We definitely don't need to add any more complexity here :)
But let me put out a more alternative proposal. Note that for both active and standby tasks,
a) regular changelog --> limitOffset == endOffset and we restore up to limitOffset.
b) source changelog --> limitOffset <= endOffset and we restore up to limitOffset.
In both cases we actually don't care at all about the current endOffset and the only difference between the two cases is that endOffset happens to be increasing for source changelogs. But we don't care, we just need to know where to stop.
Now note, for either kind of changelog we just restore up to the limitOffset, where
a) active --> limitOffset is constant, we must be careful not to go past it and once curOffset == limitOffset we are done (permanently)
b) standby --> limitOffset increases, we must be careful not to go past it and once
curOffset == limitOffset
So again, there are more similarities than differences in how we handle them. Here's what I propose then:

  1. At no point is anything with "offset" in the name allowed to be null :) They also get initialized immediately and always represent the "correct" value, eg do not use a mock value "0L" to signal we should not restore standbys yet.
  2. There is no endOffset field, just limitOffset. Inside register we just initialize
    limitOffset = exists(committedOffset) ? min(endOffset, committedOffset) : endOffset;
    which is correct for both regular and source changelogs, and we never need to know which it is.
  3. During restoration:
    a. first if (ChangelogMetadata.stateManager.isStandby) ; updateLimitOffset()
    b. calculate and restore as many buffered records s.t. curOffset <= limitOffset holds
    c. if we reach the point where curOffset == limitOffset^, then for
    standbys: keep remaining buffered records,
    actives: toss any remaining buffered records and mark as completed
    ^ we would still need all the ugly handling around tx-marker edge cases unfortunately
  4. ???
  5. profit

Just to clarify, I don't think we need to block any of your PRs on this refactoring. I would be totally happy to continue merging as is, and go back to clean this up later. Of course who knows what this class will look like once John takes over the branch 😜

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman, I like your suggestion here, just to point it out that some complexity as I mentioned above does not reside in the logical workflow itself but around timeout exception handling etc, e.g. when querying committed offset / end offsets if we cannot get it within timeout, the in your step 2) the endOffset / committedOffset would not be complete to reliably use. Of course this can be addressed by not transiting the corresponding changelog from registered to initialized (and hence we do not need the sentinel null or 0 values), but at the same time we would also potentially trying to get their position to complete the initialization later. Another minor thing worth mentioning is that currently for regular standbys we do not need to update the limitOffset == endOffset regularly since we do not complete and we would never read beyond end-offset anyways --- following your idea we can still make the limitOffset == long.MAX maybe to bypass the unnecessary end-offset requests.

I will try out your ideas in follow-up PRs.

// TODO K9113: this function should be called by stream thread
public void transitToUpdateStandby() {
if (state != ChangelogReaderState.ACTIVE_RESTORING) {
throw new IllegalStateException("The changelog reader is not restoring actove tasks while trying to " +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalStateException("The changelog reader is not restoring actove tasks while trying to " +
throw new IllegalStateException("The changelog reader is not restoring active tasks while trying to " +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: also log an error here, maybe even include changelogs (or at least changelogs.keyset?)

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will include the changelogs in the exception message.

Since this is a fatal error, the illegal state exception would never be swallowed and will always be printed to std. So I feel logging another error would have duplicated entries on log files.


needsRestoring.removeAll(completedRestorers);
if (changelogs.putIfAbsent(partition, changelogMetadata) != null) {
throw new IllegalStateException("There is already a changelog registered for " + partition +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: log an error with the changelogState before throwing exception?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, see above. Will add changelogState.

// if it is in the active restoring mode, we immediately pause those standby changelogs
// here we just blindly pause all (including the existing and newly added)
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? Don't we pause all standby changelogs when we transit to ACTIVE_RESTORING?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the current call trace from rebalance callback, we will create new standby / active tasks at the end of a rebalance and then in state manager when we register the stores (see AssignedTasks#initializeNewTasks -> Task#initializeStateStores. So even when we are in ACTIVE_RESTORING state we can still register new standby changelogs.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can we just immediately pause new standby changelogs as they are added, since we will also go through the ACTIVE_RESTORING state after a rebalance IIUC. This keeps that logic encapsulated so we don't have to be considering when/where standbys might be added, or thinking about the standbys at all, while initializing the active changelogs

}
if (changelogMetadata.changelogState != ChangelogState.RESTORING) {
throw new IllegalStateException("The corresponding changelog restorer for " + partition +
" has already transited to completed state, this should not happen.");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check and/or log the actual state rather than assume it would be COMPLETED (not REGISTERED)?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are looping through restoringChangelogs() which should be those RESTORING ones only -- but note that even those paused changelogs are still in RESTORING.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if we assume these will all be RESTORING then why do the check at all -- if for some reason something went wrong and they are not RESTORING as expected, it could be because the state was either COMPLETED or REGISTERED, but we don't know which

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" has already transited to completed state, this should not happen.");
" has already transited to " + changelogMetadata.changelogState + " state, this should not happen.");

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think this check is more for discovering bugs and fail fast; I can update the error message as you suggested.

}

/**
* returns a flag whether offset limit caused not all records restored

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't return anything, need to update docs

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, good catch.

*/
private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
final TopicPartition partition = changelogMetadata.changelogPartition;
final int numRecords = IntStream.range(0, changelogMetadata.bufferedRecords.size())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like a simple for loop would be much easier to read, but if others don't agree I'm fine with this if we can maybe improve the variable name. eg numRecords -> numRecordsBeforeLastOffset (or something better but to that effect)

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part is that the underlying restoreCallback.restoreBatch inside stateManager#restore would try to restore all the passed in records in a list, so I have to pass in a sub-list if the buffered records exceed the end / limit offset. And also after that we want to clear the sub-list from the buffered records in a most efficient way (that's why I used an ArrayList).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman After some thoughts I refactored the code to record the "limit index" in the buffered records while checking null-keys and then use that limit to get the sublist directly.

Also did some minor optimizations to save array-shift / copy from the fetched records.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Sophie meant was this logic:

  1. generate an int range containing all the indices of buffered records
  2. get the record for each index, then get its offset, then keep that index only if the offset is after the limit or end
  3. then get the first index in the resulting collection (or if it's empty, then use the size of the collection)

Honestly, I agree that it's a bit... "what?"

the for loop seems easier to read; eg:

restoreRecords = new List...
for (record: bufferedRecords) {
  if (record.offset < end && record.offset < limit) {
    restoreRecords.add(record); 
  } else {
    break;
  }
}
...

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to avoid array-copy from buffered records to the records to restore list, but I agree it did look a bit weird with that optimization. Does the current refactored code look clearer to you folks?

"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition);
// for active changelogs, we need to find their end offset before transit to restoring
// if the changelog is on source topic, then its end offset should be the minimum of
// its committed offset and its end offset; for standby tasks that use source topics

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use the committed offset for both? When would the end offset ever be less than the committed?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the source topic is truncated from the tail after committing the position, then it is possible that commit offset > end offset. Since it is an external topic we cannot assume such scenario would never happen.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof.

@vvcephei vvcephei left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang , sorry it took me a while to work through this. IIRC, I only had one substantial cluster of comments.

Thanks!

}
}
}
if (allTasksRunning()) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

@Override
public StateStore getStateStore(final String name) {
final StateStore store = stateManager.getGlobalStore(name);
final StateStore store = stateManager.getStore(name);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\o/

public class ProcessorStateManager implements StateManager {

public class StateStoreMetadata {
public StateStore stateStore;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be final? It seems not to make sense to change this association once the object is constructed.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, also made other fields to be final except offset.

*/
public class ProcessorStateManager implements StateManager {

public class StateStoreMetadata {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be static

// corresponding changelog partition of the store
private TopicPartition changelogPartition;

private StateStoreMetadata(final StateStore stateStore) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this seems like golfing, but this utility class could be clearer with its intentions. There are two cases: either it's backed by a changelog or not. If not, all three of the (changelog, restoreCallback, and recordCoverter) will be null; if so, they'll all be non-null. Either way, they never change after construction. Likewise, the stateStore field also never changes after construction. Only the offset is mutable.

Can we clarify all this by creating two different constructors and making the 4 non-mutable fields final? If it helps clarify the usages, we could add a boolean method whether it is logged, so that callers don't have to do null checks.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

RuntimeException firstException = null;
// attempting to flush the stores
if (!registeredStores.isEmpty()) {
if (!stores.isEmpty()) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet!

// we would start restoring from beginning and it does not end yet
return false;
} else {
// NOTE there are several corner cases that we need to consider:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this seems good to me. Certainly better than trying to keep all those arithmetic operations straight. As I read it, we're basically just waiting until we either fetch a record past the limit, or would fetch a record past the limit to know that we're done.

*/
private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
final TopicPartition partition = changelogMetadata.changelogPartition;
final int numRecords = IntStream.range(0, changelogMetadata.bufferedRecords.size())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Sophie meant was this logic:

  1. generate an int range containing all the indices of buffered records
  2. get the record for each index, then get its offset, then keep that index only if the offset is after the limit or end
  3. then get the first index in the resulting collection (or if it's empty, then use the size of the collection)

Honestly, I agree that it's a bit... "what?"

the for loop seems easier to read; eg:

restoreRecords = new List...
for (record: bufferedRecords) {
  if (record.offset < end && record.offset < limit) {
    restoreRecords.add(record); 
  } else {
    break;
  }
}
...

"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition);
// for active changelogs, we need to find their end offset before transit to restoring
// if the changelog is on source topic, then its end offset should be the minimum of
// its committed offset and its end offset; for standby tasks that use source topics

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof.

@@ -1038,85 +1044,11 @@ boolean maybeCommit() {
private void maybeUpdateStandbyTasks() {
if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) {
if (processStandbyRecords) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah. That's a pretty subtle heuristic. It would definitely not hurt to clean it up, too.

@guozhangwang

Copy link
Copy Markdown
Owner Author

@vvcephei @ableegoldman The PR has been updated per your comments. It compiles (unit tests do not though) now.

@guozhangwang guozhangwang changed the base branch from k9113-base to K9113-record-collector January 8, 2020 19:37
@guozhangwang guozhangwang changed the base branch from K9113-record-collector to k9113-base January 8, 2020 19:37

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work! The overall change is much simplified, with much more clear ownership of each class.

);
}
log.trace("Initializing state stores");
log.debug("Acquired state directory lock");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: what's the gain for changing this to debug level?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to have some issues around locking ordering with multi-threading within a JVM in the past, and even now I'm not sure if we've cleared all the bugs, so I'm promoting it for debuggibility just in case.

}

void clearCheckpoints() throws ProcessorStateException {
public void loadCheckpoint() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: should be package-private

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be called in different classes in John's PR so I leave it public. After we've done that part and realize all classes are still in a single package I will do it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you rename this method to initStoresFromCheckpointedOffsets() and delete the comment?

@Override
public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions,
final InternalProcessorContext processorContext) {
StateManagerUtil.reinitializeStateStoresForPartitions(log,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: we should also cleanup this function in StateManagerUtil

@guozhangwang guozhangwang Jan 8, 2020

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha you got me (I was hoping no on realized it so I can secretly keep it for a while still)! I left it intentionally to guide me how to implement the exception handling logic on the thread level just so that I do not need to check out trunk and compare :P If you insist I can remove it in this PR too.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I'm not insisting, help yourself :)


return global;
}
final boolean isGlobalStore = task.topology.globalStateStores().stream()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: is there a risk of NPE for store.name().equals?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store.name() should never be null, if it is the a NPE is valid to indicate some series bug.

// TODO: this map does not work with customized grouper where multiple partitions
// of the same topic can be assigned to the same task.
private final Map<String, TopicPartition> partitionForTopic;
// NOTE we assume the partition of the topic can always be inferred from the task id;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: this comment is a bit awkward as it is not associated with any function or parameter, move it to the appropriate place.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

for (final PartitionInfo partition : partitions) {
if (partition.partition() == topicPartition.partition()) {
return true;
if (storeIsPersistent) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why do we only care about persistent store?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure, it was there all the time and it seems used only in tests. When I add / cleanup unit tests I will revisit this.

}
}

// this is an optimization: if there's no buffered records so far, then we can reuse

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: in terms of the memory saving, we have nay lambda expressions to get a new hashMap out of a Map. Do we know if it is a deep-copy, or just a stream-in iterator for us to use?

@guozhangwang guozhangwang Jan 9, 2020

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption from the name stream() is that most impls should allow a in-place iterator.

Nevertheless, my intuition is that compared with the actual record list, any metadata like changelogs should be pretty short (a task with 10+ changelogs is a huge one), so the only one that worth optimizing is the data itself, i.e. here the records list.

// 2) if not all the buffered records have been applied, then it means we are restricted by the end offset,
// and the consumer's position is likely already ahead of that end offset. Then we just need to check
// the first record in the remaining buffer and see if that record is no smaller than the end offset.
if (metadata.bufferedRecords.isEmpty()) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the internal if-else block could be merged with external one.

}

log.trace("Added restorer for changelog {}", restorer.partition());
private boolean restoredToEnd(final ChangelogMetadata metadata) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to hasRestoredToEnd is more explicit


needsRestoring.removeAll(completedRestorers);
private ChangelogMetadata restoringChangelogByPartition(final TopicPartition partition) {
final ChangelogMetadata changelogMetadata = changelogs.get(partition);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm in favor of doing the null check in first time register

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bit orthogonal: here we poll a bunch of records organized by partitions, and we check that each partition is indeed in the registered changelogs, if the code is bug-free it should always be true.

@guozhangwang

Copy link
Copy Markdown
Owner Author

@abbccdda addressed your comments.

records.clear();
} else {
restorer.setEndingOffset(restoreOffset);
changelogMetadata.bufferedRecords.clear();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really true that Java won't optimize list.subList(0,list.size()).clear() to list.clear()?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my read that's the case. For list.subList.clear() the subList#removeRange would be triggered, which does the shifting; for list.clear() we just set all values to null without copying.

@vvcephei vvcephei left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; left a minor remark a little while ago about clearing the sublist vs. the list, but it's not a big deal.

}

final Consumer<byte[], byte[]> consumer = clientSupplier.getConsumer(consumerConfigs);
changelogReader.setMainConsumer(consumer);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can take a look at this later.

@cadonna cadonna left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for starting this huge refactoring, @guozhangwang

Here my feedback.

*/
public interface ChangelogReader {
/**
* Register a state store and it's partition for later restoration.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you correct this javadoc? You register a state manager and not a state store. Furthermore, there is a typo. It should be "its" and not "it's".

storeMetadata.recordConverter = converterForStore(store);

final RecordConverter recordConverter = converterForStore(store);
changelogReader.register(storePartition, this);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: I am wondering, if it would be cleaner to first register all stores with the ProcessorStateManager and then to register the ProcessorStateManager with the ChangelogReader instead of registering each state with both. The advantage would be that ProcessorStateManager would not depend on ChangelogReader anymore.

* The manager is also responsible for restoring state stores via their registered restore callback,
* which is used for both updating standby tasks as well as restoring active tasks.
*/
public class ProcessorStateManager implements StateManager {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you rename this class to TaskStateManager or similar? As far as I understand, this class manages all state stores of a task.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: Q: I am wondering, if it would be cleaner: we do not register all state stores within a state manager to the changelog reader, but only those log-enabled ones with a given changelog. We can, still, register a state manager into a change logger and let the state manager to return a subset of stores that are log-enabled, I just felt it is a bit larger scope and did not do this refactoring here.

Yes I can, how about doing the renaming in a separate PR? Github sometimes confuses with file renaming and translate it to a deletefile + createNewFile which could make reviewing much harder. I will do a renaming with no class changes at all just to ease reviews.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think reducing dependencies is always a good thing to do, because it simplifies refactoring and testing. I like the idea that the state manager returns only the log-enabled state stores. See my comment below regarding using two maps, one for log-enabled and one for log-disabled state stores.

// 3. when checkpointing with the given written offsets from record collector,
// update blindly with the given offset
//
// will be used by the changelog reader to determine if restoration is completed, hence public

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Remove (or correct) this comment because it lies.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();

private final List<TopicPartition> changelogPartitions = new ArrayList<>();
private final FixedOrderMap<String, StateStoreMetadata> stores = new FixedOrderMap<>();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Wouldn't it make sense to use two maps, one for log-enabled stores and one for log-disabled stores? If I look into the code, most loops over this map contain a filter or if-statement that checks whether the store it log-enabled. Having a dedicated map for log-enabled store would also facilitate my comment on line 238.

// we would start restoring from beginning and it does not end yet
return false;
} else {
// NOTE there are several corner cases that we need to consider:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems also good to me. However, tests are a better way to verify correctness than reviewer.
A side question: A stand-by task does not have a endOffset because it has limitOffset, right? I am wondering if could simplify the code by using endOffset for limitOffset in stand-by tasks. It seems to me that for active tasks endOffset is not null and limitOffset is null while for stand-by tasks it is the inverse. But maybe, I am missing something.

* corresponding state manager as well for restoring.
*/
@Override
public void register(final TopicPartition partition, final ProcessorStateManager stateManager) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment where this method is called in ProcessorStateManager.

public Map<TopicPartition, Long> checkpointed() {
updateCheckpointFileCache(emptyMap());
final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
// used by the changelog reader only

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: If this code is only used by the ChangelogReader, wouldn't it make sense to move this code over to the ChangelogReader? See also my comment on line 111 in ProcessorStateManager.

}

// used by the changelog reader only
void restore(final TopicPartition changelogPartition, final List<ConsumerRecord<byte[], byte[]>> restoreRecords) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Also here the question whether this code could be moved to StoreChangelogReader?

* The reader also maintains the source of truth for restoration state: only active tasks restoring changelog could
* be completed, while standby tasks updating changelog would always be in restoring state after being initialized.
*/
public class StoreChangelogReader implements ChangelogReader {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: I would unify the use of Store and State. Either you can rename StoreChangelogReader to StateChangelogReader or rename ProcessorStateManager to ProcessorStoreManager (see also my comment about renaming for ProcessorStoreManager). Alternatively, you could use StateStore for both.

…nals/ProcessorStateManager.java

Co-Authored-By: Bruno Cadonna <bruno@confluent.io>
@guozhangwang

Copy link
Copy Markdown
Owner Author

@cadonna I'm going to merge this PR to base branch to let @vvcephei able to take it from the base. Some of your comments are replied but not incorporated yet so feel free to continue the discussion here. If you feel strong about the suggestion of code refactoring and I'm convinced we can do that in the next PR.

final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
// 1) we keep adding partitions to restore consumer whenever new tasks are registered with the state manager;
// 2) we do not unassign partitions when we switch between standbys and actives, we just pause / resume them;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why pausing/resuming is better than assigning/unassigning of partitions? I don't see a big difference?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assigning / unassigning would cleanup the internal bookkeeping data as well as the fetched records while pausing / restoring would not, so it is a lighter approach for better performance. Also it is clearer logic since with this approach the assigned partitions always correspond to the current assigned tasks (both active and standby), previously we have to keep track of the assigned partitions inside AssignedTasks as well since this correspondence was not there, now we do not need to keep track of this as there's always an exact mapping.

@mjsax mjsax left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds a lot of internal comments -- this might be an indicator that the code is still not structured perfectly -- IMHO, if code is structured well, comments are not necessary at all. We can of course still merge it, as it for sure an improvement, but we might want to consider to do more refactoring on top that make the comments unnecessary?

@guozhangwang

Copy link
Copy Markdown
Owner Author

Many comments may be unnecessary indeed -- I just put them there to help the reader. Let me know if there's any concrete points about the current structure that can be improved? I can further improve it.

@guozhangwang guozhangwang deleted the K9113-state-manager-p3 branch April 24, 2020 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants