-
Notifications
You must be signed in to change notification settings - Fork 547
feat: Implement basic transformer (add/remove/obfuscate columns) #18704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
erezrokah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plugin code looks great. Added a few comments.
For the migrate message bit we probably want it to be a protocol message and not record metadata.
Also where does the plugin sets the cq:extension:message_type I couldn't find it.
Regardless, I think the plugin's code (not the CLI part) can go into the main branch once the comments are resolved
| // Don't move this file to a different package, it's used by Go releaser to embed the version in the binary. | ||
| var ( | ||
| Name = "basic" | ||
| Kind = "destination" // breaks otherwise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update this to transformer supported in https://github.com/cloudquery/plugin-sdk/releases/tag/v4.54.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cli/cmd/sync_v3.go
Outdated
| // This could either be an `Insert` or a `MigrateTable` message. To answer this, | ||
| // we look to see if the schema's metadata has a MetadataMessageTypeKey key | ||
| msgType := "insert" | ||
| if isMigrateTableRecord(record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be part of the protocol not part of the data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change belongs to the parent PR. Let's discuss it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, we changed the protocol after an offline discussion.
| @@ -0,0 +1 @@ | |||
| basic | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call it toolkit? Not sure what's a better name for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not married to basic, but I chose a name that clarified that advanced transformations would never be in scope, because as we know it so far we cannot add Datafusion within Go in a newer version. I'm not strongly opposed to toolkit but MacGyver would laugh in our face 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think basic is okay for now. We can always change it later by releasing it under a different name
| } | ||
|
|
||
| func (s *Spec) Validate() error { | ||
| for _, t := range s.TransformationSpecs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non blocking, it would be nice to collect all errors and report them instead of only the first one each time.
You can use errors.Join for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used errors.Join now.
| func NewFromSpec(sp spec.TransformationSpec) (*Transformer, error) { | ||
| tr := &Transformer{matcher: tablematcher.New(sp.Tables)} | ||
|
|
||
| fns := map[string]TransformationFn{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would a switch/case be more suitable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used switch.
| mem := memory.NewGoAllocator() | ||
| bld := array.NewStringBuilder(mem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| mem := memory.NewGoAllocator() | |
| bld := array.NewStringBuilder(mem) | |
| bld := array.NewStringBuilder(memory.DefaultAllocator) |
And everywhere else should give the same result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL. Updated everywhere.
| return bld.NewStringArray() | ||
| } | ||
|
|
||
| func (*RecordUpdater) _obfuscateColumn(column arrow.Array) arrow.Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for _obfuscateColumn and not obfuscateColumn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally RecordUpdater had ObfuscateColumn rather than ObfuscateColumns. Name clash. But I took out the _ now.
|
This can be merged directly to |
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Co-authored-by: Kemal <223029+disq@users.noreply.github.com>
erezrokah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good but I think that in order to package/publish it we only need a docs directory with an overview.md file.
See example in https://github.com/cloudquery/cloudquery/tree/main/plugins/source/xkcd/docs
|
@erezrokah yep added a sample docs and the package command works. I'm just having trouble with publishing to local; some auth problem. I can't debug the failing SQL query because the SQL uses some variables that I guess only exist in the server's connection. |
| path: "localhost:7777" # TODO change this when it's published | ||
| registry: "grpc" # TODO change this when it's published |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| path: "localhost:7777" # TODO change this when it's published | |
| registry: "grpc" # TODO change this when it's published | |
| path: cloudquery/basic | |
| version: VERSION_TRANSFORMER_BASIC |
This is our convention, the VERSION_TRANSFORMER_BASIC will be replaced during published with the version being published so the docs always have the latest version.
erezrokah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a convention to create _configuration.md file that can be embedded in other documents.
See https://github.com/cloudquery/cloudquery/blob/main/plugins/source/xkcd/docs/_configuration.md
Also
| :configuration |
The Cloud also uses this _configuration.md file if exists for pages like https://hub.cloudquery.io/export-data/airtable/spangenberg_arrowflight#source-configuration
hermanschaaf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! 🚀🚀🚀
| } | ||
|
|
||
| oldRecord := r.record.Columns() | ||
| newColumns := make([]arrow.Array, 0, len(oldRecord)-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't matter too much, but should the cap here be len(oldRecord)-len(colIndices) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| return r.record, nil | ||
| } | ||
|
|
||
| func (r *RecordUpdater) ObfuscateColumns(columnNames []string) (arrow.Record, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great for now, but I can see how in the future we may also need to handle fields inside JSON columns, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think before that we might need to obfuscate non-string fields too. And we can implement anything we want; the question is where is the line where bespoke transformations end and Datafusion-based transformations start.
|
Just realized this PR is missing the steps from https://github.com/cloudquery/cloudquery/blob/cf45e5a2d6225fccce9248919f1c280e5db286b6/contributing/adding_a_new_plugin_to_cq_monorepo.md |
|
Steps added |
🤖 I have created a release *beep* *boop* --- ## 1.0.0 (2024-08-05) ### Features * Implement basic transformer (add/remove/obfuscate columns) ([#18704](#18704)) ([704102f](704102f)) * Support table name changes on basic transformer. ([#18833](#18833)) ([67d3701](67d3701)) ### Bug Fixes * **deps:** Update module github.com/cloudquery/cloudquery-api-go to v1.12.0 ([#18448](#18448)) ([a5850e1](a5850e1)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v4 ([#18822](#18822)) ([bfd8765](bfd8765)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v4 ([#18842](#18842)) ([39beee4](39beee4)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.0 ([#18821](#18821)) ([ef364a2](ef364a2)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.1 ([#18830](#18830)) ([605c202](605c202)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.0 ([#18839](#18839)) ([6b57bca](6b57bca)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.1 ([#18852](#18852)) ([4320340](4320340)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This adds a
plugins/transformer/basicfolder with a basic transformer plugin that enables the following 3 transformations:Temporary limitation:- I discovered yesterday that schema changes don't work due to the previous PR (which enables transformation support in sync_v3) not transforming theMigrateTablemessage. I'll fix that today. Obfuscation works, though.Because this is early days and we might end up reusing behaviour on different transformers (which means we might want to lift some of this to the sdk), I separated concerns into different packages and tested them separately.
The design looks like this:
And then for each transformer within this plugin:
Given this spec:
It runs successfully in normal elapsed time:
✓ Code/test-xkcd-postgres-sync $ cli sync cloudquery-config Loading spec(s) from cloudquery-config Starting sync for: xkcd (cloudquery/xkcd@v1.0.6) -> [postgresql (cloudquery/postgresql@v8.0.7)] Sync completed successfully. Resources: 2962, Errors: 0, Warnings: 0, Time: 15sAnd obfuscates
title&safe_titlecolumns:UPDATE: all functionality works; here's an example spec:
And the result (same elapsed time):