Skip to content

Commit cfcffba

Browse files
authored
feat: Retry on Postgres deadlock (#21268)
#### Summary - [We do not recommend customers configure overlapping syncs](https://docs.cloudquery.io/docs/advanced-topics/running-cloudquery-in-parallel#no-overlapping-syncs). - Overlapping syncs can cause deadlocks in the Postgres destination. - Avoiding overlapping syncs is the primary recommendation to prevent deadlocks. - As a secondary safeguard, this PR introduces retries to handle most Postgres deadlock scenarios (other destinations are out of scope). - Retries are attempted up to 5 times, with random jitter between attempts. - Deadlocks are rare, even with multiple concurrent syncs. Five retries balances resilience and respect for Postgres backpressure. - Users can enable this behaviour via the `retry_on_deadlock` option.
1 parent 3b47338 commit cfcffba

File tree

10 files changed

+275
-20
lines changed

10 files changed

+275
-20
lines changed

plugins/destination/postgresql/client/client.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,18 @@ type pkConstraintDetails struct {
2727
columns []string
2828
}
2929

30+
// DBPool is an interface that abstracts the pgxpool.Pool and pgxpool.Conn types for easier fine-grained testing.
31+
type DBPool interface {
32+
Acquire(ctx context.Context) (*pgxpool.Conn, error)
33+
Close()
34+
Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
35+
Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
36+
QueryRow(ctx context.Context, query string, args ...any) pgx.Row
37+
SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
38+
}
39+
3040
type Client struct {
31-
conn *pgxpool.Pool
41+
conn DBPool
3242
logger zerolog.Logger
3343
currentDatabaseName string
3444
currentSchemaName string
@@ -137,7 +147,7 @@ func (c *Client) Close(ctx context.Context) error {
137147
return nil
138148
}
139149

140-
func currentDatabase(ctx context.Context, conn *pgxpool.Pool) (string, error) {
150+
func currentDatabase(ctx context.Context, conn DBPool) (string, error) {
141151
var db string
142152
err := conn.QueryRow(ctx, "select current_database()").Scan(&db)
143153
if err != nil {
@@ -146,7 +156,7 @@ func currentDatabase(ctx context.Context, conn *pgxpool.Pool) (string, error) {
146156
return db, nil
147157
}
148158

149-
func currentSchema(ctx context.Context, conn *pgxpool.Pool) (string, error) {
159+
func currentSchema(ctx context.Context, conn DBPool) (string, error) {
150160
var schema string
151161
err := conn.QueryRow(ctx, "select current_schema()").Scan(&schema)
152162
if err != nil {

plugins/destination/postgresql/client/insert.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strconv"
88
"strings"
99

10+
"github.com/avast/retry-go/v4"
1011
"github.com/cloudquery/plugin-sdk/v4/message"
1112
"github.com/cloudquery/plugin-sdk/v4/schema"
1213
"github.com/jackc/pgx/v5"
@@ -87,21 +88,37 @@ func (c *Client) InsertBatch(ctx context.Context, messages message.WriteInserts)
8788
}
8889

8990
func (c *Client) flushBatch(ctx context.Context, batch *pgx.Batch) error {
90-
if batch.Len() == 0 {
91-
return nil
92-
}
93-
err := c.conn.SendBatch(ctx, batch).Close()
94-
if err == nil {
91+
err := retry.Do(func() error {
92+
if batch.Len() == 0 {
93+
return nil
94+
}
95+
err := c.conn.SendBatch(ctx, batch).Close()
96+
if err != nil {
97+
return err
98+
}
99+
95100
return nil
96-
}
101+
}, retry.RetryIf(func(err error) bool {
102+
var pgErr *pgconn.PgError
103+
if errors.As(err, &pgErr) {
104+
return pgErr.Code == "40P01"
105+
}
106+
107+
return false
108+
}),
109+
retry.Attempts(uint(c.spec.RetryOnDeadlock)+1),
110+
retry.LastErrorOnly(true),
111+
)
97112

98-
var pgErr *pgconn.PgError
99-
if errors.As(err, &pgErr) {
100-
return fmt.Errorf("failed to execute batch with pgerror: %s: %w", pgErrToStr(pgErr), err)
113+
if err != nil {
114+
var pgErr *pgconn.PgError
115+
if errors.As(err, &pgErr) {
116+
return fmt.Errorf("failed to execute batch with pgerror: %s: %w", pgErrToStr(pgErr), err)
117+
}
118+
return fmt.Errorf("failed to execute batch: %w", err)
101119
}
102120

103-
// not recoverable error
104-
return fmt.Errorf("failed to execute batch: %w", err)
121+
return nil
105122
}
106123

107124
func (*Client) insert(table *schema.Table) string {
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/apache/arrow-go/v18/arrow"
10+
"github.com/apache/arrow-go/v18/arrow/array"
11+
"github.com/apache/arrow-go/v18/arrow/memory"
12+
"github.com/cloudquery/cloudquery/plugins/destination/postgresql/v8/client/spec"
13+
internalPlugin "github.com/cloudquery/cloudquery/plugins/destination/postgresql/v8/resources/plugin"
14+
"github.com/cloudquery/plugin-sdk/v4/message"
15+
"github.com/cloudquery/plugin-sdk/v4/plugin"
16+
"github.com/cloudquery/plugin-sdk/v4/schema"
17+
"github.com/goccy/go-json"
18+
"github.com/google/uuid"
19+
"github.com/jackc/pgx/v5"
20+
"github.com/jackc/pgx/v5/pgconn"
21+
"github.com/jackc/pgx/v5/pgxpool"
22+
"github.com/rs/zerolog"
23+
"github.com/stretchr/testify/require"
24+
"golang.org/x/sync/errgroup"
25+
)
26+
27+
type MockBatchResults struct {
28+
closeErr error
29+
}
30+
31+
func (m *MockBatchResults) Close() error { return m.closeErr }
32+
33+
func (*MockBatchResults) Exec() (pgconn.CommandTag, error) {
34+
return pgconn.CommandTag{}, nil
35+
}
36+
37+
func (*MockBatchResults) Query() (pgx.Rows, error) { return nil, nil }
38+
39+
func (*MockBatchResults) QueryRow() pgx.Row { return nil }
40+
41+
type MockDBPool struct {
42+
sendBatchErrs []error
43+
callCount int
44+
}
45+
46+
func (*MockDBPool) Acquire(ctx context.Context) (*pgxpool.Conn, error) { return nil, nil }
47+
48+
func (*MockDBPool) Close() {}
49+
func (*MockDBPool) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) {
50+
return pgconn.CommandTag{}, nil
51+
}
52+
func (*MockDBPool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
53+
return nil, nil
54+
}
55+
func (*MockDBPool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { return nil }
56+
func (m *MockDBPool) SendBatch(ctx context.Context, batch *pgx.Batch) pgx.BatchResults {
57+
var err error
58+
if m.callCount < len(m.sendBatchErrs) {
59+
err = m.sendBatchErrs[m.callCount]
60+
}
61+
m.callCount++
62+
return &MockBatchResults{closeErr: err}
63+
}
64+
65+
func TestClient_flushBatch(t *testing.T) {
66+
pgErr := &pgconn.PgError{Code: "40P01", Message: "deadlock detected"}
67+
ctx := context.Background()
68+
batch := &pgx.Batch{}
69+
// Add a dummy query so batch.Len() > 0
70+
batch.Queue("SELECT 1")
71+
72+
tests := []struct {
73+
name string
74+
sendBatchErrs []error
75+
retryOnDeadlock int64
76+
wantErr bool
77+
}{
78+
{
79+
name: "happy path",
80+
sendBatchErrs: []error{nil},
81+
retryOnDeadlock: 5,
82+
wantErr: false,
83+
},
84+
{
85+
name: "two retries then success",
86+
sendBatchErrs: []error{
87+
pgErr,
88+
pgErr,
89+
nil,
90+
},
91+
retryOnDeadlock: 5,
92+
wantErr: false,
93+
},
94+
{
95+
name: "six retries, always deadlock, fail",
96+
sendBatchErrs: []error{
97+
pgErr,
98+
pgErr,
99+
pgErr,
100+
pgErr,
101+
pgErr,
102+
pgErr,
103+
},
104+
retryOnDeadlock: 5,
105+
wantErr: true,
106+
},
107+
}
108+
109+
for _, tt := range tests {
110+
t.Run(tt.name, func(t *testing.T) {
111+
client := &Client{
112+
conn: &MockDBPool{sendBatchErrs: tt.sendBatchErrs},
113+
spec: &spec.Spec{RetryOnDeadlock: tt.retryOnDeadlock},
114+
}
115+
err := client.flushBatch(ctx, batch)
116+
if (err != nil) != tt.wantErr {
117+
t.Errorf("flushBatch() error = %v, wantErr %v", err, tt.wantErr)
118+
}
119+
})
120+
}
121+
}
122+
123+
func TestConcurrentSyncsAgainstSameTable(t *testing.T) {
124+
const syncConcurrency = 10 // Lowered from 100 to 10 to work well with other tests
125+
const rounds = 99
126+
ctx := context.Background()
127+
group, _ := errgroup.WithContext(ctx)
128+
randomUUIDString := uuid.New().String()
129+
tableName := "k8s_core_namespaces_" + randomUUIDString
130+
131+
table := &schema.Table{
132+
Name: tableName,
133+
Columns: []schema.Column{
134+
{Name: "id", Type: arrow.BinaryTypes.String, NotNull: true},
135+
{Name: "name", Type: arrow.BinaryTypes.String, PrimaryKey: true},
136+
schema.CqSyncTimeColumn,
137+
},
138+
}
139+
// Create the table
140+
migratePlugin := plugin.NewPlugin("postgresql",
141+
internalPlugin.Version,
142+
New,
143+
plugin.WithJSONSchema(spec.JSONSchema),
144+
)
145+
s := &spec.Spec{ConnectionString: getTestConnection(), BatchSize: 1, RetryOnDeadlock: 5}
146+
b, err := json.Marshal(s)
147+
require.NoError(t, err)
148+
err = migratePlugin.Init(ctx, b, plugin.NewClientOptions{})
149+
require.NoError(t, err)
150+
migrateContext := context.Background()
151+
if err := migratePlugin.WriteAll(migrateContext, []message.WriteMessage{&message.WriteMigrateTable{Table: table}}); err != nil {
152+
t.Fatal(fmt.Errorf("failed to create table: %w", err))
153+
}
154+
155+
for range syncConcurrency {
156+
group.Go(func() error {
157+
// Simulate a sync job against the same table
158+
syncContext := context.Background()
159+
p := plugin.NewPlugin("postgresql",
160+
internalPlugin.Version,
161+
New,
162+
plugin.WithJSONSchema(spec.JSONSchema),
163+
)
164+
p.SetLogger(zerolog.New(zerolog.NewTestWriter(t)).Level(zerolog.WarnLevel))
165+
s := &spec.Spec{ConnectionString: getTestConnection()}
166+
b, err := json.Marshal(s)
167+
require.NoError(t, err)
168+
err = p.Init(syncContext, b, plugin.NewClientOptions{})
169+
require.NoError(t, err)
170+
171+
for range rounds {
172+
jobIndexAsString := fmt.Sprintf("%02d", 1)
173+
randomUUIDStringWithLastCharacterReplaced := randomUUIDString[:len(randomUUIDString)-len(jobIndexAsString)] + jobIndexAsString
174+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
175+
bldr.Field(0).(*array.StringBuilder).Append(uuid.MustParse(randomUUIDStringWithLastCharacterReplaced).String())
176+
bldr.Field(1).(*array.StringBuilder).Append("source")
177+
bldr.Field(2).(*array.TimestampBuilder).Append(arrow.Timestamp(time.Now().UnixMicro()))
178+
record := bldr.NewRecord()
179+
180+
if err := p.WriteAll(syncContext, []message.WriteMessage{&message.WriteInsert{
181+
Record: record,
182+
}}); err != nil {
183+
t.Fatal(fmt.Errorf("failed to insert record: %w", err))
184+
}
185+
}
186+
187+
return nil
188+
})
189+
}
190+
191+
require.NoError(t, group.Wait())
192+
193+
ch := make(chan arrow.Record)
194+
go func() {
195+
defer close(ch)
196+
err = migratePlugin.Read(ctx, table, ch)
197+
}()
198+
199+
numRows := 0
200+
for record := range ch {
201+
numRows += int(record.NumRows())
202+
}
203+
204+
require.Equal(t, 1, numRows)
205+
require.NoError(t, err)
206+
}

plugins/destination/postgresql/client/migrate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table, chan
139139
if err != nil {
140140
return err
141141
}
142+
default:
143+
continue
142144
}
143145
}
144146
return nil
@@ -167,6 +169,8 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool {
167169

168170
case schema.TableColumnChangeTypeMoveToCQOnly:
169171
cqMigration = true
172+
default:
173+
continue
170174
}
171175
}
172176

plugins/destination/postgresql/client/spec/schema.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/destination/postgresql/client/spec/spec.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type Spec struct {
3939

4040
// Option to create specific indexes to improve deletion performance
4141
CreatePerformanceIndexes bool `json:"create_performance_indexes,omitempty" jsonschema:"default=false"`
42+
43+
// Number of times to retry a transaction if a deadlock is detected by Postgres.
44+
RetryOnDeadlock int64 `json:"retry_on_deadlock,omitempty" jsonschema:"default=0"`
4245
}
4346

4447
func (s *Spec) SetDefaults() {

plugins/destination/postgresql/docs/_licenses.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ The following tools / packages are used in this plugin:
77
| Name | License |
88
|------|---------|
99
| github.com/adrg/xdg | MIT |
10-
| github.com/apache/arrow/go/v13 | Apache-2.0 |
1110
| github.com/apache/arrow-go/v18 | Apache-2.0 |
11+
| github.com/apache/arrow/go/v13 | Apache-2.0 |
1212
| github.com/apapsch/go-jsonmerge/v2 | MIT |
1313
| github.com/aws/aws-sdk-go-v2 | Apache-2.0 |
1414
| github.com/aws/aws-sdk-go-v2/config | Apache-2.0 |
@@ -29,8 +29,9 @@ The following tools / packages are used in this plugin:
2929
| github.com/aws/smithy-go/internal/sync/singleflight | BSD-3-Clause |
3030
| github.com/bahlo/generic-list-go | BSD-3-Clause |
3131
| github.com/buger/jsonparser | MIT |
32-
| github.com/cenkalti/backoff/v4 | MIT |
32+
| github.com/cenkalti/backoff/v5 | MIT |
3333
| github.com/cloudquery/cloudquery-api-go | MPL-2.0 |
34+
| github.com/cloudquery/codegen/jsonschema/docs | MPL-2.0 |
3435
| github.com/cloudquery/plugin-pb-go | MPL-2.0 |
3536
| github.com/cloudquery/plugin-sdk/v2/internal/glob | MIT |
3637
| github.com/cloudquery/plugin-sdk/v2/schema | MIT |
@@ -50,7 +51,6 @@ The following tools / packages are used in this plugin:
5051
| github.com/hashicorp/go-cleanhttp | MPL-2.0 |
5152
| github.com/hashicorp/go-retryablehttp | MPL-2.0 |
5253
| github.com/hashicorp/go-uuid | MPL-2.0 |
53-
| github.com/huandu/xstrings | MIT |
5454
| github.com/invopop/jsonschema | MIT |
5555
| github.com/jackc/pgpassfile | MIT |
5656
| github.com/jackc/pgservicefile | MIT |
@@ -74,13 +74,16 @@ The following tools / packages are used in this plugin:
7474
| github.com/pierrec/lz4/v4 | BSD-3-Clause |
7575
| github.com/pmezard/go-difflib/difflib | BSD-3-Clause |
7676
| github.com/rs/zerolog | MIT |
77+
| github.com/samber/lo | MIT |
7778
| github.com/santhosh-tekuri/jsonschema/v6 | Apache-2.0 |
7879
| github.com/spf13/cobra | Apache-2.0 |
7980
| github.com/spf13/pflag | BSD-3-Clause |
81+
| github.com/stoewer/go-strcase | MIT |
8082
| github.com/stretchr/testify | MIT |
8183
| github.com/thoas/go-funk | MIT |
8284
| github.com/wk8/go-ordered-map/v2 | Apache-2.0 |
8385
| github.com/zeebo/xxh3 | BSD-2-Clause |
86+
| go.opentelemetry.io/auto/sdk | Apache-2.0 |
8487
| go.opentelemetry.io/otel | Apache-2.0 |
8588
| go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp | Apache-2.0 |
8689
| go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp | Apache-2.0 |

plugins/destination/postgresql/docs/overview.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ This is the (nested) spec used by the PostgreSQL destination Plugin.
6666

6767
Creates indexes on tables that help with performance when using `write_mode: overwrite-delete-stale`.
6868

69+
- `retry_on_deadlock` (`integer`) (optional) (default: `0`)
70+
71+
Number of times to retry a transaction if a deadlock is detected by Postgres (Postgres error code `40P01`).
72+
6973
### Verbose logging for debug
7074

7175
The PostgreSQL destination can be run in debug mode.

0 commit comments

Comments
 (0)