2020import com .google .api .gax .core .CredentialsProvider ;
2121import com .google .api .gax .rpc .FixedHeaderProvider ;
2222import com .google .api .gax .rpc .TransportChannelProvider ;
23+ import com .google .auto .value .AutoOneOf ;
24+ import com .google .auto .value .AutoValue ;
25+ import com .google .cloud .bigquery .storage .v1 .StreamWriter .Builder .ConnectionMode ;
26+ import com .google .common .annotations .VisibleForTesting ;
2327import com .google .common .base .Preconditions ;
2428import io .grpc .Status ;
2529import io .grpc .Status .Code ;
2630import io .grpc .StatusRuntimeException ;
2731import java .io .IOException ;
32+ import java .util .Map ;
33+ import java .util .Objects ;
2834import java .util .UUID ;
35+ import java .util .concurrent .ConcurrentHashMap ;
2936import java .util .logging .Logger ;
3037
3138/**
3643public class StreamWriter implements AutoCloseable {
3744 private static final Logger log = Logger .getLogger (StreamWriter .class .getName ());
3845
39- private final ConnectionWorker connectionWorker ;
40-
4146 /*
4247 * The identifier of stream to write to.
4348 */
@@ -51,11 +56,108 @@ public class StreamWriter implements AutoCloseable {
5156 */
5257 private final String writerId = UUID .randomUUID ().toString ();
5358
59+ /**
60+ * Stream can access a single connection or a pool of connection depending on whether multiplexing
61+ * is enabled.
62+ */
63+ private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool ;
64+
65+ /**
66+ * Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be
67+ * shared by every stream writer in the same process.
68+ */
69+ private static final Map <ConnectionPoolKey , ConnectionWorkerPool > connectionPoolMap =
70+ new ConcurrentHashMap <>();
71+
5472 /** The maximum size of one request. Defined by the API. */
5573 public static long getApiMaxRequestBytes () {
5674 return 10L * 1000L * 1000L ; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
5775 }
5876
77+ /**
78+ * Connection pool with different key will be split.
79+ *
80+ * <p>Shard based only on location right now.
81+ */
82+ @ AutoValue
83+ abstract static class ConnectionPoolKey {
84+ abstract String location ();
85+
86+ public static ConnectionPoolKey create (String location ) {
87+ return new AutoValue_StreamWriter_ConnectionPoolKey (location );
88+ }
89+ }
90+
91+ /**
92+ * When in single table mode, append directly to connectionWorker. Otherwise append to connection
93+ * pool in multiplexing mode.
94+ */
95+ @ AutoOneOf (SingleConnectionOrConnectionPool .Kind .class )
96+ public abstract static class SingleConnectionOrConnectionPool {
97+ /** Kind of connection operation mode. */
98+ public enum Kind {
99+ CONNECTION_WORKER ,
100+ CONNECTION_WORKER_POOL
101+ }
102+
103+ public abstract Kind getKind ();
104+
105+ public abstract ConnectionWorker connectionWorker ();
106+
107+ public abstract ConnectionWorkerPool connectionWorkerPool ();
108+
109+ public ApiFuture <AppendRowsResponse > append (
110+ StreamWriter streamWriter , ProtoRows protoRows , long offset ) {
111+ if (getKind () == Kind .CONNECTION_WORKER ) {
112+ return connectionWorker ()
113+ .append (streamWriter .getStreamName (), streamWriter .getProtoSchema (), protoRows , offset );
114+ } else {
115+ return connectionWorkerPool ().append (streamWriter , protoRows , offset );
116+ }
117+ }
118+
119+ public void close (StreamWriter streamWriter ) {
120+ if (getKind () == Kind .CONNECTION_WORKER ) {
121+ connectionWorker ().close ();
122+ } else {
123+ connectionWorkerPool ().close (streamWriter );
124+ }
125+ }
126+
127+ long getInflightWaitSeconds () {
128+ if (getKind () == Kind .CONNECTION_WORKER_POOL ) {
129+ throw new IllegalStateException (
130+ "getInflightWaitSeconds is not supported in multiplexing mode." );
131+ }
132+ return connectionWorker ().getInflightWaitSeconds ();
133+ }
134+
135+ TableSchema getUpdatedSchema () {
136+ if (getKind () == Kind .CONNECTION_WORKER_POOL ) {
137+ // TODO(gaole): implement updated schema support for multiplexing.
138+ throw new IllegalStateException ("getUpdatedSchema is not implemented for multiplexing." );
139+ }
140+ return connectionWorker ().getUpdatedSchema ();
141+ }
142+
143+ String getWriterId (String streamWriterId ) {
144+ if (getKind () == Kind .CONNECTION_WORKER_POOL ) {
145+ return streamWriterId ;
146+ }
147+ return connectionWorker ().getWriterId ();
148+ }
149+
150+ public static SingleConnectionOrConnectionPool ofSingleConnection (ConnectionWorker connection ) {
151+ return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool .connectionWorker (connection );
152+ }
153+
154+ public static SingleConnectionOrConnectionPool ofConnectionPool (
155+ ConnectionWorkerPool connectionPool ) {
156+ return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool .connectionWorkerPool (
157+ connectionPool );
158+ }
159+ }
160+
59161 private StreamWriter (Builder builder ) throws IOException {
60162 BigQueryWriteClient client ;
61163 this .streamName = builder .streamName ;
@@ -78,16 +180,66 @@ private StreamWriter(Builder builder) throws IOException {
78180 client = builder .client ;
79181 ownsBigQueryWriteClient = false ;
80182 }
81- connectionWorker =
82- new ConnectionWorker (
83- builder .streamName ,
84- builder .writerSchema ,
85- builder .maxInflightRequest ,
86- builder .maxInflightBytes ,
87- builder .limitExceededBehavior ,
88- builder .traceId ,
89- client ,
90- ownsBigQueryWriteClient );
183+ if (builder .connectionMode == ConnectionMode .SINGLE_TABLE ) {
184+ this .singleConnectionOrConnectionPool =
185+ SingleConnectionOrConnectionPool .ofSingleConnection (
186+ new ConnectionWorker (
187+ builder .streamName ,
188+ builder .writerSchema ,
189+ builder .maxInflightRequest ,
190+ builder .maxInflightBytes ,
191+ builder .limitExceededBehavior ,
192+ builder .traceId ,
193+ client ,
194+ ownsBigQueryWriteClient ));
195+ } else {
196+ if (builder .location == "" ) {
197+ throw new IllegalArgumentException ("Location must be specified for multiplexing client!" );
198+ }
199+ // Assume the connection in the same pool share the same client and trace id.
200+ // The first StreamWriter for a new stub will create the pool for the other
201+ // streams in the same region, meaning the per StreamWriter settings are no
202+ // longer working unless all streams share the same set of settings
203+ this .singleConnectionOrConnectionPool =
204+ SingleConnectionOrConnectionPool .ofConnectionPool (
205+ connectionPoolMap .computeIfAbsent (
206+ ConnectionPoolKey .create (builder .location ),
207+ (key ) ->
208+ new ConnectionWorkerPool (
209+ builder .maxInflightRequest ,
210+ builder .maxInflightBytes ,
211+ builder .limitExceededBehavior ,
212+ builder .traceId ,
213+ client ,
214+ ownsBigQueryWriteClient )));
215+ validateFetchedConnectonPool (client , builder );
216+ }
217+ }
218+
219+ // Validate whether the fetched connection pool matched certain properties.
220+ private void validateFetchedConnectonPool (
221+ BigQueryWriteClient client , StreamWriter .Builder builder ) {
222+ String paramsValidatedFailed = "" ;
223+ if (!Objects .equals (
224+ this .singleConnectionOrConnectionPool .connectionWorkerPool ().getTraceId (),
225+ builder .traceId )) {
226+ paramsValidatedFailed = "Trace id" ;
227+ } else if (!Objects .equals (
228+ this .singleConnectionOrConnectionPool .connectionWorkerPool ().bigQueryWriteClient (),
229+ client )) {
230+ paramsValidatedFailed = "Bigquery write client" ;
231+ } else if (!Objects .equals (
232+ this .singleConnectionOrConnectionPool .connectionWorkerPool ().limitExceededBehavior (),
233+ builder .limitExceededBehavior )) {
234+ paramsValidatedFailed = "Limit Exceeds Behavior" ;
235+ }
236+
237+ if (!paramsValidatedFailed .isEmpty ()) {
238+ throw new IllegalArgumentException (
239+ String .format (
240+ "%s used for the same connection pool for the same location must be the same!" ,
241+ paramsValidatedFailed ));
242+ }
91243 }
92244
93245 /**
@@ -127,7 +279,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
127279 * @return the append response wrapped in a future.
128280 */
129281 public ApiFuture <AppendRowsResponse > append (ProtoRows rows , long offset ) {
130- return this .connectionWorker .append (streamName , writerSchema , rows , offset );
282+ return this .singleConnectionOrConnectionPool .append (this , rows , offset );
131283 }
132284
133285 /**
@@ -139,12 +291,12 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
139291 * stream case.
140292 */
141293 public long getInflightWaitSeconds () {
142- return connectionWorker .getInflightWaitSeconds ();
294+ return singleConnectionOrConnectionPool .getInflightWaitSeconds ();
143295 }
144296
145297 /** @return a unique Id for the writer. */
146298 public String getWriterId () {
147- return connectionWorker .getWriterId ();
299+ return singleConnectionOrConnectionPool .getWriterId (writerId );
148300 }
149301
150302 /** @return name of the Stream that this writer is working on. */
@@ -160,7 +312,7 @@ public ProtoSchema getProtoSchema() {
160312 /** Close the stream writer. Shut down all resources. */
161313 @ Override
162314 public void close () {
163- this . connectionWorker . close ();
315+ singleConnectionOrConnectionPool . close (this );
164316 }
165317
166318 /**
@@ -179,11 +331,28 @@ public static StreamWriter.Builder newBuilder(String streamName) {
179331
180332 /** Thread-safe getter of updated TableSchema */
181333 public synchronized TableSchema getUpdatedSchema () {
182- return connectionWorker .getUpdatedSchema ();
334+ return singleConnectionOrConnectionPool .getUpdatedSchema ();
335+ }
336+
337+ @ VisibleForTesting
338+ SingleConnectionOrConnectionPool .Kind getConnectionOperationType () {
339+ return singleConnectionOrConnectionPool .getKind ();
183340 }
184341
185342 /** A builder of {@link StreamWriter}s. */
186343 public static final class Builder {
344+ /** Operation mode for the internal connection pool. */
345+ public enum ConnectionMode {
346+ // Create a connection per given write stream.
347+ SINGLE_TABLE ,
348+ // Share a connection for multiple tables. This mode is only effective in default stream case.
349+ // Some key characteristics:
350+ // 1. tables within the same pool has to be in the same location.
351+ // 2. Close(streamReference) will not close connection immediately until all tables on
352+ // this connection is closed.
353+ // 3. Try to use one stream per table at first and share stream later.
354+ MULTIPLEXING
355+ }
187356
188357 private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L ;
189358
@@ -210,10 +379,14 @@ public static final class Builder {
210379 private FlowController .LimitExceededBehavior limitExceededBehavior =
211380 FlowController .LimitExceededBehavior .Block ;
212381
382+ private ConnectionMode connectionMode = ConnectionMode .SINGLE_TABLE ;
383+
213384 private String traceId = null ;
214385
215386 private TableSchema updatedTableSchema = null ;
216387
388+ private String location ;
389+
217390 private Builder (String streamName ) {
218391 this .streamName = Preconditions .checkNotNull (streamName );
219392 this .client = null ;
@@ -246,6 +419,11 @@ public Builder setEndpoint(String endpoint) {
246419 return this ;
247420 }
248421
422+ public Builder enableConnectionPool () {
423+ this .connectionMode = ConnectionMode .MULTIPLEXING ;
424+ return this ;
425+ }
426+
249427 /**
250428 * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
251429 * API endpoint.
@@ -280,6 +458,12 @@ public Builder setTraceId(String traceId) {
280458 return this ;
281459 }
282460
461+ /** Location of the table this stream writer is targeting. */
462+ public Builder setLocation (String location ) {
463+ this .location = location ;
464+ return this ;
465+ }
466+
283467 /**
284468 * Sets the limit exceeded behavior.
285469 *
0 commit comments