@@ -6,10 +6,12 @@ import (
66 "sync"
77 "time"
88
9+ "github.com/cloudquery/plugin-sdk/helpers"
910 "github.com/cloudquery/plugin-sdk/schema"
1011 "github.com/cloudquery/plugin-sdk/specs"
1112 "github.com/rs/zerolog"
1213 "github.com/thoas/go-funk"
14+ "golang.org/x/sync/semaphore"
1315)
1416
1517type SourceNewExecutionClientFunc func (context.Context , zerolog.Logger , specs.Source ) (schema.ClientMeta , error )
@@ -33,7 +35,9 @@ type SourcePlugin struct {
3335
3436type SourceOption func (* SourcePlugin )
3537
36- const minGoRoutines = 5
38+ const (
39+ defaultConcurrency = 500000
40+ )
3741
3842func WithSourceExampleConfig (exampleConfig string ) SourceOption {
3943 return func (p * SourcePlugin ) {
@@ -126,25 +130,20 @@ func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *
126130 }
127131
128132 // limiter used to limit the amount of resources fetched concurrently
129- maxGoroutines := spec .MaxGoRoutines
130- if maxGoroutines < minGoRoutines {
131- maxGoroutines = minGoRoutines
133+ concurrency := spec .Concurrency
134+ if concurrency == 0 {
135+ concurrency = defaultConcurrency
132136 }
133- p .logger .Info ().Uint64 ("max_goroutines" , maxGoroutines ).Msg ("starting fetch" )
134-
135- // goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(maxGoroutines))
136-
137- w := sync.WaitGroup {}
137+ p .logger .Info ().Uint64 ("concurrency" , concurrency ).Msg ("starting fetch" )
138+ goroutinesSem := semaphore .NewWeighted (helpers .Uint64ToInt64 (concurrency ))
139+ wg := sync.WaitGroup {}
138140 totalResources := 0
139141 startTime := time .Now ()
140142 tableNames , err := p .interpolateAllResources (spec .Tables )
141143 if err != nil {
142144 return err
143145 }
144146
145- // this is the same fetchtime for all resources
146- fetchTime := time .Now ()
147-
148147 for _ , table := range p .tables {
149148 table := table
150149 if funk .ContainsString (spec .SkipTables , table .Name ) || ! funk .ContainsString (tableNames , table .Name ) {
@@ -155,39 +154,26 @@ func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *
155154 if table .Multiplex != nil {
156155 clients = table .Multiplex (c )
157156 }
158- // we call this here because we dont know when the following goroutine will be called and we do want an order
159- // of table by table
160- // totalClients := len(clients)
161- // newN, err := helpers.TryAcquireMax(ctx, goroutinesSem, int64(totalClients))
162- // if err != nil {
163- // p.logger.Error().Err(err).Msg("failed to TryAcquireMax semaphore. exiting")
164- // break
165- // }
166- // goroutinesSem.TryAcquire()
167- w .Add (1 )
168- go func () {
169- defer w .Done ()
170- wg := sync.WaitGroup {}
171- p .logger .Info ().Str ("table" , table .Name ).Msg ("fetch start" )
172- tableStartTime := time .Now ()
173- totalTableResources := 0
174- for _ , client := range clients {
175- client := client
176-
177- // i := i
178- wg .Add (1 )
179- go func () {
180- defer wg .Done ()
181- // defer goroutinesSem.Release(1)
182- totalTableResources += table .Resolve (ctx , client , fetchTime , nil , res )
183- }()
157+ for _ , client := range clients {
158+ client := client
159+ wg .Add (1 )
160+ if err := goroutinesSem .Acquire (ctx , 1 ); err != nil {
161+ // This means context was cancelled
162+ return err
184163 }
185- wg .Wait ()
186- totalResources += totalTableResources
187- p .logger .Info ().Str ("table" , table .Name ).Int ("total_resources" , totalTableResources ).TimeDiff ("duration" , time .Now (), tableStartTime ).Msg ("fetch table finished" )
188- }()
164+ go func () {
165+ defer wg .Done ()
166+ defer goroutinesSem .Release (1 )
167+ // TODO: prob introduce client.Identify() to be used in logs
168+ tableStartTime := time .Now ()
169+ p .logger .Info ().Str ("table" , table .Name ).Msg ("fetch start" )
170+ totalTableResources := table .Resolve (ctx , client , startTime , nil , res )
171+ totalResources += totalTableResources
172+ p .logger .Info ().Str ("table" , table .Name ).Int ("total_resources" , totalTableResources ).TimeDiff ("duration" , time .Now (), tableStartTime ).Msg ("fetch table finished" )
173+ }()
174+ }
189175 }
190- w .Wait ()
176+ wg .Wait ()
191177 p .logger .Info ().Int ("total_resources" , totalResources ).TimeDiff ("duration" , time .Now (), startTime ).Msg ("fetch finished" )
192178 return nil
193179}
0 commit comments