Skip to content

Commit c5a4bf5

Browse files
authored
fix: Error if after the migration there are not null columns that are not part of the new schema (#6282)
#### Summary Fixes #6248 (implements ~~first~~ second solution from the issue) <!--
1 parent d0d867b commit c5a4bf5

4 files changed

Lines changed: 103 additions & 6 deletions

File tree

plugins/destination/postgresql/client/client_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package client
22

33
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
47
"os"
58
"testing"
69

710
"github.com/cloudquery/plugin-sdk/plugins/destination"
11+
"github.com/cloudquery/plugin-sdk/schema"
12+
"github.com/cloudquery/plugin-sdk/specs"
13+
"github.com/cloudquery/plugin-sdk/testdata"
14+
"github.com/rs/zerolog"
15+
"github.com/stretchr/testify/require"
816
)
917

1018
func getTestConnection() string {
@@ -24,3 +32,40 @@ func TestPgPlugin(t *testing.T) {
2432
},
2533
destination.PluginTestSuiteTests{})
2634
}
35+
36+
func TestPgPluginPrimaryKeyRename(t *testing.T) {
37+
tableName := fmt.Sprintf("cq_test_pk_rename_%d", rand.Intn(100))
38+
tableWithStalePk := testdata.TestTable(tableName)
39+
// We simulate that a primary column was renamed
40+
tableWithStalePk.Columns = append(tableWithStalePk.Columns, schema.Column{Name: "stale_pk_1", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}})
41+
tableWithStalePk.Columns = append(tableWithStalePk.Columns, schema.Column{Name: "stale_pk_2", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}})
42+
p := destination.NewPlugin("postgresql", "development", New)
43+
ctx := context.Background()
44+
45+
spec := Spec{
46+
ConnectionString: getTestConnection(),
47+
PgxLogLevel: LogLevelTrace,
48+
}
49+
50+
// Init the plugin so we can call migrate
51+
if err := p.Init(ctx, zerolog.Logger{}, specs.Destination{Name: "stale_pk", Spec: spec}); err != nil {
52+
t.Fatal(err)
53+
}
54+
55+
// Call migrate so the `stale_pk_1` and `stale_pk_2` columns are created
56+
if err := p.Migrate(ctx, []*schema.Table{tableWithStalePk}); err != nil {
57+
t.Fatal(err)
58+
}
59+
60+
// Migrate the table again without the `stale_pk_1` and `stale_pk_2` columns
61+
table := testdata.TestTable(tableName)
62+
err := p.Migrate(ctx, []*schema.Table{table})
63+
expected := `the following primary keys were removed from the schema ["stale_pk_1" "stale_pk_2"] for table "%s".
64+
You can migrate the table manually by running:
65+
-----------------------------------------------------------------------------------------
66+
alter table "%s" drop constraint if exists "%s_cqpk";
67+
alter table "%s" alter column "stale_pk_1" drop not null;
68+
alter table "%s" alter column "stale_pk_2" drop not null;
69+
-----------------------------------------------------------------------------------------`
70+
require.ErrorContains(t, err, fmt.Sprintf(expected, tableName, tableName, tableName, tableName, tableName))
71+
}

plugins/destination/postgresql/client/migrate.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,26 @@ func (c *Client) isTableExistSQL(ctx context.Context, table string) (bool, error
6161
return tableExist == 1, nil
6262
}
6363

64+
func (c *Client) getStalePks(pgPKs map[string]bool, table *schema.Table) []string {
65+
stalePks := []string{}
66+
if c.enabledPks() {
67+
for pk := range pgPKs {
68+
stalePk := true
69+
for _, col := range table.Columns {
70+
if col.Name == pk && col.CreationOptions.PrimaryKey {
71+
stalePk = false
72+
break
73+
}
74+
}
75+
if stalePk {
76+
c.logger.Info().Str("table", table.Name).Str("column", pk).Msg("Column exists with primary key but is not in the schema")
77+
stalePks = append(stalePks, pk)
78+
}
79+
}
80+
}
81+
return stalePks
82+
}
83+
6484
func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) error {
6585
var err error
6686
var pgColumns *pgTableColumns
@@ -76,6 +96,16 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
7696
return fmt.Errorf("failed to get table %s primary key columns: %w", table.Name, err)
7797
}
7898

99+
stalePks := c.getStalePks(pgPKs, table)
100+
constraintName := pgx.Identifier{table.Name + "_cqpk"}.Sanitize()
101+
dropConstraintSQL := "alter table " + tableName + " drop constraint if exists " + constraintName
102+
if len(stalePks) > 0 {
103+
message := "the following primary keys were removed from the schema %q for table %q.\nYou can migrate the table manually by running:\n%s"
104+
sep := strings.Repeat("-", len(dropConstraintSQL)+1)
105+
query := fmt.Sprintf("%s\n%s;\n%s\n%s", sep, dropConstraintSQL, getDropNotNullQuery(table, stalePks), sep)
106+
return fmt.Errorf(message, stalePks, table.Name, query)
107+
}
108+
79109
reCreatePrimaryKeys := false
80110

81111
for _, col := range table.Columns {
@@ -123,26 +153,25 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
123153
reCreatePrimaryKeys = true
124154
}
125155
}
156+
126157
if reCreatePrimaryKeys {
127158
c.logger.Info().Str("table", table.Name).Msg("Recreating primary keys")
128-
if err := c.setNullOnPks(ctx, table); err != nil {
159+
if err := c.setNotNullOnPks(ctx, table); err != nil {
129160
return fmt.Errorf("failed to enforce not null on primary keys: %w", err)
130161
}
131162

132163
tx, err := c.conn.Begin(ctx)
133164
if err != nil {
134165
return fmt.Errorf("failed to begin transaction to recreate primary keys: %w", err)
135166
}
136-
constraintName := pgx.Identifier{table.Name + "_cqpk"}.Sanitize()
137-
sql := "alter table " + tableName + " drop constraint if exists " + constraintName
138-
if _, err := tx.Exec(ctx, sql); err != nil {
167+
if _, err := tx.Exec(ctx, dropConstraintSQL); err != nil {
139168
if err := tx.Rollback(ctx); err != nil {
140169
c.logger.Error().Err(err).Msg("failed to rollback transaction")
141170
}
142171
return fmt.Errorf("failed to drop primary key constraint on table %s: %w", table.Name, err)
143172
}
144173

145-
sql = "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings.Join(table.PrimaryKeys(), ",") + ")"
174+
sql := "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings.Join(table.PrimaryKeys(), ",") + ")"
146175
if _, err := tx.Exec(ctx, sql); err != nil {
147176
if err := tx.Rollback(ctx); err != nil {
148177
c.logger.Error().Err(err).Msg("failed to rollback transaction")
@@ -153,10 +182,11 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
153182
return fmt.Errorf("failed to commit transaction to recreate primary keys: %w", err)
154183
}
155184
}
185+
156186
return nil
157187
}
158188

159-
func (c *Client) setNullOnPks(ctx context.Context, table *schema.Table) error {
189+
func (c *Client) setNotNullOnPks(ctx context.Context, table *schema.Table) error {
160190
for _, col := range table.PrimaryKeys() {
161191
sql := "alter table " + pgx.Identifier{table.Name}.Sanitize() + " alter column " + pgx.Identifier{col}.Sanitize() + " set not null"
162192
if _, err := c.conn.Exec(ctx, sql); err != nil {
@@ -166,6 +196,15 @@ func (c *Client) setNullOnPks(ctx context.Context, table *schema.Table) error {
166196
return nil
167197
}
168198

199+
func getDropNotNullQuery(table *schema.Table, stalePks []string) string {
200+
queries := []string{}
201+
for _, col := range stalePks {
202+
queries = append(queries, "alter table "+pgx.Identifier{table.Name}.Sanitize()+" alter column "+pgx.Identifier{col}.Sanitize()+" drop not null;")
203+
}
204+
205+
return strings.Join(queries, "\n")
206+
}
207+
169208
func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) error {
170209
var sb strings.Builder
171210
tableName := pgx.Identifier{table.Name}.Sanitize()

plugins/destination/postgresql/go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ require (
77
github.com/jackc/pgx-zerolog v0.0.0-20220923130014-7856b90a65ae
88
github.com/jackc/pgx/v5 v5.2.0
99
github.com/rs/zerolog v1.28.0
10+
github.com/stretchr/testify v1.8.1
11+
)
12+
13+
require (
14+
github.com/davecgh/go-spew v1.1.1 // indirect
15+
github.com/pmezard/go-difflib v1.0.0 // indirect
16+
gopkg.in/yaml.v3 v3.0.1 // indirect
1017
)
1118

1219
require (

plugins/destination/postgresql/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1
4949
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
5050
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5151
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
52+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5253
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
5354
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
5455
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -182,11 +183,16 @@ github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUq
182183
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
183184
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
184185
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
186+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
187+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
185188
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
186189
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
187190
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
188191
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
192+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
193+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
189194
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
195+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
190196
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
191197
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
192198
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

0 commit comments

Comments
 (0)