Skip to content

Commit aadbde9

Browse files
authored
feat: Resolve table relations in parallel (#416)
This changes the resolver for table relations to work concurrently, like parent tables. To do so safely and without the risk of deadlock, we instantiate one semaphore per depth level. (The size of the semaphore is decreased logarithmically for each depth.) This change will only improve performance for tables with child relations. To compare the performance before and after, I used the benchmarks in #415 ## Before ``` goos: darwin goarch: arm64 pkg: github.com/cloudquery/plugin-sdk/plugins BenchmarkDefaultConcurrency-8 1 11957 resources/s 12626 targetResources/s 73597280 B/op 1032425 allocs/op BenchmarkTablesWithChildrenDefaultConcurrency-8 1 545.9 resources/s 40606 targetResources/s 461622584 B/op 6690105 allocs/op PASS ok github.com/cloudquery/plugin-sdk/plugins 150.596s ``` ## After ``` goos: darwin goarch: arm64 pkg: github.com/cloudquery/plugin-sdk/plugins BenchmarkDefaultConcurrency-8 1 11373 resources/s 12626 targetResources/s 73692608 B/op 1033614 allocs/op BenchmarkTablesWithChildrenDefaultConcurrency-8 1 30162 resources/s 40606 targetResources/s 464285672 B/op 6697508 allocs/op PASS ok github.com/cloudquery/plugin-sdk/plugins 6.241s ``` ## Analysis This change focuses on the `BenchmarkTablesWithChildrenDefaultConcurrency` case. The change improves `resources/s` from 545.9 to 30162, an improvement of about 55x. Memory and allocs are mostly the same. The small downward change in `resources/s` in the `BenchmarkDefaultConcurrency` case is likely due to the few additional allocs needed. Closes #358
1 parent c6655f9 commit aadbde9

2 files changed

Lines changed: 37 additions & 18 deletions

File tree

plugins/source.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type SourcePlugin struct {
3232
// resourceSem is a semaphore that limits the number of concurrent resources being fetched
3333
resourceSem *semaphore.Weighted
3434
// tableSem is a semaphore that limits the number of concurrent tables being fetched
35-
tableSem *semaphore.Weighted
35+
tableSems []*semaphore.Weighted
3636
// maxDepth is the max depth of tables
3737
maxDepth uint64
3838
// caser
@@ -101,7 +101,6 @@ func NewSourcePlugin(name string, version string, tables []*schema.Table, newExe
101101
if p.maxDepth > maxAllowedDepth {
102102
panic(fmt.Errorf("max depth of tables is %d, max allowed is %d", p.maxDepth, maxAllowedDepth))
103103
}
104-
105104
return &p
106105
}
107106

plugins/source_scheduler_dfs.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@ const (
2323
)
2424

2525
func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client schema.ClientMeta, tables schema.Tables, resolvedResources chan<- *schema.Resource) {
26-
// current DFS supports only parallelization for top level tables and resources.
27-
// it is possible to extend support for multiple levels but this require benchmarking to find a good fit on how to split
28-
// gorourtines for each level efficiently.
2926
// This is very similar to the concurrent web crawler problem with some minor changes.
30-
// We are using DFS to make sure memory usage is capped at O(h) where h is the height of the tree/subchilds.
31-
tableConcurrency := spec.Concurrency / minResourceConcurrency
32-
if tableConcurrency < minTableConcurrency {
33-
tableConcurrency = minTableConcurrency
34-
}
27+
// We are using DFS to make sure memory usage is capped at O(h) where h is the height of the tree.
28+
tableConcurrency := max(spec.Concurrency/minResourceConcurrency, minTableConcurrency)
3529
resourceConcurrency := tableConcurrency * minResourceConcurrency
3630

37-
p.tableSem = semaphore.NewWeighted(int64(tableConcurrency))
31+
p.tableSems = make([]*semaphore.Weighted, p.maxDepth)
32+
for i := uint64(0); i < p.maxDepth; i++ {
33+
p.tableSems[i] = semaphore.NewWeighted(int64(tableConcurrency))
34+
// reduce table concurrency logarithmically for every depth level
35+
tableConcurrency = max(tableConcurrency/2, minTableConcurrency)
36+
}
3837
p.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency))
3938

4039
var wg sync.WaitGroup
@@ -52,25 +51,25 @@ func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client sc
5251
p.metrics.initWithClients(table, clients)
5352
for _, client := range clients {
5453
client := client
55-
if err := p.tableSem.Acquire(ctx, 1); err != nil {
54+
if err := p.tableSems[0].Acquire(ctx, 1); err != nil {
5655
// This means context was cancelled
5756
wg.Wait()
5857
return
5958
}
6059
wg.Add(1)
6160
go func() {
6261
defer wg.Done()
63-
defer p.tableSem.Release(1)
62+
defer p.tableSems[0].Release(1)
6463
// not checking for error here as nothing much todo.
6564
// the error is logged and this happens when context is cancelled
66-
p.resolveTableDfs(ctx, tables, table, client, nil, resolvedResources)
65+
p.resolveTableDfs(ctx, tables, table, client, nil, resolvedResources, 0)
6766
}()
6867
}
6968
}
7069
wg.Wait()
7170
}
7271

73-
func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource) {
72+
func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) {
7473
clientName := client.ID()
7574
logger := p.logger.With().Str("table", table.Name).Str("client", clientName).Logger()
7675
logger.Info().Msg("table resolver started")
@@ -97,14 +96,14 @@ func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables sc
9796
}()
9897

9998
for r := range res {
100-
p.resolveResourcesDfs(ctx, allIncludedTables, table, client, parent, r, resolvedResources)
99+
p.resolveResourcesDfs(ctx, allIncludedTables, table, client, parent, r, resolvedResources, depth)
101100
}
102101

103102
// we don't need any waitgroups here because we are waiting for the channel to close
104103
logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Msg("fetch table finished")
105104
}
106105

107-
func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource) {
106+
func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource, depth int) {
108107
resourcesSlice := helpers.InterfaceSlice(resources)
109108
if len(resourcesSlice) == 0 {
110109
return
@@ -136,17 +135,31 @@ func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTable
136135
wg.Wait()
137136
}()
138137

138+
var wg sync.WaitGroup
139139
for resource := range resourcesChan {
140+
resource := resource
140141
resolvedResources <- resource
141142
for _, relation := range resource.Table.Relations {
143+
relation := relation
142144
if allIncludedTables.GetTopLevel(relation.Name) == nil {
143145
// this indicates that child table is skipped by user config,
144146
// so we should not sync it
145147
continue
146148
}
147-
p.resolveTableDfs(ctx, allIncludedTables, relation, client, resource, resolvedResources)
149+
if err := p.tableSems[depth].Acquire(ctx, 1); err != nil {
150+
// This means context was cancelled
151+
wg.Wait()
152+
return
153+
}
154+
wg.Add(1)
155+
go func() {
156+
defer wg.Done()
157+
defer p.tableSems[depth].Release(1)
158+
p.resolveTableDfs(ctx, allIncludedTables, relation, client, resource, resolvedResources, depth+1)
159+
}()
148160
}
149161
}
162+
wg.Wait()
150163
}
151164

152165
func (p *SourcePlugin) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item interface{}) *schema.Resource {
@@ -209,3 +222,10 @@ func (p *SourcePlugin) resolveColumn(ctx context.Context, logger zerolog.Logger,
209222
}
210223
}
211224
}
225+
226+
func max(a, b uint64) uint64 {
227+
if a > b {
228+
return a
229+
}
230+
return b
231+
}

0 commit comments

Comments
 (0)