KAFKA-5886: Implement KIP-91 delivery.timeout.ms#3849
Conversation
There was a problem hiding this comment.
deliveryTimeoutMs should be mentioned
|
Thanks for the PR @sutambe . Looking over the changes, it seems that there are two cases from the KIP which don't seem to be covered:
Have I missed something? Or are you planning on adding the functionality above? |
becketqin
left a comment
There was a problem hiding this comment.
Thanks for the patch. Left some comments.
There was a problem hiding this comment.
It seems better to say Producer.send() instead of send.
There was a problem hiding this comment.
We are passing now everywhere else. Maybe we can just keep the argument name the same.
There was a problem hiding this comment.
The actual argument is now. However, I like the formal argument name to be createTime because it's an immutable value while constructing a batch. now, is by definition, changing.
There was a problem hiding this comment.
Should we validate the delivery.timeout.ms is greater than request.timeout.ms?
There was a problem hiding this comment.
It is probably cleaner to have an explicit EXPIRED state.
There was a problem hiding this comment.
I did some digging around. An expired batch's final state is FAILED. I don't feel great about adding yet another finalState. We already have ABORTED and FAILED. The ProducerBatch.done will get even more complicated.
There was a problem hiding this comment.
Maybe it's not a big deal but just want to call out that this is a behavior change. Currently the producer will throw exception when transition from FAILED state to another state due to some reason other than expiration. If we change this logic, we may miss those cases which are not failed by expiration but still got state update twice. It may not be that important if we do not have programming bugs.
Personally I think it is better to clearly define the states of the batches even if additional complexity is necessary.
The comments should probably also cover the force close case for completeness.
There was a problem hiding this comment.
The logic here probably needs more comments. We may have the following three cases that the state of a batch has been updated before the ProduceResponse returns:
- A transaction abortion happens. The state of the batches would have been updated to
ABORTED. - The producer is closed forcefully. The state of the batches would have been updated to
ABORTED. - The batch is expired when it is in-flight. The state of the batch would have been updated to
EXPIRED.
In the other cases, we should throw IllegalStateException.
There was a problem hiding this comment.
Please review the updated method documentation.
There was a problem hiding this comment.
The batches still needs to be expired in order if max.in.flight.requests.per.connection is set to 1. So we probably still want to check if the partition is muted or not. That said, if we guarantee that when RecordAccumulator.expiredBatches() returns non-empty list, all the earlier batches have already been expired, we can remove the muted check here.
BTW, I did not see the logic of expiring an in-flight batch in the current patch. Am I missing something?
There was a problem hiding this comment.
isFull is no longer used.
|
Friendly reminder that the feature freeze is this Wednesday. |
|
@ijuma Just want to check. Do you think this feature is a "minor" feature? |
|
@becketqin, it is possible to classify this as a minor feature, but the fact that it affects a core part of the Producer puts it in a bit of a grey area. If the PR is almost ready and we miss the feature freeze, my take is that it would be OK to merge it by the end of this week. Later than that and it seems a bit risky. It's a bit worrying that the merge conflicts haven't been resolved since last week. |
|
@ijuma @becketqin I've an new PR but after a rebase I've to fix one more test. Working on that now. |
|
Thanks @sutambe! |
|
@apurvam It's not clear to me why |
There was a problem hiding this comment.
This variable can be dropped.
00145bf to
9d8b7ea
Compare
|
I added the following to testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail before the first sender.run() call The test still fails. |
|
@sutambe where are those tests failing? The latest PR builder suggests that the clients and core tests all passed. |
|
@apurvam @ijuma @becketqin The Sender and RecordAccumulator are passing now. The failing tests are connect tests that are irrelevant. |
|
@sutambe I don't think the test failures are irrelevant since the same 3 tests failed in all the runs. Further, the cause of the failure is: I think their mocks may need to be updated to take account of the new configs and attendant |
There was a problem hiding this comment.
The check 'if (deliveryTimeoutMs <= (now - this.createdMs))' inside maybeExpire() would be true.
Looks like another method can be created inside ProducerBatch which expires the batch.
There was a problem hiding this comment.
maybeExpire has a side-effect of setting errorMessage internally. Hence calling it again in if.
There was a problem hiding this comment.
Understand.
That part can be refactored - goal is to reduce unnecessary comparison.
There was a problem hiding this comment.
@apurvam Those test don't even compile or run on my machine. What's up with those tests?
There was a problem hiding this comment.
They can't construct a kafka producer with the changes made in this PR.
There was a problem hiding this comment.
Assuming nFlightBatches is a TreeSet suggested above, this code can be simplified to:
while (!inFlightBatches.isEmpty() &&
inFlightBatches.first().maybeExpire(deliveryTimeoutMs, now)) {
expiredBatches.add(inFlightBatches.pollFirst());
}
There was a problem hiding this comment.
Is this comment accurate? The new state is not necessarily SUCCEEDED.
There was a problem hiding this comment.
Maybe it's not a big deal but just want to call out that this is a behavior change. Currently the producer will throw exception when transition from FAILED state to another state due to some reason other than expiration. If we change this logic, we may miss those cases which are not failed by expiration but still got state update twice. It may not be that important if we do not have programming bugs.
Personally I think it is better to clearly define the states of the batches even if additional complexity is necessary.
The comments should probably also cover the force close case for completeness.
There was a problem hiding this comment.
Some typos in this comments. "Expire the batch if no outcome is known within delivery.timeout.ms"
There was a problem hiding this comment.
Does this have to be a per partition Map? Intuitively we just need a TreeSet<ProducerBatch> with a comparator?
There was a problem hiding this comment.
Apparently the my understanding of TreeSet is not accurate. It uses the comparator to decide whether the entries are the same or not. We can use a TreeMap<Long, Set> then. We may also want to bucket the timestamp a little bit, say one second to avoid huge amount of Sets created for each ms in the TreeMap.
There was a problem hiding this comment.
I was thinking about this too. Using millisecond as unit for Map key is not prudent.
After the switch to second as unit, we may need to check the two adjacent buckets keyed by ts-1 (sec) and ts+1 (sec).
There was a problem hiding this comment.
As we discussed, TreeSet does not cut it. The naming is consistent. A TreeSet is a set. It's just that equality criterion is different.
There was a problem hiding this comment.
Assuming nFlightBatches is a TreeSet suggested above, this code can be simplified to:
while (!inFlightBatches.isEmpty() &&
inFlightBatches.first().maybeExpire(deliveryTimeoutMs, now)) {
expiredBatches.add(inFlightBatches.pollFirst());
}
There was a problem hiding this comment.
This logic would become inFlightRequests.remove(batch) when a TreeSet is used for this.
There was a problem hiding this comment.
This would be just inFlightBatches.add(batch)
There was a problem hiding this comment.
We usually just use earliestDeliveryTimeout in Kafka.
There was a problem hiding this comment.
It seems we don't need the deliveryTimeoutMs in the sender. It is only used as an argument passed to the accumulator. But the accumulator already has the config.
There was a problem hiding this comment.
It seems an existing issue. When we expire the batches here. The memory of those batches will be deallocated. It seems that we will deallocate the same batch again when the ProduceResponse returns?
|
@sutambe I had a look at the failing Sender expiry tests. What is happening is that the tests are not modified to account for the fact that the inflight batches can be expired. In the tests, we used to expire a batch sitting in the accumulator but not the inflight batch. When the inflight batch returned, it would be re queued. But now, the test sends the response for the inflight batch, but when it goes to requeue, it discovers that there shouldn't be an inflight request an raises an exception. The tests should be updated to account for the new behavior and make sure that the inflight batch is not expired. |
|
Actually, the test reveals a bug in the current patch: the response for the inflight batch which expired is not being handled correctly. We should not be trying to requeue it to start with. So we need two tests: one where the inflight batch is not expired, and the current case. The reenqueue logic in the sender needs to be updated to not reenqueue the expired batches. |
becketqin
left a comment
There was a problem hiding this comment.
@sutambe Thanks for updating the batch. A few comments:
- for a batch that is got expired prematurely, we should not reqenqueu the batch. (as @apurvam noticed) and we should not double deallocate the memory.
- There are a few review comments before that are not addressed yet. (such as unused local variables)
- We may want to revisit some of the tests and see if they still make sense.
- It would be good to add more unit tests to the patch. More specifically, we may want to have the following tests:
- Test a batch is correctly inserted into the in.flight.batches if needed. And not inserted if not needed.
- Test the callback of an expired batch is fired in time when it is in-flight/not in-flight
- Test when expire an in-flight batch, we still wait for the request to finish before sending the next batch.
- Test we are not going to retry an already expired batch.
- Test when batch is expired prematurely, the buffer pool is only deallocated after the response is returned. (because we are still holding the batch until the response is returned)
There was a problem hiding this comment.
This test has nothing to do with linger.ms anymore...
There was a problem hiding this comment.
We should change the test name to something like testBatchExpiration. and the test below to testBatchExpirationAfterReenqueue.
There was a problem hiding this comment.
Similar to above we should rename this.
There was a problem hiding this comment.
The typo is still there.
There was a problem hiding this comment.
Should we still expire the batches when they are expired instead of expiring all the bucket? Having a second granularity bucket does not prevent us from doing that, right?
|
@apurvam @becketqin I updated the implementation to use |
becketqin
left a comment
There was a problem hiding this comment.
Thanks for updating the patch. Left some comments.
There was a problem hiding this comment.
We don't need a PriorityQueue for this because the batches in the RecordAccumulator is already in order. So we just need to keep the draining order.
There was a problem hiding this comment.
If we always insert the batch to the inFlightBatches queue and there is no bug, the batch to be removed should always be the first batch. Can we assert on that?
There was a problem hiding this comment.
The original reason we have this optimization is because we used to have a big sorted data structure. So avoiding inserting elements to it makes sense. Given that now the batch order in the RecordAccumulator is already guaranteed. It seems we can just put all the drained batches to the inFlightBatches queue, which is simpler.
There was a problem hiding this comment.
The while loop may break if the request size has reached. So there is no guarantee that it will iterate over all the partitions. One alternative is to find the nextBatchExpiryTimeMs in the expireBatches.
There was a problem hiding this comment.
It seems intuitively this should be the earliest batch in the entire record accumulator?
There was a problem hiding this comment.
It seems we may release the memory for the expired batches before the response is returned. This means the underneath ByteBuffer is still referred by the ProducerBatch instance in the inFlightRequests. I am not sure if this would cause any problem, but it seems a little dangerous.
There was a problem hiding this comment.
Is the response preparation needed in this case?
|
retest this please |
1 similar comment
|
retest this please |
|
So the Given that these changes are on the same code, and given the consistent failure of this test, it is probably a regression. @sutambe can you reproduce the failure locally? |
|
Just looking at the stack trace and the test, it may be that an expired batch is being closed twice in some cases. |
|
@sutambe @becketqin It would be nice to unblock this. Can someone else pick up the work? |
|
@hachikuji Yeah, this has been pending for too long. I have spoken to @sutambe and he said he still wants to finish the patch. He will figure out the ETA and see if that works. |
|
@sutambe @becketqin is there any update on the status of this PR? It would be great if we could get this in the next release. |
|
@guozhangwang @junrao @bbejeck @becketqin We also hit this issue when running Kafka Streams library with some high volume output topics. It would be nice to get this moving and push it to the next release. |
|
becket cant load this page for some reason (some weird issue with his github profile?). |
|
@bbejeck @apurvam @becketqin @hachikuji I don't have any update since Dec last year. Sorry, the work has stalled and it has been very hard to find cycles for this effort. I don't mind if Confluent wants to take this effort forward. Better later than never. |
Avoiding overflow when deliveryTimeoutMs is MAX_VALUE per-partition map for tracking soon to expire batches Updated tests
|
@sutambe i made some change based on your pull request to fix style check and test failure. do yo mind I amend the change to this pull request? cc @becketqin @apurvam @hachikuji |
|
@yuyang08 I'd suggest you create your own PR against apache kafka trunk and let other reviewers to continue reviewing that one. |
|
@guozhangwang sure. will create a separate pull request |
|
@guozhangwang @apurvam @becketqin created new pr #5270 for KAFKA-5886 |
|
This has been merged via a different PR, closing. |
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
…ache#3849) This issue has been there for multiple years. Also adjust the logging to only include overridden topic configs, I _think_ this behavior changed unintentionally as part of the kraft work (and made the original issue worse). Unit test included and also tested manually. Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
First shot at implementing kip-91
delivery.timeout.ms. Summarydelivery.timeout.msconfig. Default 120,000retriestoMAX_INT.RecordAccumulator.expiredBatches.RecordAccumulatorTestis removed. It has three additional tests.testExpiredBatchSingle,testExpiredBatchesSize,testExpiredBatchesRetry. All of them test that expiry is independent of muted.