Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 5e1f092

Browse files
authored
Merge 1e0ea32 into 7c1bb12
2 parents 7c1bb12 + 1e0ea32 commit 5e1f092

File tree

6 files changed

+396
-492
lines changed

6 files changed

+396
-492
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,19 @@
2828
<differenceType>7002</differenceType>
2929
<method>void flushAll(long)</method>
3030
</difference>
31+
<difference>
32+
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
33+
<differenceType>7002</differenceType>
34+
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setBatchingSettings(com.google.api.gax.batching.BatchingSettings)</method>
35+
</difference>
36+
<difference>
37+
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
38+
<differenceType>7002</differenceType>
39+
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setExecutorProvider(com.google.api.gax.core.ExecutorProvider)</method>
40+
</difference>
41+
<difference>
42+
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
43+
<differenceType>7002</differenceType>
44+
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setRetrySettings(com.google.api.gax.retrying.RetrySettings)</method>
45+
</difference>
3146
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java

Lines changed: 32 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.BatchingSettings;
20+
import com.google.api.gax.batching.FlowControlSettings;
2021
import com.google.api.gax.core.CredentialsProvider;
21-
import com.google.api.gax.core.ExecutorProvider;
22-
import com.google.api.gax.retrying.RetrySettings;
2322
import com.google.api.gax.rpc.TransportChannelProvider;
2423
import com.google.cloud.bigquery.Schema;
2524
import com.google.common.base.Preconditions;
@@ -51,6 +50,7 @@ public class JsonStreamWriter implements AutoCloseable {
5150
private BigQueryWriteClient client;
5251
private String streamName;
5352
private StreamWriter streamWriter;
53+
private StreamWriter.Builder streamWriterBuilder;
5454
private Descriptor descriptor;
5555
private TableSchema tableSchema;
5656

@@ -66,20 +66,16 @@ private JsonStreamWriter(Builder builder)
6666
this.descriptor =
6767
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
6868

69-
StreamWriter.Builder streamWriterBuilder;
7069
if (this.client == null) {
7170
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
7271
} else {
7372
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
7473
}
7574
setStreamWriterSettings(
76-
streamWriterBuilder,
7775
builder.channelProvider,
7876
builder.credentialsProvider,
79-
builder.batchingSettings,
80-
builder.retrySettings,
81-
builder.executorProvider,
8277
builder.endpoint,
78+
builder.flowControlSettings,
8379
builder.createDefaultStream);
8480
this.streamWriter = streamWriterBuilder.build();
8581
this.streamName = this.streamWriter.getStreamNameString();
@@ -134,17 +130,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
134130
}
135131

