Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b3ffd77

Browse files
feat: some fixes for multiplexing client (#1798)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent f6083be commit b3ffd77

6 files changed

Lines changed: 117 additions & 45 deletions

File tree

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,24 @@
4040
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
4141
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
4242
</difference>
43+
<difference>
44+
<differenceType>7002</differenceType>
45+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
46+
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int)</method>
47+
</difference>
48+
<difference>
49+
<differenceType>7013</differenceType>
50+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
51+
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int)</method>
52+
</difference>
53+
<difference>
54+
<differenceType>7002</differenceType>
55+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
56+
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int)</method>
57+
</difference>
58+
<difference>
59+
<differenceType>7013</differenceType>
60+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
61+
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)</method>
62+
</difference>
4363
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.FlowController;
20+
import com.google.api.gax.rpc.FixedHeaderProvider;
2021
import com.google.auto.value.AutoValue;
2122
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
2223
import com.google.common.base.Stopwatch;
@@ -153,24 +154,24 @@ public abstract static class Settings {
153154
* The minimum connections each pool created before trying to reuse the previously created
154155
* connection in multiplexing mode.
155156
*/
156-
abstract int minConnectionsPerPool();
157+
abstract int minConnectionsPerRegion();
157158

158159
/** The maximum connections per connection pool. */
159-
abstract int maxConnectionsPerPool();
160+
abstract int maxConnectionsPerRegion();
160161

161162
public static Builder builder() {
162163
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
163-
.setMinConnectionsPerPool(2)
164-
.setMaxConnectionsPerPool(10);
164+
.setMinConnectionsPerRegion(2)
165+
.setMaxConnectionsPerRegion(10);
165166
}
166167

167168
/** Builder for the options to config {@link ConnectionWorkerPool}. */
168169
@AutoValue.Builder
169170
public abstract static class Builder {
170171
// TODO(gaole) rename to per location for easier understanding.
171-
public abstract Builder setMinConnectionsPerPool(int value);
172+
public abstract Builder setMinConnectionsPerRegion(int value);
172173

173-
public abstract Builder setMaxConnectionsPerPool(int value);
174+
public abstract Builder setMaxConnectionsPerRegion(int value);
174175

175176
public abstract Settings build();
176177
}
@@ -192,7 +193,7 @@ public ConnectionWorkerPool(
192193
this.traceId = traceId;
193194
this.client = client;
194195
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
195-
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
196+
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
196197
}
197198

198199
/**
@@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker(
266267
ImmutableList.copyOf(connectionWorkerPool));
267268
if (!existingBestConnection.getLoad().isOverwhelmed()) {
268269
return existingBestConnection;
269-
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
270+
} else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) {
270271
// At this point, we have reached the connection cap and the selected connection is
271272
// overwhelmed, we can try scale up the connection pool.
272273
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
273274
currentMaxConnectionCount += 1;
274-
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
275-
currentMaxConnectionCount = settings.maxConnectionsPerPool();
275+
if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
276+
currentMaxConnectionCount = settings.maxConnectionsPerRegion();
276277
}
277278
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
278279
} else {
@@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
323324
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
324325
testValueCreateConnectionCount.getAndIncrement();
325326
}
327+
// TODO(gaole): figure out a better way to handle header / request body mismatch
328+
// currently we use different header for the client in each connection worker to be different
329+
// as the backend require the header to have the same write_stream field as request body.
330+
BigQueryWriteClient clientAfterModification = client;
331+
if (ownsBigQueryWriteClient) {
332+
BigQueryWriteSettings settings = client.getSettings();
333+
BigQueryWriteSettings stubSettings =
334+
settings
335+
.toBuilder()
336+
.setHeaderProvider(
337+
FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
338+
.build();
339+
clientAfterModification = BigQueryWriteClient.create(stubSettings);
340+
}
326341
ConnectionWorker connectionWorker =
327342
new ConnectionWorker(
328343
streamName,
@@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
331346
maxInflightBytes,
332347
limitExceededBehavior,
333348
traceId,
334-
client,
349+
clientAfterModification,
335350
ownsBigQueryWriteClient);
336351
connectionWorkerPool.add(connectionWorker);
337352
log.info(

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.gax.core.CredentialsProvider;
2121
import com.google.api.gax.rpc.TransportChannelProvider;
2222
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
23+
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
2324
import com.google.common.base.Preconditions;
2425
import com.google.protobuf.Descriptors;
2526
import com.google.protobuf.Descriptors.Descriptor;
@@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable {
6061
private long totalMessageSize = 0;
6162
private long absTotal = 0;
6263
private ProtoSchema protoSchema;
64+
private boolean enableConnectionPool = false;
6365

6466
/**
6567
* Constructs the JsonStreamWriter
@@ -69,7 +71,6 @@ public class JsonStreamWriter implements AutoCloseable {
6971
private JsonStreamWriter(Builder builder)
7072
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
7173
InterruptedException {
72-
this.client = builder.client;
7374
this.descriptor =
7475
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
7576

@@ -83,6 +84,8 @@ private JsonStreamWriter(Builder builder)
8384
builder.endpoint,
8485
builder.flowControlSettings,
8586
builder.traceId);
87+
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
88+
streamWriterBuilder.setLocation(builder.location);
8689
this.streamWriter = streamWriterBuilder.build();
8790
this.streamName = builder.streamName;
8891
this.tableSchema = builder.tableSchema;
@@ -120,8 +123,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
120123
throws IOException, DescriptorValidationException {
121124
// Handle schema updates in a Thread-safe way by locking down the operation
122125
synchronized (this) {
123-
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
124-
if (updatedSchema != null) {
126+
// Update schema only work when connection pool is not enabled.
127+
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
128+
&& this.streamWriter.getUpdatedSchema() != null) {
129+
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
125130
// Close the StreamWriter
126131
this.streamWriter.close();
127132
// Update JsonStreamWriter's TableSchema and Descriptor
@@ -312,6 +317,9 @@ public static final class Builder {
312317
private String traceId;
313318
private boolean ignoreUnknownFields = false;
314319
private boolean reconnectAfter10M = false;
320+
// Indicte whether multiplexing mode is enabled.
321+
private boolean enableConnectionPool = false;
322+
private String location;
315323

316324
private static String streamPatternString =
317325
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
@@ -450,6 +458,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
450458
return this;
451459
}
452460

461+
/**
462+
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
463+
* connection if possible until the connection is overwhelmed. This feature is still under
464+
* development, please contact write api team before using.
465+
*
466+
* @param enableConnectionPool
467+
* @return Builder
468+
*/
469+
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
470+
this.enableConnectionPool = enableConnectionPool;
471+
return this;
472+
}
473+
474+
/**
475+
* Location of the table this stream writer is targeting. Connection pools are shared by
476+
* location.
477+
*
478+
* @param location
479+
* @return Builder
480+
*/
481+
public Builder setLocation(String location) {
482+
this.location = location;
483+
return this;
484+
}
485+
453486
/**
454487
* Builds JsonStreamWriter
455488
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.api.gax.rpc.TransportChannelProvider;
2323
import com.google.auto.value.AutoOneOf;
2424
import com.google.auto.value.AutoValue;
25-
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
2625
import com.google.common.annotations.VisibleForTesting;
2726
import com.google.common.base.Preconditions;
2827
import io.grpc.Status;
@@ -33,6 +32,7 @@
3332
import java.util.Objects;
3433
import java.util.UUID;
3534
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.TimeUnit;
3636
import java.util.logging.Logger;
3737

3838
/**
@@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException {
180180
client = builder.client;
181181
ownsBigQueryWriteClient = false;
182182
}
183-
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
183+
if (!builder.enableConnectionPool) {
184184
this.singleConnectionOrConnectionPool =
185185
SingleConnectionOrConnectionPool.ofSingleConnection(
186186
new ConnectionWorker(
@@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException {
212212
builder.traceId,
213213
client,
214214
ownsBigQueryWriteClient)));
215-
validateFetchedConnectonPool(client, builder);
215+
validateFetchedConnectonPool(builder);
216+
// Shut down the passed in client. Internally we will create another client inside connection
217+
// pool for every new connection worker.
218+
// TODO(gaole): instead of perform close outside of pool approach, change to always create
219+
// new client in connection.
220+
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
221+
&& ownsBigQueryWriteClient) {
222+
client.shutdown();
223+
try {
224+
client.awaitTermination(150, TimeUnit.SECONDS);
225+
} catch (InterruptedException unused) {
226+
// Ignore interruption as this client is not used.
227+
}
228+
client.close();
229+
}
216230
}
217231
}
218232

219233
// Validate whether the fetched connection pool matched certain properties.
220-
private void validateFetchedConnectonPool(
221-
BigQueryWriteClient client, StreamWriter.Builder builder) {
234+
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
222235
String paramsValidatedFailed = "";
223236
if (!Objects.equals(
224237
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
225238
builder.traceId)) {
226239
paramsValidatedFailed = "Trace id";
227-
} else if (!Objects.equals(
228-
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
229-
client)) {
230-
paramsValidatedFailed = "Bigquery write client";
231240
} else if (!Objects.equals(
232241
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
233242
builder.limitExceededBehavior)) {
@@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
341350

342351
/** A builder of {@link StreamWriter}s. */
343352
public static final class Builder {
344-
/** Operation mode for the internal connection pool. */
345-
public enum ConnectionMode {
346-
// Create a connection per given write stream.
347-
SINGLE_TABLE,
348-
// Share a connection for multiple tables. This mode is only effective in default stream case.
349-
// Some key characteristics:
350-
// 1. tables within the same pool has to be in the same location.
351-
// 2. Close(streamReference) will not close connection immediately until all tables on
352-
// this connection is closed.
353-
// 3. Try to use one stream per table at first and share stream later.
354-
MULTIPLEXING
355-
}
356-
357353
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
358354

359355
private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb.
@@ -379,14 +375,14 @@ public enum ConnectionMode {
379375
private FlowController.LimitExceededBehavior limitExceededBehavior =
380376
FlowController.LimitExceededBehavior.Block;
381377

382-
private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;
383-
384378
private String traceId = null;
385379

386380
private TableSchema updatedTableSchema = null;
387381

388382
private String location;
389383

384+
private boolean enableConnectionPool = false;
385+
390386
private Builder(String streamName) {
391387
this.streamName = Preconditions.checkNotNull(streamName);
392388
this.client = null;
@@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) {
419415
return this;
420416
}
421417

422-
public Builder enableConnectionPool() {
423-
this.connectionMode = ConnectionMode.MULTIPLEXING;
418+
/**
419+
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
420+
* connection if possible until the connection is overwhelmed. This feature is still under
421+
* development, please contact write api team before using.
422+
*
423+
* @param enableConnectionPool
424+
* @return Builder
425+
*/
426+
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
427+
this.enableConnectionPool = enableConnectionPool;
424428
return this;
425429
}
426430

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable(
146146
throws IOException, ExecutionException, InterruptedException {
147147
ConnectionWorkerPool.setOptions(
148148
Settings.builder()
149-
.setMinConnectionsPerPool(2)
150-
.setMaxConnectionsPerPool(maxConnections)
149+
.setMinConnectionsPerRegion(2)
150+
.setMaxConnectionsPerRegion(maxConnections)
151151
.build());
152152
ConnectionWorkerPool connectionWorkerPool =
153153
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
@@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable(
201201
@Test
202202
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
203203
ConnectionWorkerPool.setOptions(
204-
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
204+
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
205205
ConnectionWorkerPool connectionWorkerPool =
206206
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
207207

@@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
250250
@Test
251251
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
252252
ConnectionWorkerPool.setOptions(
253-
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
253+
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
254254
ConnectionWorkerPool connectionWorkerPool =
255255
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
256256

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
9696
.setWriterSchema(createProtoSchema())
9797
.setTraceId(TEST_TRACE_ID)
9898
.setLocation("US")
99-
.enableConnectionPool()
99+
.setEnableConnectionPool(true)
100100
.build();
101101
}
102102

0 commit comments

Comments
 (0)