-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.
Description
Hi,
when im calling stopAsync on a subscriber i get the following exception:
com.google.common.util.concurrent.AbstractFuture executeListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$4@22553be0 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@6e1d503c
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2c92d002 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@31c73eaa[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 18]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675)
at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:141)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:385)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:422)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:61)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:504)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:425)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:536)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
the doc of the startAsync suggests to wait for a stop signal:
/**
* Initiates service startup and returns immediately.
*
* <p>Example of receiving a specific number of messages.
*
* <pre>{@code
* Subscriber subscriber = Subscriber.defaultBuilder(subscription, receiver).build();
* subscriber.addListener(new Subscriber.Listener() {
* public void failed(Subscriber.State from, Throwable failure) {
* // Handle error.
* }
* }, executor);
* subscriber.startAsync();
*
* // Wait for a stop signal.
* done.get();
* subscriber.stopAsync().awaitTerminated();
* }</pre>
*/
But i don't know what is meant by done.get();
version 0.25.0-beta
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.