KAFKA-5886: Introduce delivery.timeout.ms producer config (KIP-91)#5270
Conversation
|
@apurvam, @becketqin, @guozhangwang, @ijuma could you help to review this change? |
|
Thanks @yuyang08 , we will take a look at this PR asap. |
|
@guozhangwang , @ijuma , @apurvam , @becketqin friendly ping ... your feedback will help us to iterate faster |
hachikuji
left a comment
There was a problem hiding this comment.
Left a few initial comments. Still making my way through the full PR.
There was a problem hiding this comment.
This check is a little weird to me. It seems arbitrary to check only lingerMs in the case of overflow. Wouldn't it make more sense to use Integer.MAX_VALUE?
There was a problem hiding this comment.
updated the sanity checking logic. changed lingerMs and deliveryTimeoutMs to integer type. with that, we convert them to long for addition and do not need to worry about the overflow.
There was a problem hiding this comment.
I'm wondering if we should set this value automatically if the value is not overridden. It would be annoying to get this config error after an upgrade if I have overridden the request timeout. Maybe we could use the max of the default delivery timeout and the sum of requestTimeoutMs and lingerMs?
There was a problem hiding this comment.
my concern is that setting the value automatically may hide some issues from the user, and it also requires more explanation. It might be better to keep it simple. The issue that the users can run into an upgrade is that they override requestTimeoutMs with a large value, and that violates the required variant deliveryTimeout > requestTimeoutMs + lingerMs. In that case, the user will be notify the error during producer initialization, and it shall not take much effort to fix it.
There was a problem hiding this comment.
Yeah, I appreciate the concern. However, there is already some precedent for redefining defaults based on provided configurations. For example, when idempotence is enabled, we override retries if it has not been explicitly provided by the user. We log an info message so that the user knows the default has been adjusted.
In general, I think we should try to avoid compatibility issues even in configuration as long as it is safe to do so. By using a larger request timeout, the user has already declared willingness to await for the request timeout to receive a produce acknowledgement, so redefining the default delivery timeout seems reasonable and saves an annoying config update.
There was a problem hiding this comment.
A reason for the error is that a longer request timeout is not unlikely to have been done as a workaround for the issue that delivery timeout is fixing. So, it might provide the user with an opportunity to fix that. A warning might be a less heavy handed way to achieve this though.
There was a problem hiding this comment.
updated the change to use requestTimeoutMs + lingerMs if it is larger than deliveryTimeoutMs setting, and log a warning message.
There was a problem hiding this comment.
The latter half here may not be very clear to users since it's a bit low level. Is there actually a lower bound for the timeout or can it be arbitrarily small depending on how long the batch remains open?
There was a problem hiding this comment.
updated the doc string on the lower bound explanation.
There was a problem hiding this comment.
Can you mention this default change in the upgrade notes?
There was a problem hiding this comment.
updated the upgrade doc on this.
There was a problem hiding this comment.
Maybe helpful to mention the baseOffset in this message?
There was a problem hiding this comment.
added baseOffset info in the log message
There was a problem hiding this comment.
Can we explain in this comment why we use this order? Intuitively, I would expect that we'd expire the oldest stuff first so that the callbacks are invoked in the order of sending.
There was a problem hiding this comment.
this is a comment that was in #3849. updated the comment.
There was a problem hiding this comment.
Hmm.. Shouldn't we be expiring the batch in the else case? Otherwise it seems like we may lose track of the batch.
There was a problem hiding this comment.
good catch. expire the batch in else branch.
There was a problem hiding this comment.
Unless I'm missing something, we only remove from the inFlightBatches collection when we reenqueue and when we encounter a delivery timeout. Don't we need to remove on completion or failure as well?
There was a problem hiding this comment.
Good catch. This would have to be called from Sender.failBatch and Sender.completeBatch as well, similar to how the transactionManager.removeInflightBatch is called in those methods.
There was a problem hiding this comment.
good catch! updated the code to call maybeRemoveFromInflightBatches from Sender.failBatch and Sender.completeBatch.
There was a problem hiding this comment.
restored the space
There was a problem hiding this comment.
In what scenario will batch.createdMs + deliveryTimeoutMs be negative? batch.createdMs is the wall clock time when the batch is created, and should always be positive. The config def for deliveryTimeoutMs enforces the value is atleast(0). So then how could the sum of the two be negative? Are you accounting for wrap around?
There was a problem hiding this comment.
yes, this is to guard us against potential overflow due to setting a large value for deliveryTimeoutMs.
There was a problem hiding this comment.
Ok. It would be good mention this in a comment. It is very non obvious.
There was a problem hiding this comment.
update the code to use if ... statement and add the comment
There was a problem hiding this comment.
I am assuming this method and drainBatchesForOneNode is just a refactor? Was any logic changed here? It is hard to tell from the diff and the logic is super intricate so subtle changes are easy to miss. I assume nothing should have to change in this portion of the code since it is totally orthogonal to expiring batches.
There was a problem hiding this comment.
yes, this is a refactoring. otherwise, the method RecordAccumulator.drain fails in style checking due to too many possible paths in one method. comparing with the previous code, the change is to add the following lines on updating inflightBatches
// put this batch in the infligh list
List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
if (inflightBatchList == null) {
inflightBatchList = new LinkedList<>();
inFlightBatches.put(tp, inflightBatchList);
}
inflightBatchList.add(batch);
There was a problem hiding this comment.
here and elsewhere, the code style for kafka requires that there be braces even around single line if statements like this.
There was a problem hiding this comment.
restored the curly braces.
There was a problem hiding this comment.
The preceding comment needs to be updated to account for this new logic.
There was a problem hiding this comment.
good point. updated the comment
There was a problem hiding this comment.
This seems not to be used.
There was a problem hiding this comment.
good catch. removed the unused code.
There was a problem hiding this comment.
Good catch. This would have to be called from Sender.failBatch and Sender.completeBatch as well, similar to how the transactionManager.removeInflightBatch is called in those methods.
apurvam
left a comment
There was a problem hiding this comment.
Hi @yuyang08 , thanks for the updates.
It is looking good. I left some comments, but I think the larger point is that we should add some checks that the inflight batches tracked in the accumulator are actually cleared.
I added some suggestions for adding checks for when batches expire.
But we should also add checks that the accumulator has no inflight batches when batches complete successfully and when they fail.
There was a problem hiding this comment.
nit: move this to a helper named markBatchInflight or something similar.
There was a problem hiding this comment.
updated the code to capture this in markBatchInflight method
There was a problem hiding this comment.
We should log a warning if we have overflowed and are hence not updating the next batch expiry time.
There was a problem hiding this comment.
good point. added the logging here.
There was a problem hiding this comment.
We should add some sort of accumulator.hasInflightBatches method, and then check that it returns false here. This would check that expired batches are not reenqueued, which is logic added in this patch.
There was a problem hiding this comment.
added the method public List<ProducerBatch> inFlightBatches(TopicPartition tp) in RecordAccumulator, and updated the test with two assertions:
line 1912: assertEquals("Expect one in-flight batch in accumulator", 1, accumulator.inFlightBatches(tp0).size());
.....
line 1920: assertEquals("Expect zero in-flight batch in accumulator", 0, accumulator.inFlightBatches(tp0).size());
There was a problem hiding this comment.
How is this different from the test testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed?
There was a problem hiding this comment.
testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed initialize sender with guaranteeMessageOrder = false, while testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder initialize sender with guaranteeMessageOrder = true. The inflightBatches size is different when we set the parameter to true/false.
There was a problem hiding this comment.
THis should be dropped, or be log.debug.
There was a problem hiding this comment.
good catch. removed this debugging line.
There was a problem hiding this comment.
If we had accumulator.hasInflightRequests we could assert false here.
There was a problem hiding this comment.
How is this testing that we don't double deallocate. Ideally, we would expire the inflight batch, then get a response, and then check that the batch is not deallocated twice. In this case, it would de allocate only once, unless I am missing something.
There was a problem hiding this comment.
updated the test a bit to add another sender.run(time.milliseconds()); call after the batch expiry. Not sure if that fits expire the inflight batch, then get a response.
my expectation was that if there is double deallocation, we would get an IllegalStateException exception from MatchingBufferPool.deallocate.
|
@hachikuji, @apurvam I've updated |
|
@hachikuji , @apurvam friendly ping ... could you help to review the updated change to allow us iterate faster? thanks! |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the updates. Left a few more comments.
There was a problem hiding this comment.
To clarify, I think we should only override the default value. If the user has explicitly provided an inconsistent value, then we should throw an exception. We can check this using config.originals().
There was a problem hiding this comment.
nit: createdMs for consistency?
By the way, we have a function below createdTimeMs which is currently unused and seems incorrect anyway. Can you remove it?
There was a problem hiding this comment.
updated to createdMs, and removed the unused method.
There was a problem hiding this comment.
nit: "Ignores" -> "Ignored"?
There was a problem hiding this comment.
It seems tryFinalState can only be SUCCEEDED or FAILED, so one of these transitions is not possible anyway.
There was a problem hiding this comment.
ProducerBatcn.finalState can also be updated to FinalState.ABORTED through Sender.run() --> RecordAccumulator.abortIncompleteBatches() or abortUndrainedBatches() --> ... -> ProducerBatch.abort() .
There was a problem hiding this comment.
My point is that tryFinalState can only be SUCCEEDED or FAILED.
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;So the only possibilities are FAILED -> FAILED and ABORTED -> FAILED.
There was a problem hiding this comment.
i see. misunderstood your comment earlier. updated the change
There was a problem hiding this comment.
Are the order of the first two arguments backwards? I think tryFinalState is the state we're trying to transition to.
There was a problem hiding this comment.
good catch! fixed the arguments order.
There was a problem hiding this comment.
nit: add back the newline
There was a problem hiding this comment.
removed the new line
There was a problem hiding this comment.
nit: topic-partition as we did above?
There was a problem hiding this comment.
Aren't both bounds inclusive? The inconsistency is a little annoying and this is a private constructor anyway. I think mentioning this in the javadoc below is good enough.
There was a problem hiding this comment.
added java doc comments and changed the parameter name to max
There was a problem hiding this comment.
Can you mention that this is for the producer? For example:
The default value for the producer's
retriesconfig was changed toINT.MAX_VALUE...
Might also be worthwhile including a link to KIP-91
There was a problem hiding this comment.
updated to include KIP-91 link
|
@hachikuji @apurvam @ijuma have updated the change to address your comments. could you take a look again? thanks! |
|
@hachikuji , @apurvam friendly ping... mind to take another look? thanks! |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few more comments.
There was a problem hiding this comment.
I'm not sure this is sufficient to solve the issue mentioned above. If we do not reenqueue because the timeout has been reached, then who is responsible for completing the batch? I think I would probably suggest that we skip the check for delivery timeout and just reenqueue. We will detect the expiration the next time we iterate the deque.
There was a problem hiding this comment.
I think the only issue with reenqueing unconditionally is that the batch will then be drained even if it is expired, since accumulator.drain is called before accumulator.expiredBatches in the background thread. This could violate the contract.
That said, we need to complete the batch and deallocate it over here, otherwise it seems to be dropped on the flor.
There was a problem hiding this comment.
update to check whether a batch has reached deliveryTimeoutMs or not in Sender.canRetry. With this change, we do not need to check whether a batch has timeout or not in RecordAccumuator.reenqueue, and cleaning up of timed-out batches is handled by Sender.failBatch.
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)
&& ...
There was a problem hiding this comment.
Is this call necessary for in-flight batches? Presumably we have already closed the append stream since the batch was already sent once.
There was a problem hiding this comment.
good point. removed this line.
There was a problem hiding this comment.
If it is an expected invariant, I would suggest we raise an exception if it does not hold. Otherwise, bugs will go undetected since we don't have a way to verify logging output when running tests.
There was a problem hiding this comment.
+1. Better to fail with an exception if the invariant is violated.
There was a problem hiding this comment.
updated the change to throw IllegalBatchFinalStateException when the invariant is violated.
There was a problem hiding this comment.
nit: introduces -> introduced
There was a problem hiding this comment.
Why changing linger to int ?
There was a problem hiding this comment.
we have request.timeout.ms and delivery.timeout.ms of int type. this is to make the type of linger.ms be consistent with other timeout related settings.
There was a problem hiding this comment.
Should this be of type long ?
With long, there is no overflow on line 478
In ProducerBatch, deliveryTimeoutMs is long in hasReachedDeliveryTimeout
There was a problem hiding this comment.
ProducerBatch.hasReachedDeliveryTimeout is called by RecordAccumulator. In RecordAccumulator's construct, we have had using long type for lingerMs, and retryBackoffMs. It will be inconsistent to use int for deliveryTimeoutMs. And it will require changes at many places (especially in the test cases) if we use int type for lingerMs and retryBackoffMs. I thought that it would be better to have another PR for data type related changes for lingerMs etc.
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
...
There was a problem hiding this comment.
Doesn't need to be warn.
Can be info since there is no action from user
There was a problem hiding this comment.
This is try to get the user's attention as we are overriding the default delivery.timeout.ms setting. Previously the user may set a long request.timeout.ms as a work around. The user may want to explicitly set delivery.timeout.ms and give a smaller value for request.timeout.ms.
apurvam
left a comment
There was a problem hiding this comment.
Sorry for the delayed response @yuyang08 . The week of july 4 was extremely short, and last week we had a company event and multi-day off site, which took a lot of cycles.
I left a few more comments, the biggest one around synchronization of inFlightBatches.
There was a problem hiding this comment.
It seems to me that we need to synchronize on accesses to the contained List<ProducerBatch>. This list is updated in maybeRemoveFromInflightBatches, which called both from the sender background thread and from the network client threads. This unsafe access can corrupt the list in the present patch.
There was a problem hiding this comment.
I am not fully convinced that there is concurrent access to List<ProducerBatch> inFlightBatches. In KafkaProducer, there is only one background I/O thread (Sender) that turns the records into requests and transmit them to the cluster. inFlightBatches is only being modified by RecordAccumulator methods that are called by Sender thread. Sender.run(long now) --> NetworkClient.poll --> NetworkClient.handle... --> Sender.handleProduceResponse --> Sender.completeBatch --> Sender.reenqueueBatch --> RecordAccumulator.reenque --> RecordAccumulator.maybeRemoveFromInflightBatches is the call stack. NetworkClient uses Selector for non-blocking i/o multiplexing, but it is not a thread.
We need to synchronize access on Deque<ProducerBatch> deque = entry.getValue(); as both the producer thread and the sender thread can concurrently access deque at the same time.
There was a problem hiding this comment.
I couldn't find the concurrent access either, so it may be ok. It actually makes me wonder why we are using a ConcurrentMap? To be honest, it would be more intuitive for the inflight batches to be tracked inside Sender. I'd suggest doing that if it's straightforward, but I'm fine leaving it here if it takes a lot of work.
There was a problem hiding this comment.
Yea. I think I was mistaken. If the callbacks from the network client are happening in the context of the sender thread, then there is no concurrent access on inflightBatches.
There was a problem hiding this comment.
good point on tracking the inflight batches in Sender. updated the change to move inFlightBatches to Sender, and changed its type from ConcurrentMap to Map.
There was a problem hiding this comment.
+1. Better to fail with an exception if the invariant is violated.
There was a problem hiding this comment.
As mentioned above, we should synchronize on this list while iterating, and then synchronize again in maybeRemoveFromInflightBatches
There was a problem hiding this comment.
As I mentioned earlier, i might miss something, but it seems to me that there is no concurrent access to inFlightBatches, and we do not need synchronize qualifier for it.
There was a problem hiding this comment.
I think the only issue with reenqueing unconditionally is that the batch will then be drained even if it is expired, since accumulator.drain is called before accumulator.expiredBatches in the background thread. This could violate the contract.
That said, we need to complete the batch and deallocate it over here, otherwise it seems to be dropped on the flor.
There was a problem hiding this comment.
Why did those tests fail if you leave the isMuted in there?
|
FAILURE |
|
@hachikuji , @apurvam, @tedyu thanks for your review! I've address all of your comments. The change also passed the tests. could you take a look again? |
There was a problem hiding this comment.
I'm not sure we need a new exception type for an error that should not happen. Maybe we can just use IllegalStateException?
There was a problem hiding this comment.
updated to use IllegalStateException
There was a problem hiding this comment.
nit: can we use ArrayList? https://twitter.com/joshbloch/status/583813919019573248
There was a problem hiding this comment.
updated to use ArrayList
There was a problem hiding this comment.
We're not using the keys, so can we just iterate the values?
There was a problem hiding this comment.
update to iterate through batches.values()
There was a problem hiding this comment.
nit: extra space before method name
There was a problem hiding this comment.
removed the extra space
There was a problem hiding this comment.
Can you clarify why use this for the iteration? I would have expected we would iterate over the entry set of inFlightBatches.
There was a problem hiding this comment.
this is really not needed. updated to iterate through inFlightBatches directly.
There was a problem hiding this comment.
removed this unused method
There was a problem hiding this comment.
This message needs to be updated.
There was a problem hiding this comment.
updated the comment:
// expireBatches is called in Sender.sendProducerData, before client.poll.
// The batch.finalState() == null invariant should always hold. An IllegalStateException
// exception will be thrown if the invariant is violated.
There was a problem hiding this comment.
If batches is empty, should we remove the list from inFlightBatches?
There was a problem hiding this comment.
updated the change to remove the list from inFlightBatches when it is empty.
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM, merging to trunk. Thanks for the patch!
|
Note that I removed an unused method and made some tweaks to the upgrade notes. |
| int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); | ||
| int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); | ||
|
|
||
| if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { |
There was a problem hiding this comment.
(deliveryTimeoutMs < lingerMs + requestTimeoutMs) implies (deliveryTimeoutMs < Integer.MAX_VALUE), why do we need to check (deliveryTimeoutMs < Integer.MAX_VALUE), logically it can be removed.
This change is based on @sutambe 's change #3849 earlier.
primary changes in this pr:
In RecordAccumulator.java, use
inFlightBatchesto track the in-flight batches, instead of usingsoonToExpireInFlightsBatchesto only track the soon-to-expire batches. With this change, in RecordAccumulator.expiredBatches, we check bothinFlightBatchesandbatchesto find the expired batches.Fixed the test failures in SenderTest.java and RecordAccumulatorTest.java.