Skip to content

com.google.cloud.pubsub.v1.Subscriber how to await stop signal? #2485

@peter-gerhard

Description

@peter-gerhard

Hi,
when im calling stopAsync on a subscriber i get the following exception:

com.google.common.util.concurrent.AbstractFuture executeListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$4@22553be0 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@6e1d503c
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2c92d002 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@31c73eaa[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 18]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675)
	at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:141)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:385)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:422)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:61)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:504)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:425)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:536)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

the doc of the startAsync suggests to wait for a stop signal:

  /**
   * Initiates service startup and returns immediately.
   *
   * <p>Example of receiving a specific number of messages.
   *
   * <pre>{@code
   * Subscriber subscriber = Subscriber.defaultBuilder(subscription, receiver).build();
   * subscriber.addListener(new Subscriber.Listener() {
   *   public void failed(Subscriber.State from, Throwable failure) {
   *     // Handle error.
   *   }
   * }, executor);
   * subscriber.startAsync();
   *
   * // Wait for a stop signal.
   * done.get();
   * subscriber.stopAsync().awaitTerminated();
   * }</pre>
   */

But i don't know what is meant by done.get();

version 0.25.0-beta

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions