Skip to content

Conversation

@marianogappa
Copy link
Contributor

Adds support for transformations in cli.

Transformations is a new property of the destination spec, e.g.:

---
kind: source
spec:
  name: "xkcd"
  path: "cloudquery/xkcd"
  version: "v1.0.6"
  tables: ['*']
  destinations:
    - "postgresql"
  spec:
---
kind: transformer # new transformer kind
spec:
  name: "reverser"
  path: "localhost:7777"
  registry: "grpc"
  spec:
---
kind: destination
spec:
  name: "postgresql"
  path: "cloudquery/postgresql"
  registry: "cloudquery"
  version: "v8.0.7"
  write_mode: "overwrite-delete-stale"
  migrate_mode: forced
  transformers: # new transformer field
    - "reverser" # example transformer

  spec:
    connection_string: "postgresql://user:@localhost:5432/xkcd?sslmode=disable"

Multiple transformers are supported on a per-destination basis. Thus, each destination now constructs a transformer pipeline:

Screenshot 2024-07-23 at 12 10 04

This transformer pipeline is not some accidental complexity. I'd love it if we could just for-loop and apply transformations sequentially, but we decided to make transformers asynchronous, so a transformation is not some function we can call: it's a goroutine that has recv & send semantics. I've abstracted the pipeline as simply as I could.

Note that, even if we'd just allow a single transformer per destination as an MVP, we'd still need a goroutine for it (and thus for the source Recv as well, which was a blocking for { ... } up until now). So this is as close to MVP as I can make it.

Current limitations

  • Since the managed plugin system doesn't know about transformations yet, you can only use grpc at the moment.
  • There's no concept of per-table transformers at the moment, so if a transformer is added to a destination, it'd sniff through every record processed by that destination. Rationale is to minimise MVP.
  • I haven't added the Transformer kind to plugin-pb-go in managed plugin yet. There's a TODO in the code where Destination is used instead, but I think at the moment it doesn't affect anything.

Testing
In order to test this, you'll need a transformer. I've created https://github.com/cloudquery/cloudquery-private/tree/mariano/test-transformations which adds a test "reverser" transformer which you can go run . serve, and you should be good to test a sync with the yaml from above.

My testing

Syncs should continue to work as before if no transformations are added. Number of resources should not change between syncs (in case channel closing logic would lead to losing tails)

Screenshot 2024-07-23 at 12 30 59

Screenshot 2024-07-23 at 12 31 56

Screenshot 2024-07-23 at 12 30 38

Adding a reverser transformer reverses the strings

Screenshot 2024-07-23 at 12 39 29

Screenshot 2024-07-23 at 12 37 26

I've also tried adding two reverses in sequence, which results in a regular sync (which tests that the pipeline system works). Note that currently you can't add the same transformer twice (we use a map, which elides all but one instance), so I hacked it to try this. We might want to allow it.

Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great 🚀 Added a few minor comments. Waiting for the migrate related changes and I'll do another pass cloudquery/plugin-pb#24

cli/cmd/sync.go Outdated
Comment on lines 321 to 327
versions, err := transformer.Versions(ctx)
if err != nil {
return fmt.Errorf("failed to get transformer versions: %w", err)
}
if !slices.Contains(versions, 3) {
return fmt.Errorf("transformer plugin %[1]s does not support CloudQuery protocol version 3, required by the %[2]s source plugin. Please upgrade to a newer version of the %[1]s transformer plugin", transformer.Name(), source.Name)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can ever happen for transformer plugins, they are always written with protocol version 3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed check.

cli/cmd/sync.go Outdated
Comment on lines 328 to 329
destWarnings := specReader.GetTransformerWarningsByName(source.Name)
for field, msg := range destWarnings {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
destWarnings := specReader.GetTransformerWarningsByName(source.Name)
for field, msg := range destWarnings {
transformerWarnings := specReader.GetTransformerWarningsByName(source.Name)
for field, msg := range transformerWarnings {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a similar validation as in

return fmt.Errorf("source %s references unknown destination %s", source.Name, destination)
to check if someone references a transformer that doesn't exit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check.

Comment on lines 237 to 241
pbClient, err := transformerPbClient.Transform(ctx)
if err != nil {
return err
}
transformClientsByDestination[name] = append(transformClientsByDestination[name], pbClient)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pbClient, err := transformerPbClient.Transform(ctx)
if err != nil {
return err
}
transformClientsByDestination[name] = append(transformClientsByDestination[name], pbClient)
transformClient, err := transformerPbClient.Transform(ctx)
if err != nil {
return err
}
transformClientsByDestination[name] = append(transformClientsByDestination[name], transformClient)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@marianogappa marianogappa requested a review from erezrokah July 31, 2024 12:31
@marianogappa marianogappa marked this pull request as ready for review August 1, 2024 14:48
@marianogappa marianogappa requested a review from a team August 1, 2024 14:48
@marianogappa marianogappa added the automerge Automatically merge once required checks pass label Aug 1, 2024
@kodiakhq kodiakhq bot merged commit 2af93ed into main Aug 1, 2024
@kodiakhq kodiakhq bot deleted the mariano/transformations-support branch August 1, 2024 15:37
kodiakhq bot pushed a commit that referenced this pull request Aug 5, 2024
🤖 I have created a release *beep* *boop*
---


## [6.3.0](cli-v6.2.0...cli-v6.3.0) (2024-08-05)


### Features

* Implement transformations support. ([#18669](#18669)) ([2af93ed](2af93ed))
* Support table name changes on basic transformer. ([#18833](#18833)) ([67d3701](67d3701))


### Bug Fixes

* **deps:** Update module github.com/cloudquery/cloudquery-api-go to v1.12.6 ([#18828](#18828)) ([3a48c89](3a48c89))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.1 ([#18830](#18830)) ([605c202](605c202))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.0 ([#18839](#18839)) ([6b57bca](6b57bca))
* **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.1 ([#18852](#18852)) ([4320340](4320340))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/cli automerge Automatically merge once required checks pass

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants