Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b26265e

Browse files
authored
feat: add connection worker skeleton used for multiplexing client (#1778)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client
1 parent 7dd447d commit b26265e

3 files changed

Lines changed: 196 additions & 2 deletions

File tree

.github/.OwlBot.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ deep-preserve-regex:
7878
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/Waiter.java"
7979
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java"
8080
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java"
81+
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java"
8182

8283
deep-copy-regex:
8384
- source: "/google/cloud/bigquery/storage/(v.*)/.*-java/proto-google-.*/src"

google-cloud-bigquerystorage/pom.xml

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@
4444
<groupId>org.codehaus.mojo</groupId>
4545
<artifactId>flatten-maven-plugin</artifactId>
4646
</plugin>
47+
<plugin>
48+
<groupId>org.apache.maven.plugins</groupId>
49+
<artifactId>maven-dependency-plugin</artifactId>
50+
<configuration>
51+
<usedDependencies>
52+
<dependency>com.google.auto.value:auto-value</dependency>
53+
</usedDependencies>
54+
</configuration>
55+
</plugin>
4756
</plugins>
4857
</build>
4958
<dependencies>
@@ -63,6 +72,16 @@
6372
<groupId>com.google.api</groupId>
6473
<artifactId>api-common</artifactId>
6574
</dependency>
75+
<dependency>
76+
<groupId>com.google.auto.value</groupId>
77+
<artifactId>auto-value</artifactId>
78+
<version>1.9</version>
79+
</dependency>
80+
<dependency>
81+
<groupId>com.google.auto.value</groupId>
82+
<artifactId>auto-value-annotations</artifactId>
83+
<version>1.9</version>
84+
</dependency>
6685
<dependency>
6786
<groupId>com.google.protobuf</groupId>
6887
<artifactId>protobuf-java</artifactId>
@@ -71,7 +90,6 @@
7190
<groupId>com.google.api.grpc</groupId>
7291
<artifactId>proto-google-common-protos</artifactId>
7392
</dependency>
74-
7593
<dependency>
7694
<groupId>com.google.api.grpc</groupId>
7795
<artifactId>proto-google-cloud-bigquerystorage-v1beta1</artifactId>
@@ -134,7 +152,6 @@
134152
<artifactId>junit</artifactId>
135153
<scope>test</scope>
136154
</dependency>
137-
138155
<dependency>
139156
<groupId>com.google.truth</groupId>
140157
<artifactId>truth</artifactId>
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.gax.batching.FlowController;
20+
import com.google.auto.value.AutoValue;
21+
import javax.annotation.concurrent.GuardedBy;
22+
23+
public class ConnectionWorkerPool {
24+
/*
25+
* Max allowed inflight requests in the stream. Method append is blocked at this.
26+
*/
27+
private final long maxInflightRequests;
28+
29+
/*
30+
* Max allowed inflight bytes in the stream. Method append is blocked at this.
31+
*/
32+
private final long maxInflightBytes;
33+
34+
/*
35+
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
36+
*/
37+
private final FlowController.LimitExceededBehavior limitExceededBehavior;
38+
39+
/*
40+
* TraceId for debugging purpose.
41+
*/
42+
private final String traceId;
43+
44+
/*
45+
* Tracks current inflight requests in the stream.
46+
*/
47+
@GuardedBy("lock")
48+
private long inflightRequests = 0;
49+
50+
/*
51+
* Tracks current inflight bytes in the stream.
52+
*/
53+
@GuardedBy("lock")
54+
private long inflightBytes = 0;
55+
56+
/*
57+
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
58+
* count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
59+
* connection. Keep track of the number of reconnections in succession. This will be reset if
60+
* a row is successfully called back.
61+
*/
62+
@GuardedBy("lock")
63+
private long conectionRetryCountWithoutCallback = 0;
64+
65+
/*
66+
* If false, streamConnection needs to be reset.
67+
*/
68+
@GuardedBy("lock")
69+
private boolean streamConnectionIsConnected = false;
70+
71+
/*
72+
* A boolean to track if we cleaned up inflight queue.
73+
*/
74+
@GuardedBy("lock")
75+
private boolean inflightCleanuped = false;
76+
77+
/*
78+
* Indicates whether user has called Close() or not.
79+
*/
80+
@GuardedBy("lock")
81+
private boolean userClosed = false;
82+
83+
/*
84+
* The final status of connection. Set to nonnull when connection is permanently closed.
85+
*/
86+
@GuardedBy("lock")
87+
private Throwable connectionFinalStatus = null;
88+
89+
/*
90+
* Contains the updated TableSchema.
91+
*/
92+
@GuardedBy("lock")
93+
private TableSchema updatedSchema;
94+
95+
/*
96+
* A client used to interact with BigQuery.
97+
*/
98+
private BigQueryWriteClient client;
99+
100+
/*
101+
* If true, the client above is created by this writer and should be closed.
102+
*/
103+
private boolean ownsBigQueryWriteClient = false;
104+
105+
/** Settings for connection pool. */
106+
@AutoValue
107+
public abstract static class Settings {
108+
/**
109+
* The minimum connections each pool created before trying to reuse the previously created
110+
* connection in multiplexing mode.
111+
*/
112+
abstract int minConnectionsPerPool();
113+
114+
/** The maximum connections per connection pool. */
115+
abstract int maxConnectionsPerPool();
116+
117+
public static Builder builder() {
118+
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
119+
.setMinConnectionsPerPool(2)
120+
.setMaxConnectionsPerPool(10);
121+
}
122+
123+
/** Builder for the options to config {@link ConnectionWorkerPool}. */
124+
@AutoValue.Builder
125+
public abstract static class Builder {
126+
public abstract Builder setMinConnectionsPerPool(int value);
127+
128+
public abstract Builder setMaxConnectionsPerPool(int value);
129+
130+
public abstract Settings build();
131+
}
132+
}
133+
134+
/** Static setting for connection pool. */
135+
private static Settings settings = Settings.builder().build();
136+
137+
public ConnectionWorkerPool(
138+
long maxInflightRequests,
139+
long maxInflightBytes,
140+
FlowController.LimitExceededBehavior limitExceededBehavior,
141+
String traceId,
142+
BigQueryWriteClient client,
143+
boolean ownsBigQueryWriteClient) {
144+
this.maxInflightRequests = maxInflightRequests;
145+
this.maxInflightBytes = maxInflightBytes;
146+
this.limitExceededBehavior = limitExceededBehavior;
147+
this.traceId = traceId;
148+
this.client = client;
149+
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
150+
}
151+
152+
/**
153+
* Sets static connection pool options.
154+
*
155+
* <p>Note: this method should be triggered prior to the construction of connection pool.
156+
*/
157+
public static void setOptions(Settings settings) {
158+
ConnectionWorkerPool.settings = settings;
159+
}
160+
161+
/** Distributes the writing of a message to an underlying connection. */
162+
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
163+
throw new RuntimeException("Append is not implemented!");
164+
}
165+
166+
/** Distributes the writing of a message to an underlying connection. */
167+
public ApiFuture<AppendRowsResponse> append(
168+
StreamWriter streamWriter, ProtoRows rows, long offset) {
169+
throw new RuntimeException("append with offset is not implemented on connection pool!");
170+
}
171+
172+
/** Close the stream writer. Shut down all resources. */
173+
public void close(StreamWriter streamWriter) {
174+
throw new RuntimeException("close is implemented on connection pool");
175+
}
176+
}

0 commit comments

Comments
 (0)