Skip to content

ConcurrentModificationException in subscriber callbacks when using exactly-once-delivery #1778

@Sourc

Description

@Sourc

Description

This recently merged PR introduces a concurrent-modification exception when exactly-once-delivery is enabled. I believe the source is this unsynchronized call, as all other usages seem to be synchronized.

I've also noticed that the subscriber often stops processing messages completely shortly after this is thrown (usually the 2nd or 3rd time the exception is thrown), which makes this even more critical to fix. I've not fully verified why the subscriber completely stops processing messages (it's still considered in a running state at the top-level, maybe the cause would be messages sitting in-memory but not getting sent to the user to ack / nack 🤷 ), so it's possible the soft-crashes are caused by something else - but the pattern is very consistent.

Environment details

Tested using library version 1.125.6, Java 17.

Steps to reproduce

  1. Create a subscriber with exactly-once-delivery enabled as shown in the official docs. I've tested by simply copying the exampel straight off, removing the shutdown at the end.
  2. Create a publisher, also as shown in the official docs. I've changed it publish 500 messages every 10 seconds.
  3. Start publishing messages, eventually a listener call will throw.

Stack trace

RuntimeException while executing runnable CallbackListener{com.google.api.core.ApiFutures$1@28557790} with executor MoreExecutors.directExecutor() java.util.ConcurrentModificationException: null
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
	at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:788)
	at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:786)
	at com.google.cloud.pubsub.v1.MessageDispatcher.notifyAckSuccess(MessageDispatcher.java:423)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:530)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:523)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:546)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
 [com.google.common.util.concurrent.AbstractFuture.executeListener() @ 1291]

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/java-pubsub API.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions