Skip to content

Commit dcb5d26

Browse files
authored
feat: Separate Queued Tables from In Progress Tables (#920)
#### Summary When users limit the concurrency of their sync, tables that have not been started will appear to be "in-progress" even though they are waiting to start... This PR separates `queued` tables from `in-progress` by defining `in-progress` tables as those that have no `end-time`, but do have a non-zero start time. `queued` tables are those tables that have a zero start and end time
1 parent 72e1d49 commit dcb5d26

3 files changed

Lines changed: 64 additions & 4 deletions

File tree

plugins/source/metrics.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,9 @@ func (s *Metrics) InProgressTables() []string {
172172
for _, clientMetrics := range tableMetrics {
173173
clientMetrics.mutex.Lock()
174174
endTime := clientMetrics.endTime
175+
startTime := clientMetrics.startTime
175176
clientMetrics.mutex.Unlock()
176-
if endTime.IsZero() {
177+
if endTime.IsZero() && !startTime.IsZero() {
177178
inProgressTables = append(inProgressTables, table)
178179
break
179180
}
@@ -184,3 +185,23 @@ func (s *Metrics) InProgressTables() []string {
184185

185186
return inProgressTables
186187
}
188+
189+
func (s *Metrics) QueuedTables() []string {
190+
var queuedTables []string
191+
192+
for table, tableMetrics := range s.TableClient {
193+
for _, clientMetrics := range tableMetrics {
194+
clientMetrics.mutex.Lock()
195+
startTime := clientMetrics.startTime
196+
endTime := clientMetrics.endTime
197+
clientMetrics.mutex.Unlock()
198+
if startTime.IsZero() && endTime.IsZero() {
199+
queuedTables = append(queuedTables, table)
200+
break
201+
}
202+
}
203+
}
204+
205+
slices.Sort(queuedTables)
206+
return queuedTables
207+
}

plugins/source/metrics_test.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,44 @@ func TestInProgressTables(t *testing.T) {
7171
Panics: 3,
7272
startTime: time.Now(),
7373
}
74-
74+
s.TableClient["test_table_running3"] = make(map[string]*TableClientMetrics)
75+
s.TableClient["test_table_running3"]["testExecutionClient"] = &TableClientMetrics{}
7576
assert.ElementsMatch(t, []string{"test_table_running1", "test_table_running2"}, s.InProgressTables())
7677
}
7778

79+
func TestQueuedTables(t *testing.T) {
80+
s := &Metrics{
81+
TableClient: make(map[string]map[string]*TableClientMetrics),
82+
}
83+
s.TableClient["test_table_done"] = make(map[string]*TableClientMetrics)
84+
s.TableClient["test_table_done"]["testExecutionClient"] = &TableClientMetrics{
85+
Resources: 1,
86+
Errors: 2,
87+
Panics: 3,
88+
startTime: time.Now(),
89+
endTime: time.Now().Add(time.Second),
90+
}
91+
92+
s.TableClient["test_table_running1"] = make(map[string]*TableClientMetrics)
93+
s.TableClient["test_table_running1"]["testExecutionClient"] = &TableClientMetrics{
94+
Resources: 1,
95+
Errors: 2,
96+
Panics: 3,
97+
startTime: time.Now(),
98+
}
99+
100+
s.TableClient["test_table_running2"] = make(map[string]*TableClientMetrics)
101+
s.TableClient["test_table_running2"]["testExecutionClient"] = &TableClientMetrics{
102+
Resources: 1,
103+
Errors: 2,
104+
Panics: 3,
105+
startTime: time.Now(),
106+
}
107+
s.TableClient["test_table_running3"] = make(map[string]*TableClientMetrics)
108+
s.TableClient["test_table_running3"]["testExecutionClient"] = &TableClientMetrics{}
109+
assert.ElementsMatch(t, []string{"test_table_running3"}, s.QueuedTables())
110+
}
111+
78112
type MockClientMeta struct {
79113
}
80114

plugins/source/scheduler.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,17 +146,22 @@ func (p *Plugin) periodicMetricLogger(ctx context.Context, wg *sync.WaitGroup) {
146146
return
147147
case <-ticker.C:
148148
inProgressTables := p.metrics.InProgressTables()
149-
149+
queuedTables := p.metrics.QueuedTables()
150150
logLine := p.logger.Info().
151151
Uint64("total_resources", p.metrics.TotalResourcesAtomic()).
152152
Uint64("total_errors", p.metrics.TotalErrorsAtomic()).
153153
Uint64("total_panics", p.metrics.TotalPanicsAtomic()).
154-
Int("num_in_progress_tables", len(inProgressTables))
154+
Int("num_in_progress_tables", len(inProgressTables)).
155+
Int("num_queued_tables", len(queuedTables))
155156

156157
if len(inProgressTables) <= periodicMetricLoggerLogTablesLimit {
157158
logLine.Strs("in_progress_tables", inProgressTables)
158159
}
159160

161+
if len(queuedTables) <= periodicMetricLoggerLogTablesLimit {
162+
logLine.Strs("queued_tables", queuedTables)
163+
}
164+
160165
logLine.Msg("Sync in progress")
161166
}
162167
}

0 commit comments

Comments
 (0)