Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 237c827

Browse files
authored
feat: Support building a BigQueryWriteClient within the StreamWriterV2 (#876)
* Support building a BigQueryWriteClient within the StreamWriterV2. * feat: Support building a BigQueryWriteClient within the StreamWriterV2
1 parent 854c81e commit 237c827

2 files changed

Lines changed: 93 additions & 7 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.api.core.SettableApiFuture;
20+
import com.google.api.gax.core.CredentialsProvider;
21+
import com.google.api.gax.rpc.TransportChannelProvider;
2022
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
2123
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
2224
import com.google.common.base.Preconditions;
2325
import com.google.common.util.concurrent.Uninterruptibles;
2426
import io.grpc.Status;
2527
import io.grpc.Status.Code;
2628
import io.grpc.StatusRuntimeException;
29+
import java.io.IOException;
2730
import java.util.Deque;
2831
import java.util.LinkedList;
2932
import java.util.concurrent.TimeUnit;
@@ -36,8 +39,6 @@
3639
/**
3740
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
3841
*
39-
* <p>TODO: Add credential support.
40-
*
4142
* <p>TODO: Attach schema.
4243
*
4344
* <p>TODO: Attach traceId.
@@ -104,6 +105,16 @@ public class StreamWriterV2 implements AutoCloseable {
104105
@GuardedBy("lock")
105106
private final Deque<AppendRequestAndResponse> inflightRequestQueue;
106107

108+
/*
109+
* A client used to interact with BigQuery.
110+
*/
111+
private BigQueryWriteClient client;
112+
113+
/*
114+
* If true, the client above is created by this writer and should be closed.
115+
*/
116+
private boolean ownsBigQueryWriteClient = false;
117+
107118
/*
108119
* Wraps the underlying bi-directional stream connection with server.
109120
*/
@@ -119,7 +130,7 @@ public static long getApiMaxRequestBytes() {
119130
return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
120131
}
121132

122-
private StreamWriterV2(Builder builder) {
133+
private StreamWriterV2(Builder builder) throws IOException {
123134
this.lock = new ReentrantLock();
124135
this.hasMessageInWaitingQueue = lock.newCondition();
125136
this.inflightReduced = lock.newCondition();
@@ -128,9 +139,22 @@ private StreamWriterV2(Builder builder) {
128139
this.maxInflightBytes = builder.maxInflightBytes;
129140
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
130141
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
142+
if (builder.client == null) {
143+
BigQueryWriteSettings stubSettings =
144+
BigQueryWriteSettings.newBuilder()
145+
.setCredentialsProvider(builder.credentialsProvider)
146+
.setTransportChannelProvider(builder.channelProvider)
147+
.setEndpoint(builder.endpoint)
148+
.build();
149+
this.client = BigQueryWriteClient.create(stubSettings);
150+
this.ownsBigQueryWriteClient = true;
151+
} else {
152+
this.client = builder.client;
153+
this.ownsBigQueryWriteClient = false;
154+
}
131155
this.streamConnection =
132156
new StreamConnection(
133-
builder.client,
157+
this.client,
134158
new RequestCallback() {
135159
@Override
136160
public void run(AppendRowsResponse response) {
@@ -261,6 +285,9 @@ public void close() {
261285
log.warning(
262286
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
263287
}
288+
if (this.ownsBigQueryWriteClient) {
289+
this.client.close();
290+
}
264291
}
265292

266293
/*
@@ -405,6 +432,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite
405432
return new StreamWriterV2.Builder(streamName, client);
406433
}
407434

435+
/** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */
436+
public static StreamWriterV2.Builder newBuilder(String streamName) {
437+
return new StreamWriterV2.Builder(streamName);
438+
}
439+
408440
/** A builder of {@link StreamWriterV2}s. */
409441
public static final class Builder {
410442

@@ -420,6 +452,19 @@ public static final class Builder {
420452

421453
private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
422454

455+
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
456+
457+
private TransportChannelProvider channelProvider =
458+
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
459+
460+
private CredentialsProvider credentialsProvider =
461+
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
462+
463+
private Builder(String streamName) {
464+
this.streamName = Preconditions.checkNotNull(streamName);
465+
this.client = null;
466+
}
467+
423468
private Builder(String streamName, BigQueryWriteClient client) {
424469
this.streamName = Preconditions.checkNotNull(streamName);
425470
this.client = Preconditions.checkNotNull(client);
@@ -435,8 +480,34 @@ public Builder setMaxInflightBytes(long value) {
435480
return this;
436481
}
437482

483+
/** Gives the ability to override the gRPC endpoint. */
484+
public Builder setEndpoint(String endpoint) {
485+
this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null.");
486+
return this;
487+
}
488+
489+
/**
490+
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
491+
* API endpoint.
492+
*
493+
* <p>For performance, this client benefits from having multiple underlying connections. See
494+
* {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
495+
*/
496+
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
497+
this.channelProvider =
498+
Preconditions.checkNotNull(channelProvider, "ChannelProvider is null.");
499+
return this;
500+
}
501+
502+
/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
503+
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
504+
this.credentialsProvider =
505+
Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null.");
506+
return this;
507+
}
508+
438509
/** Builds the {@code StreamWriterV2}. */
439-
public StreamWriterV2 build() {
510+
public StreamWriterV2 build() throws IOException {
440511
return new StreamWriterV2(this);
441512
}
442513
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.protobuf.Int64Value;
3232
import io.grpc.Status;
3333
import io.grpc.StatusRuntimeException;
34+
import java.io.IOException;
3435
import java.util.ArrayList;
3536
import java.util.Arrays;
3637
import java.util.List;
@@ -81,7 +82,7 @@ public void tearDown() throws Exception {
8182
serviceHelper.stop();
8283
}
8384

84-
private StreamWriterV2 getTestStreamWriterV2() {
85+
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
8586
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
8687
}
8788

@@ -158,6 +159,20 @@ public void run() {
158159
appendThread.interrupt();
159160
}
160161

162+
@Test
163+
public void testBuildBigQueryWriteClientInWriter() throws Exception {
164+
StreamWriterV2 writer =
165+
StreamWriterV2.newBuilder(TEST_STREAM)
166+
.setCredentialsProvider(NoCredentialsProvider.create())
167+
.setChannelProvider(serviceHelper.createChannelProvider())
168+
.build();
169+
170+
testBigQueryWrite.addResponse(createAppendResponse(0));
171+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
172+
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
173+
writer.close();
174+
}
175+
161176
@Test
162177
public void testAppendSuccess() throws Exception {
163178
StreamWriterV2 writer = getTestStreamWriterV2();
@@ -371,7 +386,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
371386
}
372387

373388
@Test
374-
public void testMessageTooLarge() {
389+
public void testMessageTooLarge() throws Exception {
375390
StreamWriterV2 writer = getTestStreamWriterV2();
376391

377392
String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1));

0 commit comments

Comments
 (0)