1717
1818import com .google .api .core .ApiFuture ;
1919import com .google .api .core .SettableApiFuture ;
20+ import com .google .api .gax .core .CredentialsProvider ;
21+ import com .google .api .gax .rpc .TransportChannelProvider ;
2022import com .google .cloud .bigquery .storage .v1beta2 .StreamConnection .DoneCallback ;
2123import com .google .cloud .bigquery .storage .v1beta2 .StreamConnection .RequestCallback ;
2224import com .google .common .base .Preconditions ;
2325import com .google .common .util .concurrent .Uninterruptibles ;
2426import io .grpc .Status ;
2527import io .grpc .Status .Code ;
2628import io .grpc .StatusRuntimeException ;
29+ import java .io .IOException ;
2730import java .util .Deque ;
2831import java .util .LinkedList ;
2932import java .util .concurrent .TimeUnit ;
3639/**
3740 * A BigQuery Stream Writer that can be used to write data into BigQuery Table.
3841 *
39- * <p>TODO: Add credential support.
40- *
4142 * <p>TODO: Attach schema.
4243 *
4344 * <p>TODO: Attach traceId.
@@ -104,6 +105,16 @@ public class StreamWriterV2 implements AutoCloseable {
104105 @ GuardedBy ("lock" )
105106 private final Deque <AppendRequestAndResponse > inflightRequestQueue ;
106107
108+ /*
109+ * A client used to interact with BigQuery.
110+ */
111+ private BigQueryWriteClient client ;
112+
113+ /*
114+ * If true, the client above is created by this writer and should be closed.
115+ */
116+ private boolean ownsBigQueryWriteClient = false ;
117+
107118 /*
108119 * Wraps the underlying bi-directional stream connection with server.
109120 */
@@ -119,7 +130,7 @@ public static long getApiMaxRequestBytes() {
119130 return 8L * 1000L * 1000L ; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
120131 }
121132
122- private StreamWriterV2 (Builder builder ) {
133+ private StreamWriterV2 (Builder builder ) throws IOException {
123134 this .lock = new ReentrantLock ();
124135 this .hasMessageInWaitingQueue = lock .newCondition ();
125136 this .inflightReduced = lock .newCondition ();
@@ -128,9 +139,22 @@ private StreamWriterV2(Builder builder) {
128139 this .maxInflightBytes = builder .maxInflightBytes ;
129140 this .waitingRequestQueue = new LinkedList <AppendRequestAndResponse >();
130141 this .inflightRequestQueue = new LinkedList <AppendRequestAndResponse >();
142+ if (builder .client == null ) {
143+ BigQueryWriteSettings stubSettings =
144+ BigQueryWriteSettings .newBuilder ()
145+ .setCredentialsProvider (builder .credentialsProvider )
146+ .setTransportChannelProvider (builder .channelProvider )
147+ .setEndpoint (builder .endpoint )
148+ .build ();
149+ this .client = BigQueryWriteClient .create (stubSettings );
150+ this .ownsBigQueryWriteClient = true ;
151+ } else {
152+ this .client = builder .client ;
153+ this .ownsBigQueryWriteClient = false ;
154+ }
131155 this .streamConnection =
132156 new StreamConnection (
133- builder .client ,
157+ this .client ,
134158 new RequestCallback () {
135159 @ Override
136160 public void run (AppendRowsResponse response ) {
@@ -261,6 +285,9 @@ public void close() {
261285 log .warning (
262286 "Append handler join is interrupted. Stream: " + streamName + " Error: " + e .toString ());
263287 }
288+ if (this .ownsBigQueryWriteClient ) {
289+ this .client .close ();
290+ }
264291 }
265292
266293 /*
@@ -405,6 +432,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite
405432 return new StreamWriterV2 .Builder (streamName , client );
406433 }
407434
435+ /** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */
436+ public static StreamWriterV2 .Builder newBuilder (String streamName ) {
437+ return new StreamWriterV2 .Builder (streamName );
438+ }
439+
408440 /** A builder of {@link StreamWriterV2}s. */
409441 public static final class Builder {
410442
@@ -420,6 +452,19 @@ public static final class Builder {
420452
421453 private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES ;
422454
455+ private String endpoint = BigQueryWriteSettings .getDefaultEndpoint ();
456+
457+ private TransportChannelProvider channelProvider =
458+ BigQueryWriteSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
459+
460+ private CredentialsProvider credentialsProvider =
461+ BigQueryWriteSettings .defaultCredentialsProviderBuilder ().build ();
462+
463+ private Builder (String streamName ) {
464+ this .streamName = Preconditions .checkNotNull (streamName );
465+ this .client = null ;
466+ }
467+
423468 private Builder (String streamName , BigQueryWriteClient client ) {
424469 this .streamName = Preconditions .checkNotNull (streamName );
425470 this .client = Preconditions .checkNotNull (client );
@@ -435,8 +480,34 @@ public Builder setMaxInflightBytes(long value) {
435480 return this ;
436481 }
437482
483+ /** Gives the ability to override the gRPC endpoint. */
484+ public Builder setEndpoint (String endpoint ) {
485+ this .endpoint = Preconditions .checkNotNull (endpoint , "Endpoint is null." );
486+ return this ;
487+ }
488+
489+ /**
490+ * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
491+ * API endpoint.
492+ *
493+ * <p>For performance, this client benefits from having multiple underlying connections. See
494+ * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
495+ */
496+ public Builder setChannelProvider (TransportChannelProvider channelProvider ) {
497+ this .channelProvider =
498+ Preconditions .checkNotNull (channelProvider , "ChannelProvider is null." );
499+ return this ;
500+ }
501+
502+ /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
503+ public Builder setCredentialsProvider (CredentialsProvider credentialsProvider ) {
504+ this .credentialsProvider =
505+ Preconditions .checkNotNull (credentialsProvider , "CredentialsProvider is null." );
506+ return this ;
507+ }
508+
438509 /** Builds the {@code StreamWriterV2}. */
439- public StreamWriterV2 build () {
510+ public StreamWriterV2 build () throws IOException {
440511 return new StreamWriterV2 (this );
441512 }
442513 }
0 commit comments