Skip to content

Pub/Sub StreamingPull receives many duplicates when there is a backlog #3383

@yonran

Description

@yonran

The Pub/Sub StreamingPull API gives many duplicates when the messages are small and there is a backlog. This is a difference from Pull, which does not exhibit this behavior. After a while, more than 50% of messages can be duplicates. This makes it very hard to process a backlog. @kir-titievsky suggested that I create a new issue when I described it in #2465.

I created two test programs to replicate this issue:

  1. CloudPubSub.java uses the Asynchronous Pull (MessageReceiver) Java API to receive messages and write to a file of JSONs. Optionally, you can also use my custom google-cloud-java branch in which I instrumented MessageDispatcher.java to log the message IDs that are acked.
    • MyProducer.java publishes a backlog of messages (e.g. --initial-publish-messages=10000) and then publishes a stream of messages (default --publish-period=400 means 2.5 messages/second)
    • LogMessagesReceiver.java sleeps a fixed duration per message (default --message-processing-time=5000ms), then throttles acking to --period=333 which means 3 messages/second, and then acks the message. Note that it should make progress since its --period is less than the publisher’s --publish-period, but it doesn’t because of the duplicate messages.
    • CloudPubSub.java has the FlowControlSettings. By default, --concurrent-messages=20 means that 20 receivers sleep in parallel. Since 5000 < 333*20 = 6660, there are enough concurrent threads that a 5000ms sleep does not reduce the subscriber throughput below the desired 3 messages per second.
    • Result: After an hour or two, there are many duplicate message ids, as indicated by running jq < /tmp/inv-log-cloudpubsub-pub2.5-sub3.jsons --slurp '[.[] | .messageId] | {unique: sort|unique|length, total: length} | .duplicates = .total - .unique'
    • Stackdriver metrics show that the backlog is growing even through the consumer (3 messages/second) is faster than the producer (2.5 messages/second).
      undelivered and oldest acknowledged
  2. RawPubSub.java uses the same low-level GRPC StreamingPull API as the high-level MessageReceiver API does. Like CloudPubSub.java, it logs the message ids to stdout and to a file of JSONs.
    • In the publisher thread, MyProducer.java publishes a backlog of messages (e.g. --initial-publish-messages=10000) and then publishes a stream of messages (default --publish-period=400 means 2.5 messages/second)
    • onError is implemented on the ack StreamObserver so that we can detect acks that failed. I have not seen any failures.
    • The receiver thread calls request(1) to get one message at a time, queues them up, and processes them with similar timing as CloudPubSub.java (i.e., waits 5 seconds per message and then throttles acks to 3 messages per second). It also calls modack to extend deadlines.
    • Result: After an hour or two, there are many duplicate message ids, as indicated by running jq < /tmp/inv-log-grpc-pub2.5-sub3.jsons --slurp '[.[] | .messageId] | {unique: sort|unique|length, total: length} | .duplicates = .total - .unique'
    • Message IDs are logged to stdout at the time they are received and at the time they are acked. By checking the log, we can see that the client acks messages within the subscription’s ackDeadlineSeconds (10s default), and the server sent duplicates regardless.
  3. The pubsub-0.21.1-beta branch of CloudPubSub.java uses the same MessageReceiver Java API, but at version 0.21.1-beta which still used Pull instead of StreamingPull. It does not give a significant number of duplicates.

I opened a support ticket for this, Case 15877623, 2018-05-21: Pub/Sub subscriber receives duplicated MessageIds and never catches up. On 2018-05-29, the representative said this is a known issue with StreamingPull, but there is no ETA for fixing it and that I should poll the PubSub release notes for updates.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.status: blockedResolving the issue is dependent on other work.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions