@@ -96,14 +96,21 @@ func (c *Client) waitForTableToExist(ctx context.Context, client *bigquery.Clien
9696func (c * Client ) waitForSchemaToMatch (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
9797 c .logger .Debug ().Str ("table" , table .Name ).Msg ("Waiting for schemas to match" )
9898 wantSchema := c .bigQuerySchemaForTable (table )
99+ want , err := wantSchema .ToJSONFields ()
100+ if err != nil {
101+ return fmt .Errorf ("failed to convert schema to JSON: %v" , err )
102+ }
99103 for i := 0 ; i < maxTableChecks ; i ++ {
100104 md , err := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Metadata (ctx )
101105 if err != nil {
102106 return err
103107 }
104- haveSchema := md .Schema
105- if schemasMatch (haveSchema , wantSchema ) {
106- c .logger .Debug ().Str ("table" , table .Name ).Msg ("Schemas match" )
108+ got , err := md .Schema .ToJSONFields ()
109+ if err != nil {
110+ return fmt .Errorf ("failed to convert schema to JSON: %v" , err )
111+ }
112+ if string (got ) == string (want ) {
113+ c .logger .Debug ().Str ("table" , table .Name ).Msg ("Schemas matched" )
107114 return nil
108115 }
109116 c .logger .Debug ().Str ("table" , table .Name ).Int ("i" , i ).Msg ("Waiting for schemas to match" )
@@ -113,78 +120,14 @@ func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Clie
113120}
114121
115122func (c * Client ) autoMigrateTable (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
116- bqTable := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name )
117- md , err := bqTable .Metadata (ctx )
118- if err != nil {
119- return fmt .Errorf ("failed to get table metadata: %w" , err )
120- }
121- haveSchema := md .Schema
122- wantSchema := c .bigQuerySchemaForTable (table )
123- wantSchema , err = mergeSchemas (haveSchema , wantSchema )
124- if err != nil {
125- return fmt .Errorf ("failed to migrate table schema: %w" , err )
126- }
123+ bqSchema := c .bigQuerySchemaForTable (table )
127124 tm := bigquery.TableMetadataToUpdate {
128125 Name : table .Name ,
129126 Description : table .Description ,
130- Schema : wantSchema ,
131- }
132- _ , err = bqTable .Update (ctx , tm , "" )
133- if err != nil {
134- return fmt .Errorf ("failed to update table schema: %w" , err )
135- }
136- return nil
137- }
138-
139- func schemasMatch (haveSchema , wantSchema bigquery.Schema ) bool {
140- // Schemas are considered a match if everything in the want schema is in the have schema,
141- // and they have the same types.
142- // We don't mind if there are extra fields in the have schema.
143- haveMap := make (map [string ]* bigquery.FieldSchema )
144- for _ , f := range haveSchema {
145- haveMap [f .Name ] = f
146- }
147- for _ , wf := range wantSchema {
148- if hf , ok := haveMap [wf .Name ]; ! ok {
149- return false
150- } else if hf .Type != wf .Type {
151- return false
152- }
153- }
154- return true
155- }
156-
157- // mergeSchemas merges the schema we want with the schema we have, to avoid
158- // losing any existing data
159- func mergeSchemas (haveSchema , wantSchema bigquery.Schema ) (bigquery.Schema , error ) {
160- haveMap := make (map [string ]* bigquery.FieldSchema )
161- for _ , f := range haveSchema {
162- haveMap [f .Name ] = f
163- }
164- wantMap := make (map [string ]* bigquery.FieldSchema )
165- for _ , f := range wantSchema {
166- wantMap [f .Name ] = f
167- }
168- merged := make (bigquery.Schema , 0 , len (wantSchema ))
169- // keep everything in the schema we have, as long as the types didn't change
170- // or an unknown column isn't required
171- for _ , f := range haveSchema {
172- if want , ok := wantMap [f .Name ]; ok {
173- if want .Type != f .Type {
174- return nil , fmt .Errorf ("field %v changed type from %v to %v" , f .Name , f .Type , want .Type )
175- }
176- } else if f .Required {
177- return nil , fmt .Errorf ("field %v is required but not in new schema" , f .Name )
178- }
179- merged = append (merged , f )
180- }
181- // add anything new
182- for _ , f := range wantSchema {
183- if _ , ok := haveMap [f .Name ]; ! ok {
184- merged = append (merged , f )
185- }
127+ Schema : bqSchema ,
186128 }
187- return merged , nil
129+ _ , err := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Update (ctx , tm , "" )
130+ return err
188131}
189132
190133func (c * Client ) createTable (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
0 commit comments