Skip to content

Commit 65141eb

Browse files
committed
load historic task states from db
1 parent ea4a59e commit 65141eb

15 files changed

Lines changed: 439 additions & 282 deletions

File tree

pkg/coordinator/coordinator.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ type Coordinator struct {
5656

5757
testRunMap map[uint64]types.Test
5858
testQueue []types.TestRunner
59-
testHistory []types.Test
6059
testRegistryMutex sync.RWMutex
6160
testNotificationChan chan bool
6261
}
@@ -78,7 +77,6 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Co
7877
testDescriptors: map[string]testDescriptorEntry{},
7978
testRunMap: map[uint64]types.Test{},
8079
testQueue: []types.TestRunner{},
81-
testHistory: []types.Test{},
8280
testNotificationChan: make(chan bool, 1),
8381
}
8482
}
@@ -530,13 +528,7 @@ func (c *Coordinator) GetTestQueue() []types.Test {
530528
}
531529

532530
func (c *Coordinator) GetTestHistory() []types.Test {
533-
c.testRegistryMutex.RLock()
534-
defer c.testRegistryMutex.RUnlock()
535-
536-
tests := make([]types.Test, len(c.testHistory))
537-
copy(tests, c.testHistory)
538-
539-
return tests
531+
return nil
540532
}
541533

542534
func (c *Coordinator) startMetrics() error {
@@ -614,7 +606,6 @@ runLoop:
614606
if len(c.testQueue) > 0 {
615607
nextTest = c.testQueue[0]
616608
c.testQueue = c.testQueue[1:]
617-
c.testHistory = append(c.testHistory, nextTest)
618609
}
619610
c.testRegistryMutex.Unlock()
620611

@@ -787,21 +778,7 @@ func (c *Coordinator) runTestCleanup(ctx context.Context) {
787778
}
788779

789780
func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) {
790-
c.testRegistryMutex.Lock()
791-
defer c.testRegistryMutex.Unlock()
792-
793-
cleanedHistory := []types.Test{}
794-
795-
for _, test := range c.testHistory {
796-
if test.Status() != types.TestStatusPending && test.StartTime().Add(retentionTime).Compare(time.Now()) == -1 {
797-
test.Logger().Infof("cleanup test")
798-
continue
799-
}
800-
801-
cleanedHistory = append(cleanedHistory, test)
802-
}
803-
804-
c.testHistory = cleanedHistory
781+
// TODO: clean db
805782
}
806783

807784
func (c *Coordinator) runEpochGC(ctx context.Context) {

pkg/coordinator/db/schema/pgsql/20240913135112_init.sql

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS public."test_runs"
3030
"config" TEXT NOT NULL,
3131
"start_time" BIGINT NOT NULL,
3232
"stop_time" BIGINT NOT NULL,
33+
"timeout" INTEGER NOT NULL,
3334
"status" VARCHAR(16) NOT NULL,
3435
CONSTRAINT "test_runs_pkey" PRIMARY KEY ("run_id")
3536
);
@@ -41,15 +42,13 @@ CREATE TABLE IF NOT EXISTS public."task_states"
4142
"parent_task" INTEGER NOT NULL,
4243
"name" VARCHAR(128) NOT NULL,
4344
"title" TEXT NOT NULL,
45+
"ref_id" VARCHAR(128) NOT NULL,
4446
"timeout" INTEGER NOT NULL,
4547
"ifcond" TEXT NOT NULL,
46-
"is_cleanup" BOOLEAN NOT NULL,
47-
"is_started" BOOLEAN NOT NULL,
48-
"is_running" BOOLEAN NOT NULL,
49-
"is_skipped" BOOLEAN NOT NULL,
50-
"is_timeout" BOOLEAN NOT NULL,
48+
"run_flags" INTEGER NOT NULL,
5149
"start_time" BIGINT NOT NULL,
5250
"stop_time" BIGINT NOT NULL,
51+
"scope_owner" INTEGER NOT NULL,
5352
"task_config" TEXT NOT NULL,
5453
"task_status" TEXT NOT NULL,
5554
"task_result" INTEGER NOT NULL,

