feat: Add Metrics and improve scheduler with DFS#318
Conversation
erezrokah
left a comment
There was a problem hiding this comment.
This looks good and nothing blocking.
Added a few comments but I don't think any are blocking
| EndTime time.Time | ||
| } | ||
|
|
||
| func (s *TableClientMetrics) Equal(other *TableClientMetrics) bool { |
There was a problem hiding this comment.
Is there a reason not to use https://pkg.go.dev/gotest.tools/assert#DeepEqual in tests? Or https://github.com/google/go-cmp?
If we need the Equal signature we can wrap a call to go-cmp
Same for the other Equal.
If the reason is to skip StartTime and EndTime looks like we can do it via google/go-cmp#143 (comment)
There was a problem hiding this comment.
DeepEqual is super slow and can have unpredictable results depending on what you are trying to achieve so we want to define equal for every type. DeepEqual is usually used for testing purposes when you don't control or have access to some external struct.
There was a problem hiding this comment.
My concern is that someone adds a field to the struct and then forgets to add it to the Equal.
i.e. We'll have to maintain this implementation. Not sure the performance hit is something that can really slow us down.
Where is the Equal function used?
There was a problem hiding this comment.
we use it in tests but what it the issue of maintaining this and using a library that is used to compare structs which are not in control of the author?
If you look at SourceMetrics the Equal function is not that simple comparing the map but it ensure it will work and I can also add tests to that to ensure we don't forget to update that.
There was a problem hiding this comment.
If it's only used in the tests I would say we don't need it as a part of the struct and just do the equality in the test (via library or a helper function).
Again this is not blocking for the PR, just seems there's already existing code we can use to compare structs in tests, so we don't need to re-implement it
There was a problem hiding this comment.
+1 to what @erezrokah said; not a blocker, but I'd vote for go-cmp in tests rather than maintain equality operators if we don't need them in our (non-test) code.
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
🤖 I have created a release *beep* *boop* --- ## [0.13.15](v0.13.14...v0.13.15) (2022-10-30) ### Features * Add Metrics and improve scheduler with DFS ([#318](#318)) ([2d7a83b](2d7a83b)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Should be merged after this SDK PR cloudquery/plugin-sdk#318
hermanschaaf
left a comment
There was a problem hiding this comment.
I read through, and it all looks good to me; the concurrency also looks good, though I'd like to tweak it a bit further once we have benchmarks in place in the coming weeks 👍
| defer wg.Done() | ||
| defer p.tableSem.Release(1) |
There was a problem hiding this comment.
nit: I think the ordering of the defers need to be swapped here to match the order of the Acquire and wg.Add operations (since defers get executed in reverse order)
| p.metrics.initWithClients(table, clients) | ||
| for _, client := range clients { | ||
| client := client | ||
| if err := p.tableSem.Acquire(ctx, 1); err != nil { |
There was a problem hiding this comment.
Just a passing comment, but since the semaphore is acquired inside the clients loop, technically the tableSem is more like a tableClientSem - so if you have multiple accounts and set table concurrency to 1, only one account will be resolved at a time. Not necessarily a bad thing, but maybe just worth keeping in mind / documenting.
This add a type system to CloudQuery SDK. This is mandatory to support multiple destinations. Also, this found quite a few bugs where we were sending random stuff over the wire without any validation apart from maybe when we were hitting the database and then failing batches all together. In CloudQuery type system I used heavily https://github.com/jackc/pgtype and kept the license and the copyright in it's own package `cqtype`. This is a continue of this PR #298 where I split it into this one and #318 Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com> Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com>
This is instead of #3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: #3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
This is instead of #3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: #3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Should be merged after this SDK PR cloudquery/plugin-sdk#318
This is instead of cloudquery/cloudquery#3176 SDK PRs: cloudquery/plugin-sdk#318 cloudquery/plugin-sdk#320 Previous related CloudQuery PRs: cloudquery/cloudquery#3286 Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
Trying to split this PR: #298 into smaller bits.
Metrics:
Scheduler:
schematopluginsso I added that in the same PR.Currently the user will specify only one variable
concurrencyand the scheduler will decide on how to split it between levels. For simplicity I kept it the same way as before with concurrency for only the first level.Concurrent DFS will make sure there are no deadlocks and memory is always kept at O(goroutines) and o(h) (where h is height).