@@ -267,40 +267,41 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
267267
268268 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
269269 for (ReceivedMessage pubsubMessage : responseMessages ) {
270- ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), pubsubMessage .getMessage ().getSerializedSize ()));
270+ int size = pubsubMessage .getMessage ().getSerializedSize ();
271+ AckHandler handler = new AckHandler (pubsubMessage .getAckId (), size );
272+ ackHandlers .add (handler );
271273 }
272274
273275 Instant now = new Instant (clock .millisTime ());
274276 Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
275277 logger .log (
276278 Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
277279
280+ // We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281+ // Otherwise, the alarm might go off before we can add the handlers.
278282 synchronized (outstandingAckHandlers ) {
279283 // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
280284 // We will also later iterate over ackHandlers when we give messages to user code.
281285 // We must create a new list to pass to outstandingAckHandlers,
282286 // so that we can't iterate and modify the list concurrently.
287+ ArrayList <AckHandler > ackHandlersCopy = new ArrayList <>(ackHandlers );
283288 outstandingAckHandlers .add (
284- new ExtensionJob (
285- expiration ,
286- INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
287- new ArrayList <AckHandler >(ackHandlers )));
289+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlersCopy ));
288290 }
289- setupNextAckDeadlineExtensionAlarm (expiration );
290291
291292 // Deadline extension must be set up before we reserve flow control.
292293 // Flow control might block for a while, and extension will keep messages from expiring.
294+ setupNextAckDeadlineExtensionAlarm (expiration );
293295
296+ // Reserving flow control must happen before we give the messages to the user,
297+ // otherwise the user code might be given too many messages to process at once.
294298 try {
295- flowController .reserve (responseMessages .size (), totalMessageSize (responseMessages ));
299+ flowController .reserve (responseMessages .size (), getTotalMessageSize (responseMessages ));
296300 } catch (FlowController .FlowControlException e ) {
297301 throw new IllegalStateException ("Flow control unexpected exception" , e );
298302 }
299303 messagesWaiter .incrementPendingMessages (responseMessages .size ());
300304
301- // Reserving flow control must happen before we give the messages to the user,
302- // otherwise the user code might be given too many messages to process at once.
303-
304305 Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
305306 for (ReceivedMessage userMessage : responseMessages ) {
306307 final PubsubMessage message = userMessage .getMessage ();
@@ -328,7 +329,7 @@ public void run() {
328329 }
329330 }
330331
331- private static int totalMessageSize (Collection <ReceivedMessage > messages ) {
332+ private static int getTotalMessageSize (Collection <ReceivedMessage > messages ) {
332333 int total = 0 ;
333334 for (ReceivedMessage message : messages ) {
334335 total += message .getMessage ().getSerializedSize ();
0 commit comments