pkg/coordinator/db/schema/sqlite/20240913135112_init.sql

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS "test_runs"
3030
"config" TEXT NOT NULL,
3131
"start_time" INTEGER NOT NULL,
3232
"stop_time" INTEGER NOT NULL,
33+
"timeout" INTEGER NOT NULL,
3334
"status" TEXT NOT NULL,
3435
CONSTRAINT "test_runs_pkey" PRIMARY KEY ("run_id")
3536
);
@@ -41,15 +42,13 @@ CREATE TABLE IF NOT EXISTS "task_states"
4142
"parent_task" INTEGER NOT NULL,
4243
"name" TEXT NOT NULL,
4344
"title" TEXT NOT NULL,
45+
"ref_id" TEXT NOT NULL,
4446
"timeout" INTEGER NOT NULL,
4547
"ifcond" TEXT NOT NULL,
46-
"is_cleanup" BOOLEAN NOT NULL,
47-
"is_started" BOOLEAN NOT NULL,
48-
"is_running" BOOLEAN NOT NULL,
49-
"is_skipped" BOOLEAN NOT NULL,
50-
"is_timeout" BOOLEAN NOT NULL,
48+
"run_flags" INTEGER NOT NULL,
5149
"start_time" INTEGER NOT NULL,
5250
"stop_time" INTEGER NOT NULL,
51+
"scope_owner" INTEGER NOT NULL,
5352
"task_config" TEXT NOT NULL,
5453
"task_status" TEXT NOT NULL,
5554
"task_result" INTEGER NOT NULL,

pkg/coordinator/db/task_states.go

Lines changed: 67 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,90 +7,80 @@ import (
77
"github.com/jmoiron/sqlx"
88
)
99

10-
/*
11-
CREATE TABLE IF NOT EXISTS public."task_states"
12-
(
13-
14-
"run_id" INTEGER NOT NULL,
15-
"task_id" INTEGER NOT NULL,
16-
"options" TEXT NOT NULL,
17-
"parent_task" INTEGER NOT NULL,
18-
"is_cleanup" BOOLEAN NOT NULL,
19-
"is_started" BOOLEAN NOT NULL,
20-
"is_running" BOOLEAN NOT NULL,
21-
"is_skipped" BOOLEAN NOT NULL,
22-
"is_timeout" BOOLEAN NOT NULL,
23-
"start_time" BIGINT NOT NULL,
24-
"stop_time" BIGINT NOT NULL,
25-
"task_config" TEXT NOT NULL,
26-
"task_status" TEXT NOT NULL,
27-
"task_result" INTEGER NOT NULL,
28-
CONSTRAINT "task_states_pkey" PRIMARY KEY ("run_id", "task_id")
29-
30-
);
31-
*/
3210
type TaskState struct {
3311
RunID int `db:"run_id"`
3412
TaskID int `db:"task_id"`
3513
ParentTask int `db:"parent_task"`
3614
Name string `db:"name"`
3715
Title string `db:"title"`
16+
RefID string `db:"ref_id"`
3817
Timeout int `db:"timeout"`
3918
IfCond string `db:"ifcond"`
40-
IsCleanup bool `db:"is_cleanup"`
41-
IsStarted bool `db:"is_started"`
42-
IsRunning bool `db:"is_running"`
43-
IsSkipped bool `db:"is_skipped"`
44-
IsTimeout bool `db:"is_timeout"`
19+
RunFlags uint32 `db:"run_flags"`
4520
StartTime int64 `db:"start_time"`
4621
StopTime int64 `db:"stop_time"`
22+
ScopeOwner int `db:"scope_owner"`
4723
TaskConfig string `db:"task_config"`
4824
TaskStatus string `db:"task_status"`
4925
TaskResult int `db:"task_result"`
5026
TaskError string `db:"task_error"`
5127
}
5228

29+
type TaskStateIndex struct {
30+
TaskID int `db:"task_id"`
31+
ParentTask int `db:"parent_task"`
32+
RunFlags uint32 `db:"run_flags"`
33+
}
34+
35+
var (
36+
TaskRunFlagCleanup uint32 = 0x01
37+
TaskRunFlagStarted uint32 = 0x02
38+
TaskRunFlagRunning uint32 = 0x04
39+
TaskRunFlagSkipped uint32 = 0x08
40+
TaskRunFlagTimeout uint32 = 0x10
41+
)
42+
43+
// InsertTaskState inserts a task state into the database.
5344
func (db *Database) InsertTaskState(tx *sqlx.Tx, state *TaskState) error {
5445
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
5546
EnginePgsql: `
5647
INSERT INTO task_states (
57-
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
58-
start_time, stop_time, task_config, task_status, task_result, task_error
59-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
48+
run_id, task_id, parent_task, name, title, ref_id, timeout, ifcond, run_flags,
49+
start_time, stop_time, scope_owner, task_config, task_status, task_result, task_error
50+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
6051
ON CONFLICT (run_id, task_id) DO UPDATE SET
6152
parent_task = excluded.parent_task,
6253
name = excluded.name,
6354
title = excluded.title,
55+
ref_id = excluded.ref_id,
6456
timeout = excluded.timeout,
6557
ifcond = excluded.ifcond,
66-
is_cleanup = excluded.is_cleanup,
67-
is_started = excluded.is_started,
68-
is_running = excluded.is_running,
69-
is_skipped = excluded.is_skipped,
70-
is_timeout = excluded.is_timeout,
58+
run_flags = excluded.run_flags,
7159
start_time = excluded.start_time,
7260
stop_time = excluded.stop_time,
61+
scope_owner = excluded.scope_owner,
7362
task_config = excluded.task_config,
7463
task_status = excluded.task_status,
7564
task_result = excluded.task_result,
7665
task_error = excluded.task_error`,
7766
EngineSqlite: `
7867
INSERT OR REPLACE INTO task_states (
79-
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
80-
start_time, stop_time, task_config, task_status, task_result, task_error
81-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)`,
68+
run_id, task_id, parent_task, name, title, ref_id, timeout, ifcond, run_flags,
69+
start_time, stop_time, scope_owner, task_config, task_status, task_result, task_error
70+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`,
8271
}),
83-
state.RunID, state.TaskID, state.ParentTask, state.Name, state.Title, state.Timeout, state.IfCond, state.IsCleanup, state.IsStarted, state.IsRunning,
84-
state.IsSkipped, state.IsTimeout, state.StartTime, state.StopTime, state.TaskConfig, state.TaskStatus,
85-
state.TaskResult, state.TaskError)
72+
state.RunID, state.TaskID, state.ParentTask, state.Name, state.Title, state.RefID, state.Timeout,
73+
state.IfCond, state.RunFlags, state.StartTime, state.StopTime, state.ScopeOwner, state.TaskConfig,
74+
state.TaskStatus, state.TaskResult, state.TaskError)
8675
if err != nil {
8776
return err
8877
}
8978

9079
return nil
9180
}
9281

93-
func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields []string) error {
82+
// UpdateTaskStateStatus updates the status fields of a task state.
83+
func (db *Database) UpdateTaskStateStatus(tx *sqlx.Tx, state *TaskState, updateFields []string) error {
9484
var sql strings.Builder
9585

9686
args := []any{}
@@ -106,18 +96,9 @@ func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields
10696
case "title":
10797
fmt.Fprintf(&sql, `title = $%v`, len(args)+1)
10898
args = append(args, state.Title)
109-
case "is_started":
110-
fmt.Fprintf(&sql, `is_started = $%v`, len(args)+1)
111-
args = append(args, state.IsStarted)
112-
case "is_running":
113-
fmt.Fprintf(&sql, `is_running = $%v`, len(args)+1)
114-
args = append(args, state.IsRunning)
115-
case "is_skipped":
116-
fmt.Fprintf(&sql, `is_skipped = $%v`, len(args)+1)
117-
args = append(args, state.IsSkipped)
118-
case "is_timeout":
119-
fmt.Fprintf(&sql, `is_timeout = $%v`, len(args)+1)
120-
args = append(args, state.IsTimeout)
99+
case "run_flags":
100+
fmt.Fprintf(&sql, `run_flags = $%v`, len(args)+1)
101+
args = append(args, state.RunFlags)
121102
case "start_time":
122103
fmt.Fprintf(&sql, `start_time = $%v`, len(args)+1)
123104
args = append(args, state.StartTime)
@@ -151,3 +132,35 @@ func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields
151132

152133
return nil
153134
}
135+
136+
// GetTaskStateIndex returns the task index for a given test run.
137+
func (db *Database) GetTaskStateIndex(runID int) ([]*TaskStateIndex, error) {
138+
var states []*TaskStateIndex
139+
140+
err := db.reader.Select(&states, `
141+
SELECT task_id, parent_task, run_flags
142+
FROM task_states
143+
WHERE run_id = $1
144+
ORDER BY task_id ASC`,
145+
runID)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
return states, nil
151+
}
152+
153+
// GetTaskStateByTaskID returns a task state by task ID.
154+
func (db *Database) GetTaskStateByTaskID(runID, taskID int) (*TaskState, error) {
155+
var state TaskState
156+
157+
err := db.reader.Get(&state, `
158+
SELECT * FROM task_states
159+
WHERE run_id = $1 AND task_id = $2`,
160+
runID, taskID)
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
return &state, nil
166+
}

pkg/coordinator/db/test_configs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type TestConfig struct {
1313
ScheduleCronYaml string `db:"schedule_cron_yaml"`
1414
}
1515

16+
// InsertTestConfig inserts a test config into the database.
1617
func (db *Database) InsertTestConfig(tx *sqlx.Tx, config *TestConfig) error {
1718
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
1819
EnginePgsql: `
@@ -41,6 +42,7 @@ func (db *Database) InsertTestConfig(tx *sqlx.Tx, config *TestConfig) error {
4142
return nil
4243
}
4344

45+
// GetTestConfigs returns all test configs.
4446
func (db *Database) GetTestConfigs() ([]*TestConfig, error) {
4547
var configs []*TestConfig
4648

pkg/coordinator/db/test_runs.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,39 @@ type TestRun struct {
1212
Config string `db:"config"`
1313
StartTime int64 `db:"start_time"`
1414
StopTime int64 `db:"stop_time"`
15+
Timeout int32 `db:"timeout"`
1516
Status string `db:"status"`
1617
}
1718

19+
// InsertTestRun inserts a test run into the database.
1820
func (db *Database) InsertTestRun(tx *sqlx.Tx, run *TestRun) error {
1921
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
2022
EnginePgsql: `
2123
INSERT INTO test_runs (
22-
run_id, test_id, name, source, config, start_time, stop_time, status
23-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
24+
run_id, test_id, name, source, config, start_time, stop_time, timeout, status
25+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
2426
ON CONFLICT (run_id) DO UPDATE SET
2527
test_id = excluded.test_id,
2628
name = excluded.name,
2729
source = excluded.source,
2830
start_time = excluded.start_time,
2931
stop_time = excluded.stop_time,
32+
timeout = excluded.timeout,
3033
status = excluded.status`,
3134
EngineSqlite: `
3235
INSERT OR REPLACE INTO test_runs (
33-
run_id, test_id, name, source, config, start_time, stop_time, status
34-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
36+
run_id, test_id, name, source, config, start_time, stop_time, timeout, status
37+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
3538
}),
36-
run.RunID, run.TestID, run.Name, run.Source, run.Config, run.StartTime, run.StopTime, run.Status)
39+
run.RunID, run.TestID, run.Name, run.Source, run.Config, run.StartTime, run.StopTime, run.Timeout, run.Status)
3740
if err != nil {
3841
return err
3942
}
4043

4144
return nil
4245
}
4346

47+
// UpdateTestRunStatus updates the status fields of a test run.
4448
func (db *Database) UpdateTestRunStatus(tx *sqlx.Tx, run *TestRun) error {
4549
_, err := tx.Exec(`
4650
UPDATE test_runs
@@ -53,3 +57,18 @@ func (db *Database) UpdateTestRunStatus(tx *sqlx.Tx, run *TestRun) error {
5357

5458
return nil
5559
}
60+
61+
// GetTestRunByRunID returns a test run by run ID.
62+
func (db *Database) GetTestRunByRunID(runID int) (*TestRun, error) {
63+
var run TestRun
64+
65+
err := db.reader.Get(&run, `
66+
SELECT * FROM test_runs
67+
WHERE run_id = $1`,
68+
runID)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
return &run, nil
74+
}

0 commit comments

Comments
 (0)