Skip to content

KAFKA-5886: Introduce delivery.timeout.ms producer config (KIP-91)#5270

Merged
hachikuji merged 4 commits into
apache:trunkfrom
yuyang08:kip91
Jul 26, 2018
Merged

KAFKA-5886: Introduce delivery.timeout.ms producer config (KIP-91)#5270
hachikuji merged 4 commits into
apache:trunkfrom
yuyang08:kip91

Conversation

@yuyang08

Copy link
Copy Markdown
Contributor

This change is based on @sutambe 's change #3849 earlier.

primary changes in this pr:

  1. In RecordAccumulator.java, use inFlightBatches to track the in-flight batches, instead of using soonToExpireInFlightsBatches to only track the soon-to-expire batches. With this change, in RecordAccumulator.expiredBatches, we check both inFlightBatches and batches to find the expired batches.

  2. Fixed the test failures in SenderTest.java and RecordAccumulatorTest.java.

@yuyang08

yuyang08 commented Jun 21, 2018

Copy link
Copy Markdown
Contributor Author

@apurvam, @becketqin, @guozhangwang, @ijuma could you help to review this change?

@guozhangwang

Copy link
Copy Markdown
Contributor

Thanks @yuyang08 , we will take a look at this PR asap.

@ijuma ijuma added this to the 2.1.0 milestone Jun 23, 2018
@ijuma ijuma added the producer label Jun 23, 2018
@yuyang08

yuyang08 commented Jun 26, 2018

Copy link
Copy Markdown
Contributor Author

@guozhangwang , @ijuma , @apurvam , @becketqin friendly ping ... your feedback will help us to iterate faster

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few initial comments. Still making my way through the full PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is a little weird to me. It seems arbitrary to check only lingerMs in the case of overflow. Wouldn't it make more sense to use Integer.MAX_VALUE?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the sanity checking logic. changed lingerMs and deliveryTimeoutMs to integer type. with that, we convert them to long for addition and do not need to worry about the overflow.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should set this value automatically if the value is not overridden. It would be annoying to get this config error after an upgrade if I have overridden the request timeout. Maybe we could use the max of the default delivery timeout and the sum of requestTimeoutMs and lingerMs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my concern is that setting the value automatically may hide some issues from the user, and it also requires more explanation. It might be better to keep it simple. The issue that the users can run into an upgrade is that they override requestTimeoutMs with a large value, and that violates the required variant deliveryTimeout > requestTimeoutMs + lingerMs. In that case, the user will be notify the error during producer initialization, and it shall not take much effort to fix it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I appreciate the concern. However, there is already some precedent for redefining defaults based on provided configurations. For example, when idempotence is enabled, we override retries if it has not been explicitly provided by the user. We log an info message so that the user knows the default has been adjusted.

In general, I think we should try to avoid compatibility issues even in configuration as long as it is safe to do so. By using a larger request timeout, the user has already declared willingness to await for the request timeout to receive a produce acknowledgement, so redefining the default delivery timeout seems reasonable and saves an annoying config update.

@ijuma ijuma Jun 28, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reason for the error is that a longer request timeout is not unlikely to have been done as a workaround for the issue that delivery timeout is fixing. So, it might provide the user with an opportunity to fix that. A warning might be a less heavy handed way to achieve this though.

@yuyang08 yuyang08 Jun 28, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the change to use requestTimeoutMs + lingerMs if it is larger than deliveryTimeoutMs setting, and log a warning message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter half here may not be very clear to users since it's a bit low level. Is there actually a lower bound for the timeout or can it be arbitrarily small depending on how long the batch remains open?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the doc string on the lower bound explanation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention this default change in the upgrade notes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the upgrade doc on this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe helpful to mention the baseOffset in this message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added baseOffset info in the log message

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is unused

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain in this comment why we use this order? Intuitively, I would expect that we'd expire the oldest stuff first so that the callbacks are invoked in the order of sending.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a comment that was in #3849. updated the comment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Shouldn't we be expiring the batch in the else case? Otherwise it seems like we may lose track of the batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. expire the batch in else branch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, we only remove from the inFlightBatches collection when we reenqueue and when we encounter a delivery timeout. Don't we need to remove on completion or failure as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This would have to be called from Sender.failBatch and Sender.completeBatch as well, similar to how the transactionManager.removeInflightBatch is called in those methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! updated the code to call maybeRemoveFromInflightBatches from Sender.failBatch and Sender.completeBatch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unintentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored the space

