Skip to content

Commit 371be8e

Browse files
committed
fix: Better error message
1 parent 14790b6 commit 371be8e

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

plugins/destination/postgresql/client/client_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,12 @@ func TestPgPluginPrimaryKeyRename(t *testing.T) {
6060
// Migrate the table again without the `stale_pk_1` and `stale_pk_2` columns
6161
table := testdata.TestTable(tableName)
6262
err := p.Migrate(ctx, []*schema.Table{table})
63-
require.ErrorContains(t, err, fmt.Sprintf("the following primary keys were removed from the schema [\"stale_pk_1\" \"stale_pk_2\"] for table \"%s\"", tableName))
63+
expected := `the following primary keys were removed from the schema ["stale_pk_1" "stale_pk_2"] for table "%s".
64+
You can migrate the table manually by running:
65+
-----------------------------------------------------------------------------------------
66+
alter table "%s" drop constraint if exists "%s_cqpk";
67+
alter table "%s" alter column "stale_pk_1" drop not null;
68+
alter table "%s" alter column "stale_pk_2" drop not null;
69+
-----------------------------------------------------------------------------------------`
70+
require.ErrorContains(t, err, fmt.Sprintf(expected, tableName, tableName, tableName, tableName, tableName))
6471
}

plugins/destination/postgresql/client/migrate.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
8888

8989
// create the new column as it doesn't exist
9090
tableName := pgx.Identifier{table.Name}.Sanitize()
91+
9192
if pgColumns, err = c.getPgTableColumns(ctx, table.Name); err != nil {
9293
return fmt.Errorf("failed to get table %s columns types: %w", table.Name, err)
9394
}
@@ -97,9 +98,13 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
9798
}
9899

99100
stalePks := c.getStalePks(pgPKs, table)
101+
constraintName := pgx.Identifier{table.Name + "_cqpk"}.Sanitize()
102+
dropConstraintSQL := "alter table " + tableName + " drop constraint if exists " + constraintName
100103
if len(stalePks) > 0 {
101-
message := "the following primary keys were removed from the schema %q for table %q.\nEither drop the columns or remove their \"not null\" constraint"
102-
return fmt.Errorf(message, stalePks, table.Name)
104+
message := "the following primary keys were removed from the schema %q for table %q.\nYou can migrate the table manually by running:\n%s"
105+
sep := strings.Repeat("-", len(dropConstraintSQL)+1)
106+
query := fmt.Sprintf("%s\n%s;\n%s\n%s", sep, dropConstraintSQL, getDropNotNullQuery(table, stalePks), sep)
107+
return fmt.Errorf(message, stalePks, table.Name, query)
103108
}
104109

105110
reCreatePrimaryKeys := false
@@ -160,16 +165,14 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
160165
if err != nil {
161166
return fmt.Errorf("failed to begin transaction to recreate primary keys: %w", err)
162167
}
163-
constraintName := pgx.Identifier{table.Name + "_cqpk"}.Sanitize()
164-
sql := "alter table " + tableName + " drop constraint if exists " + constraintName
165-
if _, err := tx.Exec(ctx, sql); err != nil {
168+
if _, err := tx.Exec(ctx, dropConstraintSQL); err != nil {
166169
if err := tx.Rollback(ctx); err != nil {
167170
c.logger.Error().Err(err).Msg("failed to rollback transaction")
168171
}
169172
return fmt.Errorf("failed to drop primary key constraint on table %s: %w", table.Name, err)
170173
}
171174

172-
sql = "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings.Join(table.PrimaryKeys(), ",") + ")"
175+
sql := "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings.Join(table.PrimaryKeys(), ",") + ")"
173176
if _, err := tx.Exec(ctx, sql); err != nil {
174177
if err := tx.Rollback(ctx); err != nil {
175178
c.logger.Error().Err(err).Msg("failed to rollback transaction")
@@ -194,6 +197,15 @@ func (c *Client) setNotNullOnPks(ctx context.Context, table *schema.Table) error
194197
return nil
195198
}
196199

200+
func getDropNotNullQuery(table *schema.Table, stalePks []string) string {
201+
queries := []string{}
202+
for _, col := range stalePks {
203+
queries = append(queries, "alter table "+pgx.Identifier{table.Name}.Sanitize()+" alter column "+pgx.Identifier{col}.Sanitize()+" drop not null;")
204+
}
205+
206+
return strings.Join(queries, "\n")
207+
}
208+
197209
func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) error {
198210
var sb strings.Builder
199211
tableName := pgx.Identifier{table.Name}.Sanitize()

0 commit comments

Comments
 (0)