refactor(persis): pluggable Backend/Collection interface with lazy migration#2194
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 17
🧹 Nitpick comments (3)
internal/persis/workerheartbeat/store_test.go (1)
79-104: ⚡ Quick winAdd a refresh-during-cleanup regression test.
Please add a test where a worker is listed as stale, then heartbeat is refreshed before deletion, and assert
DeleteStaledoes not remove that worker.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/workerheartbeat/store_test.go` around lines 79 - 104, Add a regression test (e.g., TestDeleteStale_RefreshDuringCleanup) that creates a stale record using exec.WorkerHeartbeatRecord and s.Upsert, then simulates a concurrent refresh by calling s.Upsert again for that same WorkerID with an updated LastHeartbeatAt (now) before invoking s.DeleteStale with the cutoff; assert DeleteStale returns 0 deletions for that worker and that s.Get still finds the refreshed record, using the existing DeleteStale, Upsert, and Get methods to locate behavior.internal/persis/session/store_test.go (1)
115-167: ⚡ Quick winAdd a concurrent message-write regression test.
Current tests cover single-writer behavior only. Add a concurrent
AddMessagetest (multiple goroutines writing to the same session) and assert no messages are lost.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/session/store_test.go` around lines 115 - 167, Add a new concurrent test (e.g., TestAddMessage_Concurrent) that uses newStore and newSession to create a session, then launches many goroutines that call s.AddMessage(ctx, "s1", &agent.Message{SequenceID: i, Type: agent.MessageTypeUser, Content: fmt.Sprintf("m%d", i)}) concurrently (use sync.WaitGroup), collect/require no errors from each goroutine, wait for completion, then call s.GetMessages(ctx, "s1") and assert the returned slice length equals the number of goroutines and that all expected contents or sequence IDs are present; also call s.GetLatestSequenceID(ctx, "s1") and assert it equals the max SequenceID to ensure no messages were lost.internal/persis/watermark/store_test.go (1)
25-85: ⚡ Quick winAdd tests for migration and corrupt-state recovery paths.
Loadhas important branches for legacy versions (0/1/2), unknown versions, and decode failure fallback. Covering these explicitly would protect the migration behavior from regressions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/watermark/store_test.go` around lines 25 - 85, The tests only cover basic Save/Load and overwrite paths; add unit tests that exercise Load's migration and corrupt-state recovery branches by writing legacy and invalid payloads into the same backend store and asserting Load's behavior. Specifically, add tests (e.g., TestLoad_MigrateLegacyVersions, TestLoad_UnknownVersionRecovery, TestLoad_DecodeFailureFallback) that use newStore(t) or the store backend to seed stored bytes representing versions 0/1/2, an unknown version value, and a deliberately corrupted/invalid encoded blob, then call s.Load(ctx) and assert the expected migration result or recovery behavior (version normalization to scheduler.SchedulerStateVersion, DAG contents/LastTick for migrated cases, and the fallback/reset behavior for unknown or decode failures) to lock in the existing Load, Save, and migration logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/persis/apikey/store.go`:
- Around line 142-179: The read of existingRec/existingStored happens before
s.mu is acquired, leaving the byName index vulnerable to races; move the lock to
cover the entire read-modify-write path (i.e., acquire s.mu at the start of the
Update flow before calling s.col.Get and persis.Decode, perform the name
conflict check and call s.col.Put while holding the lock, then update s.byName
and release the lock) so that s.col.Get, the conflict check (the taken/id
comparison), the Put, and the subsequent delete/assignment to s.byName are all
serialized; alternatively, if you prefer a narrower change, keep the initial
read but re-check existingStored.Name and the byName conflict under s.mu after
locking before calling s.col.Put and before mutating s.byName to ensure no race
can produce stale byName entries.
- Around line 212-239: UpdateLastUsed performs an unsynchronized
read-modify-write against s.col (uses s.col.Get and s.col.Put) and can race with
Delete/Update; acquire the store mutex (s.mu) to serialize this path: after the
id == "" check, call s.mu.Lock() and defer s.mu.Unlock() so the Get -> Decode ->
encode -> Put sequence is atomic, ensuring you hold the lock while reading the
record and writing it back (Release only after s.col.Put completes) to prevent
clobbering or resurrecting deleted records.
In `@internal/persis/file/collection_test.go`:
- Around line 264-292: freshCollection is a package-global that gets reassigned
in TestFileCollection and TestMemoryCollection (both t.Parallel()), causing
races and flaky mixing; remove the shared global by making the factory local and
pass it into RunCollectionContract (or change RunCollectionContract to accept a
func(t *testing.T) persis.Collection), e.g. define a local freshCollection
closure inside TestFileCollection and inside TestMemoryCollection and call
RunCollectionContract(t, b.Collection("test"), freshCollection) (or update
RunCollectionContract signature to accept the factory) and update all callers to
use the passed-in factory instead of the package-level freshCollection.
In `@internal/persis/file/collection.go`:
- Around line 191-193: Sanitize and enforce a root-boundary check when mapping
IDs to disk paths: have filePath use idToRelPath(id) but then run filepath.Clean
and ensure the resulting joined path is not absolute and is contained inside
c.dir by using filepath.Rel(c.dir, joined) and rejecting if rel starts with ".."
or equals ".." (or returns an error); apply the same pattern to the other places
that join IDs/prefixes (the code around the blocks referenced at 273-287,
291-294) so no ".." or absolute paths can escape c.dir; replace any raw string
prefix checks (the check around line ~371) with a secure containment check using
filepath.Rel or by comparing path elements with filepath.Separator (e.g.,
compute rel, confirm it doesn't start with ".." and then use that for
prefix/sibling directory decisions).
- Around line 175-186: In Claim, don't swallow filesystem errors: when iterating
candidates, change the read/remove error handling so that if c.readFile(path) or
os.Remove(path) returns an error you check for os.IsNotExist and only continue
for not-found; for any other error return that error immediately instead of
continuing; after successful read and remove call
removeEmptyDirs(filepath.Dir(path), c.dir) and return fr.toRecord(), nil; only
return persis.ErrNotFound if every candidate either didn't exist (IsNotExist)
and none produced a record.
In `@internal/persis/secret/store.go`:
- Around line 209-250: The code computes oldRef/newRef before acquiring s.mu,
which allows a race where the stored record changes between Get and Lock and
leaves s.byRef inconsistent; to fix, move the critical ref computation and
existence-checks inside the mutex (i.e., acquire s.mu.Lock() immediately after
reading and decoding existingRec), then recompute
oldRef/refKey(existing.Secret.Workspace, existing.Secret.Ref) and
newRef/refKey(updated.Workspace, updated.Ref) while holding the lock, perform
the taken-check against s.byRef and apply delete/insert to s.byRef and update
existing.Secret only under the same lock so byRef remains synchronized with the
stored record for functions/methods manipulating s.byRef (e.g., Update/Delete
using sec.ID, existing, existingRec, s.col).
- Around line 285-313: WriteValue currently does a read-modify-write (s.col.Get
-> appendVersion -> s.col.Put) which causes a lost-update race under
concurrency; change it to an optimistic-transaction/retry pattern: read the
record (s.col.Get), compute the new storedRecord via appendVersion, then attempt
an atomic conditional update (compare-and-swap) against the original metadata
(e.g., UpdatedAt or record ID/ETag) instead of unconditional s.col.Put; if the
conditional Put/CompareAndSwap fails due to a concurrent update, re-read the
record and retry the appendVersion/conditional update loop a few times before
returning a conflict error. Ensure you reference the same storedRecord, rec
(Original persis.Record metadata like UpdatedAt/CreatedAt/ID), appendVersion and
s.col methods so the fix replaces the unconditional Put with an atomic
conditional update and retry logic.
- Around line 371-384: The loadByID function can return a storedRecord with a
nil Secret, leading to panics in callers like GetByID/GetCurrentVersion; update
loadByID (the function) to validate that the decoded storedRecord (sr) has a
non-nil Secret (sr.Secret) and return a clear not-found error (e.g.,
secret.ErrNotFound or fmt.Errorf("secret store: missing secret payload %q", id))
when it is nil instead of returning &sr, nil so callers that dereference
sr.Secret are protected.
In `@internal/persis/session/store.go`:
- Around line 188-227: UpdateSession currently does an unsynchronized
Get→mutate→Put causing lost updates under concurrency; wrap the entire
read-modify-write sequence in the store-level lock (or a dedicated per-session
lock) so the Get, mutation of storedSession, persis.Encode/Put and subsequent
in-memory updates (s.updatedAt and s.sortUserSessions) are atomic. Apply the
same locking pattern to AddMessage (and the other similar read-modify-write path
mentioned) so both methods acquire the lock before Get and release it only after
Put and in-memory state updates to prevent clobbered messages or stale metadata.
In `@internal/persis/testutil/memory.go`:
- Around line 168-174: copyRecord currently shallow-copies persis.Record but
leaves the ExpiresAt pointer aliased, allowing external mutations to leak;
modify copyRecord to deep-copy ExpiresAt as well: if r.ExpiresAt != nil allocate
a new value, copy the pointed value into it and assign it to cp.ExpiresAt
(analogous to how Data is duplicated) so the returned *persis.Record has its own
independent ExpiresAt.
In `@internal/persis/user/store.go`:
- Around line 189-247: The code fetches and decodes the existing record
(s.col.Get + persis.Decode and computation of oldOIDCKey) before acquiring
s.mu.Lock in the Update (and similarly in Delete), which can yield stale index
keys during concurrent writes; move the Get + persis.Decode + old key
computation (the calls to s.col.Get, persis.Decode and oidcKey for old values)
so they occur while holding s.mu.Lock (i.e., acquire s.mu.Lock before
fetching/decoding and computing oldOIDCKey) and keep the existing index-update
logic that modifies byUsername and byOIDCIdentity under the same lock; apply the
same change to the Delete code path (lines referenced in the review) to ensure
atomicity of read-modify-write on the indices.
In `@internal/persis/watermark/store.go`:
- Around line 72-76: In Store.Save, add a nil-check at the top to reject a nil
state and return a clear error instead of encoding it; specifically, in the
Store.Save(ctx context.Context, state *scheduler.SchedulerState) method validate
state != nil and return an error like "watermark store: nil state" (or similar)
before calling persis.Encode so you never persist a nil/empty payload that later
appears as corrupted in Load.
In `@internal/persis/webhook/store.go`:
- Around line 165-207: The existing code fetches and decodes the existing record
(calls to s.col.Get and persis.Decode producing existingStored) outside the
s.mu.Lock() that protects the byDAGName map, which can produce stale-byDAGName
races; move the Get + Decode (the existingRec and existingStored population) to
occur while holding s.mu.Lock() (i.e., call s.mu.Lock() before s.col.Get and
persis.Decode and keep the lock through the subsequent checks/Put and byDAGName
mutation), and apply the same change to the other similar block around the code
referenced at 217-236 so all reads/validation of existingStored and updates to
s.byDAGName happen under the same write lock.
- Around line 255-282: UpdateLastUsed performs an unlocked read-modify-write and
can be overwritten by concurrent Delete/Update calls; protect the whole
operation by serializing with the store's mutex (e.g., lock s.mu or the store's
existing sync.RWMutex) so that Get/Decode/modify/Encode/Put are executed under
the lock; acquire the write lock at the start of UpdateLastUsed and release it
after the Put (or use defer Unlock) to prevent stale overwrites or recreation
during concurrent mutations.
In `@internal/persis/workerheartbeat/store.go`:
- Around line 93-107: DeleteStale currently lists all records and then deletes
by WorkerID, opening a TOCTOU window between s.List and s.col.Delete where a
worker could refresh and be wrongly removed; change the logic to perform a
conditional/atomic delete based on the stored heartbeat timestamp or version
instead of a blind Delete. Update Store.DeleteStale to call a new or existing
conditional method on s.col (e.g.,
DeleteIfOlder/DeleteIfMatchTimestamp/DeleteIfVersion) passing r.WorkerID and
r.LastHeartbeatTime() (or version) so the backend only deletes when the stored
last-heartbeat is still <= before; if your storage backend lacks such an
operation, implement the check-and-delete inside a single transaction or loop
using Get+Delete in a single transactional API rather than separate List and
Delete calls. Ensure error handling still treats persis.ErrNotFound specially
and increments removed only when the conditional delete actually succeeded.
In `@refactor_persis_layer.html`:
- Around line 175-182: In the Mermaid HTML text nodes (e.g., the block
containing "RT --> FDR", "SC --> FQ & FP", "AG --> FSS & FA & FEV", etc.)
replace each raw arrow operator '-->' with escaped form '-->' (and any other
'>' in edge operators) so the '>' is HTML-escaped; apply the same replacement to
the other Mermaid block noted (the one around the 932-937 region) so the
spec-char-escape lint rule passes.
- Around line 290-291: The rendered inline comment text for the fields Since and
Until uses raw comparison operators (">=" and "<") which trigger
spec-char-escape; update the comment strings for Since (*time.Time // CreatedAt
>= Since) and Until (*time.Time // CreatedAt < Until) to use HTML entities (e.g.
>= and <) so the operators are escaped in the HTML-rendered code snippet
and no parser warnings occur.
---
Nitpick comments:
In `@internal/persis/session/store_test.go`:
- Around line 115-167: Add a new concurrent test (e.g.,
TestAddMessage_Concurrent) that uses newStore and newSession to create a
session, then launches many goroutines that call s.AddMessage(ctx, "s1",
&agent.Message{SequenceID: i, Type: agent.MessageTypeUser, Content:
fmt.Sprintf("m%d", i)}) concurrently (use sync.WaitGroup), collect/require no
errors from each goroutine, wait for completion, then call s.GetMessages(ctx,
"s1") and assert the returned slice length equals the number of goroutines and
that all expected contents or sequence IDs are present; also call
s.GetLatestSequenceID(ctx, "s1") and assert it equals the max SequenceID to
ensure no messages were lost.
In `@internal/persis/watermark/store_test.go`:
- Around line 25-85: The tests only cover basic Save/Load and overwrite paths;
add unit tests that exercise Load's migration and corrupt-state recovery
branches by writing legacy and invalid payloads into the same backend store and
asserting Load's behavior. Specifically, add tests (e.g.,
TestLoad_MigrateLegacyVersions, TestLoad_UnknownVersionRecovery,
TestLoad_DecodeFailureFallback) that use newStore(t) or the store backend to
seed stored bytes representing versions 0/1/2, an unknown version value, and a
deliberately corrupted/invalid encoded blob, then call s.Load(ctx) and assert
the expected migration result or recovery behavior (version normalization to
scheduler.SchedulerStateVersion, DAG contents/LastTick for migrated cases, and
the fallback/reset behavior for unknown or decode failures) to lock in the
existing Load, Save, and migration logic.
In `@internal/persis/workerheartbeat/store_test.go`:
- Around line 79-104: Add a regression test (e.g.,
TestDeleteStale_RefreshDuringCleanup) that creates a stale record using
exec.WorkerHeartbeatRecord and s.Upsert, then simulates a concurrent refresh by
calling s.Upsert again for that same WorkerID with an updated LastHeartbeatAt
(now) before invoking s.DeleteStale with the cutoff; assert DeleteStale returns
0 deletions for that worker and that s.Get still finds the refreshed record,
using the existing DeleteStale, Upsert, and Get methods to locate behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: feb12d73-5ad5-4863-b9b6-03e7f1d171b3
📒 Files selected for processing (48)
internal/cmd/context.gointernal/cmd/process/agent_stores.gointernal/cmd/process/scheduler.gointernal/cmd/process/server.gointernal/engine/run.gointernal/intg/distr/fixtures_test.gointernal/intg/one_off_schedule_test.gointernal/intg/queue/fixture_test.gointernal/persis/apikey/store.gointernal/persis/apikey/store_test.gointernal/persis/backend.gointernal/persis/codec.gointernal/persis/errors.gointernal/persis/file/backend.gointernal/persis/file/collection.gointernal/persis/file/collection_test.gointernal/persis/fileapikey/store.gointernal/persis/fileapikey/store_test.gointernal/persis/filesecret/store.gointernal/persis/filesecret/store_test.gointernal/persis/fileuser/store.gointernal/persis/fileuser/store_test.gointernal/persis/filewatermark/store.gointernal/persis/filewatermark/store_test.gointernal/persis/filewebhook/store.gointernal/persis/filewebhook/store_test.gointernal/persis/secret/store.gointernal/persis/secret/store_test.gointernal/persis/session/store.gointernal/persis/session/store_test.gointernal/persis/testutil/memory.gointernal/persis/user/store.gointernal/persis/user/store_test.gointernal/persis/watermark/store.gointernal/persis/watermark/store_test.gointernal/persis/webhook/store.gointernal/persis/webhook/store_test.gointernal/persis/workerheartbeat/store.gointernal/persis/workerheartbeat/store_test.gointernal/runtime/agent/agent_test.gointernal/service/auth/service_test.gointernal/service/auth/webhook_test.gointernal/service/frontend/api/v1/secrets_test.gointernal/service/frontend/server.gointernal/service/frontend/server_test.gointernal/service/worker/remote_handler.gointernal/test/helper.gorefactor_persis_layer.html
💤 Files with no reviewable changes (10)
- internal/persis/filewebhook/store.go
- internal/persis/fileuser/store_test.go
- internal/persis/filesecret/store_test.go
- internal/persis/filewebhook/store_test.go
- internal/persis/filewatermark/store_test.go
- internal/persis/fileapikey/store.go
- internal/persis/filesecret/store.go
- internal/persis/fileuser/store.go
- internal/persis/filewatermark/store.go
- internal/persis/fileapikey/store_test.go
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | ||
| if id == "" { | ||
| return auth.ErrInvalidAPIKeyID | ||
| } | ||
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrAPIKeyNotFound | ||
| } | ||
| return err | ||
| } | ||
| var stored auth.APIKeyForStorage | ||
| if err := persis.Decode(rec, &stored); err != nil { | ||
| return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err) | ||
| } | ||
| now := time.Now().UTC() | ||
| stored.LastUsedAt = &now | ||
| data, enc, err := persis.Encode(stored) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return s.col.Put(ctx, &persis.Record{ | ||
| ID: rec.ID, | ||
| Data: data, | ||
| Encoding: enc, | ||
| CreatedAt: rec.CreatedAt, | ||
| UpdatedAt: now, | ||
| }) |
There was a problem hiding this comment.
UpdateLastUsed is an unsynchronized read-modify-write and can clobber/delete-resurrect state.
This path bypasses mu, so it can race with Delete/Update and write back stale payloads after another writer has already changed or removed the record.
🔧 Suggested fix (serialize with store mutex)
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidAPIKeyID
}
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}
return err
}
@@
- return s.col.Put(ctx, &persis.Record{
+ return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | |
| if id == "" { | |
| return auth.ErrInvalidAPIKeyID | |
| } | |
| rec, err := s.col.Get(ctx, id) | |
| if err != nil { | |
| if errors.Is(err, persis.ErrNotFound) { | |
| return auth.ErrAPIKeyNotFound | |
| } | |
| return err | |
| } | |
| var stored auth.APIKeyForStorage | |
| if err := persis.Decode(rec, &stored); err != nil { | |
| return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err) | |
| } | |
| now := time.Now().UTC() | |
| stored.LastUsedAt = &now | |
| data, enc, err := persis.Encode(stored) | |
| if err != nil { | |
| return err | |
| } | |
| return s.col.Put(ctx, &persis.Record{ | |
| ID: rec.ID, | |
| Data: data, | |
| Encoding: enc, | |
| CreatedAt: rec.CreatedAt, | |
| UpdatedAt: now, | |
| }) | |
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | |
| if id == "" { | |
| return auth.ErrInvalidAPIKeyID | |
| } | |
| s.mu.Lock() | |
| defer s.mu.Unlock() | |
| rec, err := s.col.Get(ctx, id) | |
| if err != nil { | |
| if errors.Is(err, persis.ErrNotFound) { | |
| return auth.ErrAPIKeyNotFound | |
| } | |
| return err | |
| } | |
| var stored auth.APIKeyForStorage | |
| if err := persis.Decode(rec, &stored); err != nil { | |
| return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err) | |
| } | |
| now := time.Now().UTC() | |
| stored.LastUsedAt = &now | |
| data, enc, err := persis.Encode(stored) | |
| if err != nil { | |
| return err | |
| } | |
| return s.col.Put(ctx, &persis.Record{ | |
| ID: rec.ID, | |
| Data: data, | |
| Encoding: enc, | |
| CreatedAt: rec.CreatedAt, | |
| UpdatedAt: now, | |
| }) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/apikey/store.go` around lines 212 - 239, UpdateLastUsed
performs an unsynchronized read-modify-write against s.col (uses s.col.Get and
s.col.Put) and can race with Delete/Update; acquire the store mutex (s.mu) to
serialize this path: after the id == "" check, call s.mu.Lock() and defer
s.mu.Unlock() so the Get -> Decode -> encode -> Put sequence is atomic, ensuring
you hold the lock while reading the record and writing it back (Release only
after s.col.Put completes) to prevent clobbering or resurrecting deleted
records.
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | ||
| if id == "" { | ||
| return auth.ErrInvalidWebhookID | ||
| } | ||
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrWebhookNotFound | ||
| } | ||
| return err | ||
| } | ||
| var stored auth.WebhookForStorage | ||
| if err := persis.Decode(rec, &stored); err != nil { | ||
| return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err) | ||
| } | ||
| now := time.Now().UTC() | ||
| stored.LastUsedAt = &now | ||
| data, enc, err := persis.Encode(stored) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return s.col.Put(ctx, &persis.Record{ | ||
| ID: rec.ID, | ||
| Data: data, | ||
| Encoding: enc, | ||
| CreatedAt: rec.CreatedAt, | ||
| UpdatedAt: now, | ||
| }) |
There was a problem hiding this comment.
Serialize UpdateLastUsed with other writers to prevent stale overwrite/recreation.
This method does unlocked read-modify-write and can race with Delete/Update, writing stale state back after a concurrent mutation.
🔧 Suggested fix (serialize with store mutex)
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidWebhookID
}
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}
return err
}
@@
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | |
| if id == "" { | |
| return auth.ErrInvalidWebhookID | |
| } | |
| rec, err := s.col.Get(ctx, id) | |
| if err != nil { | |
| if errors.Is(err, persis.ErrNotFound) { | |
| return auth.ErrWebhookNotFound | |
| } | |
| return err | |
| } | |
| var stored auth.WebhookForStorage | |
| if err := persis.Decode(rec, &stored); err != nil { | |
| return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err) | |
| } | |
| now := time.Now().UTC() | |
| stored.LastUsedAt = &now | |
| data, enc, err := persis.Encode(stored) | |
| if err != nil { | |
| return err | |
| } | |
| return s.col.Put(ctx, &persis.Record{ | |
| ID: rec.ID, | |
| Data: data, | |
| Encoding: enc, | |
| CreatedAt: rec.CreatedAt, | |
| UpdatedAt: now, | |
| }) | |
| func (s *Store) UpdateLastUsed(ctx context.Context, id string) error { | |
| if id == "" { | |
| return auth.ErrInvalidWebhookID | |
| } | |
| s.mu.Lock() | |
| defer s.mu.Unlock() | |
| rec, err := s.col.Get(ctx, id) | |
| if err != nil { | |
| if errors.Is(err, persis.ErrNotFound) { | |
| return auth.ErrWebhookNotFound | |
| } | |
| return err | |
| } | |
| var stored auth.WebhookForStorage | |
| if err := persis.Decode(rec, &stored); err != nil { | |
| return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err) | |
| } | |
| now := time.Now().UTC() | |
| stored.LastUsedAt = &now | |
| data, enc, err := persis.Encode(stored) | |
| if err != nil { | |
| return err | |
| } | |
| return s.col.Put(ctx, &persis.Record{ | |
| ID: rec.ID, | |
| Data: data, | |
| Encoding: enc, | |
| CreatedAt: rec.CreatedAt, | |
| UpdatedAt: now, | |
| }) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/webhook/store.go` around lines 255 - 282, UpdateLastUsed
performs an unlocked read-modify-write and can be overwritten by concurrent
Delete/Update calls; protect the whole operation by serializing with the store's
mutex (e.g., lock s.mu or the store's existing sync.RWMutex) so that
Get/Decode/modify/Encode/Put are executed under the lock; acquire the write lock
at the start of UpdateLastUsed and release it after the Put (or use defer
Unlock) to prevent stale overwrites or recreation during concurrent mutations.
| func (s *Store) DeleteStale(ctx context.Context, before time.Time) (int, error) { | ||
| records, err := s.List(ctx) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| removed := 0 | ||
| for _, r := range records { | ||
| if r.LastHeartbeatTime().After(before) { | ||
| continue | ||
| } | ||
| if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) { | ||
| return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err) | ||
| } | ||
| removed++ | ||
| } |
There was a problem hiding this comment.
Close the stale-delete TOCTOU window.
DeleteStale can delete a worker that was refreshed after List but before Delete. This can incorrectly remove active workers under concurrent heartbeat updates.
Safer pattern
for _, r := range records {
if r.LastHeartbeatTime().After(before) {
continue
}
+ current, err := s.Get(ctx, r.WorkerID)
+ if err != nil {
+ if errors.Is(err, exec.ErrWorkerHeartbeatNotFound) {
+ continue
+ }
+ return removed, err
+ }
+ if current.LastHeartbeatTime().After(before) {
+ continue
+ }
if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) {
return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err)
}
removed++
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/workerheartbeat/store.go` around lines 93 - 107, DeleteStale
currently lists all records and then deletes by WorkerID, opening a TOCTOU
window between s.List and s.col.Delete where a worker could refresh and be
wrongly removed; change the logic to perform a conditional/atomic delete based
on the stored heartbeat timestamp or version instead of a blind Delete. Update
Store.DeleteStale to call a new or existing conditional method on s.col (e.g.,
DeleteIfOlder/DeleteIfMatchTimestamp/DeleteIfVersion) passing r.WorkerID and
r.LastHeartbeatTime() (or version) so the backend only deletes when the stored
last-heartbeat is still <= before; if your storage backend lacks such an
operation, implement the check-and-delete inside a single transaction or loop
using Get+Delete in a single transactional API rather than separate List and
Delete calls. Ensure error handling still treats persis.ErrNotFound specially
and increments removed only when the conditional delete actually succeeded.
| RT --> FDR | ||
| SC --> FQ & FP | ||
| CO --> FD | ||
| AU --> FU & FS & FK | ||
| WH --> FW | ||
| AG --> FSS & FA & FEV | ||
|
|
||
| FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --> F |
There was a problem hiding this comment.
Escape Mermaid edge operators in HTML text nodes to satisfy lint.
--> in these Mermaid blocks is currently raw HTML text and is flagged by spec-char-escape. Escape the > as > so lint passes consistently.
🔧 Proposed fix
- RT --> FDR
- SC --> FQ & FP
- CO --> FD
- AU --> FU & FS & FK
- WH --> FW
- AG --> FSS & FA & FEV
-
- FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --> F
+ RT --> FDR
+ SC --> FQ & FP
+ CO --> FD
+ AU --> FU & FS & FK
+ WH --> FW
+ AG --> FSS & FA & FEV
+
+ FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --> F- A["Step 1\nDefine interfaces\npersis/backend.go\npersis/codec.go"] -->
- B["Step 2\nFileBackend\npersis/file/backend.go\n(no behavior change)"] -->
- C["Step 3\nPort simple stores\nwebhook, apikey,\nuser, secret"] -->
- D["Step 4\nPort complex stores\nqueue, proc,\ndistributed"] -->
- E["Step 5\nPort dagrun store\n(most complex)"] -->
- F["Step 6\nRemove file*/ packages\nSimplify context.go"] -->
+ A["Step 1\nDefine interfaces\npersis/backend.go\npersis/codec.go"] -->
+ B["Step 2\nFileBackend\npersis/file/backend.go\n(no behavior change)"] -->
+ C["Step 3\nPort simple stores\nwebhook, apikey,\nuser, secret"] -->
+ D["Step 4\nPort complex stores\nqueue, proc,\ndistributed"] -->
+ E["Step 5\nPort dagrun store\n(most complex)"] -->
+ F["Step 6\nRemove file*/ packages\nSimplify context.go"] -->Also applies to: 932-937
🧰 Tools
🪛 HTMLHint (1.9.2)
[error] 175-175: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 176-176: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 177-177: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 178-178: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 179-179: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 180-180: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 182-182: Special characters must be escaped : [ > ].
(spec-char-escape)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@refactor_persis_layer.html` around lines 175 - 182, In the Mermaid HTML text
nodes (e.g., the block containing "RT --> FDR", "SC --> FQ & FP", "AG --> FSS &
FA & FEV", etc.) replace each raw arrow operator '-->' with escaped form
'-->' (and any other '>' in edge operators) so the '>' is HTML-escaped; apply
the same replacement to the other Mermaid block noted (the one around the
932-937 region) so the spec-char-escape lint rule passes.
| Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span> | ||
| Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span> |
There was a problem hiding this comment.
Escape comparison operators inside the rendered code comment text.
The inline code comment text contains raw >= / < and triggers spec-char-escape; use entities to avoid parser ambiguity in HTML-rendered code snippets.
🔧 Proposed fix
- Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span>
- Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span>
+ Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span>
+ Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span>📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span> | |
| Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span> | |
| Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span> | |
| Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span> |
🧰 Tools
🪛 HTMLHint (1.9.2)
[error] 290-290: Special characters must be escaped : [ > ].
(spec-char-escape)
[error] 291-291: Special characters must be escaped : [ < ].
(spec-char-escape)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@refactor_persis_layer.html` around lines 290 - 291, The rendered inline
comment text for the fields Since and Until uses raw comparison operators (">="
and "<") which trigger spec-char-escape; update the comment strings for Since
(*time.Time // CreatedAt >= Since) and Until (*time.Time // CreatedAt < Until)
to use HTML entities (e.g. >= and <) so the operators are escaped in the
HTML-rendered code snippet and no parser warnings occur.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ee0528cb41
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if workerID == "" { | ||
| return nil, exec.ErrWorkerHeartbeatNotFound | ||
| } | ||
| rec, err := s.col.Get(ctx, workerID) |
There was a problem hiding this comment.
Resolve heartbeats by legacy record ID during reads
This lookup assumes the collection key is the plain workerID, but pre-refactor heartbeat files were keyed as encodeKey(workerID) (see internal/persis/filedistributed/worker_heartbeat_store.go). After upgrading, existing heartbeats are still present but cannot be read here, so Get returns ErrWorkerHeartbeatNotFound until each worker writes a new-format record. In that window, stale-run repair can treat a live remote worker as missing and incorrectly fail active distributed attempts.
Useful? React with 👍 / 👎.
| if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) { | ||
| return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err) |
There was a problem hiding this comment.
Delete stale heartbeat files using stored record key
For migrated data, the on-disk record ID is still the legacy hashed filename while WorkerID in the payload is the human-readable ID. Deleting by r.WorkerID misses those legacy records, so stale entries are never removed (yet removed++ still increments), which leaves permanently stale workers visible in GetWorkers/metrics and can double-count workers after new-format upserts.
Useful? React with 👍 / 👎.
| // ─── helpers ───────────────────────────────────────────────────────────────── | ||
|
|
||
| func (s *Store) load(ctx context.Context, id string) (*storedSession, error) { | ||
| rec, err := s.col.Get(ctx, id) |
There was a problem hiding this comment.
Preserve legacy session key mapping when loading records
Legacy session files were stored under <userID>/<sessionID>.json, so after envelope wrapping their collection ID is userID/sessionID. The rebuilt index stores ss.ID (just sessionID), but subsequent reads call Get with that bare ID, so migrated sessions are indexed yet unreadable and get skipped by ListSessions/ListSubSessions as not found. This effectively hides pre-upgrade agent session history until each session is recreated in the new layout.
Useful? React with 👍 / 👎.
| } | ||
| now := time.Now().UTC() | ||
| return s.col.Put(ctx, &persis.Record{ | ||
| ID: record.WorkerID, |
There was a problem hiding this comment.
Reject unsafe worker IDs before using them as record IDs
This persists WorkerID directly as a file-backed record key, and file.Collection maps record IDs to filesystem paths. Unlike the previous implementation (which hashed IDs), a worker ID containing path separators or .. can write outside the intended workers namespace on heartbeat upsert. Because WorkerId comes from RPC input and is only checked for emptiness, a malicious or misconfigured worker can trigger path traversal in coordinator storage.
Useful? React with 👍 / 👎.
| } | ||
|
|
||
| func (s *Store) rebuildIndex(ctx context.Context) error { | ||
| page, err := s.col.List(ctx, persis.ListQuery{}) |
There was a problem hiding this comment.
Drain paginated collection results when rebuilding indexes
Index rebuild reads only one page (ListQuery{} once) and ignores NextCursor, but the backend contract allows Limit=0 to use a backend default page size. Any backend that paginates by default will load only a subset of users into byUsername/byOIDCIdentity, causing false not found/duplicate behavior after startup even though records exist in storage.
Useful? React with 👍 / 👎.
| existingRec, err := s.col.Get(ctx, user.ID) | ||
| if err != nil { |
There was a problem hiding this comment.
Serialize update read/modify/write under the same user lock
This update path reads the persisted record before acquiring s.mu, so a concurrent Delete can remove index entries in between and then this Put recreates the record without restoring unchanged username/OIDC index keys. That leaves storage and in-memory indexes inconsistent (e.g., GetByUsername fails for an existing user), a race that did not exist in the previous fully locked file-store implementation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8efcdd8bc2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| func (s *SecretStore) WriteValue(ctx context.Context, id string, input secret.WriteValueInput) (*secret.Secret, error) { | ||
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return nil, secret.ErrNotFound |
There was a problem hiding this comment.
Serialize secret value rotations before writing new versions
WriteValue now does an unlocked read/modify/write cycle (Get → decode → appendVersion → Put), so concurrent rotations of the same secret can both read the same CurrentVersion and each write back version+1, dropping one update and corrupting version history/metadata. The previous file-backed store held the store mutex across this whole sequence, so this regression can surface whenever two writes hit the same secret close together.
Useful? React with 👍 / 👎.
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrAPIKeyNotFound | ||
| } |
There was a problem hiding this comment.
Prevent UpdateLastUsed from overwriting concurrent API-key edits
UpdateLastUsed reads the full record and writes it back without synchronizing with Update, so a normal auth request that bumps LastUsedAt can race with an admin update and re-persist stale role/scope metadata from its earlier snapshot. Because UpdateLastUsed is invoked on successful key use in auth/service.go, this can silently revert recent API-key configuration changes under load.
Useful? React with 👍 / 👎.
| existingRec, err := s.col.Get(ctx, sec.ID) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return secret.ErrNotFound | ||
| } |
There was a problem hiding this comment.
Keep secret ref index consistent across concurrent update/delete
Update fetches and decodes the existing secret before taking s.mu, so a concurrent Delete can remove the byRef entry in between. If the update keeps the same ref (oldRef == newRef), the subsequent Put recreates the record but never restores the in-memory ref index, making GetByRef return not found even though the secret exists on disk.
Useful? React with 👍 / 👎.
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrWebhookNotFound | ||
| } |
There was a problem hiding this comment.
Serialize webhook last-used writes with webhook mutations
UpdateLastUsed performs an unlocked read/modify/write of the full webhook record, so webhook invocations can race with admin mutations (Update) and write back stale auth fields (token hash, auth mode, HMAC settings) from an older snapshot. Since ValidateWebhookToken/ValidateWebhookHMAC call this on successful requests, production traffic can silently undo recent webhook configuration changes.
Useful? React with 👍 / 👎.
| existingRec, err := s.col.Get(ctx, key.ID) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrAPIKeyNotFound | ||
| } |
There was a problem hiding this comment.
Restore API-key name index on update/delete races
Update reads existingRec before acquiring s.mu, so a concurrent Delete can remove the byName entry first; when the update then rewrites the same-name key, the existingStored.Name != key.Name block is skipped and the name index is never restored. After this race the key exists in storage but is absent from byName, which allows creating duplicate API-key names that should be rejected.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| s.mu.Lock() |
There was a problem hiding this comment.
Drain paginated API-key pages when rebuilding index
rebuildIndex calls List once with ListQuery{} and ignores NextCursor, but the persistence contract allows backend-default pagination when Limit is zero. On paginated backends this rebuilds byName from only the first page, so after restart some existing keys are missing from uniqueness checks and duplicate names can be created.
Useful? React with 👍 / 👎.
| existingRec, err := s.col.Get(ctx, webhook.ID) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrWebhookNotFound | ||
| } |
There was a problem hiding this comment.
Restore DAG-name index on webhook update/delete races
Update loads the existing webhook before taking s.mu, so a concurrent Delete can remove byDAGName and the record in between. If the update keeps the same DAG name, it writes the webhook back but does not reinsert byDAGName (because the rename branch is skipped), leaving GetByDAGName/uniqueness checks inconsistent with stored data.
Useful? React with 👍 / 👎.
…acts Define the storage seam for Dagu's control-plane persistence layer. Backend + Collection (8 methods total) is the sole interface any future database driver must implement; all domain stores become thin adapters over Collection, with opaque id+blob Records — no per-domain schema. Also adds Encode/Decode[T] codec helpers and ErrNotFound/ErrConflict sentinel errors. Includes refactor_persis_layer.html design document.
FileBackend (persis/file/) implements Backend+Collection on the local filesystem: each collection is a subdirectory, each record is a .json file, "/" in IDs maps to path separators. Atomic writes via temp+rename, CompareAndSwap via collection mutex, Claim via sorted-filename dequeue. MemoryBackend (persis/testutil/) is a thread-safe in-memory backend for unit tests — no filesystem required to test any store adapter. Shared contract test (collection_test.go) exercises all 6 Collection methods against both backends, confirming behavioural parity.
Replaces filewebhook's 472-line file-I/O implementation with a 200-line adapter that serializes auth.WebhookForStorage as a JSON blob into any persis.Collection. HMAC secret encryption stays in the adapter layer. DAG-name secondary index rebuilt from collection on startup. 16 tests via MemoryBackend — no filesystem dependencies.
…ection apikey.Store: flat CRUD with byName secondary index. 13 tests. user.Store: three secondary indices (byUsername, byOIDCIdentity, count), handles OIDC identity collision detection and cross-index atomicity. 17 tests. All tests use MemoryBackend — no filesystem dependencies.
Implements secret.Store using a persis.Collection backend. Keeps byRef in-memory index (workspace\x00ref → secretID) rebuilt on startup. Encrypted version history stays in the data blob; encryptor injected at construction.
watermark: single-record store (Load/Save) with version migration. session: implements agent.SessionStore with byUser/byParent/updatedAt indices, maxPerUser cleanup, and sub-session tracking.
Simple Upsert/Get/List/DeleteStale over a Collection keyed by workerID. No secondary indices needed; DeleteStale filters by LastHeartbeatTime.
…le* packages Replace fileuser, fileapikey, filewebhook, filesecret, filewatermark, and filedistributed.NewWorkerHeartbeatStore with the new persis.Collection-backed adapters across all production wiring and tests, then delete the five old packages. Key changes: - server.go: use file.NewCollection(UsersDir/APIKeysDir/WebhooksDir) so paths match the configured dirs rather than DataDir+name, preserving backward compat - file/backend.go: add NewCollection(dir) for lazy-creation collections (no eager MkdirAll), used where the parent dir may not exist yet - file/collection.go: detect legacy flat-JSON format in readFile and wrap it in the new envelope for transparent lazy migration - All callers (agent_stores.go, scheduler.go, process/server.go, engine/run.go, remote_handler.go, context.go, test/helper.go) updated to use new stores - Test files switched to testutil.NewMemoryBackend() for unit tests and file.New/watermark.New for integration tests
… package Move apikey, user, webhook, secret, watermark, session, workerheartbeat adapters from seven one-file packages into a single persis/store package, following Temporal's pattern of one package per backend rather than one package per store type. Renames: Store→XStore, New→NewXStore in each file. All callers updated to import persis/store and use the new names. No logic changes.
…or prefixes - Drop unused `collector *telemetry.Collector` param from `initBuiltinAuthService` - Update all call sites (server.go + 7 test call sites) - Prefix watermarkMigrateState errors with "watermark store:" for consistency
4094ea7 to
fe0341d
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fe0341d687
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| _ = s.col.Put(ctx, &persis.Record{ | ||
| ID: rec.ID, | ||
| Data: data, | ||
| Encoding: enc, | ||
| CreatedAt: rec.CreatedAt, | ||
| UpdatedAt: now, | ||
| }) |
There was a problem hiding this comment.
Serialize ResolveValue before persisting secret snapshot
ResolveValue performs an unlocked read/modify/write of the entire secret record, then writes that stale snapshot back with Put. If a rotation (WriteValue) or metadata update commits between this Get and Put, the later Put from ResolveValue can overwrite the newer record and drop the new version/current-version metadata. This is a regression from the previous file-backed store, which held a store mutex across these operations, and it can corrupt secret history under concurrent reads and writes.
Useful? React with 👍 / 👎.
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return sr.Secret.Clone(), nil |
There was a problem hiding this comment.
Validate decoded secret payload before dereferencing
If a stored secret record is valid JSON but missing the secret object (for example from manual edits or partially corrupted legacy data), GetByID dereferences sr.Secret unconditionally and panics instead of returning an error. The previous file-backed implementation explicitly rejected records with nil secret payloads, so this regression can turn recoverable bad data into a process crash path.
Useful? React with 👍 / 👎.
| rec, err := s.col.Get(ctx, id) | ||
| if err != nil { | ||
| if errors.Is(err, persis.ErrNotFound) { | ||
| return auth.ErrUserNotFound | ||
| } |
There was a problem hiding this comment.
Guard user count updates against concurrent deletes
This method reads the record before acquiring s.mu and later decrements s.count unconditionally, so two concurrent Delete calls for the same user can both pass the pre-lock Get; the second col.Delete is a no-op but still decrements the counter. That leaves CountUsers inaccurate (potentially negative) and reports success on a delete that should have been not found.
Useful? React with 👍 / 👎.
| return errors.New("session store: message cannot be nil") | ||
| } | ||
|
|
||
| rec, err := s.col.Get(ctx, sessionID) |
There was a problem hiding this comment.
Serialize session message writes with session deletion
AddMessage does an unlocked read/modify/write cycle (Get → append → Put) before touching in-memory indexes, so a concurrent DeleteSession can remove the session in between and then this Put recreates it from a stale snapshot. Because the delete path already removed byUser/byParent entries, the resurrected session can exist on disk but disappear from ListSessions/ListSubSessions, causing inconsistent session visibility under concurrent traffic.
Useful? React with 👍 / 👎.
Concurrency fixes (serialize full read-modify-write under store mutex): - apikey: Update, Delete, UpdateLastUsed - secret: Update, Delete, WriteValue; nil guard in loadByID - session: UpdateSession, AddMessage (lock moved before col.Get) - user: Update, Delete - webhook: Update, Delete, UpdateLastUsed - watermark: nil guard in Save - workerheartbeat: TOCTOU fix in DeleteStale (re-read before delete); reject unsafe worker IDs containing path-traversal characters Collection fixes: - file/collection.go: filePath now validates path stays within root (traversal prevention); Claim propagates real I/O errors instead of silently swallowing them - file/collection_test.go: remove shared-global freshCollection to fix parallel test race; pass factory as parameter to RunCollectionContract Memory backend: - testutil/memory.go: deep-copy ExpiresAt pointer in copyRecord Regression tests: - watermark_test: TestLoad_MigratesLegacyVersions (v0/1/2), UnknownVersion, CorruptData - workerheartbeat_test: TestDeleteStale_RefreshDuringCleanup - session_test: TestAddMessage_Concurrent (20 goroutines)
- codec.go: add explicit EncodingProto case in Decode switch (exhaustive lint) - file/collection.go: suppress G304 gosec on readFile (path validated by filePath root check); use _ = for cleanup-path Close/Remove calls (errcheck lint) - file/collection_test.go: rename unused t param to _ in memory freshCollection closure (revive lint)
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0857ca1c7b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } | ||
|
|
||
| func (s *SecretStore) appendVersion(sr *secretStoredRecord, input secret.WriteValueInput) error { | ||
| if sr.Secret.ProviderType != secret.ProviderDaguManaged { |
There was a problem hiding this comment.
Guard nil secret payload before dereferencing in version writes
If a stored record decodes successfully but has secret: null (for example from manual edits or partially corrupted legacy data), WriteValue reaches appendVersion and dereferences sr.Secret unconditionally, which panics the process instead of returning a recoverable error. The previous file-backed implementation rejected nil secret metadata at load time, so this introduces a crash path for malformed persisted records.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Drain all pages when rebuilding the secret ref index
rebuildIndex reads only one ListQuery{} page and ignores NextCursor, but the collection contract allows Limit=0 to use a backend default page size. On paginated backends this leaves byRef incomplete after startup, so GetByRef can return not found for existing secrets and duplicate ref conflicts can be missed in create/update flows.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Iterate all pages when rebuilding session indexes
rebuildIndex consumes only the first List page and ignores NextCursor, while the persistence contract permits backend-default pagination when Limit is zero. With a paginated backend, sessions beyond page one are never inserted into byUser/byParent/updatedAt, so ListSessions and ListSubSessions can silently miss existing sessions after restart.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Drain webhook pages while rebuilding DAG-name index
Index rebuild reads a single ListQuery{} page and does not follow NextCursor. On backends that paginate by default, byDAGName is only partially reconstructed, which makes existing webhooks undiscoverable by DAG name and can allow duplicate DAG-name creations that should be rejected.
Useful? React with 👍 / 👎.
|
|
||
| // List returns all heartbeat records. | ||
| func (s *WorkerHeartbeatStore) List(ctx context.Context) ([]exec.WorkerHeartbeatRecord, error) { | ||
| page, err := s.col.List(ctx, persis.ListQuery{}) |
There was a problem hiding this comment.
List and purge all heartbeat pages before stale cleanup
List fetches just one collection page and ignores pagination cursors, and DeleteStale relies on that method. On paginated backends, stale-heartbeat cleanup processes only the first page, leaving older stale worker records undeleted and making List callers observe an incomplete worker set.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Paginate through all API keys in List
APIKeyStore.List is documented to return all keys, but it performs a single ListQuery{} call and ignores NextCursor. On a backend with default pagination, callers receive only the first page, which can hide existing keys from administrative views and tooling that relies on full key enumeration.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Paginate through all users in List
UserStore.List promises all users but reads only one page from the collection and never follows NextCursor. With a paginated backend, user listings become incomplete after the first page, which can break user-management workflows that depend on full traversal.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Paginate through all webhooks in List
WebhookStore.List performs a one-shot ListQuery{} and ignores pagination cursors even though the interface contract is to return all webhooks. On paginated backends this silently truncates results to page one, so existing webhooks can disappear from list-based management operations.
Useful? React with 👍 / 👎.
| page, err := s.col.List(ctx, persis.ListQuery{}) | ||
| if err != nil { |
There was a problem hiding this comment.
Paginate through all secrets in List
SecretStore.List currently reads a single page and does not follow NextCursor, despite being defined as returning all secrets (optionally workspace-filtered). On paginated backends this yields incomplete secret inventories, which can hide valid entries from operators and API consumers.
Useful? React with 👍 / 👎.
| if err := persis.Decode(rec, &sr); err != nil { | ||
| return "", nil, fmt.Errorf("secret store: decode for ResolveValue: %w", err) | ||
| } | ||
| if sr.Secret.Status == secret.StatusDisabled { |
There was a problem hiding this comment.
Check for missing secret payload before status read
ResolveValue decodes the stored record and immediately accesses sr.Secret.Status without validating that sr.Secret is non-nil. If a record is syntactically valid JSON but missing the nested secret object, this path panics instead of returning an error, turning recoverable data corruption into a process crash.
Useful? React with 👍 / 👎.
Pagination: - Add listAll() helper that drains all pages via NextCursor - Replace single List() call with listAll() in rebuildIndex and List methods across all stores (apikey, webhook, secret, session, user, workerheartbeat) secret.ResolveValue: - Hold store mutex over full read-modify-write to prevent overwriting concurrent WriteValue/Update commits - Add nil guard for sr.Secret before dereferencing WorkerHeartbeatStore backward compatibility: - Add SHA-256 fallback in Get() for legacy records keyed by encodeKey(workerID) - Fix DeleteStale to iterate col.List directly using actual record IDs (rec.ID) instead of r.WorkerID, fixing stale-delete for legacy hashed-filename records SessionStore legacy key resolution: - Add collectionID map (logical session ID → actual collection key) - rebuildIndex populates mapping when rec.ID != ss.ID (legacy userID/sessionID) - loadSession, deleteLockedCtx, UpdateSession, AddMessage all resolve the collection key before col.Get/Put/Delete so pre-upgrade sessions are readable and writable without requiring data migration
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0f6ca5a1b3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| return s.col.Put(ctx, &persis.Record{ | ||
| ID: record.WorkerID, | ||
| Data: data, | ||
| Encoding: enc, | ||
| CreatedAt: now, | ||
| UpdatedAt: now, | ||
| }) |
There was a problem hiding this comment.
Delete legacy heartbeat key after successful upsert
During migration, Upsert writes the new record under record.WorkerID but leaves any legacy SHA-256-keyed record in place, so one worker can exist twice until stale cleanup runs. Because both GetWorkers (internal/service/coordinator/handler.go) and telemetry collection (internal/cmn/telemetry/collector.go) consume List() without deduping by worker ID, this can temporarily double-count workers and emit duplicate per-worker metrics series right after upgrade.
Useful? React with 👍 / 👎.
| // Try new-format key (plain workerID) first. | ||
| r, err := s.getByCollectionID(ctx, workerID) | ||
| if err == nil { | ||
| return r, nil |
There was a problem hiding this comment.
Read freshest heartbeat when legacy and new keys coexist
The read path returns immediately on the plain workerID key and only consults the legacy SHA-256 key when the new key is missing. During rolling upgrades where old and new coordinators may write different key formats for the same worker, this can return a stale plain-key heartbeat even when a fresher legacy heartbeat exists, and RepairStaleDistributedRun uses this Get result to decide whether to fail active attempts (internal/runtime/distributed_stale_run.go).
Useful? React with 👍 / 👎.
| _ = s.col.Put(ctx, &persis.Record{ | ||
| ID: rec.ID, | ||
| Data: data, | ||
| Encoding: enc, | ||
| CreatedAt: rec.CreatedAt, |
There was a problem hiding this comment.
Return errors when ResolveValue metadata write fails
ResolveValue now ignores the result of col.Put after updating LastResolvedAt/UpdatedAt, so storage write failures are silently dropped while the call still returns plaintext and success. This regresses behavior from the previous file-backed store (which returned write errors) and can leave secret usage metadata/audit fields permanently stale under transient or persistent backend failures.
Useful? React with 👍 / 👎.
…rror - WorkerHeartbeatStore.Get: when both plain-workerID and legacy SHA-256 keys coexist during migration, compare LastHeartbeatTime and return the fresher record instead of blindly preferring the new-format key - SecretStore.ResolveValue: propagate the col.Put error instead of silently discarding it with `_ = s.col.Put(...)`
No logic changes — pure gofmt whitespace/import formatting.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 145ca2238c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if err := s.col.Put(ctx, &persis.Record{ | ||
| ID: sess.ID, |
There was a problem hiding this comment.
Reject duplicate session IDs before creating records
CreateSession writes sess.ID with col.Put unconditionally, so a duplicate ID silently overwrites the existing stored session instead of failing create-as-new. Because the method then only appends new index entries (byUser/updatedAt/byParent) and never removes the old owner’s mappings, a duplicate create can leave indexes inconsistent and return corrupted cross-user session lists; the previous file-backed store explicitly rejected existing IDs.
Useful? React with 👍 / 👎.
| return &fileRecord{ | ||
| ID: r.ID, | ||
| Encoding: r.Encoding, | ||
| Data: json.RawMessage(r.Data), |
There was a problem hiding this comment.
Preserve opaque record bytes in file envelope
The file backend stores Record.Data as json.RawMessage, which requires the bytes to already be valid JSON. For non-JSON payloads (e.g., any future EncodingProto/binary record), json.Marshal(fileRecord) fails with an invalid-character error, so Collection.Put cannot persist the record despite the backend interface describing Data as an opaque blob.
Useful? React with 👍 / 👎.
| oldRef := secretRefKey(existing.Secret.Workspace, existing.Secret.Ref) | ||
| newRef := secretRefKey(updated.Workspace, updated.Ref) |
There was a problem hiding this comment.
Validate stored secret payload before computing old ref
Update decodes the stored record and immediately dereferences existing.Secret to build oldRef. If persisted data is syntactically valid JSON but has "secret": null (manual edits, partial corruption, or bad legacy data), this path panics instead of returning an error, turning a recoverable record problem into a process crash.
Useful? React with 👍 / 👎.
| if _, exists := s.byName[key.Name]; exists { | ||
| return auth.ErrAPIKeyAlreadyExists | ||
| } | ||
| if err := s.col.Put(ctx, &persis.Record{ | ||
| ID: key.ID, |
There was a problem hiding this comment.
Reject duplicate API-key IDs before write
Create checks only byName and then writes key.ID with Put, so reusing an existing ID silently overwrites the prior key record. Because the old name entry is never removed in this path, the in-memory byName index can keep stale mappings and diverge from persisted data, breaking uniqueness checks and name-based lookups.
Useful? React with 👍 / 👎.
| rk := secretRefKey(stored.Workspace, stored.Ref) | ||
| if _, exists := s.byRef[rk]; exists { | ||
| return secret.ErrAlreadyExists |
There was a problem hiding this comment.
Enforce secret ID uniqueness during create
Create validates workspace/ref uniqueness but never checks whether sec.ID already exists before writing. Reusing an existing secret ID overwrites the stored record and leaves the previous ref key in byRef, so GetByRef/conflict checks can return inconsistent results for old and new refs.
Useful? React with 👍 / 👎.
- session.go: reject duplicate session IDs in CreateSession; add both in-memory index check and collection.Get check before Put - apikey.go: reject duplicate API-key IDs in Create (checked byName already; now also check by ID via col.Get) - secret.go Create: reject duplicate secret IDs (col.Get check before Put) - secret.go Update: nil-guard existing.Secret before dereferencing for oldRef computation; return descriptive error on corrupt record - file/collection.go: handle non-JSON (opaque/proto) record data; base64- encode non-JSON payloads as a JSON string so the file envelope always marshals cleanly; fix CompareAndSwap to compare decoded bytes
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 59fbb5a574
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if _, exists := s.byUsername[user.Username]; exists { | ||
| return auth.ErrUserAlreadyExists | ||
| } | ||
| if user.OIDCIssuer != "" && user.OIDCSubject != "" { |
There was a problem hiding this comment.
Reject duplicate user IDs in create path
Create no longer verifies whether user.ID already exists before Put, so a caller reusing an ID can overwrite an existing user record instead of getting ErrUserAlreadyExists. In that case, the old username/OIDC index entries are not removed and count is incremented, leaving lookup/index state inconsistent with persisted data. The previous fileuser implementation explicitly rejected duplicate IDs, so this is a behavioral regression.
Useful? React with 👍 / 👎.
| if _, exists := s.byDAGName[webhook.DAGName]; exists { | ||
| return auth.ErrWebhookAlreadyExists | ||
| } | ||
| if err := s.col.Put(ctx, &persis.Record{ | ||
| ID: webhook.ID, |
There was a problem hiding this comment.
Prevent webhook ID overwrite during create
Create checks only DAG-name uniqueness and then writes by webhook.ID without confirming the ID is unused. If an existing ID is reused with a different DAG name, the stored record is overwritten while the old DAG-name index entry is left behind, which can corrupt DAG-name lookups and uniqueness checks. The prior filewebhook store guarded against duplicate IDs, so this is a regression in create semantics.
Useful? React with 👍 / 👎.
| return nil | ||
| } |
There was a problem hiding this comment.
Preserve distinct record IDs when deriving file paths
idToRelPath uses filepath.Join(strings.Split(id, "/")...), which normalizes path segments (.., duplicate slashes, leading slash) before appending .json. As a result, distinct logical IDs like "b" and "a/../b" can resolve to the same on-disk file and overwrite each other, breaking key uniqueness guarantees for any collection that accepts such IDs. This is a data-corruption risk introduced by the new shared file backend path mapping.
Useful? React with 👍 / 👎.
| if strings.ContainsAny(record.WorkerID, `/\`) || strings.Contains(record.WorkerID, "..") { | ||
| return fmt.Errorf("worker heartbeat store: workerID %q contains unsafe characters", record.WorkerID) |
There was a problem hiding this comment.
Keep worker heartbeat IDs backward-compatible
This new guard rejects WorkerID values containing /, \, or .., but worker.id is user-configurable and the previous filedistributed store accepted arbitrary IDs by hashing them for file keys. After upgrading, deployments using IDs like region/a will fail heartbeat persistence (Heartbeat returns internal error), causing workers to appear unhealthy/offline even though they were valid before.
Useful? React with 👍 / 👎.
6a78639 to
db5ff31
Compare
store.WatermarkStore writes state.json via persis.Collection.Put, which
wraps the payload in a file envelope: {id, encoding, data: {actual state}}.
waitForSchedulerDAGRegistered was reading the flat legacy format and
checking state.dags directly, which is always undefined in the envelope
format. Now falls back to parsed.data when parsed.dags is absent.
Summary
persis.Backend/persis.Collection/persis.Recordinterface contracts so persistence can be swapped without touching call sitesFileBackend(one JSON file per record, atomic writes) andMemoryBackend(in-memory, for unit tests)file.NewCollection(dir)for lazy-creation collections (no eagerMkdirAll), safe where the parent path may not exist yetfile/collection.go: old flat-JSON files transparently wrapped in new envelope, enabling lazy on-write migration with no bulk migration neededpersis.Collectionseam:user,apikey,webhook,secret,watermark,session,workerheartbeatserver.go,agent_stores.go,scheduler.go,process/server.go,engine/run.go,remote_handler.go,context.go,test/helper.go)fileuser,fileapikey,filewebhook,filesecret,filewatermarkpackages (net −4677 lines)testutil.MemoryBackendfor unit tests; integration tests use realfile.NewTest plan
go build ./...cleango test ./internal/service/auth/... ./internal/service/frontend/... ./internal/persis/... ./internal/engine/... ./internal/cmd/... ./internal/runtime/agent/... ./internal/intg/queue/...all greenreadFile; rewritten in new envelope format on next writeSummary by CodeRabbit
Tests
Chores