Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 8a7da6d

Browse files
committed
docs(sample): Update parallel append sample to use StreamWriterV2
1 parent e41cfba commit 8a7da6d

File tree

1 file changed

+31
-14
lines changed

1 file changed

+31
-14
lines changed

samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,29 @@
2020
import com.google.api.core.ApiFuture;
2121
import com.google.api.core.ApiFutureCallback;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
2324
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
25+
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
2426
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
2527
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
26-
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
28+
import com.google.cloud.bigquery.storage.v1beta2.JsonToProtoMessage;
29+
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
30+
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema;
31+
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchemaConverter;
32+
import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2;
2733
import com.google.cloud.bigquery.storage.v1beta2.TableName;
2834
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
2935
import com.google.common.util.concurrent.MoreExecutors;
36+
import com.google.protobuf.Descriptors.Descriptor;
3037
import com.google.protobuf.Descriptors.DescriptorValidationException;
38+
import com.google.protobuf.Int64Value;
39+
import com.google.protobuf.Message;
3140
import java.io.IOException;
3241
import java.time.Duration;
3342
import java.util.concurrent.ThreadLocalRandom;
3443
import java.util.logging.Logger;
3544
import javax.annotation.Nullable;
3645
import javax.annotation.concurrent.GuardedBy;
37-
import org.json.JSONArray;
3846
import org.json.JSONObject;
3947

4048
public class ParallelWriteCommittedStream {
@@ -151,20 +159,20 @@ private void writeToStream(
151159
lastMetricsSuccessCount = 0;
152160
lastMetricsFailureCount = 0;
153161
}
154-
// Use the JSON stream writer to send records in JSON format.
155-
// For more information about JsonStreamWriter, see:
156-
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
157-
try (JsonStreamWriter writer =
158-
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
159-
.build()) {
162+
Descriptor descriptor =
163+
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
164+
writeStream.getTableSchema());
165+
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
166+
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
160167
while (System.currentTimeMillis() < deadlineMillis) {
161168
synchronized (this) {
162169
if (error != null) {
163170
// Stop writing once we get an error.
164171
throw error;
165172
}
166173
}
167-
ApiFuture<AppendRowsResponse> future = writer.append(createPayload(), -1);
174+
ApiFuture<AppendRowsResponse> future =
175+
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
168176
synchronized (this) {
169177
inflightCount++;
170178
}
@@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) {
189197
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
190198
}
191199

192-
private JSONArray createPayload() {
193-
// Create a JSON object that is compatible with the table schema.
194-
JSONArray jsonArr = new JSONArray();
200+
private AppendRowsRequest createAppendRequest(
201+
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
202+
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
195203
for (int i = 0; i < BATCH_SIZE; i++) {
196204
byte[] payload = new byte[ROW_SIZE];
197205
ThreadLocalRandom.current().nextBytes(payload);
198206
JSONObject record = new JSONObject();
199207
record.put("col1", new String(payload));
200-
jsonArr.put(record);
208+
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
209+
rowsBuilder.addSerializedRows(protoMessage.toByteString());
201210
}
202-
return jsonArr;
211+
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
212+
data.setWriterSchema(protoSchema);
213+
data.setRows(rowsBuilder.build());
214+
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
215+
request.setWriteStream(streamName);
216+
if (offset >= 0) {
217+
request.setOffset(Int64Value.of(offset));
218+
}
219+
return request.build();
203220
}
204221

205222
private void sleepIgnoringInterruption(Duration duration) {

0 commit comments

Comments
 (0)