Skip to content

Commit ed16cef

Browse files
authored
feat(pg): Use pgx batch to improve performance (#2604)
Adds batching to improve performance over network + exposing this as configuration. Closes #2462
1 parent f5f9257 commit ed16cef

8 files changed

Lines changed: 48 additions & 13 deletions

File tree

plugins/destination/postgresql/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,7 @@ This is the top level spec used by the PostgreSQL destination Plugin.
1919

2020
Available: "error", "warn", "info", "debug", "trace"
2121
define if and in which level to log [`pgx`](https://github.com/jackc/pgx) call.
22+
23+
- `batch_size` (int) (optional, defaults to 1000)
24+
25+
Number of rows to insert in a single batch.

plugins/destination/postgresql/client/client.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,15 @@ import (
1515
pgxUUID "github.com/vgarvardt/pgx-google-uuid/v4"
1616
)
1717

18-
type Spec struct {
19-
ConnectionString string `json:"connection_string,omitempty"`
20-
PgxLogLevel LogLevel `json:"pgx_log_level,omitempty"`
21-
}
22-
2318
type Client struct {
2419
conn *pgxpool.Pool
2520
logger zerolog.Logger
2621
spec specs.Destination
2722
currentDatabaseName string
2823
currentSchemaName string
2924
pgType pgType
25+
batchSize int
26+
batch *pgx.Batch
3027
}
3128

3229
type pgTablePrimaryKeys struct {
@@ -73,12 +70,15 @@ const (
7370
func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (plugins.DestinationClient, error) {
7471
c := &Client{
7572
logger: logger.With().Str("module", "pg-dest").Logger(),
73+
batch: &pgx.Batch{},
7674
}
7775
var specPostgreSql Spec
7876
c.spec = spec
7977
if err := spec.UnmarshalSpec(&specPostgreSql); err != nil {
8078
return nil, fmt.Errorf("failed to unmarshal postgresql spec: %w", err)
8179
}
80+
specPostgreSql.SetDefaults()
81+
c.batchSize = specPostgreSql.BatchSize
8282

8383
logLevel, err := pgx.LogLevelFromString(specPostgreSql.PgxLogLevel.String())
8484
if err != nil {
@@ -117,14 +117,20 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (pl
117117
}
118118

119119
func (c *Client) Close(ctx context.Context) error {
120+
var err error
120121
if c.conn == nil {
121122
return fmt.Errorf("client already closed or not initialized")
122123
}
124+
if c.batch.Len() > 0 {
125+
br := c.conn.SendBatch(ctx, c.batch)
126+
err = br.Close()
127+
c.batch = &pgx.Batch{}
128+
}
123129
if c.conn != nil {
124130
c.conn.Close()
125131
c.conn = nil
126132
}
127-
return nil
133+
return err
128134
}
129135

130136
func (c *Client) currentDatabase(ctx context.Context) (string, error) {

plugins/destination/postgresql/client/deletestale.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/jackc/pgx/v4"
1010
)
1111

12-
func (p *Client) DeleteStale(ctx context.Context, table string, source string, syncTime time.Time) error {
12+
func (c *Client) DeleteStale(ctx context.Context, table string, source string, syncTime time.Time) error {
1313
var sb strings.Builder
1414
sb.WriteString("delete from ")
1515
sb.WriteString(pgx.Identifier{table}.Sanitize())
@@ -18,7 +18,7 @@ func (p *Client) DeleteStale(ctx context.Context, table string, source string, s
1818
sb.WriteString(" = $1 and ")
1919
sb.WriteString(schema.CqSyncTimeColumn.Name)
2020
sb.WriteString(" < $2")
21-
if _, err := p.conn.Exec(ctx, sb.String(), source, syncTime); err != nil {
21+
if _, err := c.conn.Exec(ctx, sb.String(), source, syncTime); err != nil {
2222
return err
2323
}
2424
return nil

plugins/destination/postgresql/client/deletestale_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestDeleteStale(t *testing.T) {
1515
WriteMode: specs.WriteModeOverwriteDeleteStale,
1616
Spec: &Spec{
1717
ConnectionString: getTestConnection(),
18+
BatchSize: 1,
1819
},
1920
})
2021
if err != nil {

plugins/destination/postgresql/client/migrate_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestMigrate(t *testing.T) {
1515
WriteMode: specs.WriteModeOverwriteDeleteStale,
1616
Spec: &Spec{
1717
ConnectionString: getTestConnection(),
18+
BatchSize: 1,
1819
},
1920
})
2021
if err != nil {
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package client
2+
3+
type Spec struct {
4+
ConnectionString string `json:"connection_string,omitempty"`
5+
PgxLogLevel LogLevel `json:"pgx_log_level,omitempty"`
6+
BatchSize int `json:"batch_size,omitempty"`
7+
}
8+
9+
const defaultBatchSize = 1000
10+
11+
func (s *Spec) SetDefaults() {
12+
if s.BatchSize <= 0 {
13+
s.BatchSize = defaultBatchSize
14+
}
15+
}

plugins/destination/postgresql/client/write.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@ import (
99
"github.com/jackc/pgx/v4"
1010
)
1111

12-
func (p *Client) Write(ctx context.Context, table string, data map[string]interface{}) error {
12+
func (c *Client) Write(ctx context.Context, table string, data map[string]interface{}) error {
1313
var sql string
1414
var values []interface{}
15-
if p.spec.WriteMode == specs.WriteModeAppend {
15+
16+
if c.spec.WriteMode == specs.WriteModeAppend {
1617
sql, values = insert(table, data)
1718
} else {
1819
sql, values = upsert(table, data)
1920
}
20-
_, err := p.conn.Exec(ctx, sql, values...)
21-
if err != nil {
22-
return fmt.Errorf("failed to insert data with sql '%s': %w", sql, err)
21+
c.batch.Queue(sql, values...)
22+
if c.batch.Len() >= c.batchSize {
23+
br := c.conn.SendBatch(ctx, c.batch)
24+
if err := br.Close(); err != nil {
25+
c.batch = &pgx.Batch{}
26+
return fmt.Errorf("failed to execute batch: %v", err)
27+
}
28+
c.batch = &pgx.Batch{}
2329
}
2430
return nil
2531
}

plugins/destination/postgresql/client/write_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func TestWriteOverwriteDeleteStale(t *testing.T) {
1818
WriteMode: specs.WriteModeOverwriteDeleteStale,
1919
Spec: &Spec{
2020
ConnectionString: getTestConnection(),
21+
BatchSize: 1,
2122
},
2223
})
2324
if err != nil {
@@ -73,6 +74,7 @@ func TestWriteAppend(t *testing.T) {
7374
WriteMode: specs.WriteModeAppend,
7475
Spec: &Spec{
7576
ConnectionString: getTestConnection(),
77+
BatchSize: 1,
7678
},
7779
})
7880
if err != nil {

0 commit comments

Comments
 (0)