Skip to content

[pubsub] Subscriber.stopAsync().awaitTerminated() doesn't wait for all acks to be sent #3065

@prodonjs

Description

@prodonjs

In working on a gRPC emulation service for Cloud Pub/Sub, I was trying to validate correct behavior by publishing a series of messages, receiving and acknowleding the exact same set of messages, and then ensuring that no backlog existed for the subscription.

Here's the general test pattern:

String messagePrefix = subscriptionProperties.getName() + System.currentTimeMillis() + "-";
int messages = 5000;
Set<String> messagesSet = new TreeSet<>();
for (int i = 0; i < messages; i++) {
  messagesSet.add(messagePrefix + i);
}
CountDownLatch countDownLatch = new CountDownLatch(messages);
Set<String> publishedIds = new ConcurrentSkipListSet<>();
Map<String, Integer> receivedIds = new ConcurrentHashMap<>();
for (String data : messagesSet) {
  PubsubMessage message =
      PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build();
  ApiFutures.addCallback(
      publisher.publish(message),
      new ApiFutureCallback<String>() {
        @Override
        public void onFailure(Throwable throwable) {
          LOGGER.warning("Error on Publish " + throwable.getMessage());
        }

        @Override
        public void onSuccess(String s) {
          publishedIds.add(s);
        }
      });
}
publisher.shutdown();
assertEquals(messages, publishedIds.size());

subscriber =
    Subscriber.newBuilder(
            ProjectSubscriptionName.of(PROJECT, subscriptionProperties.getName()),
            (message, consumer) -> {
              consumer.ack();
              LOGGER.fine("Received and Acknowledging " + message.getMessageId());
              if (!receivedIds.containsKey(message.getMessageId())) {
                receivedIds.put(message.getMessageId(), 1);
              } else {
                int current = receivedIds.get(message.getMessageId());
                receivedIds.put(message.getMessageId(), Integer.valueOf(++current));
              }
              countDownLatch.countDown();
            })
        //.setChannelProvider(channelProvider)
        //.setCredentialsProvider(credentialsProvider)
        .build();

publisher.shutdown();
assertEquals(messages, publishedIds.size());

subscriber.startAsync();
LOGGER.info("Publisher complete, waiting for 5,000 messages to be received by Subscriber");
countDownLatch.await();
LOGGER.info("Shutting down Subscriber");
subscriber.stopAsync().awaitTerminated();

I executed this test after creating a fresh topic and subscription. My expectation was that after the CountDownLatch reached 0, the Subscriber would shut down and block until all acknowledgements were transmitted.

In the client library debug logs, I see the following ack lines, which add up to 5,000 as expected. Interestingly, the final 1,517 acks coming from a thread named time-limited test, which is the main thread of my integration test case.

2018-03-20 14:10:41.694 [Thread-50] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 78 acks
2018-03-20 14:10:41.693 [Thread-42] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 70 acks
2018-03-20 14:10:41.696 [Thread-46] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 29 acks
2018-03-20 14:10:41.700 [Thread-48] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 100 acks
2018-03-20 14:10:41.695 [Thread-52] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 169 acks
2018-03-20 14:10:41.702 [Thread-44] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 42 acks
2018-03-20 14:10:41.723 [Thread-48] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 281 acks
2018-03-20 14:10:41.729 [Thread-52] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 266 acks
2018-03-20 14:10:41.823 [Thread-50] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 36 acks
2018-03-20 14:10:41.830 [Thread-42] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 75 acks
2018-03-20 14:10:41.832 [Thread-46] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 609 acks
2018-03-20 14:10:41.831 [Thread-48] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 376 acks
2018-03-20 14:10:41.841 [Thread-44] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 186 acks
2018-03-20 14:10:41.841 [Thread-52] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 510 acks
2018-03-20 14:10:41.845 [Thread-50] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 244 acks
2018-03-20 14:10:41.868 [Thread-42] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 412 acks
2018-03-20 14:10:41.887 [Time-limited test] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 106 acks
2018-03-20 14:10:41.888 [Time-limited test] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 496 acks
2018-03-20 14:10:41.892 [Time-limited test] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 379 acks
2018-03-20 14:10:41.894 [Time-limited test] DEBUG com.google.cloud.pubsub.v1.MessageDispatcher : Sending 536 acks

Now, at this point, I can confirm the existence of a backlog by re-running the test but disabling the publisher step. The subscriber should not receive any messages and the test should time out.

However, when I grep for the *Received and Acknowledging * line in my logs, I get a total of 981 rows indicating that those acks that should have been transmitted from the previous run of receiving and acknowledging all 5,000 messages were not received by Cloud Pub/Sub. 981 happens to be the sum of the first three ack batches sent from the Time-limited test thread after the subscriber shutdown sequence was invoked.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions