1717package com .example .bigquerystorage ;
1818
1919// [START bigquerystorage_jsonstreamwriter_default]
20+
2021import com .google .api .core .ApiFuture ;
22+ import com .google .api .core .ApiFutureCallback ;
23+ import com .google .api .core .ApiFutures ;
2124import com .google .cloud .bigquery .BigQuery ;
2225import com .google .cloud .bigquery .BigQueryOptions ;
2326import com .google .cloud .bigquery .Schema ;
2427import com .google .cloud .bigquery .Table ;
2528import com .google .cloud .bigquery .storage .v1 .AppendRowsResponse ;
29+ import com .google .cloud .bigquery .storage .v1 .Exceptions ;
30+ import com .google .cloud .bigquery .storage .v1 .Exceptions .StorageException ;
2631import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
2732import com .google .cloud .bigquery .storage .v1 .TableName ;
2833import com .google .cloud .bigquery .storage .v1 .TableSchema ;
34+ import com .google .common .collect .ImmutableList ;
35+ import com .google .common .util .concurrent .MoreExecutors ;
2936import com .google .protobuf .Descriptors .DescriptorValidationException ;
37+ import io .grpc .Status ;
38+ import io .grpc .Status .Code ;
3039import java .io .IOException ;
31- import java .util .concurrent .ExecutionException ;
40+ import java .util .concurrent .Phaser ;
41+ import javax .annotation .concurrent .GuardedBy ;
3242import org .json .JSONArray ;
3343import org .json .JSONObject ;
3444
@@ -45,36 +55,155 @@ public static void runWriteToDefaultStream()
4555
4656 public static void writeToDefaultStream (String projectId , String datasetName , String tableName )
4757 throws DescriptorValidationException , InterruptedException , IOException {
48- BigQuery bigquery = BigQueryOptions .getDefaultInstance ().getService ();
49- Table table = bigquery .getTable (datasetName , tableName );
5058 TableName parentTable = TableName .of (projectId , datasetName , tableName );
51- Schema schema = table .getDefinition ().getSchema ();
52- TableSchema tableSchema = BqToBqStorageSchemaConverter .convertTableSchema (schema );
53-
54- // Use the JSON stream writer to send records in JSON format.
55- // For more information about JsonStreamWriter, see:
56- // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
57- try (JsonStreamWriter writer =
58- JsonStreamWriter .newBuilder (parentTable .toString (), tableSchema ).build ()) {
59- // Write two batches to the stream, each with 10 JSON records. A writer should be used for as
60- // much writes as possible. Creating a writer for just one write is an antipattern.
61- for (int i = 0 ; i < 2 ; i ++) {
62- // Create a JSON object that is compatible with the table schema.
63- JSONArray jsonArr = new JSONArray ();
64- for (int j = 0 ; j < 10 ; j ++) {
65- JSONObject record = new JSONObject ();
66- record .put ("test_string" , String .format ("record %03d-%03d" , i , j ));
67- jsonArr .put (record );
59+
60+ DataWriter writer = new DataWriter ();
61+ // One time initialization for the worker.
62+ writer .initialize (parentTable );
63+
64+ // Write two batches of fake data to the stream, each with 10 JSON records. Data may be
65+ // batched up to the maximum request size:
66+ // https://cloud.google.com/bigquery/quotas#write-api-limits
67+ for (int i = 0 ; i < 2 ; i ++) {
68+ // Create a JSON object that is compatible with the table schema.
69+ JSONArray jsonArr = new JSONArray ();
70+ for (int j = 0 ; j < 10 ; j ++) {
71+ JSONObject record = new JSONObject ();
72+ record .put ("test_string" , String .format ("record %03d-%03d" , i , j ));
73+ jsonArr .put (record );
74+ }
75+
76+ writer .append (new AppendContext (jsonArr , 0 ));
77+ }
78+
79+ // Final cleanup for the stream during worker teardown.
80+ writer .cleanup ();
81+ System .out .println ("Appended records successfully." );
82+ }
83+
84+ private static class AppendContext {
85+
86+ JSONArray data ;
87+ int retryCount = 0 ;
88+
89+ AppendContext (JSONArray data , int retryCount ) {
90+ this .data = data ;
91+ this .retryCount = retryCount ;
92+ }
93+ }
94+
95+ private static class DataWriter {
96+
97+ private static final int MAX_RETRY_COUNT = 2 ;
98+ private static final ImmutableList <Code > RETRIABLE_ERROR_CODES =
99+ ImmutableList .of (Code .INTERNAL , Code .ABORTED , Code .CANCELLED );
100+
101+ // Track the number of in-flight requests to wait for all responses before shutting down.
102+ private final Phaser inflightRequestCount = new Phaser (1 );
103+ private final Object lock = new Object ();
104+ private JsonStreamWriter streamWriter ;
105+
106+ @ GuardedBy ("lock" )
107+ private RuntimeException error = null ;
108+
109+ public void initialize (TableName parentTable )
110+ throws DescriptorValidationException , IOException , InterruptedException {
111+ // Retrive table schema information.
112+ BigQuery bigquery = BigQueryOptions .getDefaultInstance ().getService ();
113+ Table table = bigquery .getTable (parentTable .getDataset (), parentTable .getTable ());
114+ Schema schema = table .getDefinition ().getSchema ();
115+ TableSchema tableSchema = BqToBqStorageSchemaConverter .convertTableSchema (schema );
116+
117+ // Use the JSON stream writer to send records in JSON format. Specify the table name to write
118+ // to the default stream.
119+ // For more information about JsonStreamWriter, see:
120+ // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
121+ streamWriter = JsonStreamWriter .newBuilder (parentTable .toString (), tableSchema ).build ();
122+ }
123+
124+ public void append (AppendContext appendContext )
125+ throws DescriptorValidationException , IOException {
126+ synchronized (this .lock ) {
127+ // If earlier appends have failed, we need to reset before continuing.
128+ if (this .error != null ) {
129+ throw this .error ;
68130 }
69- ApiFuture <AppendRowsResponse > future = writer .append (jsonArr );
70- AppendRowsResponse response = future .get ();
71131 }
72- System .out .println ("Appended records successfully." );
73- } catch (ExecutionException e ) {
74- // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
75- // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
76- // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
77- System .out .println ("Failed to append records. \n " + e .toString ());
132+ // Append asynchronously for increased throughput.
133+ ApiFuture <AppendRowsResponse > future = streamWriter .append (appendContext .data );
134+ ApiFutures .addCallback (
135+ future , new AppendCompleteCallback (this , appendContext ), MoreExecutors .directExecutor ());
136+
137+ // Increase the count of in-flight requests.
138+ inflightRequestCount .register ();
139+ }
140+
141+ public void cleanup () {
142+ // Wait for all in-flight requests to complete.
143+ inflightRequestCount .arriveAndAwaitAdvance ();
144+
145+ // Close the connection to the server.
146+ streamWriter .close ();
147+
148+ // Verify that no error occurred in the stream.
149+ synchronized (this .lock ) {
150+ if (this .error != null ) {
151+ throw this .error ;
152+ }
153+ }
154+ }
155+
156+ static class AppendCompleteCallback implements ApiFutureCallback <AppendRowsResponse > {
157+
158+ private final DataWriter parent ;
159+ private final AppendContext appendContext ;
160+
161+ public AppendCompleteCallback (DataWriter parent , AppendContext appendContext ) {
162+ this .parent = parent ;
163+ this .appendContext = appendContext ;
164+ }
165+
166+ public void onSuccess (AppendRowsResponse response ) {
167+ System .out .format ("Append success\n " );
168+ done ();
169+ }
170+
171+ public void onFailure (Throwable throwable ) {
172+ // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
173+ // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
174+ // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
175+ Status status = Status .fromThrowable (throwable );
176+ if (appendContext .retryCount < MAX_RETRY_COUNT
177+ && RETRIABLE_ERROR_CODES .contains (status .getCode ())) {
178+ appendContext .retryCount ++;
179+ try {
180+ // Since default stream appends are not ordered, we can simply retry the appends.
181+ // Retrying with exclusive streams requires more careful consideration.
182+ this .parent .append (appendContext );
183+ // Mark the existing attempt as done since it's being retried.
184+ done ();
185+ return ;
186+ } catch (Exception e ) {
187+ // Fall through to return error.
188+ System .out .format ("Failed to retry append: %s\n " , e );
189+ }
190+ }
191+
192+ synchronized (this .parent .lock ) {
193+ if (this .parent .error == null ) {
194+ StorageException storageException = Exceptions .toStorageException (throwable );
195+ this .parent .error =
196+ (storageException != null ) ? storageException : new RuntimeException (throwable );
197+ }
198+ }
199+ System .out .format ("Error: %s\n " , throwable );
200+ done ();
201+ }
202+
203+ private void done () {
204+ // Reduce the count of in-flight requests.
205+ this .parent .inflightRequestCount .arriveAndDeregister ();
206+ }
78207 }
79208 }
80209}
0 commit comments