-
Notifications
You must be signed in to change notification settings - Fork 99
Description
Environment details
OS type and version: GKE node n2d-highmem-8
Java version: 21
version(s): google-cloud-pubsub-v1-1.120.0
Steps to reproduce
- Create a subscription with 120s ack deadline
- publish 4k Unacked messages
- Create a subscriber with the following setting:
100 max outstanding element count
8 parallel pull
ThreadPoolExecutor size of 150
MaxAckExtensionPeriod of 3 minutes
Sleep 30 seconds upon receiving message - Run it for a few minutes
Code example
Subscriber.newBuilder(ProjectSubscriptionName.of(PROJECT_ID, "air-report-queue-dl-sub")) { message: PubsubMessage, consumer: AckReplyConsumer ->
Thread.sleep(30000)
consumer.ack()
}.apply {
setParallelPullCount(8)
setExecutorProvider(FixedExecutorProvider.create(
ScheduledThreadPoolExecutor(
150
)
))
setMaxAckExtensionPeriod(Duration.ofMinutes(3))
setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(100)
.build()
)
}.build().apply { startAsync().awaitRunning() }Stacktrace:
Jul 09, 2025 1:07:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1307)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1070)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:819)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:651)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:621)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.GrpcLoggingInterceptor$1$1.onClose(GrpcLoggingInterceptor.java:98)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
at io.grpc.Status.asRuntimeException(Status.java:532)
... 21 more
I was logging the number of message we're processing in parallel. Initially I see the subscriber was processing 100 requests.
Outstanding messages: 0
Outstanding messages: 36
Outstanding messages: 100
Outstanding messages: 100
Outstanding messages: 100
Outstanding messages: 73
Outstanding messages: 19
Outstanding messages: 0
Outstanding messages: 0
Outstanding messages: 0
But soon as the above error accumulates, its throughput drops to zero in 5 minutes.
My guess is messages are stored in client buffer before they're processed. So they exceeds the 2 minute acknowledge deadline even though the client only spend 30s processing them. And as soon as client failed to ack more than 100 messages, it thinks itself reaches the MaxOutstandingElementCount and stops pulling.
I tried to set this paramsetMinDurationPerAckExtensionDuration(Duration.ofMinutes(2)) as suggested in this doc. But no luck, I still get the same error