Skip to content

Commit 04c7f49

Browse files
yevgenypatshermanschaaferezrokah
authored
fix: Bring concurrency back (#129)
* fix: Bring concurrency back * Update plugins/source.go Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> * change max_go_routines to concurrency * Update plugins/source.go Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com> * Update plugins/source.go Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com> * fix tests and lints Co-authored-by: Herman Schaaf <hermanschaaf@gmail.com> Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com>
1 parent d4bcdff commit 04c7f49

5 files changed

Lines changed: 42 additions & 58 deletions

File tree

plugins/source.go

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/cloudquery/plugin-sdk/helpers"
910
"github.com/cloudquery/plugin-sdk/schema"
1011
"github.com/cloudquery/plugin-sdk/specs"
1112
"github.com/rs/zerolog"
1213
"github.com/thoas/go-funk"
14+
"golang.org/x/sync/semaphore"
1315
)
1416

1517
type SourceNewExecutionClientFunc func(context.Context, zerolog.Logger, specs.Source) (schema.ClientMeta, error)
@@ -33,7 +35,9 @@ type SourcePlugin struct {
3335

3436
type SourceOption func(*SourcePlugin)
3537

36-
const minGoRoutines = 5
38+
const (
39+
defaultConcurrency = 500000
40+
)
3741

3842
func WithSourceExampleConfig(exampleConfig string) SourceOption {
3943
return func(p *SourcePlugin) {
@@ -126,25 +130,20 @@ func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *
126130
}
127131

128132
// limiter used to limit the amount of resources fetched concurrently
129-
maxGoroutines := spec.MaxGoRoutines
130-
if maxGoroutines < minGoRoutines {
131-
maxGoroutines = minGoRoutines
133+
concurrency := spec.Concurrency
134+
if concurrency == 0 {
135+
concurrency = defaultConcurrency
132136
}
133-
p.logger.Info().Uint64("max_goroutines", maxGoroutines).Msg("starting fetch")
134-
135-
// goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(maxGoroutines))
136-
137-
w := sync.WaitGroup{}
137+
p.logger.Info().Uint64("concurrency", concurrency).Msg("starting fetch")
138+
goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(concurrency))
139+
wg := sync.WaitGroup{}
138140
totalResources := 0
139141
startTime := time.Now()
140142
tableNames, err := p.interpolateAllResources(spec.Tables)
141143
if err != nil {
142144
return err
143145
}
144146

145-
// this is the same fetchtime for all resources
146-
fetchTime := time.Now()
147-
148147
for _, table := range p.tables {
149148
table := table
150149
if funk.ContainsString(spec.SkipTables, table.Name) || !funk.ContainsString(tableNames, table.Name) {
@@ -155,39 +154,26 @@ func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *
155154
if table.Multiplex != nil {
156155
clients = table.Multiplex(c)
157156
}
158-
// we call this here because we dont know when the following goroutine will be called and we do want an order
159-
// of table by table
160-
// totalClients := len(clients)
161-
// newN, err := helpers.TryAcquireMax(ctx, goroutinesSem, int64(totalClients))
162-
// if err != nil {
163-
// p.logger.Error().Err(err).Msg("failed to TryAcquireMax semaphore. exiting")
164-
// break
165-
// }
166-
// goroutinesSem.TryAcquire()
167-
w.Add(1)
168-
go func() {
169-
defer w.Done()
170-
wg := sync.WaitGroup{}
171-
p.logger.Info().Str("table", table.Name).Msg("fetch start")
172-
tableStartTime := time.Now()
173-
totalTableResources := 0
174-
for _, client := range clients {
175-
client := client
176-
177-
// i := i
178-
wg.Add(1)
179-
go func() {
180-
defer wg.Done()
181-
// defer goroutinesSem.Release(1)
182-
totalTableResources += table.Resolve(ctx, client, fetchTime, nil, res)
183-
}()
157+
for _, client := range clients {
158+
client := client
159+
wg.Add(1)
160+
if err := goroutinesSem.Acquire(ctx, 1); err != nil {
161+
// This means context was cancelled
162+
return err
184163
}
185-
wg.Wait()
186-
totalResources += totalTableResources
187-
p.logger.Info().Str("table", table.Name).Int("total_resources", totalTableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished")
188-
}()
164+
go func() {
165+
defer wg.Done()
166+
defer goroutinesSem.Release(1)
167+
// TODO: prob introduce client.Identify() to be used in logs
168+
tableStartTime := time.Now()
169+
p.logger.Info().Str("table", table.Name).Msg("fetch start")
170+
totalTableResources := table.Resolve(ctx, client, startTime, nil, res)
171+
totalResources += totalTableResources
172+
p.logger.Info().Str("table", table.Name).Int("total_resources", totalTableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished")
173+
}()
174+
}
189175
}
190-
w.Wait()
176+
wg.Wait()
191177
p.logger.Info().Int("total_resources", totalResources).TimeDiff("duration", time.Now(), startTime).Msg("fetch finished")
192178
return nil
193179
}

specs/source.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type Source struct {
2020
// For the gRPC registry the path will be the address of the gRPC server: host:port
2121
Path string `json:"path,omitempty"`
2222
// Registry can be github,local,grpc.
23-
Registry Registry `json:"registry,omitempty"`
24-
MaxGoRoutines uint64 `json:"max_goroutines,omitempty"`
23+
Registry Registry `json:"registry,omitempty"`
24+
Concurrency uint64 `json:"concurrency,omitempty"`
2525
// Tables to sync from the source plugin
2626
Tables []string `json:"tables,omitempty"`
2727
// SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables

specs/spec_reader_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88

99
var sources = map[string]Source{
1010
"aws.yml": {
11-
Name: "aws",
12-
Path: "aws",
13-
Version: "v1.0.0",
14-
MaxGoRoutines: 10,
15-
Registry: RegistryLocal,
11+
Name: "aws",
12+
Path: "aws",
13+
Version: "v1.0.0",
14+
Concurrency: 10,
15+
Registry: RegistryLocal,
1616
},
1717
}
1818

specs/spec_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ var testSpecs = map[string]Spec{
2020
"testdata/aws.yml": {
2121
Kind: KindSource,
2222
Spec: &Source{
23-
Name: "aws",
24-
Path: "aws",
25-
Version: "v1.0.0",
26-
MaxGoRoutines: 10,
27-
Registry: RegistryLocal,
23+
Name: "aws",
24+
Path: "aws",
25+
Version: "v1.0.0",
26+
Concurrency: 10,
27+
Registry: RegistryLocal,
2828
},
2929
},
3030
}

specs/testdata/aws.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,5 @@ kind: source
22
spec:
33
name: aws
44
version: v1.0.0
5-
max_goroutines: 10
5+
concurrency: 10
66
registry: local
7-
8-

0 commit comments

Comments
 (0)