Skip to content

KAFKA-9113: Extract Producer to RecordCollector#7846

Closed
guozhangwang wants to merge 17 commits into
apache:trunkfrom
guozhangwang:K9113-record-collector-p2
Closed

KAFKA-9113: Extract Producer to RecordCollector#7846
guozhangwang wants to merge 17 commits into
apache:trunkfrom
guozhangwang:K9113-record-collector-p2

Conversation

@guozhangwang

@guozhangwang guozhangwang commented Dec 17, 2019

Copy link
Copy Markdown
Contributor
  1. Extract producer from StreamTask to the RecordCollector; make the task agnostic of the underlying embedded client and instead let RecordCollector handle clients and also interpret their thrown exception (AbstractTask still has a reference of the consumer in order to pause / resume partition, and also get committed offsets; the latter function may be moved into RecordCollector in a later PR).

1.a Producer.send() could throw KafkaException whose cause is ProducerFenced;
1.b. Producer.send() could return other exceptions as passed from the callback (ProducerFenced and UnknownProducerId);
1.c. Consumer.commit() could throw CommitFailedException;
1.d Producer.[otherAPI] and Consumer.[otherAPI] could throw other KafkaException, including TimeoutException.

1.a/b/c are interpreted as TaskMigratedException and rethrown from RecordCollector; 1.d are interpreted as general StreamsException and rethrown from RecordCollector.

Task would not handle any such exception, all of them thrown all the way up to the StreamThread; thread handles TaskMigratedException by closing the task "uncleanly", and then handles any other StreamsException as fatal and shutdown itself --- this is a major change in exception handling, e.g. today when close(clean) we try to immediately capture the exception within task, and then immediately handle by re-try close(unclean), which causes very messy hierarchy. This is not completely done in this PR but we are going to that direction in future PRs.

  1. ProcessorStateManager and RecordCollector are created before StreamTask / StandbyTask and are passed in as parameters; this also have a good effect in unit tests that we can use nice mocks for them when only testing task functionalities. Also because of this I've removed a bunch of unit tests from the task level (so do not be afraid of the LOC size, the non-testing part is actually not huge), and will move them to state-manager / record-collector after we've done the whole cleanup. During this period the test coverage would be dropped a bit but we will eventually add them back to other classes.

  2. Task close / suspend procedure are cleaned up based on the clean flag (the isZombie flag is now consolidated into the previous one). Also as a side-effect we fixed the issue that double checkpointing in committing / closing.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* RecordBatchTooLargeException
* RecordTooLargeException
* UnknownServerException
* UnknownProducerIdException

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After KIP-360 this would not be thrown, but anyhow the caller like stream thread should always try to capture it since it cannot guarantee to always talk to a 2.5+ broker.

*
* @throws InterruptException If the thread is interrupted while blocked
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder of the possible exception thrown.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but at the same time, it's a little confusing because InterruptException is a KafkaException. Maybe this is no big deal, though, if the intent of the doc is to say that you should treat an InterruptException specially.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's indeed the intention, to treat interrupt exception specially.

private final static long serialVersionUID = 1L;

private final Task task;
private final TaskId taskId;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor change: since the exception maybe thrown BEFORE the task is created now I've decided to just encode the task-id, which means we cannot print the full task topology but I think it is not a huge step backwards -- actually I think it is cleaner on log4j to not print the full topology after a second thought.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: +1 for the motivation, however could we make this a separate minor PR to reduce the current PR size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbccdda My current proposal is to move all the PRs into a separate K9113-base branch which is periodically rebased on trunk, and have consecutive PRs against that base branch, so that we do not have to make sure each PR leaves the branch in a perfect state (system tests, unit tests, etc). So I think in this case this PR's size is okay -- as I said although it is big in LOC the non-testing changes are not so big, and this change is actually needed since I refactored the place when record collector etc are constructed.

