Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dest_clickhouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
working-directory: plugins/destination/clickhouse
services:
clickhouse:
image: clickhouse/clickhouse-server:22.1.2
image: clickhouse/clickhouse-server:24.8.1
env:
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: ${{ env.DB_PASSWORD }}
Expand Down
3 changes: 1 addition & 2 deletions plugins/destination/clickhouse/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type Client struct {
writer *batchwriter.BatchWriter
plugin.UnimplementedSource
batchwriter.UnimplementedDeleteStale
batchwriter.UnimplementedDeleteRecord
}

var _ plugin.Client = (*Client)(nil)
Expand Down Expand Up @@ -71,7 +70,7 @@ func New(_ context.Context, logger zerolog.Logger, specBytes []byte, _ plugin.Ne
return nil, fmt.Errorf("failed to verify server version %w", err)
}

minVer := proto.Version{Major: 22, Minor: 1, Patch: 2}
minVer := proto.Version{Major: 24, Minor: 8, Patch: 1}
if !proto.CheckMinVersion(minVer, ver.Version) {
defer conn.Close()
return nil, fmt.Errorf("server version is %s, minimum version supported is %s", ver.Version, minVer)
Expand Down
32 changes: 19 additions & 13 deletions plugins/destination/clickhouse/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,13 @@ func getTestConnection() string {
}

func TestPlugin(t *testing.T) {
ctx := context.Background()
p := plugin.NewPlugin("clickhouse",
internalPlugin.Version,
New,
plugin.WithJSONSchema(spec.JSONSchema),
)
s := &spec.Spec{ConnectionString: getTestConnection()}
b, err := json.Marshal(s)
require.NoError(t, err)
require.NoError(t, p.Init(ctx, b, plugin.NewClientOptions{}))
p := initPlugin(t)

plugin.TestWriterSuiteRunner(t,
p,
plugin.WriterTestSuiteTests{
SkipUpsert: true,
SkipDeleteStale: true,
SkipDeleteRecord: true,
SkipUpsert: true,
SkipDeleteStale: true,
SafeMigrations: plugin.SafeMigrations{
AddColumn: true,
RemoveColumn: true,
Expand All @@ -67,6 +57,22 @@ func TestPlugin(t *testing.T) {
)
}

func initPlugin(t *testing.T) *plugin.Plugin {
ctx := context.Background()
p := plugin.NewPlugin("clickhouse",
internalPlugin.Version,
New,
plugin.WithJSONSchema(spec.JSONSchema),
)
s := &spec.Spec{
ConnectionString: getTestConnection(),
}
b, err := json.Marshal(s)
require.NoError(t, err)
require.NoError(t, p.Init(ctx, b, plugin.NewClientOptions{}))
return p
}

func TestMigrateCQClientIDColumnWhenSortKeyIsAlreadySet(t *testing.T) {
ctx := context.Background()
p := plugin.NewPlugin("clickhouse",
Expand Down
99 changes: 99 additions & 0 deletions plugins/destination/clickhouse/client/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package client

import (
"context"
"reflect"
"strconv"
"strings"

"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/typeconv/ch/values"
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/util"
"github.com/cloudquery/plugin-sdk/v4/message"
)

func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteRecords) error {
if len(messages) == 0 {
return nil
}

for _, msg := range messages {
sql := generateDelete(msg.DeleteRecord)
params, err := extractPredicateValues(msg.DeleteRecord.WhereClause)
if err != nil {
return err
}

if err = c.conn.Exec(ctx, sql, params...); err != nil {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to be able to use some kind of prepared statement or batching here, but CH does not support this for DELETE statements. Batching is only supported for INSERT queries.

return err
}
}

return nil
}

func extractPredicateValues(where message.PredicateGroups) ([]any, error) {
predicateCount := 0
for _, predicateGroup := range where {
predicateCount += len(predicateGroup.Predicates)
}
results := make([]any, predicateCount)
counter := 0
for _, predicateGroup := range where {
for _, predicate := range predicateGroup.Predicates {
col := predicate.Record.Column(0)
primitiveValues, err := values.FromArray(col)
if err != nil {
return nil, err
}
unpacked := unpackArray(primitiveValues)
results[counter] = unpacked[0]
counter++
}
}
return results, nil
}

func unpackArray(s any) []any {
v := reflect.ValueOf(s)
r := make([]any, v.Len())
for i := range v.Len() {
r[i] = v.Index(i).Interface()
}
return r
}

func generateDelete(msg message.DeleteRecord) string {
var sb strings.Builder

sb.WriteString("DELETE FROM ")
sb.WriteString(util.SanitizeID(msg.TableName))
sb.WriteString(" WHERE ")
if len(msg.WhereClause) == 0 {
sb.WriteString("1")
} else {
counter := 1
for i, predicateGroup := range msg.WhereClause {
if len(predicateGroup.Predicates) == 0 {
continue
}
sb.WriteString("(")
for i, predicate := range predicateGroup.Predicates {
if i > 0 {
sb.WriteString(" ")
sb.WriteString(predicateGroup.GroupingType)
sb.WriteString(" ")
}
sb.WriteString(util.SanitizeID(predicate.Column))
sb.WriteString(" = $")
sb.WriteString(strconv.Itoa(counter))
counter++
}
sb.WriteString(")")
if i < len(msg.WhereClause)-1 {
sb.WriteString(" AND ")
}
}
}

return sb.String()
}
171 changes: 171 additions & 0 deletions plugins/destination/clickhouse/client/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package client

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/client/spec"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

func TestDelete(t *testing.T) {
testCases := []struct {
name string
insertValues []string
deleteValues []string
deleteAll bool
expectedCount int
}{
{
name: "delete single record",
insertValues: []string{"foo", "bar"},
deleteValues: []string{"foo"},
expectedCount: 1,
},
{
name: "delete both records",
insertValues: []string{"foo", "bar"},
deleteValues: []string{"foo", "bar"},
expectedCount: 0,
},
{
name: "delete none",
insertValues: []string{"foo"},
deleteValues: []string{"bar"},
expectedCount: 1,
},
{
name: "delete all records",
insertValues: []string{"foo", "bar"},
deleteAll: true,
expectedCount: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := require.New(t)
ctx := context.Background()
client := withPluginClient(ctx, r)

table := createTestTable()
r.NoError(client.MigrateTables(ctx, message.WriteMigrateTables{{Table: table}}))

writeInserts := createInsertMessages(tc.insertValues, table)
r.NoError(client.WriteTableBatch(ctx, "", writeInserts))

writeDeletes := createDeleteMessages(tc.deleteAll, table, tc.deleteValues)
r.NoError(client.DeleteRecord(ctx, writeDeletes))

count, err := countAllRows(ctx, client, table)
r.NoError(err)
r.EqualValues(tc.expectedCount, count, "unexpected amount of items after delete with match")
})
}
}

func countAllRows(ctx context.Context, client *Client, table *schema.Table) (int64, error) {
var err error
ch := make(chan arrow.Record)
go func() {
defer close(ch)
err = client.Read(ctx, table, ch)
}()
count := int64(0)
for record := range ch {
count += record.NumRows()
}
return count, err
}

func withPluginClient(ctx context.Context, r *require.Assertions) *Client {
s := &spec.Spec{ConnectionString: getTestConnection()}
b, err := json.Marshal(s)
r.NoError(err)
c, err := New(ctx, zerolog.Nop(), b, plugin.NewClientOptions{})
r.NoError(err)
return c.(*Client)
}

func valueToArrowRecord(tableName string, value string) arrow.Record {
bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.BinaryTypes.String},
},
}).ToArrowSchema())
bldrDeleteMatch.Field(0).(*array.StringBuilder).Append(value)
deleteValue := bldrDeleteMatch.NewRecord()
return deleteValue
}

func createDeleteMessages(deleteAll bool, table *schema.Table, deleteValues []string) message.WriteDeleteRecords {
writeDeletes := message.WriteDeleteRecords{}

if deleteAll {
msg := message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
},
}
return append(writeDeletes, &msg)
}
for _, deleteValue := range deleteValues {
msg := message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: table.Name,
WhereClause: message.PredicateGroups{
{
GroupingType: "AND",
Predicates: []message.Predicate{
{
Operator: "eq",
Column: "id",
Record: valueToArrowRecord(table.Name, deleteValue),
},
},
},
},
},
}
writeDeletes = append(writeDeletes, &msg)
}
return writeDeletes
}

func createInsertMessages(values []string, table *schema.Table) message.WriteInserts {
const sourceName = "source-test"
writeInserts := message.WriteInserts{}
for _, insertValue := range values {
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
bldr.Field(0).(*array.StringBuilder).Append(insertValue)
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
bldr.Field(2).(*array.TimestampBuilder).AppendTime(time.Now())
record := bldr.NewRecord()
writeInserts = append(writeInserts, &message.WriteInsert{Record: record})
}
return writeInserts
}

func createTestTable() *schema.Table {
tableName := fmt.Sprintf("cq_delete_test_%d_%04d", time.Now().UnixNano(), rand.Intn(1000))
table := &schema.Table{
Name: tableName,
Columns: schema.ColumnList{
schema.Column{Name: "id", Type: arrow.BinaryTypes.String, PrimaryKey: true, NotNull: true},
schema.CqSourceNameColumn,
schema.CqSyncTimeColumn,
},
}
return table
}
3 changes: 1 addition & 2 deletions plugins/destination/postgresql/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestPgPlugin(t *testing.T) {
plugin.TestWriterSuiteRunner(t,
p,
plugin.WriterTestSuiteTests{
SkipDeleteRecord: true,
SafeMigrations: safeMigrations,
SafeMigrations: safeMigrations,
},
plugin.WithTestDataOptions(testOpts),
)
Expand Down
Loading