feat: Add CQ type system to support multiple destinations#320
feat: Add CQ type system to support multiple destinations#320yevgenypats merged 18 commits intomainfrom
Conversation
erezrokah
left a comment
There was a problem hiding this comment.
Added a few comments and will continue to review tomorrow 📖
| eg := &errgroup.Group{} | ||
| // given most destination plugins writing in batch we are using a worker pool to write in parallel | ||
| // it might not generalize well and we might need to move it to each destination plugin implementation. | ||
| for i := 0; i < writeWorkers; i++ { |
There was a problem hiding this comment.
Since writeWorkers is always 1 maybe we can remove this for now?
There was a problem hiding this comment.
yeah we can. I was playing with that, wasn't sure what it should be and how it generalize to most plugins.
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com>
cqtypes/inet.go
Outdated
| // Network address family is dependent on server socket.h value for AF_INET. | ||
| // In practice, all platforms appear to have the same value. See | ||
| // src/include/utils/inet.h for more information. | ||
| const ( | ||
| // defaultAFInet = 2 | ||
| // defaultAFInet6 = 3 | ||
| ) |
There was a problem hiding this comment.
Looks like this constant block is now unused 🤔
There was a problem hiding this comment.
yes. removed. it is used in pgx to encode to postgresql binary format but we don't need it as we just need the decode part and not from postgresql
internal/servers/destinations.go
Outdated
|
|
||
| eg := errgroup.Group{} | ||
| eg.Go(func() error { | ||
| return s.Plugin.Write(context.Background(), tables, sourceName, syncTime, resources) |
There was a problem hiding this comment.
Just trying to understand: why is this write now using context.Background() instead of msg.Context() ?
There was a problem hiding this comment.
Good catch. I think just got confused in the context maze
| panic(columnName + " column not found") | ||
| } | ||
| if err := r.data[index].Set(value); err != nil { | ||
| panic(fmt.Errorf("failed to set column %s: %w", columnName, err)) |
There was a problem hiding this comment.
Should this be a panic? It seems like if we set a column and it fails, we should return an error but otherwise leave it empty.
There was a problem hiding this comment.
Yeah it should panic because it mean what the user provided is of incorrect value and defined the wrong column. We don't have a way of knowing about this (in sentry and in logs) other then panic as all other errors are classified as error. But I do think it's a panic in any case.
Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com>
Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com>
🤖 I have created a release *beep* *boop* --- ## [0.13.16](v0.13.15...v0.13.16) (2022-10-31) ### Features * Add CQ type system to support multiple destinations ([#320](#320)) ([d3b24a0](d3b24a0)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This is instead of #3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: #3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
This is instead of #3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: #3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
This is instead of cloudquery/cloudquery#3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: cloudquery/cloudquery#3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
This add a type system to CloudQuery SDK.
This is mandatory to support multiple destinations.
Also, this found quite a few bugs where we were sending random stuff over the wire without any validation apart from maybe when we were hitting the database and then failing batches all together.
In CloudQuery type system I used heavily https://github.com/jackc/pgtype and kept the license and the copyright in it's own package
cqtype.This is a continue of this PR #298 where I split it into this one and #318