@@ -229,6 +229,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
229229 t .Parallel ()
230230 testDefaultStreamDynamicJSON (ctx , t , mwClient , bqClient , dataset )
231231 })
232+ t .Run ("DefaultStreamJSONData" , func (t * testing.T ) {
233+ t .Parallel ()
234+ testDefaultStreamJSONData (ctx , t , mwClient , bqClient , dataset )
235+ })
232236 t .Run ("CommittedStream" , func (t * testing.T ) {
233237 t .Parallel ()
234238 testCommittedStream (ctx , t , mwClient , bqClient , dataset )
@@ -455,6 +459,63 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
455459 withDistinctValues ("public" , int64 (2 )))
456460}
457461
462+ func testDefaultStreamJSONData (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
463+ testTable := dataset .Table (tableIDs .New ())
464+ if err := testTable .Create (ctx , & bigquery.TableMetadata {Schema : testdata .ComplexTypeSchema }); err != nil {
465+ t .Fatalf ("failed to create test table %s: %v" , testTable .FullyQualifiedName (), err )
466+ }
467+
468+ md , descriptorProto := setupDynamicDescriptors (t , testdata .ComplexTypeSchema )
469+
470+ ms , err := mwClient .NewManagedStream (ctx ,
471+ WithDestinationTable (TableParentFromParts (testTable .ProjectID , testTable .DatasetID , testTable .TableID )),
472+ WithType (DefaultStream ),
473+ WithSchemaDescriptor (descriptorProto ),
474+ )
475+ if err != nil {
476+ t .Fatalf ("NewManagedStream: %v" , err )
477+ }
478+ validateTableConstraints (ctx , t , bqClient , testTable , "before send" ,
479+ withExactRowCount (0 ))
480+
481+ sampleJSONData := [][]byte {
482+ []byte (`{"json_type":"{\"foo\": \"bar\"}"}` ),
483+ []byte (`{"json_type":"{\"key\": \"value\"}"}` ),
484+ []byte (`{"json_type":"\"a string\""}` ),
485+ }
486+
487+ var result * AppendResult
488+ for k , v := range sampleJSONData {
489+ message := dynamicpb .NewMessage (md )
490+
491+ // First, json->proto message
492+ err = protojson .Unmarshal (v , message )
493+ if err != nil {
494+ t .Fatalf ("failed to Unmarshal json message for row %d: %v" , k , err )
495+ }
496+ // Then, proto message -> bytes.
497+ b , err := proto .Marshal (message )
498+ if err != nil {
499+ t .Fatalf ("failed to marshal proto bytes for row %d: %v" , k , err )
500+ }
501+ result , err = ms .AppendRows (ctx , [][]byte {b })
502+ if err != nil {
503+ t .Errorf ("single-row append %d failed: %v" , k , err )
504+ }
505+ }
506+
507+ // Wait for the result to indicate ready, then validate.
508+ o , err := result .GetResult (ctx )
509+ if err != nil {
510+ t .Errorf ("result error for last send: %v" , err )
511+ }
512+ if o != NoStreamOffset {
513+ t .Errorf ("offset mismatch, got %d want %d" , o , NoStreamOffset )
514+ }
515+ validateTableConstraints (ctx , t , bqClient , testTable , "after send" ,
516+ withExactRowCount (int64 (len (sampleJSONData ))))
517+ }
518+
458519func testBufferedStream (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
459520 testTable := dataset .Table (tableIDs .New ())
460521 if err := testTable .Create (ctx , & bigquery.TableMetadata {Schema : testdata .SimpleMessageSchema }); err != nil {
@@ -1389,6 +1450,7 @@ func TestIntegration_ProtoNormalization(t *testing.T) {
13891450 t .Run ("ComplexType" , func (t * testing.T ) {
13901451 t .Parallel ()
13911452 schema := testdata .ComplexTypeSchema
1453+ jsonData := "{\" foo\" : \" bar\" }"
13921454 mesg := & testdata.ComplexType {
13931455 NestedRepeatedType : []* testdata.NestedType {
13941456 {
@@ -1404,6 +1466,7 @@ func TestIntegration_ProtoNormalization(t *testing.T) {
14041466 RangeType : & testdata.RangeTypeTimestamp {
14051467 End : proto .Int64 (time .Now ().UnixNano ()),
14061468 },
1469+ JsonType : & jsonData ,
14071470 }
14081471 b , err := proto .Marshal (mesg )
14091472 if err != nil {
0 commit comments