-
Notifications
You must be signed in to change notification settings - Fork 99
Description
Description
This recently merged PR introduces a concurrent-modification exception when exactly-once-delivery is enabled. I believe the source is this unsynchronized call, as all other usages seem to be synchronized.
I've also noticed that the subscriber often stops processing messages completely shortly after this is thrown (usually the 2nd or 3rd time the exception is thrown), which makes this even more critical to fix. I've not fully verified why the subscriber completely stops processing messages (it's still considered in a running state at the top-level, maybe the cause would be messages sitting in-memory but not getting sent to the user to ack / nack 🤷 ), so it's possible the soft-crashes are caused by something else - but the pattern is very consistent.
Environment details
Tested using library version 1.125.6, Java 17.
Steps to reproduce
- Create a subscriber with exactly-once-delivery enabled as shown in the official docs. I've tested by simply copying the exampel straight off, removing the shutdown at the end.
- Create a publisher, also as shown in the official docs. I've changed it publish 500 messages every 10 seconds.
- Start publishing messages, eventually a listener call will throw.
Stack trace
RuntimeException while executing runnable CallbackListener{com.google.api.core.ApiFutures$1@28557790} with executor MoreExecutors.directExecutor() java.util.ConcurrentModificationException: null
at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:788)
at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:786)
at com.google.cloud.pubsub.v1.MessageDispatcher.notifyAckSuccess(MessageDispatcher.java:423)
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:530)
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:523)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:546)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
[com.google.common.util.concurrent.AbstractFuture.executeListener() @ 1291]