KAFKA-9113: Extract Producer to RecordCollector#7846
Conversation
…cord-collector
…ang/kafka into K9113-record-collector-p2
guozhangwang
left a comment
There was a problem hiding this comment.
@vvcephei @ableegoldman @cadonna for a look.
| * RecordBatchTooLargeException | ||
| * RecordTooLargeException | ||
| * UnknownServerException | ||
| * UnknownProducerIdException |
There was a problem hiding this comment.
After KIP-360 this would not be thrown, but anyhow the caller like stream thread should always try to capture it since it cannot guarantee to always talk to a 2.5+ broker.
| * | ||
| * @throws InterruptException If the thread is interrupted while blocked | ||
| * @throws InterruptException If the thread is interrupted while blocked. | ||
| * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. |
There was a problem hiding this comment.
Just a reminder of the possible exception thrown.
There was a problem hiding this comment.
Makes sense, but at the same time, it's a little confusing because InterruptException is a KafkaException. Maybe this is no big deal, though, if the intent of the doc is to say that you should treat an InterruptException specially.
There was a problem hiding this comment.
Yes that's indeed the intention, to treat interrupt exception specially.
| private final static long serialVersionUID = 1L; | ||
|
|
||
| private final Task task; | ||
| private final TaskId taskId; |
There was a problem hiding this comment.
This is a minor change: since the exception maybe thrown BEFORE the task is created now I've decided to just encode the task-id, which means we cannot print the full task topology but I think it is not a huge step backwards -- actually I think it is cleaner on log4j to not print the full topology after a second thought.
There was a problem hiding this comment.
prop: +1 for the motivation, however could we make this a separate minor PR to reduce the current PR size?
There was a problem hiding this comment.
@abbccdda My current proposal is to move all the PRs into a separate K9113-base branch which is periodically rebased on trunk, and have consecutive PRs against that base branch, so that we do not have to make sure each PR leaves the branch in a perfect state (system tests, unit tests, etc). So I think in this case this PR's size is okay -- as I said although it is big in LOC the non-testing changes are not so big, and this change is actually needed since I refactored the place when record collector etc are constructed.
| */ | ||
| void flushState() { | ||
| try { | ||
| stateMgr.flush(); |
There was a problem hiding this comment.
Push the stateMgr.flush to individual functions of tasks.
|
|
||
| try { | ||
| task.close(true, false); | ||
| task.close(true); |
There was a problem hiding this comment.
This is another cleanup: clean and isZombie are now consolidated into one flag.
|
|
||
| private RuntimeException closeSuspended(final boolean isZombie, | ||
| final StreamTask task) { | ||
| private RuntimeException closeSuspended(final boolean clean, final StreamTask task) { |
There was a problem hiding this comment.
My thoughts around those function cleanup are: after we've added the state into Task, then we would not have such functions any more but just a single close function that depends on the state: e.g. if it is running we need to 1) close topology, 2) flush state / collector, 3) commit (if clean), and then 4) close state; if it is suspended we only need to do step 4).
There was a problem hiding this comment.
SGTM. I agree it makes sense to tackle that in a separate PR.
| initialLoadedCheckpoints = checkpointFile.read(); | ||
|
|
||
| log.trace("Checkpointable offsets read from checkpoint: {}", initialLoadedCheckpoints); | ||
| try { |
There was a problem hiding this comment.
This is another minor change: now we always delete the checkpoint file after initialization and not rely on the eos flag (since it is removed); for non-eos it is not a huge regression since the first commit after initialization would write one checkpoint so the only "vulnerable window" is during the initialization and the first commit, which is around 100ms --- but maybe I will add this back in further cleanup after I've straight-out the state manager.
There was a problem hiding this comment.
I'm fine with removing this for now and planning to consider how best to re-introduce it later. Not sure if I totally buy the idea to have some period of time where the file doesn't exist, but I'm happy to re-consider it later.
Can you create a follow-up task, so we don't forget about it?
There was a problem hiding this comment.
Yes -- actually my plan is to not change this behavior inside the scope of KAFKA-9113, but this PR took a step backwards (I kept all of those changes in mind) to clean up some messy dependencies, but in a later PR we could add this logic back after we've figured out the whole hierarchy.
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the cleanup PR @guozhangwang. One high level comment is that as we are planning to rollout KIP-447 (POC here), the record collector will be sharing the same producer which is owned by thread level. So the question is whether it's possible for us to control the commit behavior beyond task level? If not immediately, I think this refactor still makes sense and is not blocking further changes to commit behavior anyway
| private final static long serialVersionUID = 1L; | ||
|
|
||
| private final Task task; | ||
| private final TaskId taskId; |
There was a problem hiding this comment.
prop: +1 for the motivation, however could we make this a separate minor PR to reduce the current PR size?
| if (eosEnabled) { | ||
| initializeTransactions(); | ||
| } | ||
| public boolean isEosEnabled() { |
There was a problem hiding this comment.
q: what's the benefit of checking the config flag every time versus one time initialization?
There was a problem hiding this comment.
I tried to abstract the EOS logic away from task, but due to an optimization it ended up still needed to be kept inside the class, so I've decided to add it back.
| private Producer<byte[], byte[]> producer; | ||
| private final Map<TopicPartition, Long> offsets; | ||
| private final Consumer<byte[], byte[]> consumer; | ||
| private final StreamThread.ProducerSupplier producerSupplier; |
| "the broker was interrupted, or if similar circumstances arise. " + | ||
| "You can increase producer parameter `max.block.ms` to increase this timeout."; | ||
|
|
||
| // used when eosEnabled is true only |
There was a problem hiding this comment.
prop: this comment is not necessary. If we want to enforce the transactionInFlight correctness, we should add a validation for all transaction related private functions as
if (!eosEnabled && transactionInFlight) {
throw new IllegalStateException();
}
There was a problem hiding this comment.
We indeed check eosEnabled everywhere we condition on transationInFlight.
|
|
||
| public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) { | ||
| if (eosEnabled) { | ||
| maybeBeginTxn(); |
There was a problem hiding this comment.
req: I don't think we should call maybeBeginTxn, as we do that call during every send. If we are not in a transaction but calling commit(), that sounds like an illegal state to me, or we should just bypass the whole commit logic as it indicates we didn't do any send call in the past when EOS is turned on.
There was a problem hiding this comment.
I thought about that but I think this is still a legal state, consider this: you processed some records but no outputs sent via the changelogger (i.e. send never called), then you call commit. In this case this transaction contains no data on Kafka but the local states may still be updated and the consumed offsets still advanced, in this case you still need to call 1) beginTxn, 2) sendOffsetsToTxn and then 3) commitTxn. Right?
| // throw since it would swallow all exception; otherwise if suspension | ||
| // throws we do not need to proceed to further steps and should just notify | ||
| // the caller thread. Therefore it is always safe to proceed without try-catch | ||
| suspend(clean); |
There was a problem hiding this comment.
prop: suspend and closeSuspended could cause confusions as the meaning was very close. Maybe name closeSuspended to closeSuspendedIntervalState
There was a problem hiding this comment.
Yes, my plan is to eventually remove closeSuspended at just let close to act differently based on state.
| } | ||
|
|
||
| @Test | ||
| public void shouldFailWithMigrateExceptionOnEOSProcessUknownPid() { |
| id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { | ||
| @Override | ||
| public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { | ||
| callback.onCompletion(null, new OutOfOrderSequenceException("boom")); |
There was a problem hiding this comment.
Why we use a super class of UnknownProducerIdException here?
There was a problem hiding this comment.
Just to not lose generality: UnknownProducerIdException is actually the only extended class of OutOfOrderSequenceException, and other scenarios that the more general OutOfOrderSequenceException is thrown directly is for starting / end offset falling out of range. So I think letting streams to handle the general OutOfOrderSequenceException is fine too.
| } | ||
|
|
||
| @Test | ||
| public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() { |
There was a problem hiding this comment.
prop: for the exception tests, could we build a template test struct like
testSuite {
mockProducer;
expectedExceptionType;
}
to reduce the amount of test code?
There was a problem hiding this comment.
Just my 2 cents: having a lot of factored-out code in tests usually hinders, rather than helps, maintainability. In the long run, the overall number of lines in the test file doesn't hurt anything, because you rarely sit down to read all the methods (typically, just while doing the review like this). After this PR is merged, you would almost always just be trying to read and understand a single method.
Thus, it pays to optimize for single-method legibility. Having a test harness to go read, and other support methods to go read, just to understand this method is only going to get in the way. As it is right now, this method is 28 lines long, perfectly legible and clear. Trading clarity for de-duplication is a bad deal.
| } | ||
|
|
||
| @Test | ||
| public void shouldMigrateTaskIfFencedDuringProcess() { |
There was a problem hiding this comment.
Prop: we need to add some comments for removed large tests, for example in high level what are they testing, what functionality has been migrated and how it's ensured the same coverage to make review easier.
There was a problem hiding this comment.
+1 I personally decided not to worry about it, but just keep a mental todo to come back after all the cleanups are complete to revisit test coverage (lines and semantics). But it would be nice to know why it's ok that they were removed.
There was a problem hiding this comment.
@abbccdda yeah my plan is to keep each PR against a separate base branch so that we do not need require each PR to be a "transaction" that always leave the branch still in a consistent and perfect state.
There was a problem hiding this comment.
I will add TODO to come back after all the cleanups for revamping our unit test coverage.
vvcephei
left a comment
There was a problem hiding this comment.
Hey @guozhangwang ,
Sorry the review took so long. Unfortunately, I had some feedback for you to consider.
Thanks so much for kicking this off!
-john
| * | ||
| * @throws InterruptException If the thread is interrupted while blocked | ||
| * @throws InterruptException If the thread is interrupted while blocked. | ||
| * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. |
There was a problem hiding this comment.
Makes sense, but at the same time, it's a little confusing because InterruptException is a KafkaException. Maybe this is no big deal, though, if the intent of the doc is to say that you should treat an InterruptException specially.
| private final static long serialVersionUID = 1L; | ||
|
|
||
| private final Task task; | ||
| private final TaskId taskId; |
|
|
||
| private RuntimeException closeSuspended(final boolean isZombie, | ||
| final StreamTask task) { | ||
| private RuntimeException closeSuspended(final boolean clean, final StreamTask task) { |
There was a problem hiding this comment.
SGTM. I agree it makes sense to tackle that in a separate PR.
| } finally { | ||
| producer = null; | ||
| } | ||
| partitionGroup.clear(); |
There was a problem hiding this comment.
It's not clear from context what this does, and hence why it's necessary here. Can you add an explanatory comment (or change clear to something more... clear)?
| suspend(clean); | ||
|
|
||
| closeSuspended(clean); | ||
| } catch (final RuntimeException error) { |
There was a problem hiding this comment.
I initially had some confusion here about whether suspend could throw in an unclean close, leading to closeSuspended never getting called. Then I went back and read the comment... Maybe we can simplify all these code paths if unclean invocations never throw at all, at the lowest levels. Then, we could get rid of all these catch blocks up the hierarchy. WDYT?
There was a problem hiding this comment.
Yes, sounds good to me!
There was a problem hiding this comment.
One tricky thing is that in commit / suspend there are several steps to execute, and when clean = false we should still execute all the steps even if early steps throw.
There was a problem hiding this comment.
Didn't follow this last thought. If we have a pattern where methods with clean=false never throw (which was my suggestion), it would automatically do what you are saying here, right?
There was a problem hiding this comment.
There are two things with clean=false here: 1) we should not throw any more, and 2) we can skip some steps like trying to commit, because we know it is likely to fail, and even it does not it's not meaningful anyways. 2) is an optional optimization admittedly.
|
|
||
| // used for testing | ||
| // visible for testing | ||
| long streamTime() { |
There was a problem hiding this comment.
If it's no longer used, we should remove it.
| } | ||
|
|
||
| @Test | ||
| public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() { |
There was a problem hiding this comment.
Just my 2 cents: having a lot of factored-out code in tests usually hinders, rather than helps, maintainability. In the long run, the overall number of lines in the test file doesn't hurt anything, because you rarely sit down to read all the methods (typically, just while doing the review like this). After this PR is merged, you would almost always just be trying to read and understand a single method.
Thus, it pays to optimize for single-method legibility. Having a test harness to go read, and other support methods to go read, just to understand this method is only going to get in the way. As it is right now, this method is 28 lines long, perfectly legible and clear. Trading clarity for de-duplication is a bad deal.
| } | ||
|
|
||
| @Test | ||
| public void shouldMigrateTaskIfFencedDuringProcess() { |
There was a problem hiding this comment.
+1 I personally decided not to worry about it, but just keep a mental todo to come back after all the cleanups are complete to revisit test coverage (lines and semantics). But it would be nice to know why it's ok that they were removed.
|
@abbccdda @vvcephei thanks for your reviews! I will address / reply them here and then close this PR and recreate a new PR from Also a meta reply to "follow-up works" as in KAFKA-9113, I will add a "TODO K9113" comment in the code where we should address (to be distinguished from other TODO), and by the end of the day |
1722d6a to
7d1cef5
Compare
|
Closing this PR and re-creating as guozhangwang#2 against |
1.a Producer.send() could throw KafkaException whose cause is ProducerFenced;
1.b. Producer.send() could return other exceptions as passed from the callback (ProducerFenced and UnknownProducerId);
1.c. Consumer.commit() could throw CommitFailedException;
1.d Producer.[otherAPI] and Consumer.[otherAPI] could throw other KafkaException, including TimeoutException.
1.a/b/c are interpreted as TaskMigratedException and rethrown from RecordCollector; 1.d are interpreted as general StreamsException and rethrown from RecordCollector.
Task would not handle any such exception, all of them thrown all the way up to the StreamThread; thread handles TaskMigratedException by closing the task "uncleanly", and then handles any other StreamsException as fatal and shutdown itself --- this is a major change in exception handling, e.g. today when close(clean) we try to immediately capture the exception within task, and then immediately handle by re-try close(unclean), which causes very messy hierarchy. This is not completely done in this PR but we are going to that direction in future PRs.
ProcessorStateManager and RecordCollector are created before StreamTask / StandbyTask and are passed in as parameters; this also have a good effect in unit tests that we can use nice mocks for them when only testing task functionalities. Also because of this I've removed a bunch of unit tests from the task level (so do not be afraid of the LOC size, the non-testing part is actually not huge), and will move them to state-manager / record-collector after we've done the whole cleanup. During this period the test coverage would be dropped a bit but we will eventually add them back to other classes.
Task close / suspend procedure are cleaned up based on the
cleanflag (theisZombieflag is now consolidated into the previous one). Also as a side-effect we fixed the issue that double checkpointing in committing / closing.Committer Checklist (excluded from commit message)