feat: Implement RandomQueue scheduler strategy#1914
Conversation
erezrokah
left a comment
There was a problem hiding this comment.
Code looks great, I'm going to test it a bit with a few plugins, also with OTEL enabled
| // | ||
| // - If the queue is empty, check `IsIdle()` to check if no workers are active. | ||
| // - If workers are still active, call `Wait()` to block until state changes. | ||
| type activeWorkSignal struct { |
There was a problem hiding this comment.
Nice 🚀 Much better than what I did before 🙈
scheduler/strategy.go
Outdated
| StrategyDFS: "dfs", | ||
| StrategyRoundRobin: "round-robin", | ||
| StrategyShuffle: "shuffle", | ||
| StrategyRandomQueue: "random-queue", |
There was a problem hiding this comment.
Maybe also call it shuffle-queue or just queue since we already have shuffle that means random
There was a problem hiding this comment.
Renamed to shuffle-queue.
| tableMetrics.Duration.Store(&duration) | ||
| tableMetrics.OtelEndTime(ctx, endTime) | ||
| if parent == nil { | ||
| logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") |
There was a problem hiding this comment.
OK now I remember what I removed here. We used to log a message here with the metrics of all relations after a parent finished.
Not we don't do that since reaching this code doesn't mean all relations finished syncing, it only means the parent finished pushing all the new work units into the queue.
See
plugin-sdk/scheduler/scheduler_dfs.go
Line 125 in 2977ae3
I don't think it's a blocker and we can add logs alter on. But still something to consider
|
Seems there's a flaky test which can be seen in a neighboring PR: https://github.com/cloudquery/plugin-sdk/actions/runs/11166226858/job/31039731830?pr=1920 |
🤖 I have created a release *beep* *boop* --- ## [4.65.0](v4.64.1...v4.65.0) (2024-10-04) ### Features * Implement RandomQueue scheduler strategy ([#1914](#1914)) ([af8ac87](af8ac87)) ### Bug Fixes * Revert "fix: Error handling in StreamingBatchWriter" ([#1918](#1918)) ([38b4bfd](38b4bfd)) * **tests:** WriterTestSuite.handleNulls should not overwrite columns ([#1920](#1920)) ([08e18e2](08e18e2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This PR implements a new Scheduler Strategy based on a Concurrent Random Queue. It is based on @erezrokah 's Priority Queue Scheduler Strategy.
How does it work
This is hopefully a much simpler scheduling strategy. It doesn't have any semaphores; it just uses the existing concurrency setting.
Table resolvers (and their relations) get
Pushed into a work queue, andconcurrencyworkersPullfrom this queue, but they pull a random element from it.Why it should work better
The key benefit of this strategy is this:
int32range, all relation API calls are evenly spread throughout the sync, thus optimally minimising rate limits!Does it work better?
Still working on results. Notably AWS & Azure yield mixed results; still have to look into why.
GCP
Before
UPDATE: GCP is moving to Round Robin strategy, and it's much faster with this strategy:
After
Result: 76.22% reduction in time, or 3.21 times faster.
Result against Round Robin: 15% reduction in time, or 0.18 times faster (probably within margin of error)
BigQuery
Before
After
Result: 32.28% reduction in time, or 0.48 times faster
SentinelOne
Before (it was already quite fast due to previous merged improvement)
After
Result: 46.67% reduction in time, or 0.875 times faster
How to test
go.modreplace for sdk:replace github.com/cloudquery/plugin-sdk/v4 => github.com/cloudquery/plugin-sdk/v4 v4.63.1-0.20241002131015-243705c940c6(check last commit on this PR)scheduler.StrategyRandomQueue.How scary is it to merge