Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 6380f71

Browse files
authored
feat: ExecutorProvider can now be replaced (#1770)
An arbitrary ExecutorProvider can be set to control the generation of gax threads. Fixes #1769
1 parent 419637e commit 6380f71

3 files changed

Lines changed: 36 additions & 1 deletion

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.FlowControlSettings;
2020
import com.google.api.gax.core.CredentialsProvider;
21+
import com.google.api.gax.core.ExecutorProvider;
2122
import com.google.api.gax.rpc.TransportChannelProvider;
2223
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
2324
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
@@ -83,6 +84,7 @@ private JsonStreamWriter(Builder builder)
8384
setStreamWriterSettings(
8485
builder.channelProvider,
8586
builder.credentialsProvider,
87+
builder.executorProvider,
8688
builder.endpoint,
8789
builder.flowControlSettings,
8890
builder.traceId);
@@ -270,6 +272,7 @@ public long getInflightWaitSeconds() {
270272
private void setStreamWriterSettings(
271273
@Nullable TransportChannelProvider channelProvider,
272274
@Nullable CredentialsProvider credentialsProvider,
275+
@Nullable ExecutorProvider executorProvider,
273276
@Nullable String endpoint,
274277
@Nullable FlowControlSettings flowControlSettings,
275278
@Nullable String traceId) {
@@ -279,6 +282,9 @@ private void setStreamWriterSettings(
279282
if (credentialsProvider != null) {
280283
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
281284
}
285+
if (executorProvider != null) {
286+
streamWriterBuilder.setExecutorProvider(executorProvider);
287+
}
282288
if (endpoint != null) {
283289
streamWriterBuilder.setEndpoint(endpoint);
284290
}
@@ -366,6 +372,7 @@ public static final class Builder {
366372

367373
private TransportChannelProvider channelProvider;
368374
private CredentialsProvider credentialsProvider;
375+
private ExecutorProvider executorProvider;
369376
private FlowControlSettings flowControlSettings;
370377
private String endpoint;
371378
private boolean createDefaultStream = false;
@@ -446,6 +453,18 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
446453
return this;
447454
}
448455

456+
/**
457+
* Setter for the underlying StreamWriter's ExecutorProvider.
458+
*
459+
* @param executorProvider
460+
* @return
461+
*/
462+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
463+
this.executorProvider =
464+
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
465+
return this;
466+
}
467+
449468
/**
450469
* Setter for the underlying StreamWriter's FlowControlSettings.
451470
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.gax.batching.FlowController;
2020
import com.google.api.gax.core.CredentialsProvider;
21+
import com.google.api.gax.core.ExecutorProvider;
2122
import com.google.api.gax.rpc.FixedHeaderProvider;
2223
import com.google.api.gax.rpc.TransportChannelProvider;
2324
import com.google.auto.value.AutoOneOf;
@@ -174,6 +175,7 @@ private StreamWriter(Builder builder) throws IOException {
174175
BigQueryWriteSettings.newBuilder()
175176
.setCredentialsProvider(builder.credentialsProvider)
176177
.setTransportChannelProvider(builder.channelProvider)
178+
.setBackgroundExecutorProvider(builder.executorProvider)
177179
.setEndpoint(builder.endpoint)
178180
// (b/185842996): Temporily fix this by explicitly providing the header.
179181
.setHeaderProvider(
@@ -383,6 +385,9 @@ public static final class Builder {
383385
private CredentialsProvider credentialsProvider =
384386
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
385387

388+
private ExecutorProvider executorProvider =
389+
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
390+
386391
private FlowController.LimitExceededBehavior limitExceededBehavior =
387392
FlowController.LimitExceededBehavior.Block;
388393

@@ -459,6 +464,12 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
459464
return this;
460465
}
461466

467+
/** {@code ExecutorProvider} to use to create Executor to run background jobs. */
468+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
469+
this.executorProvider = executorProvider;
470+
return this;
471+
}
472+
462473
/**
463474
* Sets traceId for debuging purpose. TraceId must follow the format of
464475
* CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
138138
String testStream, TableSchema BQTableSchema) {
139139
return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client)
140140
.setChannelProvider(channelProvider)
141-
.setCredentialsProvider(NoCredentialsProvider.create());
141+
.setCredentialsProvider(NoCredentialsProvider.create())
142+
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build());
142143
}
143144

144145
@Test
@@ -403,6 +404,7 @@ public void testCreateDefaultStream() throws Exception {
403404
JsonStreamWriter.newBuilder(TEST_TABLE, client)
404405
.setChannelProvider(channelProvider)
405406
.setCredentialsProvider(NoCredentialsProvider.create())
407+
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
406408
.build()) {
407409
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
408410
assertEquals("aa", writer.getLocation());
@@ -651,6 +653,7 @@ public void testWithIgnoreUnknownFields() throws Exception {
651653
.setChannelProvider(channelProvider)
652654
.setIgnoreUnknownFields(true)
653655
.setCredentialsProvider(NoCredentialsProvider.create())
656+
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
654657
.build()) {
655658
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
656659
JSONObject foo = new JSONObject();
@@ -672,6 +675,7 @@ public void testFlowControlSetting() throws Exception {
672675
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
673676
.setChannelProvider(channelProvider)
674677
.setCredentialsProvider(NoCredentialsProvider.create())
678+
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
675679
.setFlowControlSettings(
676680
FlowControlSettings.newBuilder()
677681
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
@@ -709,6 +713,7 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception {
709713
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
710714
.setChannelProvider(channelProvider)
711715
.setCredentialsProvider(NoCredentialsProvider.create())
716+
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
712717
.setFlowControlSettings(
713718
FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build())
714719
.build()) {

0 commit comments

Comments
 (0)