Skip to content

Commit 80cfed7

Browse files
authored
fix: Improve PG Write Speeds (#2887)
#### Summary <!-- Explain what problem this PR addresses --> <!--
1 parent 2d3b885 commit 80cfed7

1 file changed

Lines changed: 24 additions & 6 deletions

File tree

  • plugins/destination/postgresql/client

plugins/destination/postgresql/client/write.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package client
33
import (
44
"context"
55
"fmt"
6+
"sort"
67
"strings"
78

89
"github.com/cloudquery/plugin-sdk/specs"
@@ -35,13 +36,22 @@ func insert(table string, data map[string]interface{}) (string, []interface{}) {
3536

3637
columns := make([]string, 0, len(data))
3738
values := make([]interface{}, 0, len(data))
38-
for c, v := range data {
39-
columns = append(columns, pgx.Identifier{c}.Sanitize())
40-
values = append(values, v)
39+
40+
// Sort the columns prior to adding data to columns and values arrays
41+
// Columns need to be in the same order so that the query can be cached during the statement preparation stage
42+
keys := make([]string, 0, len(data))
43+
for k := range data {
44+
keys = append(keys, k)
45+
}
46+
sort.Strings(keys)
47+
for _, key := range keys {
48+
columns = append(columns, pgx.Identifier{key}.Sanitize())
49+
values = append(values, data[key])
4150
}
4251
sb.WriteString("insert into ")
4352
sb.WriteString(table)
4453
sb.WriteString(" (")
54+
sort.Strings(columns)
4555
for i, c := range columns {
4656
sb.WriteString(c)
4757
// sb.WriteString("::" + SchemaTypeToPg())
@@ -67,14 +77,22 @@ func upsert(table string, data map[string]interface{}) (string, []interface{}) {
6777

6878
columns := make([]string, 0, len(data))
6979
values := make([]interface{}, 0, len(data))
70-
for c, v := range data {
71-
columns = append(columns, pgx.Identifier{c}.Sanitize())
72-
values = append(values, v)
80+
// Sort the columns prior to adding data to columns and values arrays
81+
// Columns need to be in the same order so that the query can be cached during the statement preparation stage
82+
keys := make([]string, 0, len(data))
83+
for k := range data {
84+
keys = append(keys, k)
85+
}
86+
sort.Strings(keys)
87+
for _, key := range keys {
88+
columns = append(columns, pgx.Identifier{key}.Sanitize())
89+
values = append(values, data[key])
7390
}
7491

7592
sb.WriteString("insert into ")
7693
sb.WriteString(table)
7794
sb.WriteString(" (")
95+
sort.Strings(columns)
7896
for i, c := range columns {
7997
sb.WriteString(c)
8098
// sb.WriteString("::" + SchemaTypeToPg())

0 commit comments

Comments
 (0)