Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 73ddd7b

Browse files
docs(sample): update WriteToDefaultStream sample to match best practices (#1631)
* feat: Update WriteToDefaultStream sample to match best practices * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 3a33f49 commit 73ddd7b

1 file changed

Lines changed: 157 additions & 28 deletions

File tree

samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java

Lines changed: 157 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,28 @@
1717
package com.example.bigquerystorage;
1818

1919
// [START bigquerystorage_jsonstreamwriter_default]
20+
2021
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.core.ApiFutures;
2124
import com.google.cloud.bigquery.BigQuery;
2225
import com.google.cloud.bigquery.BigQueryOptions;
2326
import com.google.cloud.bigquery.Schema;
2427
import com.google.cloud.bigquery.Table;
2528
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
29+
import com.google.cloud.bigquery.storage.v1.Exceptions;
30+
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
2631
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
2732
import com.google.cloud.bigquery.storage.v1.TableName;
2833
import com.google.cloud.bigquery.storage.v1.TableSchema;
34+
import com.google.common.collect.ImmutableList;
35+
import com.google.common.util.concurrent.MoreExecutors;
2936
import com.google.protobuf.Descriptors.DescriptorValidationException;
37+
import io.grpc.Status;
38+
import io.grpc.Status.Code;
3039
import java.io.IOException;
31-
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.Phaser;
41+
import javax.annotation.concurrent.GuardedBy;
3242
import org.json.JSONArray;
3343
import org.json.JSONObject;
3444

@@ -45,36 +55,155 @@ public static void runWriteToDefaultStream()
4555

4656
public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
4757
throws DescriptorValidationException, InterruptedException, IOException {
48-
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
49-
Table table = bigquery.getTable(datasetName, tableName);
5058
TableName parentTable = TableName.of(projectId, datasetName, tableName);
51-
Schema schema = table.getDefinition().getSchema();
52-
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
53-
54-
// Use the JSON stream writer to send records in JSON format.
55-
// For more information about JsonStreamWriter, see:
56-
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
57-
try (JsonStreamWriter writer =
58-
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
59-
// Write two batches to the stream, each with 10 JSON records. A writer should be used for as
60-
// much writes as possible. Creating a writer for just one write is an antipattern.
61-
for (int i = 0; i < 2; i++) {
62-
// Create a JSON object that is compatible with the table schema.
63-
JSONArray jsonArr = new JSONArray();
64-
for (int j = 0; j < 10; j++) {
65-
JSONObject record = new JSONObject();
66-
record.put("test_string", String.format("record %03d-%03d", i, j));
67-
jsonArr.put(record);
59+
60+
DataWriter writer = new DataWriter();
61+
// One time initialization for the worker.
62+
writer.initialize(parentTable);
63+
64+
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
65+
// batched up to the maximum request size:
66+
// https://cloud.google.com/bigquery/quotas#write-api-limits
67+
for (int i = 0; i < 2; i++) {
68+
// Create a JSON object that is compatible with the table schema.
69+
JSONArray jsonArr = new JSONArray();
70+
for (int j = 0; j < 10; j++) {
71+
JSONObject record = new JSONObject();
72+
record.put("test_string", String.format("record %03d-%03d", i, j));
73+
jsonArr.put(record);
74+
}
75+
76+
writer.append(new AppendContext(jsonArr, 0));
77+
}
78+
79+
// Final cleanup for the stream during worker teardown.
80+
writer.cleanup();
81+
System.out.println("Appended records successfully.");
82+
}
83+
84+
private static class AppendContext {
85+
86+
JSONArray data;
87+
int retryCount = 0;
88+
89+
AppendContext(JSONArray data, int retryCount) {
90+
this.data = data;
91+
this.retryCount = retryCount;
92+
}
93+
}
94+
95+
private static class DataWriter {
96+
97+
private static final int MAX_RETRY_COUNT = 2;
98+
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
99+
ImmutableList.of(Code.INTERNAL, Code.ABORTED, Code.CANCELLED);
100+
101+
// Track the number of in-flight requests to wait for all responses before shutting down.
102+
private final Phaser inflightRequestCount = new Phaser(1);
103+
private final Object lock = new Object();
104+
private JsonStreamWriter streamWriter;
105+
106+
@GuardedBy("lock")
107+
private RuntimeException error = null;
108+
109+
public void initialize(TableName parentTable)
110+
throws DescriptorValidationException, IOException, InterruptedException {
111+
// Retrive table schema information.
112+
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
113+
Table table = bigquery.getTable(parentTable.getDataset(), parentTable.getTable());
114+
Schema schema = table.getDefinition().getSchema();
115+
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
116+
117+
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
118+
// to the default stream.
119+
// For more information about JsonStreamWriter, see:
120+
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
121+
streamWriter = JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build();
122+
}
123+
124+
public void append(AppendContext appendContext)
125+
throws DescriptorValidationException, IOException {
126+
synchronized (this.lock) {
127+
// If earlier appends have failed, we need to reset before continuing.
128+
if (this.error != null) {
129+
throw this.error;
68130
}
69-
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
70-
AppendRowsResponse response = future.get();
71131
}
72-
System.out.println("Appended records successfully.");
73-
} catch (ExecutionException e) {
74-
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
75-
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
76-
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
77-
System.out.println("Failed to append records. \n" + e.toString());
132+
// Append asynchronously for increased throughput.
133+
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
134+
ApiFutures.addCallback(
135+
future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());
136+
137+
// Increase the count of in-flight requests.
138+
inflightRequestCount.register();
139+
}
140+
141+
public void cleanup() {
142+
// Wait for all in-flight requests to complete.
143+
inflightRequestCount.arriveAndAwaitAdvance();
144+
145+
// Close the connection to the server.
146+
streamWriter.close();
147+
148+
// Verify that no error occurred in the stream.
149+
synchronized (this.lock) {
150+
if (this.error != null) {
151+
throw this.error;
152+
}
153+
}
154+
}
155+
156+
static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
157+
158+
private final DataWriter parent;
159+
private final AppendContext appendContext;
160+
161+
public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
162+
this.parent = parent;
163+
this.appendContext = appendContext;
164+
}
165+
166+
public void onSuccess(AppendRowsResponse response) {
167+
System.out.format("Append success\n");
168+
done();
169+
}
170+
171+
public void onFailure(Throwable throwable) {
172+
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
173+
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
174+
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
175+
Status status = Status.fromThrowable(throwable);
176+
if (appendContext.retryCount < MAX_RETRY_COUNT
177+
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
178+
appendContext.retryCount++;
179+
try {
180+
// Since default stream appends are not ordered, we can simply retry the appends.
181+
// Retrying with exclusive streams requires more careful consideration.
182+
this.parent.append(appendContext);
183+
// Mark the existing attempt as done since it's being retried.
184+
done();
185+
return;
186+
} catch (Exception e) {
187+
// Fall through to return error.
188+
System.out.format("Failed to retry append: %s\n", e);
189+
}
190+
}
191+
192+
synchronized (this.parent.lock) {
193+
if (this.parent.error == null) {
194+
StorageException storageException = Exceptions.toStorageException(throwable);
195+
this.parent.error =
196+
(storageException != null) ? storageException : new RuntimeException(throwable);
197+
}
198+
}
199+
System.out.format("Error: %s\n", throwable);
200+
done();
201+
}
202+
203+
private void done() {
204+
// Reduce the count of in-flight requests.
205+
this.parent.inflightRequestCount.arriveAndDeregister();
206+
}
78207
}
79208
}
80209
}

0 commit comments

Comments
 (0)