136132
/**
137-
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then calling
138-
* refreshAppend(), and finally setting the descriptor. All of these actions need to be performed
139-
* atomically to avoid having synchronization issues with append(). Flushing all rows first is
140-
* necessary since if there are rows remaining when the connection refreshes, it will send out the
141-
* old writer schema instead of the new one.
133+
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then
134+
* recreates stream writer, and finally setting the descriptor. All of these actions need to be
135+
* performed atomically to avoid having synchronization issues with append(). Flushing all rows
136+
* first is necessary since if there are rows remaining when the connection refreshes, it will
137+
* send out the old writer schema instead of the new one.
142138
*/
143139
void refreshConnection()
144140
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
145141
synchronized (this) {
146-
this.streamWriter.writeAllOutstanding();
147-
this.streamWriter.refreshAppend();
142+
this.streamWriter.shutdown();
143+
this.streamWriter = streamWriterBuilder.build();
148144
this.descriptor =
149145
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
150146
}
@@ -170,39 +166,37 @@ public Descriptor getDescriptor() {
170166

171167
/** Sets all StreamWriter settings. */
172168
private void setStreamWriterSettings(
173-
StreamWriter.Builder builder,
174169
@Nullable TransportChannelProvider channelProvider,
175170
@Nullable CredentialsProvider credentialsProvider,
176-
@Nullable BatchingSettings batchingSettings,
177-
@Nullable RetrySettings retrySettings,
178-
@Nullable ExecutorProvider executorProvider,
179171
@Nullable String endpoint,
172+
@Nullable FlowControlSettings flowControlSettings,
180173
Boolean createDefaultStream) {
181174
if (channelProvider != null) {
182-
builder.setChannelProvider(channelProvider);
175+
streamWriterBuilder.setChannelProvider(channelProvider);
183176
}
184177
if (credentialsProvider != null) {
185-
builder.setCredentialsProvider(credentialsProvider);
178+
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
186179
}
187-
if (batchingSettings != null) {
188-
builder.setBatchingSettings(batchingSettings);
189-
}
190-
if (retrySettings != null) {
191-
builder.setRetrySettings(retrySettings);
192-
}
193-
if (executorProvider != null) {
194-
builder.setExecutorProvider(executorProvider);
180+
BatchingSettings.Builder batchSettingBuilder =
181+
BatchingSettings.newBuilder()
182+
.setElementCountThreshold(1L)
183+
.setRequestByteThreshold(4 * 1024 * 1024L);
184+
if (flowControlSettings != null) {
185+
streamWriterBuilder.setBatchingSettings(
186+
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
187+
} else {
188+
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
195189
}
196190
if (endpoint != null) {
197-
builder.setEndpoint(endpoint);
191+
streamWriterBuilder.setEndpoint(endpoint);
198192
}
199193
if (createDefaultStream) {
200-
builder.createDefaultStream();
194+
streamWriterBuilder.createDefaultStream();
201195
}
202196
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
203197
new JsonStreamWriterOnSchemaUpdateRunnable();
204198
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
205-
builder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
199+
streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
206200
}
207201

208202
/**
@@ -313,9 +307,7 @@ public static final class Builder {
313307

314308
private TransportChannelProvider channelProvider;
315309
private CredentialsProvider credentialsProvider;
316-
private BatchingSettings batchingSettings;
317-
private RetrySettings retrySettings;
318-
private ExecutorProvider executorProvider;
310+
private FlowControlSettings flowControlSettings;
319311
private String endpoint;
320312
private boolean createDefaultStream = false;
321313

@@ -359,37 +351,15 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
359351
}
360352

361353
/**
362-
* Setter for the underlying StreamWriter's BatchingSettings.
363-
*
364-
* @param batchingSettings
365-
* @return Builder
366-
*/
367-
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
368-
this.batchingSettings =
369-
Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null.");
370-
return this;
371-
}
372-
373-
/**
374-
* Setter for the underlying StreamWriter's RetrySettings.
375-
*
376-
* @param retrySettings
377-
* @return Builder
378-
*/
379-
public Builder setRetrySettings(RetrySettings retrySettings) {
380-
this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null.");
381-
return this;
382-
}
383-
384-
/**
385-
* Setter for the underlying StreamWriter's ExecutorProvider.
354+
* Setter for the underlying StreamWriter's FlowControlSettings.
386355
*
387-
* @param executorProvider
356+
* @param flowControlSettings
388357
* @return Builder
389358
*/
390-
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
391-
this.executorProvider =
392-
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
359+
public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
360+
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
361+
this.flowControlSettings =
362+
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
393363
return this;
394364
}
395365

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 22 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,9 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
265265
List<InflightBatch> batchesToSend;
266266
batchesToSend = messagesBatch.add(outstandingAppend);
267267
// Setup the next duration based delivery alarm if there are messages batched.
268-
setupAlarm();
268+
if (batchingSettings.getDelayThreshold() != null) {
269+
setupAlarm();
270+
}
269271
if (!batchesToSend.isEmpty()) {
270272
for (final InflightBatch batch : batchesToSend) {
271273
LOG.fine("Scheduling a batch for immediate sending");
@@ -738,58 +740,31 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
738740
if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) {
739741
builder.setRequestByteThreshold(getApiMaxRequestBytes());
740742
}
741-
Preconditions.checkNotNull(batchingSettings.getDelayThreshold());
742-
Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0);
743+
LOG.info("here" + batchingSettings.getFlowControlSettings());
743744
if (batchingSettings.getFlowControlSettings() == null) {
744745
builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS);
745746
} else {
746-
747-
if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() == null) {
748-
builder.setFlowControlSettings(
749-
batchingSettings
750-
.getFlowControlSettings()
751-
.toBuilder()
752-
.setMaxOutstandingElementCount(
753-
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount())
754-
.build());
755-
} else {
756-
Preconditions.checkArgument(
757-
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() > 0);
758-
if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount()
759-
> getApiMaxInflightRequests()) {
760-
builder.setFlowControlSettings(
761-
batchingSettings
762-
.getFlowControlSettings()
763-
.toBuilder()
764-
.setMaxOutstandingElementCount(getApiMaxInflightRequests())
765-
.build());
766-
}
747+
Long elementCount =
748+
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount();
749+
if (elementCount == null || elementCount > getApiMaxInflightRequests()) {
750+
elementCount = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount();
767751
}
768-
if (batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null) {
769-
builder.setFlowControlSettings(
770-
batchingSettings
771-
.getFlowControlSettings()
772-
.toBuilder()
773-
.setMaxOutstandingRequestBytes(
774-
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes())
775-
.build());
776-
} else {
777-
Preconditions.checkArgument(
778-
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() > 0);
752+
Long elementSize =
753+
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes();
754+
if (elementSize == null || elementSize < 0) {
755+
elementSize = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes();
779756
}
780-
if (batchingSettings.getFlowControlSettings().getLimitExceededBehavior() == null) {
781-
builder.setFlowControlSettings(
782-
batchingSettings
783-
.getFlowControlSettings()
784-
.toBuilder()
785-
.setLimitExceededBehavior(
786-
DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior())
787-
.build());
788-
} else {
789-
Preconditions.checkArgument(
790-
batchingSettings.getFlowControlSettings().getLimitExceededBehavior()
791-
!= FlowController.LimitExceededBehavior.Ignore);
757+
FlowController.LimitExceededBehavior behavior =
758+
batchingSettings.getFlowControlSettings().getLimitExceededBehavior();
759+
if (behavior == null || behavior == FlowController.LimitExceededBehavior.Ignore) {
760+
behavior = DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior();
792761
}
762+
builder.setFlowControlSettings(
763+
FlowControlSettings.newBuilder()
764+
.setMaxOutstandingElementCount(elementCount)
765+
.setMaxOutstandingRequestBytes(elementSize)
766+
.setLimitExceededBehavior(behavior)
767+
.build());
793768
}
794769
this.batchingSettings = builder.build();
795770
return this;

0 commit comments

Comments
 (0)