@@ -96,21 +96,14 @@ func (c *Client) waitForTableToExist(ctx context.Context, client *bigquery.Clien
9696func (c * Client ) waitForSchemaToMatch (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
9797 c .logger .Debug ().Str ("table" , table .Name ).Msg ("Waiting for schemas to match" )
9898 wantSchema := c .bigQuerySchemaForTable (table )
99- want , err := wantSchema .ToJSONFields ()
100- if err != nil {
101- return fmt .Errorf ("failed to convert schema to JSON: %v" , err )
102- }
10399 for i := 0 ; i < maxTableChecks ; i ++ {
104100 md , err := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Metadata (ctx )
105101 if err != nil {
106102 return err
107103 }
108- got , err := md .Schema .ToJSONFields ()
109- if err != nil {
110- return fmt .Errorf ("failed to convert schema to JSON: %v" , err )
111- }
112- if string (got ) == string (want ) {
113- c .logger .Debug ().Str ("table" , table .Name ).Msg ("Schemas matched" )
104+ haveSchema := md .Schema
105+ if schemasMatch (haveSchema , wantSchema ) {
106+ c .logger .Debug ().Str ("table" , table .Name ).Msg ("Schemas match" )
114107 return nil
115108 }
116109 c .logger .Debug ().Str ("table" , table .Name ).Int ("i" , i ).Msg ("Waiting for schemas to match" )
@@ -120,14 +113,78 @@ func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Clie
120113}
121114
122115func (c * Client ) autoMigrateTable (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
123- bqSchema := c .bigQuerySchemaForTable (table )
116+ bqTable := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name )
117+ md , err := bqTable .Metadata (ctx )
118+ if err != nil {
119+ return fmt .Errorf ("failed to get table metadata: %w" , err )
120+ }
121+ haveSchema := md .Schema
122+ wantSchema := c .bigQuerySchemaForTable (table )
123+ wantSchema , err = mergeSchemas (haveSchema , wantSchema )
124+ if err != nil {
125+ return fmt .Errorf ("failed to migrate table schema: %w" , err )
126+ }
124127 tm := bigquery.TableMetadataToUpdate {
125128 Name : table .Name ,
126129 Description : table .Description ,
127- Schema : bqSchema ,
130+ Schema : wantSchema ,
131+ }
132+ _ , err = bqTable .Update (ctx , tm , "" )
133+ if err != nil {
134+ return fmt .Errorf ("failed to update table schema: %w" , err )
135+ }
136+ return nil
137+ }
138+
139+ func schemasMatch (haveSchema , wantSchema bigquery.Schema ) bool {
140+ // Schemas are considered a match if everything in the want schema is in the have schema,
141+ // and they have the same types.
142+ // We don't mind if there are extra fields in the have schema.
143+ haveMap := make (map [string ]* bigquery.FieldSchema )
144+ for _ , f := range haveSchema {
145+ haveMap [f .Name ] = f
146+ }
147+ for _ , wf := range wantSchema {
148+ if hf , ok := haveMap [wf .Name ]; ! ok {
149+ return false
150+ } else if hf .Type != wf .Type {
151+ return false
152+ }
153+ }
154+ return true
155+ }
156+
157+ // mergeSchemas merges the schema we want with the schema we have, to avoid
158+ // losing any existing data
159+ func mergeSchemas (haveSchema , wantSchema bigquery.Schema ) (bigquery.Schema , error ) {
160+ haveMap := make (map [string ]* bigquery.FieldSchema )
161+ for _ , f := range haveSchema {
162+ haveMap [f .Name ] = f
163+ }
164+ wantMap := make (map [string ]* bigquery.FieldSchema )
165+ for _ , f := range wantSchema {
166+ wantMap [f .Name ] = f
167+ }
168+ merged := make (bigquery.Schema , 0 , len (wantSchema ))
169+ // keep everything in the schema we have, as long as the types didn't change
170+ // or an unknown column isn't required
171+ for _ , f := range haveSchema {
172+ if want , ok := wantMap [f .Name ]; ok {
173+ if want .Type != f .Type {
174+ return nil , fmt .Errorf ("field %v changed type from %v to %v" , f .Name , f .Type , want .Type )
175+ }
176+ } else if f .Required {
177+ return nil , fmt .Errorf ("field %v is required but not in new schema" , f .Name )
178+ }
179+ merged = append (merged , f )
180+ }
181+ // add anything new
182+ for _ , f := range wantSchema {
183+ if _ , ok := haveMap [f .Name ]; ! ok {
184+ merged = append (merged , f )
185+ }
128186 }
129- _ , err := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Update (ctx , tm , "" )
130- return err
187+ return merged , nil
131188}
132189
133190func (c * Client ) createTable (ctx context.Context , client * bigquery.Client , table * schema.Table ) error {
0 commit comments