Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b569116

Browse files
feat: add getInflightWaitSeconds implementation (#1835)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 7e8d900 commit b569116

File tree

3 files changed

+71
-20
lines changed

3 files changed

+71
-20
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,21 @@ public void close(StreamWriter streamWriter) {
392392
}
393393
}
394394

395+
/** Fetch the wait seconds from corresponding worker. */
396+
public long getInflightWaitSeconds(StreamWriter streamWriter) {
397+
lock.lock();
398+
try {
399+
ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter);
400+
if (connectionWorker == null) {
401+
return 0;
402+
} else {
403+
return connectionWorker.getInflightWaitSeconds();
404+
}
405+
} finally {
406+
lock.unlock();
407+
}
408+
}
409+
395410
/** Enable Test related logic. */
396411
public static void enableTestingLogic() {
397412
enableTesting = true;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,9 @@ public void close(StreamWriter streamWriter) {
141141
}
142142
}
143143

144-
long getInflightWaitSeconds() {
144+
long getInflightWaitSeconds(StreamWriter streamWriter) {
145145
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
146-
throw new IllegalStateException(
147-
"getInflightWaitSeconds is not supported in multiplexing mode.");
146+
return connectionWorkerPool().getInflightWaitSeconds(streamWriter);
148147
}
149148
return connectionWorker().getInflightWaitSeconds();
150149
}
@@ -363,7 +362,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
363362
* stream case.
364363
*/
365364
public long getInflightWaitSeconds() {
366-
return singleConnectionOrConnectionPool.getInflightWaitSeconds();
365+
return singleConnectionOrConnectionPool.getInflightWaitSeconds(this);
367366
}
368367

369368
/** @return a unique Id for the writer. */

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.api.gax.rpc.StatusCode.Code;
3030
import com.google.api.gax.rpc.UnknownException;
3131
import com.google.cloud.bigquery.storage.test.Test.FooType;
32+
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
3233
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
3334
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
3435
import com.google.common.base.Strings;
@@ -60,7 +61,8 @@
6061
@RunWith(JUnit4.class)
6162
public class StreamWriterTest {
6263
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
63-
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
64+
private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s";
65+
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s";
6466
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
6567
private FakeScheduledExecutorService fakeExecutor;
6668
private FakeBigQueryWrite testBigQueryWrite;
@@ -94,7 +96,7 @@ public void tearDown() throws Exception {
9496
}
9597

9698
private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
97-
return StreamWriter.newBuilder(TEST_STREAM, client)
99+
return StreamWriter.newBuilder(TEST_STREAM_1, client)
98100
.setWriterSchema(createProtoSchema())
99101
.setTraceId(TEST_TRACE_ID)
100102
.setLocation("US")
@@ -103,7 +105,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
103105
}
104106

105107
private StreamWriter getTestStreamWriter() throws IOException {
106-
return StreamWriter.newBuilder(TEST_STREAM, client)
108+
return StreamWriter.newBuilder(TEST_STREAM_1, client)
107109
.setWriterSchema(createProtoSchema())
108110
.setTraceId(TEST_TRACE_ID)
109111
.build();
@@ -197,7 +199,7 @@ private void verifyAppendRequests(long appendCount) {
197199
if (i == 0) {
198200
// First request received by server should have schema and stream name.
199201
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
200-
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
202+
assertEquals(serverRequest.getWriteStream(), TEST_STREAM_1);
201203
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
202204
} else {
203205
// Following request should not have schema and stream name.
@@ -210,7 +212,7 @@ private void verifyAppendRequests(long appendCount) {
210212

211213
public void testBuildBigQueryWriteClientInWriter() throws Exception {
212214
StreamWriter writer =
213-
StreamWriter.newBuilder(TEST_STREAM)
215+
StreamWriter.newBuilder(TEST_STREAM_1)
214216
.setCredentialsProvider(NoCredentialsProvider.create())
215217
.setChannelProvider(serviceHelper.createChannelProvider())
216218
.setWriterSchema(createProtoSchema())
@@ -253,7 +255,7 @@ public void testNoSchema() throws Exception {
253255
new ThrowingRunnable() {
254256
@Override
255257
public void run() throws Throwable {
256-
StreamWriter.newBuilder(TEST_STREAM, client).build();
258+
StreamWriter.newBuilder(TEST_STREAM_1, client).build();
257259
}
258260
});
259261
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
@@ -267,23 +269,23 @@ public void testInvalidTraceId() throws Exception {
267269
new ThrowingRunnable() {
268270
@Override
269271
public void run() throws Throwable {
270-
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc");
272+
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc");
271273
}
272274
});
273275
assertThrows(
274276
IllegalArgumentException.class,
275277
new ThrowingRunnable() {
276278
@Override
277279
public void run() throws Throwable {
278-
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc:");
280+
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc:");
279281
}
280282
});
281283
assertThrows(
282284
IllegalArgumentException.class,
283285
new ThrowingRunnable() {
284286
@Override
285287
public void run() throws Throwable {
286-
StreamWriter.newBuilder(TEST_STREAM).setTraceId(":abc");
288+
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId(":abc");
287289
}
288290
});
289291
}
@@ -487,7 +489,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
487489
@Test
488490
public void testZeroMaxInflightRequests() throws Exception {
489491
StreamWriter writer =
490-
StreamWriter.newBuilder(TEST_STREAM, client)
492+
StreamWriter.newBuilder(TEST_STREAM_1, client)
491493
.setWriterSchema(createProtoSchema())
492494
.setMaxInflightRequests(0)
493495
.build();
@@ -499,7 +501,7 @@ public void testZeroMaxInflightRequests() throws Exception {
499501
@Test
500502
public void testZeroMaxInflightBytes() throws Exception {
501503
StreamWriter writer =
502-
StreamWriter.newBuilder(TEST_STREAM, client)
504+
StreamWriter.newBuilder(TEST_STREAM_1, client)
503505
.setWriterSchema(createProtoSchema())
504506
.setMaxInflightBytes(0)
505507
.build();
@@ -511,7 +513,7 @@ public void testZeroMaxInflightBytes() throws Exception {
511513
@Test
512514
public void testOneMaxInflightRequests() throws Exception {
513515
StreamWriter writer =
514-
StreamWriter.newBuilder(TEST_STREAM, client)
516+
StreamWriter.newBuilder(TEST_STREAM_1, client)
515517
.setWriterSchema(createProtoSchema())
516518
.setMaxInflightRequests(1)
517519
.build();
@@ -525,10 +527,45 @@ public void testOneMaxInflightRequests() throws Exception {
525527
writer.close();
526528
}
527529

530+
@Test
531+
public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
532+
ConnectionWorkerPool.setOptions(Settings.builder().setMaxConnectionsPerRegion(2).build());
533+
StreamWriter writer1 =
534+
StreamWriter.newBuilder(TEST_STREAM_1, client)
535+
.setWriterSchema(createProtoSchema())
536+
.setLocation("US")
537+
.setEnableConnectionPool(true)
538+
.setMaxInflightRequests(1)
539+
.build();
540+
StreamWriter writer2 =
541+
StreamWriter.newBuilder(TEST_STREAM_2, client)
542+
.setWriterSchema(createProtoSchema())
543+
.setMaxInflightRequests(1)
544+
.setEnableConnectionPool(true)
545+
.setMaxInflightRequests(1)
546+
.setLocation("US")
547+
.build();
548+
549+
// Server will sleep 1 second before every response.
550+
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
551+
testBigQueryWrite.addResponse(createAppendResponse(0));
552+
testBigQueryWrite.addResponse(createAppendResponse(1));
553+
554+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"});
555+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"A"});
556+
557+
assertTrue(writer1.getInflightWaitSeconds() >= 1);
558+
assertTrue(writer2.getInflightWaitSeconds() >= 1);
559+
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
560+
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
561+
writer1.close();
562+
writer2.close();
563+
}
564+
528565
@Test
529566
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
530567
StreamWriter writer =
531-
StreamWriter.newBuilder(TEST_STREAM, client)
568+
StreamWriter.newBuilder(TEST_STREAM_1, client)
532569
.setWriterSchema(createProtoSchema())
533570
.setMaxInflightBytes(1)
534571
.build();
@@ -560,7 +597,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
560597
@Test
561598
public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
562599
StreamWriter writer =
563-
StreamWriter.newBuilder(TEST_STREAM, client)
600+
StreamWriter.newBuilder(TEST_STREAM_1, client)
564601
.setWriterSchema(createProtoSchema())
565602
.setMaxInflightBytes(1)
566603
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
@@ -595,7 +632,7 @@ public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
595632
@Override
596633
public void run() throws Throwable {
597634
StreamWriter writer =
598-
StreamWriter.newBuilder(TEST_STREAM, client)
635+
StreamWriter.newBuilder(TEST_STREAM_1, client)
599636
.setWriterSchema(createProtoSchema())
600637
.setMaxInflightBytes(1)
601638
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
@@ -745,7 +782,7 @@ public void testExtractDatasetName() throws Exception {
745782
@Test(timeout = 10000)
746783
public void testCloseDisconnectedStream() throws Exception {
747784
StreamWriter writer =
748-
StreamWriter.newBuilder(TEST_STREAM)
785+
StreamWriter.newBuilder(TEST_STREAM_1)
749786
.setCredentialsProvider(NoCredentialsProvider.create())
750787
.setChannelProvider(serviceHelper.createChannelProvider())
751788
.setWriterSchema(createProtoSchema())

0 commit comments

Comments
 (0)