Skip to content

Conversation

@marianogappa
Copy link
Contributor

@marianogappa marianogappa commented Jul 26, 2024

This adds a plugins/transformer/basic folder with a basic transformer plugin that enables the following 3 transformations:

  • adding a literal string column
  • removing columns
  • obfuscating string columns

Temporary limitation:
- I discovered yesterday that schema changes don't work due to the previous PR (which enables transformation support in sync_v3) not transforming the MigrateTable message. I'll fix that today. Obfuscation works, though.

Because this is early days and we might end up reusing behaviour on different transformers (which means we might want to lift some of this to the sdk), I separated concerns into different packages and tested them separately.

The design looks like this:

Screenshot 2024-07-26 at 11 18 11

And then for each transformer within this plugin:

Screenshot 2024-07-26 at 11 18 19

Given this spec:

---
kind: source
spec:
  name: "xkcd"
  path: "cloudquery/xkcd"
  version: "v1.0.6"
  tables: ['*']
  destinations:
    - "postgresql"
  spec:
---
kind: transformer # new transformer kind
spec:
  name: "basic"
  path: "localhost:7777"
  registry: "grpc"
  spec:
    transformations:
      - kind: obfuscate_columns
        tables: ["*"]
        columns: ["safe_title", "title"]
---
kind: destination
spec:
  name: "postgresql"
  path: "cloudquery/postgresql"
  registry: "cloudquery"
  version: "v8.0.7"
  write_mode: "overwrite-delete-stale"
  migrate_mode: forced
  transformers: # new transformer field
    - "basic" # example transformer

  spec:
    connection_string: "postgresql://mariano.gappa:@localhost:5432/xkcd?sslmode=disable"

It runs successfully in normal elapsed time:

✓ Code/test-xkcd-postgres-sync  $ cli sync cloudquery-config                                                                
Loading spec(s) from cloudquery-config
Starting sync for: xkcd (cloudquery/xkcd@v1.0.6) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 2962, Errors: 0, Warnings: 0, Time: 15s

And obfuscates title & safe_title columns:

Screenshot 2024-07-26 at 11 27 00

UPDATE: all functionality works; here's an example spec:

---
kind: transformer # new transformer kind
spec:
  name: "basic"
  path: "localhost:7777"
  registry: "grpc"
  spec:
    transformations:
      - kind: obfuscate_columns
        tables: ["*"]
        columns: ["safe_title", "title"]
      - kind: remove_columns
        tables: ["*"]
        columns: ["transcript", "news"]
      - kind: add_column
        tables: ["*"]
        name: "source"
        value: "xkcd"

And the result (same elapsed time):

Screenshot 2024-07-26 at 15 49 53

Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin code looks great. Added a few comments.
For the migrate message bit we probably want it to be a protocol message and not record metadata.

Also where does the plugin sets the cq:extension:message_type I couldn't find it.

Regardless, I think the plugin's code (not the CLI part) can go into the main branch once the comments are resolved

// Don't move this file to a different package, it's used by Go releaser to embed the version in the binary.
var (
Name = "basic"
Kind = "destination" // breaks otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this to transformer supported in https://github.com/cloudquery/plugin-sdk/releases/tag/v4.54.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// This could either be an `Insert` or a `MigrateTable` message. To answer this,
// we look to see if the schema's metadata has a MetadataMessageTypeKey key
msgType := "insert"
if isMigrateTableRecord(record) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be part of the protocol not part of the data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change belongs to the parent PR. Let's discuss it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, we changed the protocol after an offline discussion.

@@ -0,0 +1 @@
basic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it toolkit? Not sure what's a better name for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not married to basic, but I chose a name that clarified that advanced transformations would never be in scope, because as we know it so far we cannot add Datafusion within Go in a newer version. I'm not strongly opposed to toolkit but MacGyver would laugh in our face 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think basic is okay for now. We can always change it later by releasing it under a different name

}

