-
Notifications
You must be signed in to change notification settings - Fork 547
feat!: Add DeleteRecord handling to Clickhouse destination #20772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
22e9b38
wip
maaarcelino f3795bb
enable DeleteRecord tests for Postgres
maaarcelino 98fdfbe
add logic and tests for deleting
maaarcelino 171e4bd
rename test
maaarcelino b5d4cc2
Merge remote-tracking branch 'origin/main' into feat/clickhouse-delet…
maaarcelino 8125693
revert batch_size
maaarcelino 5182861
fix tests
maaarcelino 4c5b009
lint
maaarcelino 9f19b15
refactor
maaarcelino 58ea994
bump CH version in CI
maaarcelino 8a38d61
bump sdk
maaarcelino 3394ef8
fix test
maaarcelino 21971c0
Merge remote-tracking branch 'origin/main' into feat/clickhouse-delet…
maaarcelino a80c000
bump CH to 24.8.1
maaarcelino 57c375b
Merge remote-tracking branch 'origin/main' into feat/clickhouse-delet…
maaarcelino d5d6928
remove test
maaarcelino File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "context" | ||
| "reflect" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/typeconv/ch/values" | ||
| "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/util" | ||
| "github.com/cloudquery/plugin-sdk/v4/message" | ||
| ) | ||
|
|
||
| func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteRecords) error { | ||
| if len(messages) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| for _, msg := range messages { | ||
| sql := generateDelete(msg.DeleteRecord) | ||
| params, err := extractPredicateValues(msg.DeleteRecord.WhereClause) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err = c.conn.Exec(ctx, sql, params...); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func extractPredicateValues(where message.PredicateGroups) ([]any, error) { | ||
| predicateCount := 0 | ||
| for _, predicateGroup := range where { | ||
| predicateCount += len(predicateGroup.Predicates) | ||
| } | ||
| results := make([]any, predicateCount) | ||
| counter := 0 | ||
| for _, predicateGroup := range where { | ||
| for _, predicate := range predicateGroup.Predicates { | ||
| col := predicate.Record.Column(0) | ||
| primitiveValues, err := values.FromArray(col) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| unpacked := unpackArray(primitiveValues) | ||
| results[counter] = unpacked[0] | ||
| counter++ | ||
| } | ||
| } | ||
| return results, nil | ||
| } | ||
|
|
||
| func unpackArray(s any) []any { | ||
| v := reflect.ValueOf(s) | ||
| r := make([]any, v.Len()) | ||
| for i := range v.Len() { | ||
| r[i] = v.Index(i).Interface() | ||
| } | ||
| return r | ||
| } | ||
|
|
||
| func generateDelete(msg message.DeleteRecord) string { | ||
| var sb strings.Builder | ||
|
|
||
| sb.WriteString("DELETE FROM ") | ||
maaarcelino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sb.WriteString(util.SanitizeID(msg.TableName)) | ||
| sb.WriteString(" WHERE ") | ||
| if len(msg.WhereClause) == 0 { | ||
| sb.WriteString("1") | ||
| } else { | ||
| counter := 1 | ||
| for i, predicateGroup := range msg.WhereClause { | ||
| if len(predicateGroup.Predicates) == 0 { | ||
| continue | ||
| } | ||
| sb.WriteString("(") | ||
| for i, predicate := range predicateGroup.Predicates { | ||
| if i > 0 { | ||
| sb.WriteString(" ") | ||
| sb.WriteString(predicateGroup.GroupingType) | ||
| sb.WriteString(" ") | ||
| } | ||
| sb.WriteString(util.SanitizeID(predicate.Column)) | ||
| sb.WriteString(" = $") | ||
| sb.WriteString(strconv.Itoa(counter)) | ||
| counter++ | ||
| } | ||
| sb.WriteString(")") | ||
| if i < len(msg.WhereClause)-1 { | ||
| sb.WriteString(" AND ") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return sb.String() | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "math/rand" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/apache/arrow-go/v18/arrow" | ||
| "github.com/apache/arrow-go/v18/arrow/array" | ||
| "github.com/apache/arrow-go/v18/arrow/memory" | ||
| "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v6/client/spec" | ||
| "github.com/cloudquery/plugin-sdk/v4/message" | ||
| "github.com/cloudquery/plugin-sdk/v4/plugin" | ||
| "github.com/cloudquery/plugin-sdk/v4/schema" | ||
| "github.com/rs/zerolog" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestDelete(t *testing.T) { | ||
| testCases := []struct { | ||
| name string | ||
| insertValues []string | ||
| deleteValues []string | ||
| deleteAll bool | ||
| expectedCount int | ||
| }{ | ||
| { | ||
| name: "delete single record", | ||
| insertValues: []string{"foo", "bar"}, | ||
| deleteValues: []string{"foo"}, | ||
| expectedCount: 1, | ||
| }, | ||
| { | ||
| name: "delete both records", | ||
| insertValues: []string{"foo", "bar"}, | ||
| deleteValues: []string{"foo", "bar"}, | ||
| expectedCount: 0, | ||
| }, | ||
| { | ||
| name: "delete none", | ||
| insertValues: []string{"foo"}, | ||
| deleteValues: []string{"bar"}, | ||
| expectedCount: 1, | ||
| }, | ||
| { | ||
| name: "delete all records", | ||
| insertValues: []string{"foo", "bar"}, | ||
| deleteAll: true, | ||
| expectedCount: 0, | ||
| }, | ||
| } | ||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| r := require.New(t) | ||
| ctx := context.Background() | ||
| client := withPluginClient(ctx, r) | ||
|
|
||
| table := createTestTable() | ||
| r.NoError(client.MigrateTables(ctx, message.WriteMigrateTables{{Table: table}})) | ||
|
|
||
| writeInserts := createInsertMessages(tc.insertValues, table) | ||
| r.NoError(client.WriteTableBatch(ctx, "", writeInserts)) | ||
|
|
||
| writeDeletes := createDeleteMessages(tc.deleteAll, table, tc.deleteValues) | ||
| r.NoError(client.DeleteRecord(ctx, writeDeletes)) | ||
|
|
||
| count, err := countAllRows(ctx, client, table) | ||
| r.NoError(err) | ||
| r.EqualValues(tc.expectedCount, count, "unexpected amount of items after delete with match") | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func countAllRows(ctx context.Context, client *Client, table *schema.Table) (int64, error) { | ||
| var err error | ||
| ch := make(chan arrow.Record) | ||
| go func() { | ||
| defer close(ch) | ||
| err = client.Read(ctx, table, ch) | ||
| }() | ||
| count := int64(0) | ||
| for record := range ch { | ||
| count += record.NumRows() | ||
| } | ||
| return count, err | ||
| } | ||
|
|
||
| func withPluginClient(ctx context.Context, r *require.Assertions) *Client { | ||
| s := &spec.Spec{ConnectionString: getTestConnection()} | ||
| b, err := json.Marshal(s) | ||
| r.NoError(err) | ||
| c, err := New(ctx, zerolog.Nop(), b, plugin.NewClientOptions{}) | ||
| r.NoError(err) | ||
| return c.(*Client) | ||
| } | ||
|
|
||
| func valueToArrowRecord(tableName string, value string) arrow.Record { | ||
| bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ | ||
| Name: tableName, | ||
| Columns: schema.ColumnList{ | ||
| schema.Column{Name: "id", Type: arrow.BinaryTypes.String}, | ||
| }, | ||
| }).ToArrowSchema()) | ||
| bldrDeleteMatch.Field(0).(*array.StringBuilder).Append(value) | ||
| deleteValue := bldrDeleteMatch.NewRecord() | ||
| return deleteValue | ||
| } | ||
|
|
||
| func createDeleteMessages(deleteAll bool, table *schema.Table, deleteValues []string) message.WriteDeleteRecords { | ||
| writeDeletes := message.WriteDeleteRecords{} | ||
|
|
||
| if deleteAll { | ||
| msg := message.WriteDeleteRecord{ | ||
| DeleteRecord: message.DeleteRecord{ | ||
| TableName: table.Name, | ||
| }, | ||
| } | ||
| return append(writeDeletes, &msg) | ||
| } | ||
| for _, deleteValue := range deleteValues { | ||
| msg := message.WriteDeleteRecord{ | ||
| DeleteRecord: message.DeleteRecord{ | ||
| TableName: table.Name, | ||
| WhereClause: message.PredicateGroups{ | ||
| { | ||
| GroupingType: "AND", | ||
| Predicates: []message.Predicate{ | ||
| { | ||
| Operator: "eq", | ||
| Column: "id", | ||
| Record: valueToArrowRecord(table.Name, deleteValue), | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
| writeDeletes = append(writeDeletes, &msg) | ||
| } | ||
| return writeDeletes | ||
| } | ||
|
|
||
| func createInsertMessages(values []string, table *schema.Table) message.WriteInserts { | ||
| const sourceName = "source-test" | ||
| writeInserts := message.WriteInserts{} | ||
| for _, insertValue := range values { | ||
| bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) | ||
| bldr.Field(0).(*array.StringBuilder).Append(insertValue) | ||
| bldr.Field(1).(*array.StringBuilder).Append(sourceName) | ||
| bldr.Field(2).(*array.TimestampBuilder).AppendTime(time.Now()) | ||
| record := bldr.NewRecord() | ||
| writeInserts = append(writeInserts, &message.WriteInsert{Record: record}) | ||
| } | ||
| return writeInserts | ||
| } | ||
|
|
||
| func createTestTable() *schema.Table { | ||
| tableName := fmt.Sprintf("cq_delete_test_%d_%04d", time.Now().UnixNano(), rand.Intn(1000)) | ||
| table := &schema.Table{ | ||
| Name: tableName, | ||
| Columns: schema.ColumnList{ | ||
| schema.Column{Name: "id", Type: arrow.BinaryTypes.String, PrimaryKey: true, NotNull: true}, | ||
| schema.CqSourceNameColumn, | ||
| schema.CqSyncTimeColumn, | ||
| }, | ||
| } | ||
| return table | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to be able to use some kind of prepared statement or batching here, but CH does not support this for
DELETEstatements. Batching is only supported forINSERTqueries.