Skip to content

Commit d08b5f6

Browse files
committed
pr comment
1 parent 28d348d commit d08b5f6

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,40 +267,41 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
267267

268268
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
269269
for (ReceivedMessage pubsubMessage : responseMessages) {
270-
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), pubsubMessage.getMessage().getSerializedSize()));
270+
int size = pubsubMessage.getMessage().getSerializedSize();
271+
AckHandler handler = new AckHandler(pubsubMessage.getAckId(), size);
272+
ackHandlers.add(handler);
271273
}
272274

273275
Instant now = new Instant(clock.millisTime());
274276
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
275277
logger.log(
276278
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
277279

280+
// We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281+
// Otherwise, the alarm might go off before we can add the handlers.
278282
synchronized (outstandingAckHandlers) {
279283
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
280284
// We will also later iterate over ackHandlers when we give messages to user code.
281285
// We must create a new list to pass to outstandingAckHandlers,
282286
// so that we can't iterate and modify the list concurrently.
287+
ArrayList<AckHandler> ackHandlersCopy = new ArrayList<>(ackHandlers);
283288
outstandingAckHandlers.add(
284-
new ExtensionJob(
285-
expiration,
286-
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
287-
new ArrayList<AckHandler>(ackHandlers)));
289+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlersCopy));
288290
}
289-
setupNextAckDeadlineExtensionAlarm(expiration);
290291

291292
// Deadline extension must be set up before we reserve flow control.
292293
// Flow control might block for a while, and extension will keep messages from expiring.
294+
setupNextAckDeadlineExtensionAlarm(expiration);
293295

296+
// Reserving flow control must happen before we give the messages to the user,
297+
// otherwise the user code might be given too many messages to process at once.
294298
try {
295-
flowController.reserve(responseMessages.size(), totalMessageSize(responseMessages));
299+
flowController.reserve(responseMessages.size(), getTotalMessageSize(responseMessages));
296300
} catch (FlowController.FlowControlException e) {
297301
throw new IllegalStateException("Flow control unexpected exception", e);
298302
}
299303
messagesWaiter.incrementPendingMessages(responseMessages.size());
300304

301-
// Reserving flow control must happen before we give the messages to the user,
302-
// otherwise the user code might be given too many messages to process at once.
303-
304305
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
305306
for (ReceivedMessage userMessage : responseMessages) {
306307
final PubsubMessage message = userMessage.getMessage();
@@ -328,7 +329,7 @@ public void run() {
328329
}
329330
}
330331

331-
private static int totalMessageSize(Collection<ReceivedMessage> messages) {
332+
private static int getTotalMessageSize(Collection<ReceivedMessage> messages) {
332333
int total = 0;
333334
for (ReceivedMessage message : messages) {
334335
total += message.getMessage().getSerializedSize();

0 commit comments

Comments
 (0)