*/
void flushState() {
try {
stateMgr.flush();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Push the stateMgr.flush to individual functions of tasks.


try {
task.close(true, false);
task.close(true);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another cleanup: clean and isZombie are now consolidated into one flag.


private RuntimeException closeSuspended(final boolean isZombie,
final StreamTask task) {
private RuntimeException closeSuspended(final boolean clean, final StreamTask task) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts around those function cleanup are: after we've added the state into Task, then we would not have such functions any more but just a single close function that depends on the state: e.g. if it is running we need to 1) close topology, 2) flush state / collector, 3) commit (if clean), and then 4) close state; if it is suspended we only need to do step 4).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I agree it makes sense to tackle that in a separate PR.

initialLoadedCheckpoints = checkpointFile.read();

log.trace("Checkpointable offsets read from checkpoint: {}", initialLoadedCheckpoints);
try {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another minor change: now we always delete the checkpoint file after initialization and not rely on the eos flag (since it is removed); for non-eos it is not a huge regression since the first commit after initialization would write one checkpoint so the only "vulnerable window" is during the initialization and the first commit, which is around 100ms --- but maybe I will add this back in further cleanup after I've straight-out the state manager.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with removing this for now and planning to consider how best to re-introduce it later. Not sure if I totally buy the idea to have some period of time where the file doesn't exist, but I'm happy to re-consider it later.
Can you create a follow-up task, so we don't forget about it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- actually my plan is to not change this behavior inside the scope of KAFKA-9113, but this PR took a step backwards (I kept all of those changes in mind) to clean up some messy dependencies, but in a later PR we could add this logic back after we've figured out the whole hierarchy.

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup PR @guozhangwang. One high level comment is that as we are planning to rollout KIP-447 (POC here), the record collector will be sharing the same producer which is owned by thread level. So the question is whether it's possible for us to control the commit behavior beyond task level? If not immediately, I think this refactor still makes sense and is not blocking further changes to commit behavior anyway

private final static long serialVersionUID = 1L;

private final Task task;
private final TaskId taskId;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: +1 for the motivation, however could we make this a separate minor PR to reduce the current PR size?

if (eosEnabled) {
initializeTransactions();
}
public boolean isEosEnabled() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: what's the benefit of checking the config flag every time versus one time initialization?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to abstract the EOS logic away from task, but due to an optimization it ended up still needed to be kept inside the class, so I've decided to add it back.

private Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final Consumer<byte[], byte[]> consumer;
private final StreamThread.ProducerSupplier producerSupplier;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: this member should be removed.

"the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.";

// used when eosEnabled is true only

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: this comment is not necessary. If we want to enforce the transactionInFlight correctness, we should add a validation for all transaction related private functions as

if (!eosEnabled && transactionInFlight) {
  throw new IllegalStateException();
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We indeed check eosEnabled everywhere we condition on transationInFlight.


public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (eosEnabled) {
maybeBeginTxn();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: I don't think we should call maybeBeginTxn, as we do that call during every send. If we are not in a transaction but calling commit(), that sounds like an illegal state to me, or we should just bypass the whole commit logic as it indicates we didn't do any send call in the past when EOS is turned on.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but I think this is still a legal state, consider this: you processed some records but no outputs sent via the changelogger (i.e. send never called), then you call commit. In this case this transaction contains no data on Kafka but the local states may still be updated and the consumed offsets still advanced, in this case you still need to call 1) beginTxn, 2) sendOffsetsToTxn and then 3) commitTxn. Right?

// throw since it would swallow all exception; otherwise if suspension
// throws we do not need to proceed to further steps and should just notify
// the caller thread. Therefore it is always safe to proceed without try-catch
suspend(clean);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: suspend and closeSuspended could cause confusions as the meaning was very close. Maybe name closeSuspended to closeSuspendedIntervalState

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my plan is to eventually remove closeSuspended at just let close to act differently based on state.

}

@Test
public void shouldFailWithMigrateExceptionOnEOSProcessUknownPid() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: typo unknown Pid

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new OutOfOrderSequenceException("boom"));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we use a super class of UnknownProducerIdException here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to not lose generality: UnknownProducerIdException is actually the only extended class of OutOfOrderSequenceException, and other scenarios that the more general OutOfOrderSequenceException is thrown directly is for starting / end offset falling out of range. So I think letting streams to handle the general OutOfOrderSequenceException is fine too.

}

@Test
public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: for the exception tests, could we build a template test struct like

testSuite {
   mockProducer;
  expectedExceptionType;
}

to reduce the amount of test code?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 2 cents: having a lot of factored-out code in tests usually hinders, rather than helps, maintainability. In the long run, the overall number of lines in the test file doesn't hurt anything, because you rarely sit down to read all the methods (typically, just while doing the review like this). After this PR is merged, you would almost always just be trying to read and understand a single method.

Thus, it pays to optimize for single-method legibility. Having a test harness to go read, and other support methods to go read, just to understand this method is only going to get in the way. As it is right now, this method is 28 lines long, perfectly legible and clear. Trading clarity for de-duplication is a bad deal.

}

