-
Notifications
You must be signed in to change notification settings - Fork 547
feat: Implement transformations support. #18669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
erezrokah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great 🚀 Added a few minor comments. Waiting for the migrate related changes and I'll do another pass cloudquery/plugin-pb#24
cli/cmd/sync.go
Outdated
| versions, err := transformer.Versions(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get transformer versions: %w", err) | ||
| } | ||
| if !slices.Contains(versions, 3) { | ||
| return fmt.Errorf("transformer plugin %[1]s does not support CloudQuery protocol version 3, required by the %[2]s source plugin. Please upgrade to a newer version of the %[1]s transformer plugin", transformer.Name(), source.Name) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this can ever happen for transformer plugins, they are always written with protocol version 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed check.
cli/cmd/sync.go
Outdated
| destWarnings := specReader.GetTransformerWarningsByName(source.Name) | ||
| for field, msg := range destWarnings { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| destWarnings := specReader.GetTransformerWarningsByName(source.Name) | |
| for field, msg := range destWarnings { | |
| transformerWarnings := specReader.GetTransformerWarningsByName(source.Name) | |
| for field, msg := range transformerWarnings { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add a similar validation as in
| return fmt.Errorf("source %s references unknown destination %s", source.Name, destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check.
cli/cmd/sync_v3.go
Outdated
| pbClient, err := transformerPbClient.Transform(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| transformClientsByDestination[name] = append(transformClientsByDestination[name], pbClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pbClient, err := transformerPbClient.Transform(ctx) | |
| if err != nil { | |
| return err | |
| } | |
| transformClientsByDestination[name] = append(transformClientsByDestination[name], pbClient) | |
| transformClient, err := transformerPbClient.Transform(ctx) | |
| if err != nil { | |
| return err | |
| } | |
| transformClientsByDestination[name] = append(transformClientsByDestination[name], transformClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
🤖 I have created a release *beep* *boop* --- ## [6.3.0](cli-v6.2.0...cli-v6.3.0) (2024-08-05) ### Features * Implement transformations support. ([#18669](#18669)) ([2af93ed](2af93ed)) * Support table name changes on basic transformer. ([#18833](#18833)) ([67d3701](67d3701)) ### Bug Fixes * **deps:** Update module github.com/cloudquery/cloudquery-api-go to v1.12.6 ([#18828](#18828)) ([3a48c89](3a48c89)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.57.1 ([#18830](#18830)) ([605c202](605c202)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.0 ([#18839](#18839)) ([6b57bca](6b57bca)) * **deps:** Update module github.com/cloudquery/plugin-sdk/v4 to v4.58.1 ([#18852](#18852)) ([4320340](4320340)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Adds support for transformations in cli.
Transformations is a new property of the destination spec, e.g.:
Multiple transformers are supported on a per-destination basis. Thus, each destination now constructs a transformer pipeline:
This transformer pipeline is not some accidental complexity. I'd love it if we could just for-loop and apply transformations sequentially, but we decided to make transformers asynchronous, so a transformation is not some function we can call: it's a goroutine that has
recv&sendsemantics. I've abstracted the pipeline as simply as I could.Note that, even if we'd just allow a single transformer per destination as an MVP, we'd still need a goroutine for it (and thus for the source
Recvas well, which was a blockingfor { ... }up until now). So this is as close to MVP as I can make it.Current limitations
grpcat the moment.Transformerkind toplugin-pb-goin managed plugin yet. There's aTODOin the code whereDestinationis used instead, but I think at the moment it doesn't affect anything.Testing
In order to test this, you'll need a transformer. I've created https://github.com/cloudquery/cloudquery-private/tree/mariano/test-transformations which adds a test "reverser" transformer which you can
go run . serve, and you should be good to test a sync with the yaml from above.My testing
Syncs should continue to work as before if no transformations are added. Number of resources should not change between syncs (in case channel closing logic would lead to losing tails)
Adding a reverser transformer reverses the strings
I've also tried adding two reverses in sequence, which results in a regular sync (which tests that the pipeline system works). Note that currently you can't add the same transformer twice (we use a
map, which elides all but one instance), so I hacked it to try this. We might want to allow it.