2929import com .google .api .gax .rpc .StatusCode .Code ;
3030import com .google .api .gax .rpc .UnknownException ;
3131import com .google .cloud .bigquery .storage .test .Test .FooType ;
32+ import com .google .cloud .bigquery .storage .v1 .ConnectionWorkerPool .Settings ;
3233import com .google .cloud .bigquery .storage .v1 .StorageError .StorageErrorCode ;
3334import com .google .cloud .bigquery .storage .v1 .StreamWriter .SingleConnectionOrConnectionPool .Kind ;
3435import com .google .common .base .Strings ;
6061@ RunWith (JUnit4 .class )
6162public class StreamWriterTest {
6263 private static final Logger log = Logger .getLogger (StreamWriterTest .class .getName ());
63- private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s" ;
64+ private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s" ;
65+ private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s" ;
6466 private static final String TEST_TRACE_ID = "DATAFLOW:job_id" ;
6567 private FakeScheduledExecutorService fakeExecutor ;
6668 private FakeBigQueryWrite testBigQueryWrite ;
@@ -94,7 +96,7 @@ public void tearDown() throws Exception {
9496 }
9597
9698 private StreamWriter getMultiplexingTestStreamWriter () throws IOException {
97- return StreamWriter .newBuilder (TEST_STREAM , client )
99+ return StreamWriter .newBuilder (TEST_STREAM_1 , client )
98100 .setWriterSchema (createProtoSchema ())
99101 .setTraceId (TEST_TRACE_ID )
100102 .setLocation ("US" )
@@ -103,7 +105,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
103105 }
104106
105107 private StreamWriter getTestStreamWriter () throws IOException {
106- return StreamWriter .newBuilder (TEST_STREAM , client )
108+ return StreamWriter .newBuilder (TEST_STREAM_1 , client )
107109 .setWriterSchema (createProtoSchema ())
108110 .setTraceId (TEST_TRACE_ID )
109111 .build ();
@@ -197,7 +199,7 @@ private void verifyAppendRequests(long appendCount) {
197199 if (i == 0 ) {
198200 // First request received by server should have schema and stream name.
199201 assertTrue (serverRequest .getProtoRows ().hasWriterSchema ());
200- assertEquals (serverRequest .getWriteStream (), TEST_STREAM );
202+ assertEquals (serverRequest .getWriteStream (), TEST_STREAM_1 );
201203 assertEquals (serverRequest .getTraceId (), TEST_TRACE_ID );
202204 } else {
203205 // Following request should not have schema and stream name.
@@ -210,7 +212,7 @@ private void verifyAppendRequests(long appendCount) {
210212
211213 public void testBuildBigQueryWriteClientInWriter () throws Exception {
212214 StreamWriter writer =
213- StreamWriter .newBuilder (TEST_STREAM )
215+ StreamWriter .newBuilder (TEST_STREAM_1 )
214216 .setCredentialsProvider (NoCredentialsProvider .create ())
215217 .setChannelProvider (serviceHelper .createChannelProvider ())
216218 .setWriterSchema (createProtoSchema ())
@@ -253,7 +255,7 @@ public void testNoSchema() throws Exception {
253255 new ThrowingRunnable () {
254256 @ Override
255257 public void run () throws Throwable {
256- StreamWriter .newBuilder (TEST_STREAM , client ).build ();
258+ StreamWriter .newBuilder (TEST_STREAM_1 , client ).build ();
257259 }
258260 });
259261 assertEquals (ex .getStatus ().getCode (), Status .INVALID_ARGUMENT .getCode ());
@@ -267,23 +269,23 @@ public void testInvalidTraceId() throws Exception {
267269 new ThrowingRunnable () {
268270 @ Override
269271 public void run () throws Throwable {
270- StreamWriter .newBuilder (TEST_STREAM ).setTraceId ("abc" );
272+ StreamWriter .newBuilder (TEST_STREAM_1 ).setTraceId ("abc" );
271273 }
272274 });
273275 assertThrows (
274276 IllegalArgumentException .class ,
275277 new ThrowingRunnable () {
276278 @ Override
277279 public void run () throws Throwable {
278- StreamWriter .newBuilder (TEST_STREAM ).setTraceId ("abc:" );
280+ StreamWriter .newBuilder (TEST_STREAM_1 ).setTraceId ("abc:" );
279281 }
280282 });
281283 assertThrows (
282284 IllegalArgumentException .class ,
283285 new ThrowingRunnable () {
284286 @ Override
285287 public void run () throws Throwable {
286- StreamWriter .newBuilder (TEST_STREAM ).setTraceId (":abc" );
288+ StreamWriter .newBuilder (TEST_STREAM_1 ).setTraceId (":abc" );
287289 }
288290 });
289291 }
@@ -487,7 +489,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
487489 @ Test
488490 public void testZeroMaxInflightRequests () throws Exception {
489491 StreamWriter writer =
490- StreamWriter .newBuilder (TEST_STREAM , client )
492+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
491493 .setWriterSchema (createProtoSchema ())
492494 .setMaxInflightRequests (0 )
493495 .build ();
@@ -499,7 +501,7 @@ public void testZeroMaxInflightRequests() throws Exception {
499501 @ Test
500502 public void testZeroMaxInflightBytes () throws Exception {
501503 StreamWriter writer =
502- StreamWriter .newBuilder (TEST_STREAM , client )
504+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
503505 .setWriterSchema (createProtoSchema ())
504506 .setMaxInflightBytes (0 )
505507 .build ();
@@ -511,7 +513,7 @@ public void testZeroMaxInflightBytes() throws Exception {
511513 @ Test
512514 public void testOneMaxInflightRequests () throws Exception {
513515 StreamWriter writer =
514- StreamWriter .newBuilder (TEST_STREAM , client )
516+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
515517 .setWriterSchema (createProtoSchema ())
516518 .setMaxInflightRequests (1 )
517519 .build ();
@@ -525,10 +527,45 @@ public void testOneMaxInflightRequests() throws Exception {
525527 writer .close ();
526528 }
527529
530+ @ Test
531+ public void testOneMaxInflightRequests_MultiplexingCase () throws Exception {
532+ ConnectionWorkerPool .setOptions (Settings .builder ().setMaxConnectionsPerRegion (2 ).build ());
533+ StreamWriter writer1 =
534+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
535+ .setWriterSchema (createProtoSchema ())
536+ .setLocation ("US" )
537+ .setEnableConnectionPool (true )
538+ .setMaxInflightRequests (1 )
539+ .build ();
540+ StreamWriter writer2 =
541+ StreamWriter .newBuilder (TEST_STREAM_2 , client )
542+ .setWriterSchema (createProtoSchema ())
543+ .setMaxInflightRequests (1 )
544+ .setEnableConnectionPool (true )
545+ .setMaxInflightRequests (1 )
546+ .setLocation ("US" )
547+ .build ();
548+
549+ // Server will sleep 1 second before every response.
550+ testBigQueryWrite .setResponseSleep (Duration .ofSeconds (1 ));
551+ testBigQueryWrite .addResponse (createAppendResponse (0 ));
552+ testBigQueryWrite .addResponse (createAppendResponse (1 ));
553+
554+ ApiFuture <AppendRowsResponse > appendFuture1 = sendTestMessage (writer1 , new String [] {"A" });
555+ ApiFuture <AppendRowsResponse > appendFuture2 = sendTestMessage (writer2 , new String [] {"A" });
556+
557+ assertTrue (writer1 .getInflightWaitSeconds () >= 1 );
558+ assertTrue (writer2 .getInflightWaitSeconds () >= 1 );
559+ assertEquals (0 , appendFuture1 .get ().getAppendResult ().getOffset ().getValue ());
560+ assertEquals (1 , appendFuture2 .get ().getAppendResult ().getOffset ().getValue ());
561+ writer1 .close ();
562+ writer2 .close ();
563+ }
564+
528565 @ Test
529566 public void testAppendsWithTinyMaxInflightBytes () throws Exception {
530567 StreamWriter writer =
531- StreamWriter .newBuilder (TEST_STREAM , client )
568+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
532569 .setWriterSchema (createProtoSchema ())
533570 .setMaxInflightBytes (1 )
534571 .build ();
@@ -560,7 +597,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
560597 @ Test
561598 public void testAppendsWithTinyMaxInflightBytesThrow () throws Exception {
562599 StreamWriter writer =
563- StreamWriter .newBuilder (TEST_STREAM , client )
600+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
564601 .setWriterSchema (createProtoSchema ())
565602 .setMaxInflightBytes (1 )
566603 .setLimitExceededBehavior (FlowController .LimitExceededBehavior .ThrowException )
@@ -595,7 +632,7 @@ public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
595632 @ Override
596633 public void run () throws Throwable {
597634 StreamWriter writer =
598- StreamWriter .newBuilder (TEST_STREAM , client )
635+ StreamWriter .newBuilder (TEST_STREAM_1 , client )
599636 .setWriterSchema (createProtoSchema ())
600637 .setMaxInflightBytes (1 )
601638 .setLimitExceededBehavior (FlowController .LimitExceededBehavior .Ignore )
@@ -745,7 +782,7 @@ public void testExtractDatasetName() throws Exception {
745782 @ Test (timeout = 10000 )
746783 public void testCloseDisconnectedStream () throws Exception {
747784 StreamWriter writer =
748- StreamWriter .newBuilder (TEST_STREAM )
785+ StreamWriter .newBuilder (TEST_STREAM_1 )
749786 .setCredentialsProvider (NoCredentialsProvider .create ())
750787 .setChannelProvider (serviceHelper .createChannelProvider ())
751788 .setWriterSchema (createProtoSchema ())
0 commit comments