@Test
public void shouldMigrateTaskIfFencedDuringProcess() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prop: we need to add some comments for removed large tests, for example in high level what are they testing, what functionality has been migrated and how it's ensured the same coverage to make review easier.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I personally decided not to worry about it, but just keep a mental todo to come back after all the cleanups are complete to revisit test coverage (lines and semantics). But it would be nice to know why it's ok that they were removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbccdda yeah my plan is to keep each PR against a separate base branch so that we do not need require each PR to be a "transaction" that always leave the branch still in a consistent and perfect state.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add TODO to come back after all the cleanups for revamping our unit test coverage.

@vvcephei vvcephei left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang ,

Sorry the review took so long. Unfortunately, I had some feedback for you to consider.

Thanks so much for kicking this off!
-john

Comment thread checkstyle/checkstyle.xml
*
* @throws InterruptException If the thread is interrupted while blocked
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but at the same time, it's a little confusing because InterruptException is a KafkaException. Maybe this is no big deal, though, if the intent of the doc is to say that you should treat an InterruptException specially.

private final static long serialVersionUID = 1L;

private final Task task;
private final TaskId taskId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


private RuntimeException closeSuspended(final boolean isZombie,
final StreamTask task) {
private RuntimeException closeSuspended(final boolean clean, final StreamTask task) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I agree it makes sense to tackle that in a separate PR.

} finally {
producer = null;
}
partitionGroup.clear();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from context what this does, and hence why it's necessary here. Can you add an explanatory comment (or change clear to something more... clear)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

suspend(clean);

closeSuspended(clean);
} catch (final RuntimeException error) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had some confusion here about whether suspend could throw in an unclean close, leading to closeSuspended never getting called. Then I went back and read the comment... Maybe we can simplify all these code paths if unclean invocations never throw at all, at the lowest levels. Then, we could get rid of all these catch blocks up the hierarchy. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sounds good to me!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One tricky thing is that in commit / suspend there are several steps to execute, and when clean = false we should still execute all the steps even if early steps throw.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't follow this last thought. If we have a pattern where methods with clean=false never throw (which was my suggestion), it would automatically do what you are saying here, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two things with clean=false here: 1) we should not throw any more, and 2) we can skip some steps like trying to commit, because we know it is likely to fail, and even it does not it's not meaningful anyways. 2) is an optional optimization admittedly.


// used for testing
// visible for testing
long streamTime() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's no longer used, we should remove it.

}

@Test
public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 2 cents: having a lot of factored-out code in tests usually hinders, rather than helps, maintainability. In the long run, the overall number of lines in the test file doesn't hurt anything, because you rarely sit down to read all the methods (typically, just while doing the review like this). After this PR is merged, you would almost always just be trying to read and understand a single method.

Thus, it pays to optimize for single-method legibility. Having a test harness to go read, and other support methods to go read, just to understand this method is only going to get in the way. As it is right now, this method is 28 lines long, perfectly legible and clear. Trading clarity for de-duplication is a bad deal.

}

@Test
public void shouldMigrateTaskIfFencedDuringProcess() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I personally decided not to worry about it, but just keep a mental todo to come back after all the cleanups are complete to revisit test coverage (lines and semantics). But it would be nice to know why it's ok that they were removed.

@guozhangwang

Copy link
Copy Markdown
Contributor Author

@abbccdda @vvcephei thanks for your reviews! I will address / reply them here and then close this PR and recreate a new PR from K9113-record-collector-p2 to K9113-base as I mentioned in my planned procedure.

Also a meta reply to "follow-up works" as in KAFKA-9113, I will add a "TODO K9113" comment in the code where we should address (to be distinguished from other TODO), and by the end of the day K9113-base should clear all such TODOs before being merged back to trunk.

@guozhangwang

Copy link
Copy Markdown
Contributor Author

Closing this PR and re-creating as guozhangwang#2 against k9113-base.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants