Skip to content

Commit 018edf4

Browse files
authored
fix: Fix number of writes reported by postgresql (#6757)
Because we didn't use the batch size, but rather the batch size limit when incrementing the counter for number of records written, the results may have been inaccurate in some cases.
1 parent 59c32dc commit 018edf4

1 file changed

Lines changed: 6 additions & 4 deletions

File tree

  • plugins/destination/postgresql/client

plugins/destination/postgresql/client/write.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *de
6969
sql = c.upsert(table)
7070
}
7171
batch.Queue(sql, r.Data...)
72-
if batch.Len() >= c.batchSize {
72+
batchSize := batch.Len()
73+
if batchSize >= c.batchSize {
7374
br := c.conn.SendBatch(ctx, batch)
7475
if err := br.Close(); err != nil {
7576
var pgErr *pgconn.PgError
@@ -79,12 +80,13 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *de
7980
}
8081
return fmt.Errorf("failed to execute batch with pgerror: %s: %w", pgErrToStr(pgErr), err)
8182
}
82-
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
83+
atomic.AddUint64(&c.metrics.Writes, uint64(batchSize))
8384
batch = &pgx.Batch{}
8485
}
8586
}
8687

87-
if batch.Len() > 0 {
88+
batchSize := batch.Len()
89+
if batchSize > 0 {
8890
br := c.conn.SendBatch(ctx, batch)
8991
if err := br.Close(); err != nil {
9092
var pgErr *pgconn.PgError
@@ -94,7 +96,7 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *de
9496
}
9597
return fmt.Errorf("failed to execute batch with pgerror: %s: %w", pgErrToStr(pgErr), err)
9698
}
97-
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
99+
atomic.AddUint64(&c.metrics.Writes, uint64(batchSize))
98100
}
99101

100102
return nil

0 commit comments

Comments
 (0)