-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Large number of duplicate messages is observed with Pub/Sub streamingPull client library using code that pulls from a Pub/Sub subscription and inserts messages into BigQuery with a synchronous blocking operation, with flowControl set to max 500 outstanding messages. See [1] for code.
For the same code, we also observe an excessive number of modifyAckDeadline operations (>> streamingPull message operations). And tracing a single message, we see modifyAcks and Acks in alternating order for the same message (modAck, modAck, Ack, modAck, Ack) [2]. This suggest that the implementation might fail to remove Ack'ed messages from a queue of messages to process and keep re-processing messages already on the client. This also suggests that ack requests may not actually be sent.
[1]
package kir.pubsub;import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.*;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.AckReplyConsumer;import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;public class Sub {
// Instantiate an asynchronous message receiver public static void main(String... args) throws Exception { final String projectId = args[0]; final String subscriptionId = args[1]; final BigQuery bq = BigQueryOptions.getDefaultInstance().getService(); final String datasetName = "pubsub_debug"; final String tableName = "gke_subscriber"; final AtomicInteger messageCount = new AtomicInteger(0); final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); MessageReceiver receiver = new MessageReceiver() { public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { // handle incoming message, then ack/nack the received message System.out.printf("%s,\t%s,\t%d\n",message.getData().toStringUtf8() , message.getMessageId() , messageCount.incrementAndGet()); Map<String,Object> row = new HashMap<String,Object>(); long timestampMs = message.getPublishTime().getSeconds()*1000 + message.getPublishTime().getNanos() / 1000000; Date timestampDate = new Date(timestampMs); row.put("messageId", message.getMessageId()); row.put("messageData", message.getData().toStringUtf8()); row.put("messagePublishTime", dateFormat.format(timestampDate)); // a version of this code without the bq.insert was ran, where consumer.ack() // was called immediately. The results were the same. InsertAllResponse response = bq.insertAll( InsertAllRequest.newBuilder(TableId.of(projectId, datasetName, tableName)).addRow(row).build() ); if (response.hasErrors()) { System.err.println("Error inserting into BigQuery " + response.toString() ); consumer.nack(); } else{ consumer.ack(); } } }; SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId); Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).setFlowControlSettings( FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build() ).build(); subscriber.startAsync(); subscriber.awaitRunning(); System.out.println("Started async subscriber."); subscriber.awaitTerminated(); }}