File tree Expand file tree Collapse file tree
main/java/com/google/cloud/pubsub/v1
test/java/com/google/cloud/pubsub/v1 Expand file tree Collapse file tree Original file line number Diff line number Diff line change 4545 * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations.
4646 */
4747final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor {
48- static final Duration DEFAULT_TIMEOUT = Duration .ofSeconds (10 );
48+ static final Duration DEFAULT_TIMEOUT = Duration .ofSeconds (60 );
4949
5050 private static final int MAX_PER_REQUEST_CHANGES = 1000 ;
5151 private static final int DEFAULT_MAX_MESSAGES = 1000 ;
Original file line number Diff line number Diff line change @@ -51,6 +51,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
5151 private String subscription = "" ;
5252 private final AtomicInteger messageAckDeadline =
5353 new AtomicInteger (Subscriber .MIN_ACK_DEADLINE_SECONDS );
54+ private final AtomicInteger getSubscriptionCalled = new AtomicInteger ();
5455 private final List <Stream > openedStreams = new ArrayList <>();
5556 private final List <Stream > closedStreams = new ArrayList <>();
5657 private final List <String > acks = new ArrayList <>();
@@ -225,6 +226,7 @@ public void enqueuePullResponse(PullResponse response) {
225226 @ Override
226227 public void getSubscription (
227228 GetSubscriptionRequest request , StreamObserver <Subscription > responseObserver ) {
229+ getSubscriptionCalled .incrementAndGet ();
228230 responseObserver .onNext (
229231 Subscription .newBuilder ()
230232 .setName (request .getSubscription ())
@@ -234,6 +236,10 @@ public void getSubscription(
234236 responseObserver .onCompleted ();
235237 }
236238
239+ public int getSubscriptionCalledNum () {
240+ return getSubscriptionCalled .get ();
241+ }
242+
237243 @ Override
238244 public void pull (PullRequest request , StreamObserver <PullResponse > responseObserver ) {
239245 synchronized (receivedPullRequest ) {
Original file line number Diff line number Diff line change @@ -203,6 +203,18 @@ public void testAckSingleMessage() throws Exception {
203203 assertEquivalent (testAckIds , fakeSubscriberServiceImpl .waitAndConsumeReceivedAcks (1 ));
204204 }
205205
206+ @ Test
207+ public void testGetSubscriptionOnce () throws Exception {
208+ Subscriber subscriber = startSubscriber (getTestSubscriberBuilder (testReceiver ));
209+
210+ sendMessages (ImmutableList .of ("A" ));
211+
212+ // Trigger ack sending
213+ subscriber .stopAsync ().awaitTerminated ();
214+
215+ assertEquals (1 , fakeSubscriberServiceImpl .getSubscriptionCalledNum ());
216+ }
217+
206218 @ Test
207219 public void testNackSingleMessage () throws Exception {
208220 Subscriber subscriber = startSubscriber (getTestSubscriberBuilder (testReceiver ));
You can’t perform that action at this time.
0 commit comments