@@ -49,6 +49,7 @@ public class JsonStreamWriter implements AutoCloseable {
4949 "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" ;
5050 private static Pattern streamPattern = Pattern .compile (streamPatternString );
5151 private static final Logger LOG = Logger .getLogger (JsonStreamWriter .class .getName ());
52+ private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L ;
5253
5354 private BigQueryWriteClient client ;
5455 private String streamName ;
@@ -77,6 +78,7 @@ private JsonStreamWriter(Builder builder)
7778 streamWriterBuilder = StreamWriter .newBuilder (builder .streamName );
7879 this .protoSchema = ProtoSchemaConverter .convert (this .descriptor );
7980 this .totalMessageSize = protoSchema .getSerializedSize ();
81+ this .client = builder .client ;
8082 streamWriterBuilder .setWriterSchema (protoSchema );
8183 setStreamWriterSettings (
8284 builder .channelProvider ,
@@ -108,6 +110,60 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
108110 return append (jsonArr , -1 );
109111 }
110112
113+ private void refreshWriter (TableSchema updatedSchema )
114+ throws DescriptorValidationException , IOException {
115+ Preconditions .checkNotNull (updatedSchema , "updatedSchema is null." );
116+ LOG .info ("Refresh internal writer due to schema update, stream: " + this .streamName );
117+ // Close the StreamWriterf
118+ this .streamWriter .close ();
119+ // Update JsonStreamWriter's TableSchema and Descriptor
120+ this .tableSchema = updatedSchema ;
121+ this .descriptor =
122+ BQTableSchemaToProtoDescriptor .convertBQTableSchemaToProtoDescriptor (updatedSchema );
123+ this .protoSchema = ProtoSchemaConverter .convert (this .descriptor );
124+ this .totalMessageSize = protoSchema .getSerializedSize ();
125+ // Create a new underlying StreamWriter with the updated TableSchema and Descriptor
126+ this .streamWriter = streamWriterBuilder .setWriterSchema (this .protoSchema ).build ();
127+ }
128+
129+ private Message buildMessage (JSONObject json )
130+ throws InterruptedException , DescriptorValidationException , IOException {
131+ try {
132+ return JsonToProtoMessage .convertJsonToProtoMessage (
133+ this .descriptor , this .tableSchema , json , ignoreUnknownFields );
134+ } catch (Exceptions .JsonDataHasUnknownFieldException ex ) {
135+ // Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before
136+ // trying to get the table schema to increase the chance of succeed. This is to avoid
137+ // client's invalid datfa caused storm of GetWriteStream.
138+ LOG .warning (
139+ "Saw Json unknown field "
140+ + ex .getFieldName ()
141+ + ", try to refresh the writer with updated schema, stream: "
142+ + streamName );
143+ GetWriteStreamRequest writeStreamRequest =
144+ GetWriteStreamRequest .newBuilder ()
145+ .setName (this .streamName )
146+ .setView (WriteStreamView .FULL )
147+ .build ();
148+ WriteStream writeStream = client .getWriteStream (writeStreamRequest );
149+ refreshWriter (writeStream .getTableSchema ());
150+ try {
151+ return JsonToProtoMessage .convertJsonToProtoMessage (
152+ this .descriptor , this .tableSchema , json , ignoreUnknownFields );
153+ } catch (Exceptions .JsonDataHasUnknownFieldException exex ) {
154+ LOG .warning (
155+ "First attempt failed, waiting for 30 seconds to retry, stream: " + this .streamName );
156+ Thread .sleep (UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS );
157+ writeStream = client .getWriteStream (writeStreamRequest );
158+ // TODO(yiru): We should let TableSchema return a timestamp so that we can simply
159+ // compare the timestamp to see if the table schema is the same. If it is the
160+ // same, we don't need to go refresh the writer again.
161+ refreshWriter (writeStream .getTableSchema ());
162+ return JsonToProtoMessage .convertJsonToProtoMessage (
163+ this .descriptor , this .tableSchema , json , ignoreUnknownFields );
164+ }
165+ }
166+ }
111167 /**
112168 * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
113169 * data to protobuf messages, then using StreamWriter's append() to write the data at the
@@ -126,17 +182,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
126182 // Update schema only work when connection pool is not enabled.
127183 if (this .streamWriter .getConnectionOperationType () == Kind .CONNECTION_WORKER
128184 && this .streamWriter .getUpdatedSchema () != null ) {
129- TableSchema updatedSchema = this .streamWriter .getUpdatedSchema ();
130- // Close the StreamWriter
131- this .streamWriter .close ();
132- // Update JsonStreamWriter's TableSchema and Descriptor
133- this .tableSchema = updatedSchema ;
134- this .descriptor =
135- BQTableSchemaToProtoDescriptor .convertBQTableSchemaToProtoDescriptor (updatedSchema );
136- this .protoSchema = ProtoSchemaConverter .convert (this .descriptor );
137- this .totalMessageSize = protoSchema .getSerializedSize ();
138- // Create a new underlying StreamWriter with the updated TableSchema and Descriptor
139- this .streamWriter = streamWriterBuilder .setWriterSchema (this .protoSchema ).build ();
185+ refreshWriter (this .streamWriter .getUpdatedSchema ());
140186 }
141187
142188 ProtoRows .Builder rowsBuilder = ProtoRows .newBuilder ();
@@ -150,9 +196,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
150196 for (int i = 0 ; i < jsonArr .length (); i ++) {
151197 JSONObject json = jsonArr .getJSONObject (i );
152198 try {
153- Message protoMessage =
154- JsonToProtoMessage .convertJsonToProtoMessage (
155- this .descriptor , this .tableSchema , json , ignoreUnknownFields );
199+ Message protoMessage = buildMessage (json );
156200 rowsBuilder .addSerializedRows (protoMessage .toByteString ());
157201 currentRequestSize += protoMessage .getSerializedSize ();
158202 } catch (IllegalArgumentException exception ) {
@@ -169,6 +213,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
169213 } else {
170214 rowIndexToErrorMessage .put (i , exception .getMessage ());
171215 }
216+ } catch (InterruptedException ex ) {
217+ throw new RuntimeException (ex );
172218 }
173219 }
174220
@@ -277,7 +323,7 @@ public static Builder newBuilder(String streamOrTableName, TableSchema tableSche
277323 */
278324 public static Builder newBuilder (
279325 String streamOrTableName , TableSchema tableSchema , BigQueryWriteClient client ) {
280- Preconditions .checkNotNull (streamOrTableName , "StreamName is null." );
326+ Preconditions .checkNotNull (streamOrTableName , "StreamOrTableName is null." );
281327 Preconditions .checkNotNull (tableSchema , "TableSchema is null." );
282328 Preconditions .checkNotNull (client , "BigQuery client is null." );
283329 return new Builder (streamOrTableName , tableSchema , client );
@@ -359,6 +405,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
359405
360406 WriteStream writeStream = this .client .getWriteStream (writeStreamRequest );
361407 TableSchema writeStreamTableSchema = writeStream .getTableSchema ();
408+
362409 this .tableSchema = writeStreamTableSchema ;
363410 } else {
364411 this .tableSchema = tableSchema ;
0 commit comments