@apurvam apurvam left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @yuyang08 .. I made a pass over the core logic and left some comments. The logic is looking good.

I still need to go over the tests and make a second pass over the core code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenario will batch.createdMs + deliveryTimeoutMs be negative? batch.createdMs is the wall clock time when the batch is created, and should always be positive. The config def for deliveryTimeoutMs enforces the value is atleast(0). So then how could the sum of the two be negative? Are you accounting for wrap around?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is to guard us against potential overflow due to setting a large value for deliveryTimeoutMs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It would be good mention this in a comment. It is very non obvious.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the code to use if ... statement and add the comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this method and drainBatchesForOneNode is just a refactor? Was any logic changed here? It is hard to tell from the diff and the logic is super intricate so subtle changes are easy to miss. I assume nothing should have to change in this portion of the code since it is totally orthogonal to expiring batches.

@yuyang08 yuyang08 Jun 28, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a refactoring. otherwise, the method RecordAccumulator.drain fails in style checking due to too many possible paths in one method. comparing with the previous code, the change is to add the following lines on updating inflightBatches

               // put this batch in the infligh list
                List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
                if (inflightBatchList == null) {
                    inflightBatchList = new LinkedList<>();
                    inFlightBatches.put(tp, inflightBatchList);
                }
                inflightBatchList.add(batch);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and elsewhere, the code style for kafka requires that there be braces even around single line if statements like this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored the curly braces.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preceding comment needs to be updated to account for this new logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. updated the comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not to be used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. removed the unused code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This would have to be called from Sender.failBatch and Sender.completeBatch as well, similar to how the transactionManager.removeInflightBatch is called in those methods.

@apurvam apurvam left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yuyang08 , thanks for the updates.

It is looking good. I left some comments, but I think the larger point is that we should add some checks that the inflight batches tracked in the accumulator are actually cleared.

I added some suggestions for adding checks for when batches expire.

But we should also add checks that the accumulator has no inflight batches when batches complete successfully and when they fail.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this to a helper named markBatchInflight or something similar.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the code to capture this in markBatchInflight method

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log a warning if we have overflowed and are hence not updating the next batch expiry time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. added the logging here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some sort of accumulator.hasInflightBatches method, and then check that it returns false here. This would check that expired batches are not reenqueued, which is logic added in this patch.