func (s *Spec) Validate() error {
for _, t := range s.TransformationSpecs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non blocking, it would be nice to collect all errors and report them instead of only the first one each time.
You can use errors.Join for that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used errors.Join now.

func NewFromSpec(sp spec.TransformationSpec) (*Transformer, error) {
tr := &Transformer{matcher: tablematcher.New(sp.Tables)}

fns := map[string]TransformationFn{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a switch/case be more suitable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used switch.

Comment on lines 121 to 122
mem := memory.NewGoAllocator()
bld := array.NewStringBuilder(mem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mem := memory.NewGoAllocator()
bld := array.NewStringBuilder(mem)
bld := array.NewStringBuilder(memory.DefaultAllocator)

And everywhere else should give the same result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL. Updated everywhere.

return bld.NewStringArray()
}

func (*RecordUpdater) _obfuscateColumn(column arrow.Array) arrow.Array {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for _obfuscateColumn and not obfuscateColumn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally RecordUpdater had ObfuscateColumn rather than ObfuscateColumns. Name clash. But I took out the _ now.

@erezrokah erezrokah marked this pull request as ready for review July 30, 2024 12:00
@erezrokah erezrokah requested review from a team and savme and removed request for a team July 30, 2024 12:00
@erezrokah
Copy link
Member

This can be merged directly to main I believe

Co-authored-by: Kemal <223029+disq@users.noreply.github.com>
@marianogappa marianogappa requested a review from erezrokah July 31, 2024 12:31
Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good but I think that in order to package/publish it we only need a docs directory with an overview.md file.

See example in https://github.com/cloudquery/cloudquery/tree/main/plugins/source/xkcd/docs

@marianogappa
Copy link
Contributor Author

@erezrokah yep added a sample docs and the package command works. I'm just having trouble with publishing to local; some auth problem. I can't debug the failing SQL query because the SQL uses some variables that I guess only exist in the server's connection.

Comment on lines 36 to 37
path: "localhost:7777" # TODO change this when it's published
registry: "grpc" # TODO change this when it's published
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
path: "localhost:7777" # TODO change this when it's published
registry: "grpc" # TODO change this when it's published
path: cloudquery/basic
version: VERSION_TRANSFORMER_BASIC

This is our convention, the VERSION_TRANSFORMER_BASIC will be replaced during published with the version being published so the docs always have the latest version.

Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a convention to create _configuration.md file that can be embedded in other documents.

See https://github.com/cloudquery/cloudquery/blob/main/plugins/source/xkcd/docs/_configuration.md

Also

The Cloud also uses this _configuration.md file if exists for pages like https://hub.cloudquery.io/export-data/airtable/spangenberg_arrowflight#source-configuration

Copy link
Member

@hermanschaaf hermanschaaf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! 🚀🚀🚀

}

oldRecord := r.record.Columns()
newColumns := make([]arrow.Array, 0, len(oldRecord)-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't matter too much, but should the cap here be len(oldRecord)-len(colIndices) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return r.record, nil
}

func (r *RecordUpdater) ObfuscateColumns(columnNames []string) (arrow.Record, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great for now, but I can see how in the future we may also need to handle fields inside JSON columns, for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before that we might need to obfuscate non-string fields too. And we can implement anything we want; the question is where is the line where bespoke transformations end and Datafusion-based transformations start.

@erezrokah
Copy link
Member

@marianogappa marianogappa requested a review from a team August 1, 2024 10:18
@cq-bot cq-bot added the area/ci label Aug 1, 2024
@marianogappa
Copy link
Contributor Author

Steps added

Base automatically changed from mariano/transformations-support to main August 1, 2024 15:37
@cq-bot cq-bot added the area/cli label Aug 1, 2024
@cq-bot cq-bot removed the area/cli label Aug 1, 2024
@marianogappa marianogappa added area/new-transformer-plugin automerge Automatically merge once required checks pass and removed needs-area labels Aug 1, 2024
@kodiakhq kodiakhq bot merged commit 704102f into main Aug 1, 2024
@kodiakhq kodiakhq bot deleted the mariano/transformations-support-transformer branch August 1, 2024 15:51
kodiakhq bot pushed a commit that referenced this pull request Aug 5, 2024
🤖 I have created a release *beep* *boop*
---


## 1.0.0 (2024-08-05)


### Features

* Implement basic transformer (add/remove/obfuscate columns) ([#18704](#18704)) ([704102f](704102f))
* Support table name changes on basic transformer. ([#18833](#18833)) ([67d3701](67d3701))


### Bug Fixes

* **deps:** Update module github.com/cloudquery/cloudquery-api-go to v1.12.0 ([#18448](#18448)) ([a5850e1](a5850e1))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v4 ([#18822](#18822)) ([bfd8765](bfd8765))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v4 ([#18842](#18842)) ([39beee4](39beee4))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.0 ([#18821](#18821)) ([ef364a2](ef364a2))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.1 ([#18830](#18830)) ([605c202](605c202))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.0 ([#18839](#18839)) ([6b57bca](6b57bca))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.1 ([#18852](#18852)) ([4320340](4320340))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ci area/new-transformer-plugin automerge Automatically merge once required checks pass

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants