Skip to content

Pub/Sub streamingPull subscriber: large number of duplicate messages, modifyAckDeadline calls observed #2465

@kir-titievsky

Description

@kir-titievsky

Large number of duplicate messages is observed with Pub/Sub streamingPull client library using code that pulls from a Pub/Sub subscription and inserts messages into BigQuery with a synchronous blocking operation, with flowControl set to max 500 outstanding messages. See [1] for code.

For the same code, we also observe an excessive number of modifyAckDeadline operations (>> streamingPull message operations). And tracing a single message, we see modifyAcks and Acks in alternating order for the same message (modAck, modAck, Ack, modAck, Ack) [2]. This suggest that the implementation might fail to remove Ack'ed messages from a queue of messages to process and keep re-processing messages already on the client. This also suggests that ack requests may not actually be sent.

[2] https://docs.google.com/spreadsheets/d/1mqtxxm0guZcOcRy8ORG0ri787XLQZNF_FLBujAiayFI/edit?ts=59cac0f0#gid=2139642597

[1]


package kir.pubsub;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.*;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.AckReplyConsumer;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Sub {

// Instantiate an asynchronous message receiver

public static void main(String... args) throws Exception {
    final String projectId = args[0];
    final String subscriptionId = args[1];

    final BigQuery bq = BigQueryOptions.getDefaultInstance().getService();
    final String datasetName = "pubsub_debug";
    final String tableName = "gke_subscriber";
    final AtomicInteger messageCount = new AtomicInteger(0);
    final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    MessageReceiver receiver = new MessageReceiver() {
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                    // handle incoming message, then ack/nack the received message
                    System.out.printf("%s,\t%s,\t%d\n",message.getData().toStringUtf8()
                            , message.getMessageId()
                            , messageCount.incrementAndGet());
                    Map<String,Object> row = new HashMap<String,Object>();
                    long timestampMs = message.getPublishTime().getSeconds()*1000 + message.getPublishTime().getNanos() / 1000000;
                    Date timestampDate = new Date(timestampMs);

                    row.put("messageId", message.getMessageId());
                    row.put("messageData", message.getData().toStringUtf8());
                    row.put("messagePublishTime", dateFormat.format(timestampDate));
                    // a version of this code without the bq.insert was ran, where consumer.ack() 
                    // was called immediately. The results were the same.
                    InsertAllResponse response = bq.insertAll(
                                InsertAllRequest.newBuilder(TableId.of(projectId, datasetName, tableName)).addRow(row).build()
                    );
                    if (response.hasErrors()) {
                        System.err.println("Error inserting into BigQuery " + response.toString() );
                        consumer.nack();
                    } else{
                        consumer.ack();
                    }
                }

            };

    SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
    Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).setFlowControlSettings(
            FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()
    ).build();
    subscriber.startAsync();
    subscriber.awaitRunning();
    System.out.println("Started async subscriber.");
    subscriber.awaitTerminated();
}

}

Metadata

Metadata

Assignees

Labels

🚨This issue needs some love.api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.status: blockedResolving the issue is dependent on other work.triaged for GAtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions