Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 1bb8e26

Browse files
feat: add multiplexing client core algorithm and basic testing, plus fix a tiny bug in fake server (#1787)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent a869a1d commit 1bb8e26

5 files changed

Lines changed: 427 additions & 9 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
4949
If you are using Gradle 5.x or later, add this to your dependencies:
5050

5151
```Groovy
52-
implementation platform('com.google.cloud:libraries-bom:26.1.1')
52+
implementation platform('com.google.cloud:libraries-bom:26.1.2')
5353
5454
implementation 'com.google.cloud:google-cloud-bigquerystorage'
5555
```

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 183 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,26 @@
1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.FlowController;
2020
import com.google.auto.value.AutoValue;
21+
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
22+
import com.google.common.base.Stopwatch;
23+
import com.google.common.collect.ImmutableList;
24+
import java.io.IOException;
25+
import java.util.Collections;
26+
import java.util.Comparator;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.logging.Logger;
2136
import javax.annotation.concurrent.GuardedBy;
2237

38+
/** Pool of connections to accept appends and distirbute to different connections. */
2339
public class ConnectionWorkerPool {
40+
private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
2441
/*
2542
* Max allowed inflight requests in the stream. Method append is blocked at this.
2643
*/
@@ -36,11 +53,29 @@ public class ConnectionWorkerPool {
3653
*/
3754
private final FlowController.LimitExceededBehavior limitExceededBehavior;
3855

56+
/** Map from write stream to corresponding connection. */
57+
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection =
58+
new ConcurrentHashMap<>();
59+
60+
/** Map from a connection to a set of write stream that have sent requests onto it. */
61+
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream =
62+
new ConcurrentHashMap<>();
63+
64+
/** Collection of all the created connections. */
65+
private final Set<ConnectionWorker> connectionWorkerPool =
66+
Collections.synchronizedSet(new HashSet<>());
67+
68+
/** Enable test related logic. */
69+
private static boolean enableTesting = false;
70+
3971
/*
4072
* TraceId for debugging purpose.
4173
*/
4274
private final String traceId;
4375

76+
/** Used for test on the number of times createWorker is called. */
77+
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
78+
4479
/*
4580
* Tracks current inflight requests in the stream.
4681
*/
@@ -102,6 +137,15 @@ public class ConnectionWorkerPool {
102137
*/
103138
private boolean ownsBigQueryWriteClient = false;
104139

140+
/**
141+
* The current maximum connection count. This value is gradually increased till the user defined
142+
* maximum connection count.
143+
*/
144+
private int currentMaxConnectionCount;
145+
146+
/** Lock for controlling concurrent operation on add / delete connections. */
147+
private final Lock lock = new ReentrantLock();
148+
105149
/** Settings for connection pool. */
106150
@AutoValue
107151
public abstract static class Settings {
@@ -147,6 +191,7 @@ public ConnectionWorkerPool(
147191
this.traceId = traceId;
148192
this.client = client;
149193
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
194+
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
150195
}
151196

152197
/**
@@ -160,13 +205,149 @@ public static void setOptions(Settings settings) {
160205

161206
/** Distributes the writing of a message to an underlying connection. */
162207
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
163-
throw new RuntimeException("Append is not implemented!");
208+
return append(streamWriter, rows, -1);
164209
}
165210

166211
/** Distributes the writing of a message to an underlying connection. */
167212
public ApiFuture<AppendRowsResponse> append(
168213
StreamWriter streamWriter, ProtoRows rows, long offset) {
169-
throw new RuntimeException("append with offset is not implemented on connection pool!");
214+
// We are in multiplexing mode after entering the following logic.
215+
ConnectionWorker connectionWorker =
216+
streamWriterToConnection.compute(
217+
streamWriter,
218+
(key, existingStream) -> {
219+
// Though compute on concurrent map is atomic, we still do explicit locking as we
220+
// may have concurrent close(...) triggered.
221+
lock.lock();
222+
try {
223+
// Stick to the existing stream if it's not overwhelmed.
224+
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
225+
return existingStream;
226+
}
227+
// Try to create or find another existing stream to reuse.
228+
ConnectionWorker createdOrExistingConnection = null;
229+
try {
230+
createdOrExistingConnection =
231+
createOrReuseConnectionWorker(streamWriter, existingStream);
232+
} catch (IOException e) {
233+
throw new IllegalStateException(e);
234+
}
235+
// Update connection to write stream relationship.
236+
connectionToWriteStream.computeIfAbsent(
237+
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>());
238+
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter);
239+
return createdOrExistingConnection;
240+
} finally {
241+
lock.unlock();
242+
}
243+
});
244+
Stopwatch stopwatch = Stopwatch.createStarted();
245+
ApiFuture<AppendRowsResponse> responseFuture =
246+
connectionWorker.append(
247+
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
248+
return responseFuture;
249+
}
250+
251+
/**
252+
* Create a new connection if we haven't reached current maximum, or reuse an existing connection
253+
* with least load.
254+
*/
255+
private ConnectionWorker createOrReuseConnectionWorker(
256+
StreamWriter streamWriter, ConnectionWorker existingConnectionWorker) throws IOException {
257+
String streamReference = streamWriter.getStreamName();
258+
if (connectionWorkerPool.size() < currentMaxConnectionCount) {
259+
// Always create a new connection if we haven't reached current maximum.
260+
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
261+
} else {
262+
ConnectionWorker existingBestConnection =
263+
pickBestLoadConnection(
264+
enableTesting ? Load.TEST_LOAD_COMPARATOR : Load.LOAD_COMPARATOR,
265+
ImmutableList.copyOf(connectionWorkerPool));
266+
if (!existingBestConnection.getLoad().isOverwhelmed()) {
267+
return existingBestConnection;
268+
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
269+
// At this point, we have reached the connection cap and the selected connection is
270+
// overwhelmed, we can try scale up the connection pool.
271+
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
272+
currentMaxConnectionCount += 1;
273+
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
274+
currentMaxConnectionCount = settings.maxConnectionsPerPool();
275+
}
276+
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
277+
} else {
278+
// Stick to the original connection if all the connections are overwhelmed.
279+
if (existingConnectionWorker != null) {
280+
return existingConnectionWorker;
281+
}
282+
// If we are at this branch, it means we reached the maximum connections.
283+
return existingBestConnection;
284+
}
285+
}
286+
}
287+
288+
/** Select out the best connection worker among the given connection workers. */
289+
static ConnectionWorker pickBestLoadConnection(
290+
Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) {
291+
if (connectionWorkerList.isEmpty()) {
292+
throw new IllegalStateException(
293+
String.format(
294+
"Bug in code! At least one connection worker should be passed in "
295+
+ "pickSemiBestLoadConnection(...)"));
296+
}
297+
// Compare all connection workers to find the connection worker with the smallest load.
298+
// Loop and find the connection with the least load.
299+
// The load comparision and computation process
300+
int currentBestIndex = 0;
301+
Load currentBestLoad = connectionWorkerList.get(currentBestIndex).getLoad();
302+
for (int i = 1; i < connectionWorkerList.size(); i++) {
303+
Load loadToCompare = connectionWorkerList.get(i).getLoad();
304+
if (comparator.compare(loadToCompare, currentBestLoad) <= 0) {
305+
currentBestIndex = i;
306+
currentBestLoad = loadToCompare;
307+
}
308+
}
309+
return connectionWorkerList.get(currentBestIndex);
310+
}
311+
312+
/**
313+
* Creates a single connection worker.
314+
*
315+
* <p>Note this function need to be thread-safe across different stream reference but no need for
316+
* a single stream reference. This is because createConnectionWorker(...) is called via
317+
* computeIfAbsent(...) which is at most once per key.
318+
*/
319+
private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema)
320+
throws IOException {
321+
if (enableTesting) {
322+
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
323+
testValueCreateConnectionCount.getAndIncrement();
324+
}
325+
ConnectionWorker connectionWorker =
326+
new ConnectionWorker(
327+
streamName,
328+
writeSchema,
329+
maxInflightRequests,
330+
maxInflightBytes,
331+
limitExceededBehavior,
332+
traceId,
333+
client,
334+
ownsBigQueryWriteClient);
335+
connectionWorkerPool.add(connectionWorker);
336+
log.info(
337+
String.format(
338+
"Scaling up new connection for stream name: %s, pool size after scaling up %s",
339+
streamName, connectionWorkerPool.size()));
340+
return connectionWorker;
341+
}
342+
343+
/** Enable Test related logic. */
344+
public static void enableTestingLogic() {
345+
enableTesting = true;
346+
}
347+
348+
/** Returns how many times createConnectionWorker(...) is called. */
349+
int getCreateConnectionCount() {
350+
return testValueCreateConnectionCount.get();
170351
}
171352

172353
/** Close the stream writer. Shut down all resources. */

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ public String getStreamName() {
152152
return streamName;
153153
}
154154

155+
/** @return the passed in user schema. */
156+
public ProtoSchema getProtoSchema() {
157+
return writerSchema;
158+
}
159+
155160
/** Close the stream writer. Shut down all resources. */
156161
@Override
157162
public void close() {

0 commit comments

Comments
 (0)