Skip to content

Commit f3b989f

Browse files
authored
fix(deps): Update plugin-sdk to v1.21.0 for destinations (#6419)
This updates plugin-sdk to v1.21.0 for all destinations, and ensures that the new migration test either passes or gets skipped for all of them. This also restores some code that was accidentally removed in #6382
1 parent 26e995c commit f3b989f

File tree

41 files changed

+197
-69
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+197
-69
lines changed

plugins/destination/azblob/client/client_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ func TestPluginCSV(t *testing.T) {
2121
NoRotate: true,
2222
},
2323
destination.PluginTestSuiteTests{
24-
SkipOverwrite: true,
25-
SkipDeleteStale: true,
26-
SkipSecondAppend: true,
24+
SkipOverwrite: true,
25+
SkipDeleteStale: true,
26+
SkipSecondAppend: true,
27+
SkipMigrateAppend: true,
2728
},
2829
)
2930
}
@@ -40,9 +41,10 @@ func TestPluginJSON(t *testing.T) {
4041
NoRotate: true,
4142
},
4243
destination.PluginTestSuiteTests{
43-
SkipOverwrite: true,
44-
SkipDeleteStale: true,
45-
SkipSecondAppend: true,
44+
SkipOverwrite: true,
45+
SkipDeleteStale: true,
46+
SkipSecondAppend: true,
47+
SkipMigrateAppend: true,
4648
},
4749
)
4850
}

plugins/destination/azblob/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0
77
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1
88
github.com/cloudquery/filetypes v1.0.2
9-
github.com/cloudquery/plugin-sdk v1.19.0
9+
github.com/cloudquery/plugin-sdk v1.21.0
1010
github.com/google/uuid v1.3.0
1111
github.com/rs/zerolog v1.28.0
1212
)

plugins/destination/azblob/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ github.com/cloudquery/filetypes v1.0.2 h1:UxZ6aoZld2EQZzPgE8JIzmn98V6Px0cmK7rsJy
5454
github.com/cloudquery/filetypes v1.0.2/go.mod h1:d9YllN+dlQNrT/+fFLwdjeXXE3a2Mj9kffO9aMwzOrg=
5555
github.com/cloudquery/plugin-sdk v1.19.0 h1:rA2FHLgon5J+VB6tK+w3LLJyegsL/vAkj3Xi9N1Xk1c=
5656
github.com/cloudquery/plugin-sdk v1.19.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
57+
github.com/cloudquery/plugin-sdk v1.21.0 h1:Oon4fFeUpc/QNkSwLPaoHcv9EVzcXK/6Y3ZaAcg7VOk=
58+
github.com/cloudquery/plugin-sdk v1.21.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
5759
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5860
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
5961
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

plugins/destination/bigquery/client/migrate.go

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,14 @@ func (c *Client) waitForTableToExist(ctx context.Context, client *bigquery.Clien
9696
func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Client, table *schema.Table) error {
9797
c.logger.Debug().Str("table", table.Name).Msg("Waiting for schemas to match")
9898
wantSchema := c.bigQuerySchemaForTable(table)
99-
want, err := wantSchema.ToJSONFields()
100-
if err != nil {
101-
return fmt.Errorf("failed to convert schema to JSON: %v", err)
102-
}
10399
for i := 0; i < maxTableChecks; i++ {
104100
md, err := client.Dataset(c.pluginSpec.DatasetID).Table(table.Name).Metadata(ctx)
105101
if err != nil {
106102
return err
107103
}
108-
got, err := md.Schema.ToJSONFields()
109-
if err != nil {
110-
return fmt.Errorf("failed to convert schema to JSON: %v", err)
111-
}
112-
if string(got) == string(want) {
113-
c.logger.Debug().Str("table", table.Name).Msg("Schemas matched")
104+
haveSchema := md.Schema
105+
if schemasMatch(haveSchema, wantSchema) {
106+
c.logger.Debug().Str("table", table.Name).Msg("Schemas match")
114107
return nil
115108
}
116109
c.logger.Debug().Str("table", table.Name).Int("i", i).Msg("Waiting for schemas to match")
@@ -120,14 +113,78 @@ func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Clie
120113
}
121114

122115
func (c *Client) autoMigrateTable(ctx context.Context, client *bigquery.Client, table *schema.Table) error {
123-
bqSchema := c.bigQuerySchemaForTable(table)
116+
bqTable := client.Dataset(c.pluginSpec.DatasetID).Table(table.Name)
117+
md, err := bqTable.Metadata(ctx)
118+
if err != nil {
119+
return fmt.Errorf("failed to get table metadata: %w", err)
120+
}
121+
haveSchema := md.Schema
122+
wantSchema := c.bigQuerySchemaForTable(table)
123+
wantSchema, err = mergeSchemas(haveSchema, wantSchema)
124+
if err != nil {
125+
return fmt.Errorf("failed to migrate table schema: %w", err)
126+
}
124127
tm := bigquery.TableMetadataToUpdate{
125128
Name: table.Name,
126129
Description: table.Description,
127-
Schema: bqSchema,
130+
Schema: wantSchema,
131+
}
132+
_, err = bqTable.Update(ctx, tm, "")
133+
if err != nil {
134+
return fmt.Errorf("failed to update table schema: %w", err)
135+
}
136+
return nil
137+
}
138+
139+
func schemasMatch(haveSchema, wantSchema bigquery.Schema) bool {
140+
// Schemas are considered a match if everything in the want schema is in the have schema,
141+
// and they have the same types.
142+
// We don't mind if there are extra fields in the have schema.
143+
haveMap := make(map[string]*bigquery.FieldSchema)
144+
for _, f := range haveSchema {
145+
haveMap[f.Name] = f
146+
}
147+
for _, wf := range wantSchema {
148+
if hf, ok := haveMap[wf.Name]; !ok {
149+
return false
150+
} else if hf.Type != wf.Type {
151+
return false
152+
}
153+
}
154+
return true
155+
}
156+
157+
// mergeSchemas merges the schema we want with the schema we have, to avoid
158+
// losing any existing data
159+
func mergeSchemas(haveSchema, wantSchema bigquery.Schema) (bigquery.Schema, error) {
160+
haveMap := make(map[string]*bigquery.FieldSchema)
161+
for _, f := range haveSchema {
162+
haveMap[f.Name] = f
163+
}
164+
wantMap := make(map[string]*bigquery.FieldSchema)
165+
for _, f := range wantSchema {
166+
wantMap[f.Name] = f
167+
}
168+
merged := make(bigquery.Schema, 0, len(wantSchema))
169+
// keep everything in the schema we have, as long as the types didn't change
170+
// or an unknown column isn't required
171+
for _, f := range haveSchema {
172+
if want, ok := wantMap[f.Name]; ok {
173+
if want.Type != f.Type {
174+
return nil, fmt.Errorf("field %v changed type from %v to %v", f.Name, f.Type, want.Type)
175+
}
176+
} else if f.Required {
177+
return nil, fmt.Errorf("field %v is required but not in new schema", f.Name)
178+
}
179+
merged = append(merged, f)
180+
}
181+
// add anything new
182+
for _, f := range wantSchema {
183+
if _, ok := haveMap[f.Name]; !ok {
184+
merged = append(merged, f)
185+
}
128186
}
129-
_, err := client.Dataset(c.pluginSpec.DatasetID).Table(table.Name).Update(ctx, tm, "")
130-
return err
187+
return merged, nil
131188
}
132189

133190
func (c *Client) createTable(ctx context.Context, client *bigquery.Client, table *schema.Table) error {

plugins/destination/bigquery/client/read.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"strings"
78
"time"
89

910
"cloud.google.com/go/bigquery"
@@ -12,7 +13,7 @@ import (
1213
)
1314

1415
const (
15-
readSQL = "SELECT * FROM `%s.%s.%s` WHERE `_cq_source_name` = @cq_source_name order by _cq_sync_time asc"
16+
readSQL = "SELECT %s FROM `%s.%s.%s` WHERE `_cq_source_name` = @cq_source_name order by _cq_sync_time asc"
1617
)
1718

1819
func (*Client) createResultsArray(table *schema.Table) []bigquery.Value {
@@ -78,7 +79,12 @@ func (*Client) createResultsArray(table *schema.Table) []bigquery.Value {
7879
}
7980

8081
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
81-
stmt := fmt.Sprintf(readSQL, c.pluginSpec.ProjectID, c.pluginSpec.DatasetID, table.Name)
82+
colNames := make([]string, 0, len(table.Columns))
83+
for _, col := range table.Columns {
84+
colNames = append(colNames, col.Name)
85+
}
86+
cols := "`" + strings.Join(colNames, "`, `") + "`"
87+
stmt := fmt.Sprintf(readSQL, cols, c.pluginSpec.ProjectID, c.pluginSpec.DatasetID, table.Name)
8288
q := c.client.Query(stmt)
8389
q.Parameters = []bigquery.QueryParameter{
8490
{

plugins/destination/bigquery/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.19
44

55
require (
66
cloud.google.com/go/bigquery v1.44.0
7-
github.com/cloudquery/plugin-sdk v1.19.0
7+
github.com/cloudquery/plugin-sdk v1.21.0
88
github.com/rs/zerolog v1.28.0
99
golang.org/x/sync v0.1.0
1010
google.golang.org/api v0.103.0

plugins/destination/bigquery/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
5555
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
5656
github.com/cloudquery/plugin-sdk v1.19.0 h1:rA2FHLgon5J+VB6tK+w3LLJyegsL/vAkj3Xi9N1Xk1c=
5757
github.com/cloudquery/plugin-sdk v1.19.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
58+
github.com/cloudquery/plugin-sdk v1.21.0 h1:Oon4fFeUpc/QNkSwLPaoHcv9EVzcXK/6Y3ZaAcg7VOk=
59+
github.com/cloudquery/plugin-sdk v1.21.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
5860
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5961
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
6062
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

plugins/destination/csv/client/client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ func TestPlugin(t *testing.T) {
4949
Directory: t.TempDir(),
5050
},
5151
destination.PluginTestSuiteTests{
52-
SkipOverwrite: true,
53-
SkipDeleteStale: true,
52+
SkipOverwrite: true,
53+
SkipDeleteStale: true,
54+
SkipMigrateAppend: true,
5455
},
5556
)
5657
}

plugins/destination/csv/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/cloudquery/cloudquery/plugins/destination/csv
33
go 1.19
44

55
require (
6-
github.com/cloudquery/plugin-sdk v1.16.1
6+
github.com/cloudquery/plugin-sdk v1.21.0
77
github.com/rs/zerolog v1.28.0
88
)
99

plugins/destination/csv/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
4242
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
4343
github.com/cloudquery/plugin-sdk v1.16.1 h1:FstBNQkdAFZRh5F3Y0ugL/pLTg/tPAzljxnpjNda4po=
4444
github.com/cloudquery/plugin-sdk v1.16.1/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
45+
github.com/cloudquery/plugin-sdk v1.21.0 h1:Oon4fFeUpc/QNkSwLPaoHcv9EVzcXK/6Y3ZaAcg7VOk=
46+
github.com/cloudquery/plugin-sdk v1.21.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI=
4547
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
4648
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
4749
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

0 commit comments

Comments
 (0)