1818import com .google .api .core .ApiFuture ;
1919import com .google .api .gax .batching .FlowController ;
2020import com .google .auto .value .AutoValue ;
21+ import com .google .cloud .bigquery .storage .v1 .ConnectionWorker .Load ;
22+ import com .google .common .base .Stopwatch ;
23+ import com .google .common .collect .ImmutableList ;
24+ import java .io .IOException ;
25+ import java .util .Collections ;
26+ import java .util .Comparator ;
27+ import java .util .HashSet ;
28+ import java .util .List ;
29+ import java .util .Map ;
30+ import java .util .Set ;
31+ import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .atomic .AtomicInteger ;
33+ import java .util .concurrent .locks .Lock ;
34+ import java .util .concurrent .locks .ReentrantLock ;
35+ import java .util .logging .Logger ;
2136import javax .annotation .concurrent .GuardedBy ;
2237
38+ /** Pool of connections to accept appends and distirbute to different connections. */
2339public class ConnectionWorkerPool {
40+ private static final Logger log = Logger .getLogger (ConnectionWorkerPool .class .getName ());
2441 /*
2542 * Max allowed inflight requests in the stream. Method append is blocked at this.
2643 */
@@ -36,11 +53,29 @@ public class ConnectionWorkerPool {
3653 */
3754 private final FlowController .LimitExceededBehavior limitExceededBehavior ;
3855
56+ /** Map from write stream to corresponding connection. */
57+ private final Map <StreamWriter , ConnectionWorker > streamWriterToConnection =
58+ new ConcurrentHashMap <>();
59+
60+ /** Map from a connection to a set of write stream that have sent requests onto it. */
61+ private final Map <ConnectionWorker , Set <StreamWriter >> connectionToWriteStream =
62+ new ConcurrentHashMap <>();
63+
64+ /** Collection of all the created connections. */
65+ private final Set <ConnectionWorker > connectionWorkerPool =
66+ Collections .synchronizedSet (new HashSet <>());
67+
68+ /** Enable test related logic. */
69+ private static boolean enableTesting = false ;
70+
3971 /*
4072 * TraceId for debugging purpose.
4173 */
4274 private final String traceId ;
4375
76+ /** Used for test on the number of times createWorker is called. */
77+ private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger (0 );
78+
4479 /*
4580 * Tracks current inflight requests in the stream.
4681 */
@@ -102,6 +137,15 @@ public class ConnectionWorkerPool {
102137 */
103138 private boolean ownsBigQueryWriteClient = false ;
104139
140+ /**
141+ * The current maximum connection count. This value is gradually increased till the user defined
142+ * maximum connection count.
143+ */
144+ private int currentMaxConnectionCount ;
145+
146+ /** Lock for controlling concurrent operation on add / delete connections. */
147+ private final Lock lock = new ReentrantLock ();
148+
105149 /** Settings for connection pool. */
106150 @ AutoValue
107151 public abstract static class Settings {
@@ -147,6 +191,7 @@ public ConnectionWorkerPool(
147191 this .traceId = traceId ;
148192 this .client = client ;
149193 this .ownsBigQueryWriteClient = ownsBigQueryWriteClient ;
194+ this .currentMaxConnectionCount = settings .minConnectionsPerPool ();
150195 }
151196
152197 /**
@@ -160,13 +205,149 @@ public static void setOptions(Settings settings) {
160205
161206 /** Distributes the writing of a message to an underlying connection. */
162207 public ApiFuture <AppendRowsResponse > append (StreamWriter streamWriter , ProtoRows rows ) {
163- throw new RuntimeException ( "Append is not implemented!" );
208+ return append ( streamWriter , rows , - 1 );
164209 }
165210
166211 /** Distributes the writing of a message to an underlying connection. */
167212 public ApiFuture <AppendRowsResponse > append (
168213 StreamWriter streamWriter , ProtoRows rows , long offset ) {
169- throw new RuntimeException ("append with offset is not implemented on connection pool!" );
214+ // We are in multiplexing mode after entering the following logic.
215+ ConnectionWorker connectionWorker =
216+ streamWriterToConnection .compute (
217+ streamWriter ,
218+ (key , existingStream ) -> {
219+ // Though compute on concurrent map is atomic, we still do explicit locking as we
220+ // may have concurrent close(...) triggered.
221+ lock .lock ();
222+ try {
223+ // Stick to the existing stream if it's not overwhelmed.
224+ if (existingStream != null && !existingStream .getLoad ().isOverwhelmed ()) {
225+ return existingStream ;
226+ }
227+ // Try to create or find another existing stream to reuse.
228+ ConnectionWorker createdOrExistingConnection = null ;
229+ try {
230+ createdOrExistingConnection =
231+ createOrReuseConnectionWorker (streamWriter , existingStream );
232+ } catch (IOException e ) {
233+ throw new IllegalStateException (e );
234+ }
235+ // Update connection to write stream relationship.
236+ connectionToWriteStream .computeIfAbsent (
237+ createdOrExistingConnection , (ConnectionWorker k ) -> new HashSet <>());
238+ connectionToWriteStream .get (createdOrExistingConnection ).add (streamWriter );
239+ return createdOrExistingConnection ;
240+ } finally {
241+ lock .unlock ();
242+ }
243+ });
244+ Stopwatch stopwatch = Stopwatch .createStarted ();
245+ ApiFuture <AppendRowsResponse > responseFuture =
246+ connectionWorker .append (
247+ streamWriter .getStreamName (), streamWriter .getProtoSchema (), rows , offset );
248+ return responseFuture ;
249+ }
250+
251+ /**
252+ * Create a new connection if we haven't reached current maximum, or reuse an existing connection
253+ * with least load.
254+ */
255+ private ConnectionWorker createOrReuseConnectionWorker (
256+ StreamWriter streamWriter , ConnectionWorker existingConnectionWorker ) throws IOException {
257+ String streamReference = streamWriter .getStreamName ();
258+ if (connectionWorkerPool .size () < currentMaxConnectionCount ) {
259+ // Always create a new connection if we haven't reached current maximum.
260+ return createConnectionWorker (streamWriter .getStreamName (), streamWriter .getProtoSchema ());
261+ } else {
262+ ConnectionWorker existingBestConnection =
263+ pickBestLoadConnection (
264+ enableTesting ? Load .TEST_LOAD_COMPARATOR : Load .LOAD_COMPARATOR ,
265+ ImmutableList .copyOf (connectionWorkerPool ));
266+ if (!existingBestConnection .getLoad ().isOverwhelmed ()) {
267+ return existingBestConnection ;
268+ } else if (currentMaxConnectionCount < settings .maxConnectionsPerPool ()) {
269+ // At this point, we have reached the connection cap and the selected connection is
270+ // overwhelmed, we can try scale up the connection pool.
271+ // The connection count will go up one by one until `maxConnectionsPerPool` is reached.
272+ currentMaxConnectionCount += 1 ;
273+ if (currentMaxConnectionCount > settings .maxConnectionsPerPool ()) {
274+ currentMaxConnectionCount = settings .maxConnectionsPerPool ();
275+ }
276+ return createConnectionWorker (streamWriter .getStreamName (), streamWriter .getProtoSchema ());
277+ } else {
278+ // Stick to the original connection if all the connections are overwhelmed.
279+ if (existingConnectionWorker != null ) {
280+ return existingConnectionWorker ;
281+ }
282+ // If we are at this branch, it means we reached the maximum connections.
283+ return existingBestConnection ;
284+ }
285+ }
286+ }
287+
288+ /** Select out the best connection worker among the given connection workers. */
289+ static ConnectionWorker pickBestLoadConnection (
290+ Comparator <Load > comparator , List <ConnectionWorker > connectionWorkerList ) {
291+ if (connectionWorkerList .isEmpty ()) {
292+ throw new IllegalStateException (
293+ String .format (
294+ "Bug in code! At least one connection worker should be passed in "
295+ + "pickSemiBestLoadConnection(...)" ));
296+ }
297+ // Compare all connection workers to find the connection worker with the smallest load.
298+ // Loop and find the connection with the least load.
299+ // The load comparision and computation process
300+ int currentBestIndex = 0 ;
301+ Load currentBestLoad = connectionWorkerList .get (currentBestIndex ).getLoad ();
302+ for (int i = 1 ; i < connectionWorkerList .size (); i ++) {
303+ Load loadToCompare = connectionWorkerList .get (i ).getLoad ();
304+ if (comparator .compare (loadToCompare , currentBestLoad ) <= 0 ) {
305+ currentBestIndex = i ;
306+ currentBestLoad = loadToCompare ;
307+ }
308+ }
309+ return connectionWorkerList .get (currentBestIndex );
310+ }
311+
312+ /**
313+ * Creates a single connection worker.
314+ *
315+ * <p>Note this function need to be thread-safe across different stream reference but no need for
316+ * a single stream reference. This is because createConnectionWorker(...) is called via
317+ * computeIfAbsent(...) which is at most once per key.
318+ */
319+ private ConnectionWorker createConnectionWorker (String streamName , ProtoSchema writeSchema )
320+ throws IOException {
321+ if (enableTesting ) {
322+ // Though atomic integer is super lightweight, add extra if check in case adding future logic.
323+ testValueCreateConnectionCount .getAndIncrement ();
324+ }
325+ ConnectionWorker connectionWorker =
326+ new ConnectionWorker (
327+ streamName ,
328+ writeSchema ,
329+ maxInflightRequests ,
330+ maxInflightBytes ,
331+ limitExceededBehavior ,
332+ traceId ,
333+ client ,
334+ ownsBigQueryWriteClient );
335+ connectionWorkerPool .add (connectionWorker );
336+ log .info (
337+ String .format (
338+ "Scaling up new connection for stream name: %s, pool size after scaling up %s" ,
339+ streamName , connectionWorkerPool .size ()));
340+ return connectionWorker ;
341+ }
342+
343+ /** Enable Test related logic. */
344+ public static void enableTestingLogic () {
345+ enableTesting = true ;
346+ }
347+
348+ /** Returns how many times createConnectionWorker(...) is called. */
349+ int getCreateConnectionCount () {
350+ return testValueCreateConnectionCount .get ();
170351 }
171352
172353 /** Close the stream writer. Shut down all resources. */
0 commit comments