Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit d8aaed5

Browse files
authored
feat: client unknown fields drives writer refreshment (#1797)
* feat:client unknown fields drives writer refreshment * . * . * . * . * . * . * . * . * . * . * .
1 parent b3ffd77 commit d8aaed5

2 files changed

Lines changed: 153 additions & 22 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class JsonStreamWriter implements AutoCloseable {
4949
"projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
5050
private static Pattern streamPattern = Pattern.compile(streamPatternString);
5151
private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName());
52+
private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L;
5253

5354
private BigQueryWriteClient client;
5455
private String streamName;
@@ -77,6 +78,7 @@ private JsonStreamWriter(Builder builder)
7778
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
7879
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
7980
this.totalMessageSize = protoSchema.getSerializedSize();
81+
this.client = builder.client;
8082
streamWriterBuilder.setWriterSchema(protoSchema);
8183
setStreamWriterSettings(
8284
builder.channelProvider,
@@ -108,6 +110,60 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
108110
return append(jsonArr, -1);
109111
}
110112

113+
private void refreshWriter(TableSchema updatedSchema)
114+
throws DescriptorValidationException, IOException {
115+
Preconditions.checkNotNull(updatedSchema, "updatedSchema is null.");
116+
LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName);
117+
// Close the StreamWriterf
118+
this.streamWriter.close();
119+
// Update JsonStreamWriter's TableSchema and Descriptor
120+
this.tableSchema = updatedSchema;
121+
this.descriptor =
122+
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
123+
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
124+
this.totalMessageSize = protoSchema.getSerializedSize();
125+
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
126+
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
127+
}
128+
129+
private Message buildMessage(JSONObject json)
130+
throws InterruptedException, DescriptorValidationException, IOException {
131+
try {
132+
return JsonToProtoMessage.convertJsonToProtoMessage(
133+
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
134+
} catch (Exceptions.JsonDataHasUnknownFieldException ex) {
135+
// Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before
136+
// trying to get the table schema to increase the chance of succeed. This is to avoid
137+
// client's invalid datfa caused storm of GetWriteStream.
138+
LOG.warning(
139+
"Saw Json unknown field "
140+
+ ex.getFieldName()
141+
+ ", try to refresh the writer with updated schema, stream: "
142+
+ streamName);
143+
GetWriteStreamRequest writeStreamRequest =
144+
GetWriteStreamRequest.newBuilder()
145+
.setName(this.streamName)
146+
.setView(WriteStreamView.FULL)
147+
.build();
148+
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
149+
refreshWriter(writeStream.getTableSchema());
150+
try {
151+
return JsonToProtoMessage.convertJsonToProtoMessage(
152+
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
153+
} catch (Exceptions.JsonDataHasUnknownFieldException exex) {
154+
LOG.warning(
155+
"First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName);
156+
Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS);
157+
writeStream = client.getWriteStream(writeStreamRequest);
158+
// TODO(yiru): We should let TableSchema return a timestamp so that we can simply
159+
// compare the timestamp to see if the table schema is the same. If it is the
160+
// same, we don't need to go refresh the writer again.
161+
refreshWriter(writeStream.getTableSchema());
162+
return JsonToProtoMessage.convertJsonToProtoMessage(
163+
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
164+
}
165+
}
166+
}
111167
/**
112168
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
113169
* data to protobuf messages, then using StreamWriter's append() to write the data at the
@@ -126,17 +182,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
126182
// Update schema only work when connection pool is not enabled.
127183
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
128184
&& this.streamWriter.getUpdatedSchema() != null) {
129-
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
130-
// Close the StreamWriter
131-
this.streamWriter.close();
132-
// Update JsonStreamWriter's TableSchema and Descriptor
133-
this.tableSchema = updatedSchema;
134-
this.descriptor =
135-
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
136-
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
137-
this.totalMessageSize = protoSchema.getSerializedSize();
138-
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
139-
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
185+
refreshWriter(this.streamWriter.getUpdatedSchema());
140186
}
141187

142188
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
@@ -150,9 +196,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
150196
for (int i = 0; i < jsonArr.length(); i++) {
151197
JSONObject json = jsonArr.getJSONObject(i);
152198
try {
153-
Message protoMessage =
154-
JsonToProtoMessage.convertJsonToProtoMessage(
155-
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
199+
Message protoMessage = buildMessage(json);
156200
rowsBuilder.addSerializedRows(protoMessage.toByteString());
157201
currentRequestSize += protoMessage.getSerializedSize();
158202
} catch (IllegalArgumentException exception) {
@@ -169,6 +213,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
169213
} else {
170214
rowIndexToErrorMessage.put(i, exception.getMessage());
171215
}
216+
} catch (InterruptedException ex) {
217+
throw new RuntimeException(ex);
172218
}
173219
}
174220

@@ -277,7 +323,7 @@ public static Builder newBuilder(String streamOrTableName, TableSchema tableSche
277323
*/
278324
public static Builder newBuilder(
279325
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
280-
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
326+
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
281327
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
282328
Preconditions.checkNotNull(client, "BigQuery client is null.");
283329
return new Builder(streamOrTableName, tableSchema, client);
@@ -359,6 +405,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
359405

360406
WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
361407
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
408+
362409
this.tableSchema = writeStreamTableSchema;
363410
} else {
364411
this.tableSchema = tableSchema;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.cloud.bigquery.storage.test.Test.FooType;
3434
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
3535
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
36+
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
3637
import com.google.protobuf.Descriptors.DescriptorValidationException;
3738
import com.google.protobuf.Int64Value;
3839
import com.google.protobuf.Timestamp;
@@ -67,6 +68,7 @@ public class JsonStreamWriterTest {
6768
private FakeScheduledExecutorService fakeExecutor;
6869
private FakeBigQueryWrite testBigQueryWrite;
6970
private static MockServiceHelper serviceHelper;
71+
private BigQueryWriteClient client;
7072

7173
private final TableFieldSchema FOO =
7274
TableFieldSchema.newBuilder()
@@ -116,14 +118,15 @@ public void setUp() throws Exception {
116118
channelProvider = serviceHelper.createChannelProvider();
117119
fakeExecutor = new FakeScheduledExecutorService();
118120
testBigQueryWrite.setExecutor(fakeExecutor);
121+
BigQueryWriteSettings settings =
122+
BigQueryWriteSettings.newBuilder()
123+
.setTransportChannelProvider(channelProvider)
124+
.setCredentialsProvider(NoCredentialsProvider.create())
125+
.build();
126+
client = BigQueryWriteClient.create(settings);
119127
Instant time = Instant.now();
120128
Timestamp timestamp =
121129
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
122-
// Add enough GetWriteStream response.
123-
for (int i = 0; i < 4; i++) {
124-
testBigQueryWrite.addResponse(
125-
WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build());
126-
}
127130
}
128131

129132
@After
@@ -133,7 +136,7 @@ public void tearDown() throws Exception {
133136

134137
private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
135138
String testStream, TableSchema BQTableSchema) {
136-
return JsonStreamWriter.newBuilder(testStream, BQTableSchema)
139+
return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client)
137140
.setChannelProvider(channelProvider)
138141
.setCredentialsProvider(NoCredentialsProvider.create());
139142
}
@@ -507,8 +510,85 @@ public void testSimpleSchemaUpdate() throws Exception {
507510
}
508511

509512
@Test
510-
public void testWithoutIgnoreUnknownFields() throws Exception {
513+
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
514+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
515+
TableSchema updatedSchema =
516+
TableSchema.newBuilder()
517+
.addFields(0, TEST_INT)
518+
.addFields(
519+
1,
520+
TableFieldSchema.newBuilder()
521+
.setName("test_string")
522+
.setType(TableFieldSchema.Type.STRING)
523+
.setMode(Mode.NULLABLE))
524+
.build();
525+
// GetWriteStream is called once and the writer is fixed to accept unknown fields.
526+
testBigQueryWrite.addResponse(
527+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
528+
testBigQueryWrite.addResponse(
529+
AppendRowsResponse.newBuilder()
530+
.setAppendResult(
531+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
532+
.build());
533+
try (JsonStreamWriter writer =
534+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
535+
JSONObject foo = new JSONObject();
536+
foo.put("test_int", 10);
537+
JSONObject bar = new JSONObject();
538+
bar.put("test_string", "a");
539+
JSONArray jsonArr = new JSONArray();
540+
jsonArr.put(foo);
541+
jsonArr.put(bar);
542+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
543+
appendFuture.get();
544+
}
545+
}
546+
547+
@Test
548+
public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception {
549+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
550+
TableSchema updatedSchema =
551+
TableSchema.newBuilder()
552+
.addFields(0, TEST_INT)
553+
.addFields(
554+
1,
555+
TableFieldSchema.newBuilder()
556+
.setName("test_string")
557+
.setType(TableFieldSchema.Type.STRING)
558+
.setMode(Mode.NULLABLE))
559+
.build();
560+
// GetWriteStream is called twice and got the updated schema
561+
testBigQueryWrite.addResponse(
562+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
563+
testBigQueryWrite.addResponse(
564+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
565+
testBigQueryWrite.addResponse(
566+
AppendRowsResponse.newBuilder()
567+
.setAppendResult(
568+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
569+
.build());
570+
try (JsonStreamWriter writer =
571+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
572+
JSONObject foo = new JSONObject();
573+
foo.put("test_int", 10);
574+
JSONObject bar = new JSONObject();
575+
bar.put("test_string", "a");
576+
JSONArray jsonArr = new JSONArray();
577+
jsonArr.put(foo);
578+
jsonArr.put(bar);
579+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
580+
appendFuture.get();
581+
}
582+
}
583+
584+
@Test
585+
public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception {
511586
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
587+
// GetWriteStream is called once but failed to update to the right schema.
588+
testBigQueryWrite.addResponse(
589+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
590+
testBigQueryWrite.addResponse(
591+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
512592
try (JsonStreamWriter writer =
513593
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
514594
JSONObject foo = new JSONObject();
@@ -626,6 +706,10 @@ public void testMultipleAppendSerializtionErrors()
626706
jsonArr.put(foo);
627707
jsonArr.put(foo1);
628708
jsonArr.put(foo2);
709+
testBigQueryWrite.addResponse(
710+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());
711+
testBigQueryWrite.addResponse(
712+
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());
629713

630714
try (JsonStreamWriter writer =
631715
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {

0 commit comments

Comments
 (0)