1717
1818import com .google .api .core .ApiFuture ;
1919import com .google .api .gax .batching .BatchingSettings ;
20+ import com .google .api .gax .batching .FlowControlSettings ;
2021import com .google .api .gax .core .CredentialsProvider ;
21- import com .google .api .gax .core .ExecutorProvider ;
22- import com .google .api .gax .retrying .RetrySettings ;
2322import com .google .api .gax .rpc .TransportChannelProvider ;
2423import com .google .cloud .bigquery .Schema ;
2524import com .google .common .base .Preconditions ;
@@ -51,6 +50,7 @@ public class JsonStreamWriter implements AutoCloseable {
5150 private BigQueryWriteClient client ;
5251 private String streamName ;
5352 private StreamWriter streamWriter ;
53+ private StreamWriter .Builder streamWriterBuilder ;
5454 private Descriptor descriptor ;
5555 private TableSchema tableSchema ;
5656
@@ -66,20 +66,16 @@ private JsonStreamWriter(Builder builder)
6666 this .descriptor =
6767 BQTableSchemaToProtoDescriptor .convertBQTableSchemaToProtoDescriptor (builder .tableSchema );
6868
69- StreamWriter .Builder streamWriterBuilder ;
7069 if (this .client == null ) {
7170 streamWriterBuilder = StreamWriter .newBuilder (builder .streamOrTableName );
7271 } else {
7372 streamWriterBuilder = StreamWriter .newBuilder (builder .streamOrTableName , builder .client );
7473 }
7574 setStreamWriterSettings (
76- streamWriterBuilder ,
7775 builder .channelProvider ,
7876 builder .credentialsProvider ,
79- builder .batchingSettings ,
80- builder .retrySettings ,
81- builder .executorProvider ,
8277 builder .endpoint ,
78+ builder .flowControlSettings ,
8379 builder .createDefaultStream );
8480 this .streamWriter = streamWriterBuilder .build ();
8581 this .streamName = this .streamWriter .getStreamNameString ();
@@ -134,17 +130,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
134130 }
135131
136132 /**
137- * Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then calling
138- * refreshAppend() , and finally setting the descriptor. All of these actions need to be performed
139- * atomically to avoid having synchronization issues with append(). Flushing all rows first is
140- * necessary since if there are rows remaining when the connection refreshes, it will send out the
141- * old writer schema instead of the new one.
133+ * Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then
134+ * recreates stream writer , and finally setting the descriptor. All of these actions need to be
135+ * performed atomically to avoid having synchronization issues with append(). Flushing all rows
136+ * first is necessary since if there are rows remaining when the connection refreshes, it will
137+ * send out the old writer schema instead of the new one.
142138 */
143139 void refreshConnection ()
144140 throws IOException , InterruptedException , Descriptors .DescriptorValidationException {
145141 synchronized (this ) {
146- this .streamWriter .writeAllOutstanding ();
147- this .streamWriter . refreshAppend ();
142+ this .streamWriter .shutdown ();
143+ this .streamWriter = streamWriterBuilder . build ();
148144 this .descriptor =
149145 BQTableSchemaToProtoDescriptor .convertBQTableSchemaToProtoDescriptor (this .tableSchema );
150146 }
@@ -170,39 +166,37 @@ public Descriptor getDescriptor() {
170166
171167 /** Sets all StreamWriter settings. */
172168 private void setStreamWriterSettings (
173- StreamWriter .Builder builder ,
174169 @ Nullable TransportChannelProvider channelProvider ,
175170 @ Nullable CredentialsProvider credentialsProvider ,
176- @ Nullable BatchingSettings batchingSettings ,
177- @ Nullable RetrySettings retrySettings ,
178- @ Nullable ExecutorProvider executorProvider ,
179171 @ Nullable String endpoint ,
172+ @ Nullable FlowControlSettings flowControlSettings ,
180173 Boolean createDefaultStream ) {
181174 if (channelProvider != null ) {
182- builder .setChannelProvider (channelProvider );
175+ streamWriterBuilder .setChannelProvider (channelProvider );
183176 }
184177 if (credentialsProvider != null ) {
185- builder .setCredentialsProvider (credentialsProvider );
178+ streamWriterBuilder .setCredentialsProvider (credentialsProvider );
186179 }
187- if (batchingSettings != null ) {
188- builder .setBatchingSettings (batchingSettings );
189- }
190- if (retrySettings != null ) {
191- builder .setRetrySettings (retrySettings );
192- }
193- if (executorProvider != null ) {
194- builder .setExecutorProvider (executorProvider );
180+ BatchingSettings .Builder batchSettingBuilder =
181+ BatchingSettings .newBuilder ()
182+ .setElementCountThreshold (1L )
183+ .setRequestByteThreshold (4 * 1024 * 1024L );
184+ if (flowControlSettings != null ) {
185+ streamWriterBuilder .setBatchingSettings (
186+ batchSettingBuilder .setFlowControlSettings (flowControlSettings ).build ());
187+ } else {
188+ streamWriterBuilder .setBatchingSettings (batchSettingBuilder .build ());
195189 }
196190 if (endpoint != null ) {
197- builder .setEndpoint (endpoint );
191+ streamWriterBuilder .setEndpoint (endpoint );
198192 }
199193 if (createDefaultStream ) {
200- builder .createDefaultStream ();
194+ streamWriterBuilder .createDefaultStream ();
201195 }
202196 JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
203197 new JsonStreamWriterOnSchemaUpdateRunnable ();
204198 jsonStreamWriterOnSchemaUpdateRunnable .setJsonStreamWriter (this );
205- builder .setOnSchemaUpdateRunnable (jsonStreamWriterOnSchemaUpdateRunnable );
199+ streamWriterBuilder .setOnSchemaUpdateRunnable (jsonStreamWriterOnSchemaUpdateRunnable );
206200 }
207201
208202 /**
@@ -313,9 +307,7 @@ public static final class Builder {
313307
314308 private TransportChannelProvider channelProvider ;
315309 private CredentialsProvider credentialsProvider ;
316- private BatchingSettings batchingSettings ;
317- private RetrySettings retrySettings ;
318- private ExecutorProvider executorProvider ;
310+ private FlowControlSettings flowControlSettings ;
319311 private String endpoint ;
320312 private boolean createDefaultStream = false ;
321313
@@ -359,37 +351,15 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
359351 }
360352
361353 /**
362- * Setter for the underlying StreamWriter's BatchingSettings.
363- *
364- * @param batchingSettings
365- * @return Builder
366- */
367- public Builder setBatchingSettings (BatchingSettings batchingSettings ) {
368- this .batchingSettings =
369- Preconditions .checkNotNull (batchingSettings , "BatchingSettings is null." );
370- return this ;
371- }
372-
373- /**
374- * Setter for the underlying StreamWriter's RetrySettings.
375- *
376- * @param retrySettings
377- * @return Builder
378- */
379- public Builder setRetrySettings (RetrySettings retrySettings ) {
380- this .retrySettings = Preconditions .checkNotNull (retrySettings , "RetrySettings is null." );
381- return this ;
382- }
383-
384- /**
385- * Setter for the underlying StreamWriter's ExecutorProvider.
354+ * Setter for the underlying StreamWriter's FlowControlSettings.
386355 *
387- * @param executorProvider
356+ * @param flowControlSettings
388357 * @return Builder
389358 */
390- public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
391- this .executorProvider =
392- Preconditions .checkNotNull (executorProvider , "ExecutorProvider is null." );
359+ public Builder setFlowControlSettings (FlowControlSettings flowControlSettings ) {
360+ Preconditions .checkNotNull (flowControlSettings , "FlowControlSettings is null." );
361+ this .flowControlSettings =
362+ Preconditions .checkNotNull (flowControlSettings , "FlowControlSettings is null." );
393363 return this ;
394364 }
395365
0 commit comments