@yuyang08 yuyang08 Jun 29, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the method public List<ProducerBatch> inFlightBatches(TopicPartition tp) in RecordAccumulator, and updated the test with two assertions:

   line 1912: assertEquals("Expect one in-flight batch in accumulator", 1, accumulator.inFlightBatches(tp0).size());
    .....
   line 1920: assertEquals("Expect zero in-flight batch in accumulator", 0, accumulator.inFlightBatches(tp0).size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from the test testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed?

@yuyang08 yuyang08 Jun 29, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed initialize sender with guaranteeMessageOrder = false, while testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder initialize sender with guaranteeMessageOrder = true. The inflightBatches size is different when we set the parameter to true/false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis should be dropped, or be log.debug.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. removed this debugging line.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had accumulator.hasInflightRequests we could assert false here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this testing that we don't double deallocate. Ideally, we would expire the inflight batch, then get a response, and then check that the batch is not deallocated twice. In this case, it would de allocate only once, unless I am missing something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the test a bit to add another sender.run(time.milliseconds()); call after the batch expiry. Not sure if that fits expire the inflight batch, then get a response.

my expectation was that if there is double deallocation, we would get an IllegalStateException exception from MatchingBufferPool.deallocate.

@yuyang08

yuyang08 commented Jun 29, 2018

Copy link
Copy Markdown
Contributor Author

@hachikuji, @apurvam I've updated SenderTest.java with checks to ensure that the inflight batches tracked in the accumulator are cleared properly, and addressed your comments. could you check again?

@yuyang08

yuyang08 commented Jul 2, 2018

Copy link
Copy Markdown
Contributor Author

@hachikuji , @apurvam friendly ping ... could you help to review the updated change to allow us iterate faster? thanks!

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a few more comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I think we should only override the default value. If the user has explicitly provided an inconsistent value, then we should throw an exception. We can check this using config.originals().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: createdMs for consistency?

By the way, we have a function below createdTimeMs which is currently unused and seems incorrect anyway. Can you remove it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to createdMs, and removed the unused method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Ignores" -> "Ignored"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems tryFinalState can only be SUCCEEDED or FAILED, so one of these transitions is not possible anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerBatcn.finalState can also be updated to FinalState.ABORTED through Sender.run() --> RecordAccumulator.abortIncompleteBatches() or abortUndrainedBatches() --> ... -> ProducerBatch.abort() .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that tryFinalState can only be SUCCEEDED or FAILED.

final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;

So the only possibilities are FAILED -> FAILED and ABORTED -> FAILED.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. misunderstood your comment earlier. updated the change

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the order of the first two arguments backwards? I think tryFinalState is the state we're trying to transition to.

@yuyang08 yuyang08 Jul 3, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! fixed the arguments order.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add back the newline

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added it back

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded newline

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the new line

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: topic-partition as we did above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: realign

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't both bounds inclusive? The inconsistency is a little annoying and this is a private constructor anyway. I think mentioning this in the javadoc below is good enough.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added java doc comments and changed the parameter name to max

Comment thread docs/upgrade.html Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this is for the producer? For example:

The default value for the producer's retries config was changed to INT.MAX_VALUE ...

Might also be worthwhile including a link to KIP-91

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to include KIP-91 link

@yuyang08

yuyang08 commented Jul 9, 2018

Copy link
Copy Markdown
Contributor Author

@hachikuji @apurvam @ijuma have updated the change to address your comments. could you take a look again? thanks!

@yuyang08

Copy link
Copy Markdown
Contributor Author

@hachikuji , @apurvam friendly ping... mind to take another look? thanks!

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few more comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is sufficient to solve the issue mentioned above. If we do not reenqueue because the timeout has been reached, then who is responsible for completing the batch? I think I would probably suggest that we skip the check for delivery timeout and just reenqueue. We will detect the expiration the next time we iterate the deque.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only issue with reenqueing unconditionally is that the batch will then be drained even if it is expired, since accumulator.drain is called before accumulator.expiredBatches in the background thread. This could violate the contract.

That said, we need to complete the batch and deallocate it over here, otherwise it seems to be dropped on the flor.

@yuyang08 yuyang08 Jul 18, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to check whether a batch has reached deliveryTimeoutMs or not in Sender.canRetry. With this change, we do not need to check whether a batch has timeout or not in RecordAccumuator.reenqueue, and cleaning up of timed-out batches is handled by Sender.failBatch.

   private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
        return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) 
                && ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this call necessary for in-flight batches? Presumably we have already closed the append stream since the batch was already sent once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. removed this line.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is an expected invariant, I would suggest we raise an exception if it does not hold. Otherwise, bugs will go undetected since we don't have a way to verify logging output when running tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Better to fail with an exception if the invariant is violated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the change to throw IllegalBatchFinalStateException when the invariant is violated.

Comment thread docs/upgrade.html Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: introduces -> introduced

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the typo.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing linger to int ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have request.timeout.ms and delivery.timeout.ms of int type. this is to make the type of linger.ms be consistent with other timeout related settings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be of type long ?
With long, there is no overflow on line 478

In ProducerBatch, deliveryTimeoutMs is long in hasReachedDeliveryTimeout

@yuyang08 yuyang08 Jul 18, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerBatch.hasReachedDeliveryTimeout is called by RecordAccumulator. In RecordAccumulator's construct, we have had using long type for lingerMs, and retryBackoffMs. It will be inconsistent to use int for deliveryTimeoutMs. And it will require changes at many places (especially in the test cases) if we use int type for lingerMs and retryBackoffMs. I thought that it would be better to have another PR for data type related changes for lingerMs etc.

    public RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be warn.
Can be info since there is no action from user

@yuyang08 yuyang08 Jul 13, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is try to get the user's attention as we are overriding the default delivery.timeout.ms setting. Previously the user may set a long request.timeout.ms as a work around. The user may want to explicitly set delivery.timeout.ms and give a smaller value for request.timeout.ms.

@apurvam apurvam left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response @yuyang08 . The week of july 4 was extremely short, and last week we had a company event and multi-day off site, which took a lot of cycles.

I left a few more comments, the biggest one around synchronization of inFlightBatches.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we need to synchronize on accesses to the contained List<ProducerBatch>. This list is updated in maybeRemoveFromInflightBatches, which called both from the sender background thread and from the network client threads. This unsafe access can corrupt the list in the present patch.

