Skip to content

PubSub: Subscriber not pulling from pubsub queue #2480

@maoxianxian

Description

@maoxianxian

Environment details

OS type and version: GKE node n2d-highmem-8
Java version: 21
version(s): google-cloud-pubsub-v1-1.120.0

Steps to reproduce

  1. Create a subscription with 120s ack deadline
  2. publish 4k Unacked messages
  3. Create a subscriber with the following setting:
    100 max outstanding element count
    8 parallel pull
    ThreadPoolExecutor size of 150
    MaxAckExtensionPeriod of 3 minutes
    Sleep 30 seconds upon receiving message
  4. Run it for a few minutes

Code example

        Subscriber.newBuilder(ProjectSubscriptionName.of(PROJECT_ID, "air-report-queue-dl-sub")) { message: PubsubMessage, consumer: AckReplyConsumer ->
            Thread.sleep(30000)
            consumer.ack()
        }.apply {
            setParallelPullCount(8)
            setExecutorProvider(FixedExecutorProvider.create(
                ScheduledThreadPoolExecutor(
                    150
                )
            ))
            setMaxAckExtensionPeriod(Duration.ofMinutes(3))
            setFlowControlSettings(
                FlowControlSettings.newBuilder()
                    .setMaxOutstandingElementCount(100)
                    .build()
            )
        }.build().apply { startAsync().awaitRunning() }

Stacktrace:

Jul 09, 2025 1:07:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1307)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1070)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:819)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:651)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:621)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.GrpcLoggingInterceptor$1$1.onClose(GrpcLoggingInterceptor.java:98)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
	at io.grpc.Status.asRuntimeException(Status.java:532)
	... 21 more

I was logging the number of message we're processing in parallel. Initially I see the subscriber was processing 100 requests.

Outstanding messages: 0
Outstanding messages: 36
Outstanding messages: 100
Outstanding messages: 100
Outstanding messages: 100
Outstanding messages: 73
Outstanding messages: 19
Outstanding messages: 0
Outstanding messages: 0
Outstanding messages: 0

But soon as the above error accumulates, its throughput drops to zero in 5 minutes.
My guess is messages are stored in client buffer before they're processed. So they exceeds the 2 minute acknowledge deadline even though the client only spend 30s processing them. And as soon as client failed to ack more than 100 messages, it thinks itself reaches the MaxOutstandingElementCount and stops pulling.
I tried to set this paramsetMinDurationPerAckExtensionDuration(Duration.ofMinutes(2)) as suggested in this doc. But no luck, I still get the same error

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/java-pubsub API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions