Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 3eb1475

Browse files
feat: wire connection pool to stream writer without implementing updated schema (#1790)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 6b3a974 commit 3eb1475

4 files changed

Lines changed: 276 additions & 26 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public static Builder builder() {
167167
/** Builder for the options to config {@link ConnectionWorkerPool}. */
168168
@AutoValue.Builder
169169
public abstract static class Builder {
170+
// TODO(gaole) rename to per location for easier understanding.
170171
public abstract Builder setMinConnectionsPerPool(int value);
171172

172173
public abstract Builder setMaxConnectionsPerPool(int value);
@@ -387,4 +388,20 @@ int getCreateConnectionCount() {
387388
int getTotalConnectionCount() {
388389
return connectionWorkerPool.size();
389390
}
391+
392+
String getTraceId() {
393+
return traceId;
394+
}
395+
396+
boolean ownsBigQueryWriteClient() {
397+
return ownsBigQueryWriteClient;
398+
}
399+
400+
FlowController.LimitExceededBehavior limitExceededBehavior() {
401+
return limitExceededBehavior;
402+
}
403+
404+
BigQueryWriteClient bigQueryWriteClient() {
405+
return client;
406+
}
390407
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 201 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,19 @@
2020
import com.google.api.gax.core.CredentialsProvider;
2121
import com.google.api.gax.rpc.FixedHeaderProvider;
2222
import com.google.api.gax.rpc.TransportChannelProvider;
23+
import com.google.auto.value.AutoOneOf;
24+
import com.google.auto.value.AutoValue;
25+
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
26+
import com.google.common.annotations.VisibleForTesting;
2327
import com.google.common.base.Preconditions;
2428
import io.grpc.Status;
2529
import io.grpc.Status.Code;
2630
import io.grpc.StatusRuntimeException;
2731
import java.io.IOException;
32+
import java.util.Map;
33+
import java.util.Objects;
2834
import java.util.UUID;
35+
import java.util.concurrent.ConcurrentHashMap;
2936
import java.util.logging.Logger;
3037

3138
/**
@@ -36,8 +43,6 @@
3643
public class StreamWriter implements AutoCloseable {
3744
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
3845

39-
private final ConnectionWorker connectionWorker;
40-
4146
/*
4247
* The identifier of stream to write to.
4348
*/
@@ -51,11 +56,108 @@ public class StreamWriter implements AutoCloseable {
5156
*/
5257
private final String writerId = UUID.randomUUID().toString();
5358

59+
/**
60+
* Stream can access a single connection or a pool of connection depending on whether multiplexing
61+
* is enabled.
62+
*/
63+
private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool;
64+
65+
/**
66+
* Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be
67+
* shared by every stream writer in the same process.
68+
*/
69+
private static final Map<ConnectionPoolKey, ConnectionWorkerPool> connectionPoolMap =
70+
new ConcurrentHashMap<>();
71+
5472
/** The maximum size of one request. Defined by the API. */
5573
public static long getApiMaxRequestBytes() {
5674
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
5775
}
5876

77+
/**
78+
* Connection pool with different key will be split.
79+
*
80+
* <p>Shard based only on location right now.
81+
*/
82+
@AutoValue
83+
abstract static class ConnectionPoolKey {
84+
abstract String location();
85+
86+
public static ConnectionPoolKey create(String location) {
87+
return new AutoValue_StreamWriter_ConnectionPoolKey(location);
88+
}
89+
}
90+
91+
/**
92+
* When in single table mode, append directly to connectionWorker. Otherwise append to connection
93+
* pool in multiplexing mode.
94+
*/
95+
@AutoOneOf(SingleConnectionOrConnectionPool.Kind.class)
96+
public abstract static class SingleConnectionOrConnectionPool {
97+
/** Kind of connection operation mode. */
98+
public enum Kind {
99+
CONNECTION_WORKER,
100+
CONNECTION_WORKER_POOL
101+
}
102+
103+
public abstract Kind getKind();
104+
105+
public abstract ConnectionWorker connectionWorker();
106+
107+
public abstract ConnectionWorkerPool connectionWorkerPool();
108+
109+
public ApiFuture<AppendRowsResponse> append(
110+
StreamWriter streamWriter, ProtoRows protoRows, long offset) {
111+
if (getKind() == Kind.CONNECTION_WORKER) {
112+
return connectionWorker()
113+
.append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset);
114+
} else {
115+
return connectionWorkerPool().append(streamWriter, protoRows, offset);
116+
}
117+
}
118+
119+
public void close(StreamWriter streamWriter) {
120+
if (getKind() == Kind.CONNECTION_WORKER) {
121+
connectionWorker().close();
122+
} else {
123+
connectionWorkerPool().close(streamWriter);
124+
}
125+
}
126+
127+
long getInflightWaitSeconds() {
128+
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
129+
throw new IllegalStateException(
130+
"getInflightWaitSeconds is not supported in multiplexing mode.");
131+
}
132+
return connectionWorker().getInflightWaitSeconds();
133+
}
134+
135+
TableSchema getUpdatedSchema() {
136+
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
137+
// TODO(gaole): implement updated schema support for multiplexing.
138+
throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing.");
139+
}
140+
return connectionWorker().getUpdatedSchema();
141+
}
142+
143+
String getWriterId(String streamWriterId) {
144+
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
145+
return streamWriterId;
146+
}
147+
return connectionWorker().getWriterId();
148+
}
149+
150+
public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) {
151+
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection);
152+
}
153+
154+
public static SingleConnectionOrConnectionPool ofConnectionPool(
155+
ConnectionWorkerPool connectionPool) {
156+
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorkerPool(
157+
connectionPool);
158+
}
159+
}
160+
59161
private StreamWriter(Builder builder) throws IOException {
60162
BigQueryWriteClient client;
61163
this.streamName = builder.streamName;
@@ -78,16 +180,66 @@ private StreamWriter(Builder builder) throws IOException {
78180
client = builder.client;
79181
ownsBigQueryWriteClient = false;
80182
}
81-
connectionWorker =
82-
new ConnectionWorker(
83-
builder.streamName,
84-
builder.writerSchema,
85-
builder.maxInflightRequest,
86-
builder.maxInflightBytes,
87-
builder.limitExceededBehavior,
88-
builder.traceId,
89-
client,
90-
ownsBigQueryWriteClient);
183+
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
184+
this.singleConnectionOrConnectionPool =
185+
SingleConnectionOrConnectionPool.ofSingleConnection(
186+
new ConnectionWorker(
187+
builder.streamName,
188+
builder.writerSchema,
189+
builder.maxInflightRequest,
190+
builder.maxInflightBytes,
191+
builder.limitExceededBehavior,
192+
builder.traceId,
193+
client,
194+
ownsBigQueryWriteClient));
195+
} else {
196+
if (builder.location == "") {
197+
throw new IllegalArgumentException("Location must be specified for multiplexing client!");
198+
}
199+
// Assume the connection in the same pool share the same client and trace id.
200+
// The first StreamWriter for a new stub will create the pool for the other
201+
// streams in the same region, meaning the per StreamWriter settings are no
202+
// longer working unless all streams share the same set of settings
203+
this.singleConnectionOrConnectionPool =
204+
SingleConnectionOrConnectionPool.ofConnectionPool(
205+
connectionPoolMap.computeIfAbsent(
206+
ConnectionPoolKey.create(builder.location),
207+
(key) ->
208+
new ConnectionWorkerPool(
209+
builder.maxInflightRequest,
210+
builder.maxInflightBytes,
211+
builder.limitExceededBehavior,
212+
builder.traceId,
213+
client,
214+
ownsBigQueryWriteClient)));
215+
validateFetchedConnectonPool(client, builder);
216+
}
217+
}
218+
219+
// Validate whether the fetched connection pool matched certain properties.
220+
private void validateFetchedConnectonPool(
221+
BigQueryWriteClient client, StreamWriter.Builder builder) {
222+
String paramsValidatedFailed = "";
223+
if (!Objects.equals(
224+
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
225+
builder.traceId)) {
226+
paramsValidatedFailed = "Trace id";
227+
} else if (!Objects.equals(
228+
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
229+
client)) {
230+
paramsValidatedFailed = "Bigquery write client";
231+
} else if (!Objects.equals(
232+
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
233+
builder.limitExceededBehavior)) {
234+
paramsValidatedFailed = "Limit Exceeds Behavior";
235+
}
236+
237+
if (!paramsValidatedFailed.isEmpty()) {
238+
throw new IllegalArgumentException(
239+
String.format(
240+
"%s used for the same connection pool for the same location must be the same!",
241+
paramsValidatedFailed));
242+
}
91243
}
92244

93245
/**
@@ -127,7 +279,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
127279
* @return the append response wrapped in a future.
128280
*/
129281
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
130-
return this.connectionWorker.append(streamName, writerSchema, rows, offset);
282+
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
131283
}
132284

133285
/**
@@ -139,12 +291,12 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
139291
* stream case.
140292
*/
141293
public long getInflightWaitSeconds() {
142-
return connectionWorker.getInflightWaitSeconds();
294+
return singleConnectionOrConnectionPool.getInflightWaitSeconds();
143295
}
144296

145297
/** @return a unique Id for the writer. */
146298
public String getWriterId() {
147-
return connectionWorker.getWriterId();
299+
return singleConnectionOrConnectionPool.getWriterId(writerId);
148300
}
149301

150302
/** @return name of the Stream that this writer is working on. */
@@ -160,7 +312,7 @@ public ProtoSchema getProtoSchema() {
160312
/** Close the stream writer. Shut down all resources. */
161313
@Override
162314
public void close() {
163-
this.connectionWorker.close();
315+
singleConnectionOrConnectionPool.close(this);
164316
}
165317

166318
/**
@@ -179,11 +331,28 @@ public static StreamWriter.Builder newBuilder(String streamName) {
179331

180332
/** Thread-safe getter of updated TableSchema */
181333
public synchronized TableSchema getUpdatedSchema() {
182-
return connectionWorker.getUpdatedSchema();
334+
return singleConnectionOrConnectionPool.getUpdatedSchema();
335+
}
336+
337+
@VisibleForTesting
338+
SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
339+
return singleConnectionOrConnectionPool.getKind();
183340
}
184341

185342
/** A builder of {@link StreamWriter}s. */
186343
public static final class Builder {
344+
/** Operation mode for the internal connection pool. */
345+
public enum ConnectionMode {
346+
// Create a connection per given write stream.
347+
SINGLE_TABLE,
348+
// Share a connection for multiple tables. This mode is only effective in default stream case.
349+
// Some key characteristics:
350+
// 1. tables within the same pool has to be in the same location.
351+
// 2. Close(streamReference) will not close connection immediately until all tables on
352+
// this connection is closed.
353+
// 3. Try to use one stream per table at first and share stream later.
354+
MULTIPLEXING
355+
}
187356

188357
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
189358

@@ -210,10 +379,14 @@ public static final class Builder {
210379
private FlowController.LimitExceededBehavior limitExceededBehavior =
211380
FlowController.LimitExceededBehavior.Block;
212381

382+
private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;
383+
213384
private String traceId = null;
214385

215386
private TableSchema updatedTableSchema = null;
216387

388+
private String location;
389+
217390
private Builder(String streamName) {
218391
this.streamName = Preconditions.checkNotNull(streamName);
219392
this.client = null;
@@ -246,6 +419,11 @@ public Builder setEndpoint(String endpoint) {
246419
return this;
247420
}
248421

422+
public Builder enableConnectionPool() {
423+
this.connectionMode = ConnectionMode.MULTIPLEXING;
424+
return this;
425+
}
426+
249427
/**
250428
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
251429
* API endpoint.
@@ -280,6 +458,12 @@ public Builder setTraceId(String traceId) {
280458
return this;
281459
}
282460

461+
/** Location of the table this stream writer is targeting. */
462+
public Builder setLocation(String location) {
463+
this.location = location;
464+
return this;
465+
}
466+
283467
/**
284468
* Sets the limit exceeded behavior.
285469
*

0 commit comments

Comments
 (0)