@@ -15,18 +15,15 @@ import (
1515 pgxUUID "github.com/vgarvardt/pgx-google-uuid/v4"
1616)
1717
18- type Spec struct {
19- ConnectionString string `json:"connection_string,omitempty"`
20- PgxLogLevel LogLevel `json:"pgx_log_level,omitempty"`
21- }
22-
2318type Client struct {
2419 conn * pgxpool.Pool
2520 logger zerolog.Logger
2621 spec specs.Destination
2722 currentDatabaseName string
2823 currentSchemaName string
2924 pgType pgType
25+ batchSize int
26+ batch * pgx.Batch
3027}
3128
3229type pgTablePrimaryKeys struct {
@@ -73,12 +70,15 @@ const (
7370func New (ctx context.Context , logger zerolog.Logger , spec specs.Destination ) (plugins.DestinationClient , error ) {
7471 c := & Client {
7572 logger : logger .With ().Str ("module" , "pg-dest" ).Logger (),
73+ batch : & pgx.Batch {},
7674 }
7775 var specPostgreSql Spec
7876 c .spec = spec
7977 if err := spec .UnmarshalSpec (& specPostgreSql ); err != nil {
8078 return nil , fmt .Errorf ("failed to unmarshal postgresql spec: %w" , err )
8179 }
80+ specPostgreSql .SetDefaults ()
81+ c .batchSize = specPostgreSql .BatchSize
8282
8383 logLevel , err := pgx .LogLevelFromString (specPostgreSql .PgxLogLevel .String ())
8484 if err != nil {
@@ -117,14 +117,20 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (pl
117117}
118118
119119func (c * Client ) Close (ctx context.Context ) error {
120+ var err error
120121 if c .conn == nil {
121122 return fmt .Errorf ("client already closed or not initialized" )
122123 }
124+ if c .batch .Len () > 0 {
125+ br := c .conn .SendBatch (ctx , c .batch )
126+ err = br .Close ()
127+ c .batch = & pgx.Batch {}
128+ }
123129 if c .conn != nil {
124130 c .conn .Close ()
125131 c .conn = nil
126132 }
127- return nil
133+ return err
128134}
129135
130136func (c * Client ) currentDatabase (ctx context.Context ) (string , error ) {
0 commit comments