Skip to content

Commit 6faac7c

Browse files
committed
pr comment
1 parent 0da6d38 commit 6faac7c

3 files changed

Lines changed: 19 additions & 1 deletion

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations.
4646
*/
4747
final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor {
48-
static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);
48+
static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
4949

5050
private static final int MAX_PER_REQUEST_CHANGES = 1000;
5151
private static final int DEFAULT_MAX_MESSAGES = 1000;

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
5151
private String subscription = "";
5252
private final AtomicInteger messageAckDeadline =
5353
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
54+
private final AtomicInteger getSubscriptionCalled = new AtomicInteger();
5455
private final List<Stream> openedStreams = new ArrayList<>();
5556
private final List<Stream> closedStreams = new ArrayList<>();
5657
private final List<String> acks = new ArrayList<>();
@@ -225,6 +226,7 @@ public void enqueuePullResponse(PullResponse response) {
225226
@Override
226227
public void getSubscription(
227228
GetSubscriptionRequest request, StreamObserver<Subscription> responseObserver) {
229+
getSubscriptionCalled.incrementAndGet();
228230
responseObserver.onNext(
229231
Subscription.newBuilder()
230232
.setName(request.getSubscription())
@@ -234,6 +236,10 @@ public void getSubscription(
234236
responseObserver.onCompleted();
235237
}
236238

239+
public int getSubscriptionCalledNum() {
240+
return getSubscriptionCalled.get();
241+
}
242+
237243
@Override
238244
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
239245
synchronized (receivedPullRequest) {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ public void testAckSingleMessage() throws Exception {
203203
assertEquivalent(testAckIds, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(1));
204204
}
205205

206+
@Test
207+
public void testGetSubscriptionOnce() throws Exception {
208+
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));
209+
210+
sendMessages(ImmutableList.of("A"));
211+
212+
// Trigger ack sending
213+
subscriber.stopAsync().awaitTerminated();
214+
215+
assertEquals(1, fakeSubscriberServiceImpl.getSubscriptionCalledNum());
216+
}
217+
206218
@Test
207219
public void testNackSingleMessage() throws Exception {
208220
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));

0 commit comments

Comments
 (0)