@yuyang08 yuyang08 Jul 19, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not fully convinced that there is concurrent access to List<ProducerBatch> inFlightBatches. In KafkaProducer, there is only one background I/O thread (Sender) that turns the records into requests and transmit them to the cluster. inFlightBatches is only being modified by RecordAccumulator methods that are called by Sender thread. Sender.run(long now) --> NetworkClient.poll --> NetworkClient.handle... --> Sender.handleProduceResponse --> Sender.completeBatch --> Sender.reenqueueBatch --> RecordAccumulator.reenque --> RecordAccumulator.maybeRemoveFromInflightBatches is the call stack. NetworkClient uses Selector for non-blocking i/o multiplexing, but it is not a thread.

We need to synchronize access on Deque<ProducerBatch> deque = entry.getValue(); as both the producer thread and the sender thread can concurrently access deque at the same time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find the concurrent access either, so it may be ok. It actually makes me wonder why we are using a ConcurrentMap? To be honest, it would be more intuitive for the inflight batches to be tracked inside Sender. I'd suggest doing that if it's straightforward, but I'm fine leaving it here if it takes a lot of work.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. I think I was mistaken. If the callbacks from the network client are happening in the context of the sender thread, then there is no concurrent access on inflightBatches.

@yuyang08 yuyang08 Jul 20, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point on tracking the inflight batches in Sender. updated the change to move inFlightBatches to Sender, and changed its type from ConcurrentMap to Map.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Better to fail with an exception if the invariant is violated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "created".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the typo

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, we should synchronize on this list while iterating, and then synchronize again in maybeRemoveFromInflightBatches

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned earlier, i might miss something, but it seems to me that there is no concurrent access to inFlightBatches, and we do not need synchronize qualifier for it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only issue with reenqueing unconditionally is that the batch will then be drained even if it is expired, since accumulator.drain is called before accumulator.expiredBatches in the background thread. This could violate the contract.

That said, we need to complete the batch and deallocate it over here, otherwise it seems to be dropped on the flor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did those tests fail if you leave the isMuted in there?

@asfgit

asfgit commented Jul 20, 2018

Copy link
Copy Markdown

FAILURE
4401 tests run, 1 skipped, 1 failed.
--none--

@yuyang08

Copy link
Copy Markdown
Contributor Author

@hachikuji , @apurvam, @tedyu thanks for your review! I've address all of your comments. The change also passed the tests. could you take a look again?

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuyang08 Thanks for the updates. I think we're pretty close. I had a few more small comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need a new exception type for an error that should not happen. Maybe we can just use IllegalStateException?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use IllegalStateException

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use ArrayList

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not using the keys, so can we just iterate the values?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to iterate through batches.values()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before method name

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the extra space

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why use this for the iteration? I would have expected we would iterate over the entry set of inFlightBatches.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really not needed. updated to iterate through inFlightBatches directly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is unused

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this unused method

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message needs to be updated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment:

                    // expireBatches is called in Sender.sendProducerData, before client.poll.
                    // The batch.finalState() == null invariant should always hold. An IllegalStateException
                    // exception will be thrown if the invariant is violated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If batches is empty, should we remove the list from inFlightBatches?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the change to remove the list from inFlightBatches when it is empty.

@hachikuji

Copy link
Copy Markdown
Contributor

retest this please

@apurvam apurvam left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @yuyang08 and for the patience during the review!

I went over the core flow and the tests once more, and it has been simplified quite nicely and LGTM!

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merging to trunk. Thanks for the patch!

@hachikuji

Copy link
Copy Markdown
Contributor

Note that I removed an unused method and made some tweaks to the upgrade notes.

@hachikuji hachikuji merged commit 7fc7136 into apache:trunk Jul 26, 2018
@ijuma

ijuma commented Jul 26, 2018

Copy link
Copy Markdown
Member

@yuyang08 Thanks for picking up this important improvement. And thanks @sutambe for submitting the original PR.

@yuyang08 yuyang08 deleted the kip91 branch August 31, 2018 01:06
int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(deliveryTimeoutMs < lingerMs + requestTimeoutMs) implies (deliveryTimeoutMs < Integer.MAX_VALUE), why do we need to check (deliveryTimeoutMs < Integer.MAX_VALUE), logically it can be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants