@@ -61,6 +61,26 @@ func (c *Client) isTableExistSQL(ctx context.Context, table string) (bool, error
6161 return tableExist == 1 , nil
6262}
6363
64+ func (c * Client ) getStalePks (pgPKs map [string ]bool , table * schema.Table ) []string {
65+ stalePks := []string {}
66+ if c .enabledPks () {
67+ for pk := range pgPKs {
68+ stalePk := true
69+ for _ , col := range table .Columns {
70+ if col .Name == pk && col .CreationOptions .PrimaryKey {
71+ stalePk = false
72+ break
73+ }
74+ }
75+ if stalePk {
76+ c .logger .Info ().Str ("table" , table .Name ).Str ("column" , pk ).Msg ("Column exists with primary key but is not in the schema" )
77+ stalePks = append (stalePks , pk )
78+ }
79+ }
80+ }
81+ return stalePks
82+ }
83+
6484func (c * Client ) autoMigrateTable (ctx context.Context , table * schema.Table ) error {
6585 var err error
6686 var pgColumns * pgTableColumns
@@ -76,6 +96,16 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
7696 return fmt .Errorf ("failed to get table %s primary key columns: %w" , table .Name , err )
7797 }
7898
99+ stalePks := c .getStalePks (pgPKs , table )
100+ constraintName := pgx.Identifier {table .Name + "_cqpk" }.Sanitize ()
101+ dropConstraintSQL := "alter table " + tableName + " drop constraint if exists " + constraintName
102+ if len (stalePks ) > 0 {
103+ message := "the following primary keys were removed from the schema %q for table %q.\n You can migrate the table manually by running:\n %s"
104+ sep := strings .Repeat ("-" , len (dropConstraintSQL )+ 1 )
105+ query := fmt .Sprintf ("%s\n %s;\n %s\n %s" , sep , dropConstraintSQL , getDropNotNullQuery (table , stalePks ), sep )
106+ return fmt .Errorf (message , stalePks , table .Name , query )
107+ }
108+
79109 reCreatePrimaryKeys := false
80110
81111 for _ , col := range table .Columns {
@@ -123,26 +153,25 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
123153 reCreatePrimaryKeys = true
124154 }
125155 }
156+
126157 if reCreatePrimaryKeys {
127158 c .logger .Info ().Str ("table" , table .Name ).Msg ("Recreating primary keys" )
128- if err := c .setNullOnPks (ctx , table ); err != nil {
159+ if err := c .setNotNullOnPks (ctx , table ); err != nil {
129160 return fmt .Errorf ("failed to enforce not null on primary keys: %w" , err )
130161 }
131162
132163 tx , err := c .conn .Begin (ctx )
133164 if err != nil {
134165 return fmt .Errorf ("failed to begin transaction to recreate primary keys: %w" , err )
135166 }
136- constraintName := pgx.Identifier {table .Name + "_cqpk" }.Sanitize ()
137- sql := "alter table " + tableName + " drop constraint if exists " + constraintName
138- if _ , err := tx .Exec (ctx , sql ); err != nil {
167+ if _ , err := tx .Exec (ctx , dropConstraintSQL ); err != nil {
139168 if err := tx .Rollback (ctx ); err != nil {
140169 c .logger .Error ().Err (err ).Msg ("failed to rollback transaction" )
141170 }
142171 return fmt .Errorf ("failed to drop primary key constraint on table %s: %w" , table .Name , err )
143172 }
144173
145- sql = "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings .Join (table .PrimaryKeys (), "," ) + ")"
174+ sql : = "alter table " + tableName + " add constraint " + constraintName + " primary key (" + strings .Join (table .PrimaryKeys (), "," ) + ")"
146175 if _ , err := tx .Exec (ctx , sql ); err != nil {
147176 if err := tx .Rollback (ctx ); err != nil {
148177 c .logger .Error ().Err (err ).Msg ("failed to rollback transaction" )
@@ -153,10 +182,11 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table) erro
153182 return fmt .Errorf ("failed to commit transaction to recreate primary keys: %w" , err )
154183 }
155184 }
185+
156186 return nil
157187}
158188
159- func (c * Client ) setNullOnPks (ctx context.Context , table * schema.Table ) error {
189+ func (c * Client ) setNotNullOnPks (ctx context.Context , table * schema.Table ) error {
160190 for _ , col := range table .PrimaryKeys () {
161191 sql := "alter table " + pgx.Identifier {table .Name }.Sanitize () + " alter column " + pgx.Identifier {col }.Sanitize () + " set not null"
162192 if _ , err := c .conn .Exec (ctx , sql ); err != nil {
@@ -166,6 +196,15 @@ func (c *Client) setNullOnPks(ctx context.Context, table *schema.Table) error {
166196 return nil
167197}
168198
199+ func getDropNotNullQuery (table * schema.Table , stalePks []string ) string {
200+ queries := []string {}
201+ for _ , col := range stalePks {
202+ queries = append (queries , "alter table " + pgx.Identifier {table .Name }.Sanitize ()+ " alter column " + pgx.Identifier {col }.Sanitize ()+ " drop not null;" )
203+ }
204+
205+ return strings .Join (queries , "\n " )
206+ }
207+
169208func (c * Client ) createTableIfNotExist (ctx context.Context , table * schema.Table ) error {
170209 var sb strings.Builder
171210 tableName := pgx.Identifier {table .Name }.Sanitize ()
0 commit comments