@@ -447,133 +447,6 @@ public void testJsonStreamWriterWithDefaultStream()
447447 }
448448 }
449449
450- @ Test
451- public void testJsonStreamWriterSchemaUpdate ()
452- throws IOException , InterruptedException , ExecutionException ,
453- Descriptors .DescriptorValidationException {
454- String tableName = "SchemaUpdateTable" ;
455- TableInfo tableInfo =
456- TableInfo .newBuilder (
457- TableId .of (DATASET , tableName ),
458- StandardTableDefinition .of (
459- Schema .of (
460- com .google .cloud .bigquery .Field .newBuilder ("foo" , LegacySQLTypeName .STRING )
461- .build ())))
462- .build ();
463-
464- // Create a Bigquery table.
465- bigquery .create (tableInfo );
466- TableName parent = TableName .of (ServiceOptions .getDefaultProjectId (), DATASET , tableName );
467- // Create a Write Api stream.
468- WriteStream writeStream =
469- client .createWriteStream (
470- CreateWriteStreamRequest .newBuilder ()
471- .setParent (parent .toString ())
472- .setWriteStream (
473- WriteStream .newBuilder ().setType (WriteStream .Type .COMMITTED ).build ())
474- .build ());
475-
476- try (JsonStreamWriter jsonStreamWriter =
477- JsonStreamWriter .newBuilder (writeStream .getName (), writeStream .getTableSchema ()).build ()) {
478- // 1). Send 1 row
479- JSONObject foo = new JSONObject ();
480- foo .put ("foo" , "aaa" );
481- JSONArray jsonArr = new JSONArray ();
482- jsonArr .put (foo );
483-
484- ApiFuture <AppendRowsResponse > response = jsonStreamWriter .append (jsonArr , 0 );
485- assertEquals (0 , response .get ().getAppendResult ().getOffset ().getValue ());
486- // 2). Schema update and wait until querying it returns a new schema.
487- try {
488- com .google .cloud .bigquery .Table table = bigquery .getTable (DATASET , tableName );
489- Schema schema = table .getDefinition ().getSchema ();
490- FieldList fields = schema .getFields ();
491- Field newField =
492- Field .newBuilder ("bar" , LegacySQLTypeName .STRING ).setMode (Field .Mode .NULLABLE ).build ();
493-
494- List <Field > fieldList = new ArrayList <Field >();
495- fieldList .add (fields .get (0 ));
496- fieldList .add (newField );
497- Schema newSchema = Schema .of (fieldList );
498- // Update the table with the new schema
499- com .google .cloud .bigquery .Table updatedTable =
500- table .toBuilder ().setDefinition (StandardTableDefinition .of (newSchema )).build ();
501- updatedTable .update ();
502- int millis = 0 ;
503- while (millis <= 10000 ) {
504- if (newSchema .equals (table .reload ().getDefinition ().getSchema ())) {
505- break ;
506- }
507- Thread .sleep (1000 );
508- millis += 1000 ;
509- }
510- newSchema = schema ;
511- LOG .info (
512- "bar column successfully added to table in "
513- + millis
514- + " millis: "
515- + bigquery .getTable (DATASET , tableName ).getDefinition ().getSchema ());
516- } catch (BigQueryException e ) {
517- LOG .severe ("bar column was not added. \n " + e .toString ());
518- }
519- // 3). Send rows to wait for updatedSchema to be returned.
520- JSONObject foo2 = new JSONObject ();
521- foo2 .put ("foo" , "bbb" );
522- JSONArray jsonArr2 = new JSONArray ();
523- jsonArr2 .put (foo2 );
524-
525- int next = 0 ;
526- for (int i = 1 ; i < 100 ; i ++) {
527- ApiFuture <AppendRowsResponse > response2 = jsonStreamWriter .append (jsonArr2 , i );
528- assertEquals (i , response2 .get ().getAppendResult ().getOffset ().getValue ());
529- if (response2 .get ().hasUpdatedSchema ()) {
530- next = i ;
531- break ;
532- } else {
533- Thread .sleep (1000 );
534- }
535- }
536-
537- int millis = 0 ;
538- while (millis <= 10000 ) {
539- if (jsonStreamWriter .getDescriptor ().getFields ().size () == 2 ) {
540- LOG .info ("JsonStreamWriter successfully updated internal descriptor!" );
541- break ;
542- }
543- Thread .sleep (100 );
544- millis += 100 ;
545- }
546- assertTrue (jsonStreamWriter .getDescriptor ().getFields ().size () == 2 );
547- // 4). Send rows with updated schema.
548- JSONObject updatedFoo = new JSONObject ();
549- updatedFoo .put ("foo" , "ccc" );
550- updatedFoo .put ("bar" , "ddd" );
551- JSONArray updatedJsonArr = new JSONArray ();
552- updatedJsonArr .put (updatedFoo );
553- for (int i = 0 ; i < 10 ; i ++) {
554- ApiFuture <AppendRowsResponse > response3 =
555- jsonStreamWriter .append (updatedJsonArr , next + 1 + i );
556- assertEquals (next + 1 + i , response3 .get ().getAppendResult ().getOffset ().getValue ());
557- response3 .get ();
558- }
559-
560- TableResult result3 =
561- bigquery .listTableData (
562- tableInfo .getTableId (), BigQuery .TableDataListOption .startIndex (0L ));
563- Iterator <FieldValueList > iter3 = result3 .getValues ().iterator ();
564- assertEquals ("aaa" , iter3 .next ().get (0 ).getStringValue ());
565- for (int j = 1 ; j <= next ; j ++) {
566- assertEquals ("bbb" , iter3 .next ().get (0 ).getStringValue ());
567- }
568- for (int j = next + 1 ; j < next + 1 + 10 ; j ++) {
569- FieldValueList temp = iter3 .next ();
570- assertEquals ("ccc" , temp .get (0 ).getStringValue ());
571- assertEquals ("ddd" , temp .get (1 ).getStringValue ());
572- }
573- assertEquals (false , iter3 .hasNext ());
574- }
575- }
576-
577450 @ Test
578451 public void testComplicateSchemaWithPendingStream ()
579452 throws IOException , InterruptedException , ExecutionException {
0 commit comments