Skip to content

Commit f265a94

Browse files
yevgenypatshermanschaaferezrokah
authored
feat: Migrate cli, plugins and destinations to new type system (#3323)
This is instead of #3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: #3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
1 parent 5af977f commit f265a94

File tree

73 files changed

+497
-861
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+497
-861
lines changed

plugins/destination/postgresql/client/client.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ import (
77

88
"github.com/cloudquery/plugin-sdk/plugins"
99
"github.com/cloudquery/plugin-sdk/specs"
10-
"github.com/jackc/pgtype"
1110
"github.com/jackc/pgx/v4"
1211
"github.com/jackc/pgx/v4/log/zerologadapter"
1312
"github.com/jackc/pgx/v4/pgxpool"
1413
"github.com/rs/zerolog"
15-
pgxUUID "github.com/vgarvardt/pgx-google-uuid/v4"
1614
)
1715

1816
type Client struct {
@@ -22,9 +20,8 @@ type Client struct {
2220
currentDatabaseName string
2321
currentSchemaName string
2422
pgType pgType
25-
batchSize int
26-
batch *pgx.Batch
2723
metrics plugins.DestinationMetrics
24+
batchSize int
2825
}
2926

3027
type pgTablePrimaryKeys struct {
@@ -71,7 +68,6 @@ const (
7168
func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (plugins.DestinationClient, error) {
7269
c := &Client{
7370
logger: logger.With().Str("module", "pg-dest").Logger(),
74-
batch: &pgx.Batch{},
7571
}
7672
var specPostgreSql Spec
7773
c.spec = spec
@@ -92,7 +88,6 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (pl
9288
return nil, fmt.Errorf("failed to parse connection string %w", err)
9389
}
9490
pgxConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
95-
conn.ConnInfo().RegisterDataType(pgtype.DataType{Value: &pgxUUID.UUID{}, Name: "uuid", OID: pgtype.UUIDOID})
9691
return nil
9792
}
9893
l := zerologadapter.NewLogger(c.logger)
@@ -117,20 +112,11 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (pl
117112
return c, nil
118113
}
119114

120-
func (c *Client) Metrics() plugins.DestinationMetrics {
121-
return c.metrics
122-
}
123-
124115
func (c *Client) Close(ctx context.Context) error {
125116
var err error
126117
if c.conn == nil {
127118
return fmt.Errorf("client already closed or not initialized")
128119
}
129-
if c.batch.Len() > 0 {
130-
br := c.conn.SendBatch(ctx, c.batch)
131-
err = br.Close()
132-
c.batch = &pgx.Batch{}
133-
}
134120
if c.conn != nil {
135121
c.conn.Close()
136122
c.conn = nil

plugins/destination/postgresql/client/client_test.go

Lines changed: 16 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -6,126 +6,11 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/cloudquery/plugin-sdk/schema"
9+
"github.com/cloudquery/plugin-sdk/plugins"
1010
"github.com/cloudquery/plugin-sdk/specs"
11-
"github.com/google/uuid"
1211
"github.com/rs/zerolog"
1312
)
1413

15-
func getTestData() map[string]interface{} {
16-
// because data is sent over the wire encoded in json we need to use strings, numbers, objects, arrays, booleans and nulls
17-
// to test everything correctly
18-
return map[string]interface{}{
19-
"_cq_id": uuid.New().String(),
20-
"_cq_parent_id": nil,
21-
"_cq_source_name": "test_source",
22-
"_cq_sync_time": "2022-09-02T20:57:55.449238",
23-
"id": uuid.New().String(),
24-
"bool_column": true,
25-
"int_column": float64(3),
26-
"float_column": float64(3),
27-
"uuid_column": "9a6011b7-c5ee-4b55-95a6-37ce5e02a5a0",
28-
"string_column": "test",
29-
"string_array_column": []interface{}{"test", "test2"},
30-
"int_array_column": []interface{}{float64(1), float64(2), float64(3)},
31-
"timestamp_column": "2019-01-01T00:00:00",
32-
"interval_column": "01:02:03",
33-
"json_column": map[string]interface{}{"1": float64(1), "test": "test"},
34-
"uuid_array_column": []interface{}{"1a6011b7-c5ee-4b55-95a6-37ce5e02a5a0", "9a6011b7-c5ee-4b55-95a6-37ce5e02a5a0"},
35-
"inet_column": "1.1.1.1",
36-
"inet_array_column": []interface{}{"8.8.8.8/0"},
37-
"cidr_column": "0.0.0.0/24",
38-
"cidr_array_column": []interface{}{"0.0.0.0/24", "0.0.0.0/16"},
39-
"mac_addr_column": "00:00:00:00:00:ab",
40-
}
41-
}
42-
43-
func getTestTable() *schema.Table {
44-
return &schema.Table{
45-
Name: "simple_table",
46-
Columns: []schema.Column{
47-
schema.CqIDColumn,
48-
schema.CqParentIDColumn,
49-
schema.CqSyncTimeColumn,
50-
schema.CqSourceNameColumn,
51-
{
52-
Name: "id",
53-
Type: schema.TypeUUID,
54-
CreationOptions: schema.ColumnCreationOptions{
55-
PrimaryKey: true,
56-
},
57-
},
58-
{
59-
Name: "bool_column",
60-
Type: schema.TypeBool,
61-
CreationOptions: schema.ColumnCreationOptions{
62-
PrimaryKey: true,
63-
},
64-
},
65-
{
66-
Name: "int_column",
67-
Type: schema.TypeInt,
68-
},
69-
{
70-
Name: "float_column",
71-
Type: schema.TypeFloat,
72-
},
73-
{
74-
Name: "uuid_column",
75-
Type: schema.TypeUUID,
76-
},
77-
{
78-
Name: "string_column",
79-
Type: schema.TypeString,
80-
},
81-
{
82-
Name: "string_array_column",
83-
Type: schema.TypeStringArray,
84-
},
85-
{
86-
Name: "int_array_column",
87-
Type: schema.TypeIntArray,
88-
},
89-
{
90-
Name: "timestamp_column",
91-
Type: schema.TypeTimestamp,
92-
},
93-
{
94-
Name: "interval_column",
95-
Type: schema.TypeTimeInterval,
96-
},
97-
{
98-
Name: "json_column",
99-
Type: schema.TypeJSON,
100-
},
101-
{
102-
Name: "uuid_array_column",
103-
Type: schema.TypeUUIDArray,
104-
},
105-
{
106-
Name: "inet_column",
107-
Type: schema.TypeInet,
108-
},
109-
{
110-
Name: "inet_array_column",
111-
Type: schema.TypeInetArray,
112-
},
113-
{
114-
Name: "cidr_column",
115-
Type: schema.TypeCIDR,
116-
},
117-
{
118-
Name: "cidr_array_column",
119-
Type: schema.TypeCIDRArray,
120-
},
121-
{
122-
Name: "mac_addr_column",
123-
Type: schema.TypeMacAddr,
124-
},
125-
},
126-
}
127-
}
128-
12914
func getTestLogger(t *testing.T) zerolog.Logger {
13015
t.Helper()
13116
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
@@ -167,3 +52,18 @@ func TestInitialize(t *testing.T) {
16752
t.Fatal("expected error when closing a closed client second time")
16853
}
16954
}
55+
56+
func TestPgPlugin(t *testing.T) {
57+
ctx := context.Background()
58+
p := plugins.NewDestinationPlugin("postgresql", "development", New)
59+
60+
if err := plugins.DestinationPluginTestHelper(ctx, p, getTestLogger(t), specs.Destination{
61+
WriteMode: specs.WriteModeAppend,
62+
Spec: Spec{
63+
ConnectionString: getTestConnection(),
64+
PgxLogLevel: LogLevelTrace,
65+
},
66+
}); err != nil {
67+
t.Fatal(err)
68+
}
69+
}

plugins/destination/postgresql/client/deletestale.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,30 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67
"time"
78

89
"github.com/cloudquery/plugin-sdk/schema"
910
"github.com/jackc/pgx/v4"
1011
)
1112

12-
func (c *Client) DeleteStale(ctx context.Context, table string, source string, syncTime time.Time) error {
13-
var sb strings.Builder
14-
sb.WriteString("delete from ")
15-
sb.WriteString(pgx.Identifier{table}.Sanitize())
16-
sb.WriteString(" where ")
17-
sb.WriteString(schema.CqSourceNameColumn.Name)
18-
sb.WriteString(" = $1 and ")
19-
sb.WriteString(schema.CqSyncTimeColumn.Name)
20-
sb.WriteString(" < $2")
21-
if _, err := c.conn.Exec(ctx, sb.String(), source, syncTime); err != nil {
22-
return err
13+
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
14+
batch := &pgx.Batch{}
15+
for _, table := range tables {
16+
var sb strings.Builder
17+
sb.WriteString("delete from ")
18+
sb.WriteString(pgx.Identifier{table.Name}.Sanitize())
19+
sb.WriteString(" where ")
20+
sb.WriteString(schema.CqSourceNameColumn.Name)
21+
sb.WriteString(" = $1 and ")
22+
sb.WriteString(schema.CqSyncTimeColumn.Name)
23+
sb.WriteString(" < $2")
24+
batch.Queue(sb.String(), source, syncTime)
25+
}
26+
br := c.conn.SendBatch(ctx, batch)
27+
if err := br.Close(); err != nil {
28+
return fmt.Errorf("failed to execute batch: %w", err)
2329
}
2430
return nil
2531
}

plugins/destination/postgresql/client/deletestale_test.go

Lines changed: 0 additions & 82 deletions
This file was deleted.

plugins/destination/postgresql/client/drop.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)