2020import com .google .api .core .ApiFuture ;
2121import com .google .api .core .ApiFutureCallback ;
2222import com .google .api .core .ApiFutures ;
23+ import com .google .cloud .bigquery .storage .v1beta2 .AppendRowsRequest ;
2324import com .google .cloud .bigquery .storage .v1beta2 .AppendRowsResponse ;
25+ import com .google .cloud .bigquery .storage .v1beta2 .BQTableSchemaToProtoDescriptor ;
2426import com .google .cloud .bigquery .storage .v1beta2 .BigQueryWriteClient ;
2527import com .google .cloud .bigquery .storage .v1beta2 .CreateWriteStreamRequest ;
26- import com .google .cloud .bigquery .storage .v1beta2 .JsonStreamWriter ;
28+ import com .google .cloud .bigquery .storage .v1beta2 .JsonToProtoMessage ;
29+ import com .google .cloud .bigquery .storage .v1beta2 .ProtoRows ;
30+ import com .google .cloud .bigquery .storage .v1beta2 .ProtoSchema ;
31+ import com .google .cloud .bigquery .storage .v1beta2 .ProtoSchemaConverter ;
32+ import com .google .cloud .bigquery .storage .v1beta2 .StreamWriterV2 ;
2733import com .google .cloud .bigquery .storage .v1beta2 .TableName ;
2834import com .google .cloud .bigquery .storage .v1beta2 .WriteStream ;
2935import com .google .common .util .concurrent .MoreExecutors ;
36+ import com .google .protobuf .Descriptors .Descriptor ;
3037import com .google .protobuf .Descriptors .DescriptorValidationException ;
38+ import com .google .protobuf .Int64Value ;
39+ import com .google .protobuf .Message ;
3140import java .io .IOException ;
3241import java .time .Duration ;
3342import java .util .concurrent .ThreadLocalRandom ;
3443import java .util .logging .Logger ;
3544import javax .annotation .Nullable ;
3645import javax .annotation .concurrent .GuardedBy ;
37- import org .json .JSONArray ;
3846import org .json .JSONObject ;
3947
4048public class ParallelWriteCommittedStream {
@@ -151,20 +159,20 @@ private void writeToStream(
151159 lastMetricsSuccessCount = 0 ;
152160 lastMetricsFailureCount = 0 ;
153161 }
154- // Use the JSON stream writer to send records in JSON format.
155- // For more information about JsonStreamWriter, see:
156- // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
157- try (JsonStreamWriter writer =
158- JsonStreamWriter .newBuilder (writeStream .getName (), writeStream .getTableSchema (), client )
159- .build ()) {
162+ Descriptor descriptor =
163+ BQTableSchemaToProtoDescriptor .convertBQTableSchemaToProtoDescriptor (
164+ writeStream .getTableSchema ());
165+ ProtoSchema protoSchema = ProtoSchemaConverter .convert (descriptor );
166+ try (StreamWriterV2 writer = StreamWriterV2 .newBuilder (writeStream .getName ()).build ()) {
160167 while (System .currentTimeMillis () < deadlineMillis ) {
161168 synchronized (this ) {
162169 if (error != null ) {
163170 // Stop writing once we get an error.
164171 throw error ;
165172 }
166173 }
167- ApiFuture <AppendRowsResponse > future = writer .append (createPayload (), -1 );
174+ ApiFuture <AppendRowsResponse > future =
175+ writer .append (createAppendRequest (writeStream .getName (), descriptor , protoSchema , -1 ));
168176 synchronized (this ) {
169177 inflightCount ++;
170178 }
@@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) {
189197 throw new RuntimeException ("Timeout waiting for inflight count to reach 0" );
190198 }
191199
192- private JSONArray createPayload () {
193- // Create a JSON object that is compatible with the table schema.
194- JSONArray jsonArr = new JSONArray ();
200+ private AppendRowsRequest createAppendRequest (
201+ String streamName , Descriptor descriptor , ProtoSchema protoSchema , long offset ) {
202+ ProtoRows . Builder rowsBuilder = ProtoRows . newBuilder ();
195203 for (int i = 0 ; i < BATCH_SIZE ; i ++) {
196204 byte [] payload = new byte [ROW_SIZE ];
197205 ThreadLocalRandom .current ().nextBytes (payload );
198206 JSONObject record = new JSONObject ();
199207 record .put ("col1" , new String (payload ));
200- jsonArr .put (record );
208+ Message protoMessage = JsonToProtoMessage .convertJsonToProtoMessage (descriptor , record );
209+ rowsBuilder .addSerializedRows (protoMessage .toByteString ());
201210 }
202- return jsonArr ;
211+ AppendRowsRequest .ProtoData .Builder data = AppendRowsRequest .ProtoData .newBuilder ();
212+ data .setWriterSchema (protoSchema );
213+ data .setRows (rowsBuilder .build ());
214+ AppendRowsRequest .Builder request = AppendRowsRequest .newBuilder ().setProtoRows (data .build ());
215+ request .setWriteStream (streamName );
216+ if (offset >= 0 ) {
217+ request .setOffset (Int64Value .of (offset ));
218+ }
219+ return request .build ();
203220 }
204221
205222 private void sleepIgnoringInterruption (Duration duration ) {
0 commit comments