Skip to content

refactor(persis): pluggable Backend/Collection interface with lazy migration#2194

Merged
yohamta0 merged 18 commits into
mainfrom
refactor/persis-backend-interface
May 23, 2026
Merged

refactor(persis): pluggable Backend/Collection interface with lazy migration#2194
yohamta0 merged 18 commits into
mainfrom
refactor/persis-backend-interface

Conversation

@yohamta0

@yohamta0 yohamta0 commented May 23, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Introduces persis.Backend / persis.Collection / persis.Record interface contracts so persistence can be swapped without touching call sites
  • Implements FileBackend (one JSON file per record, atomic writes) and MemoryBackend (in-memory, for unit tests)
  • Adds file.NewCollection(dir) for lazy-creation collections (no eager MkdirAll), safe where the parent path may not exist yet
  • Adds legacy-format detection in file/collection.go: old flat-JSON files transparently wrapped in new envelope, enabling lazy on-write migration with no bulk migration needed
  • Ports seven store types to the new persis.Collection seam: user, apikey, webhook, secret, watermark, session, workerheartbeat
  • Wires new adapters into all production callers (server.go, agent_stores.go, scheduler.go, process/server.go, engine/run.go, remote_handler.go, context.go, test/helper.go)
  • Deletes fileuser, fileapikey, filewebhook, filesecret, filewatermark packages (net −4677 lines)
  • Test files switched to testutil.MemoryBackend for unit tests; integration tests use real file.New

Test plan

  • go build ./... clean
  • go test ./internal/service/auth/... ./internal/service/frontend/... ./internal/persis/... ./internal/engine/... ./internal/cmd/... ./internal/runtime/agent/... ./internal/intg/queue/... all green
  • Existing data files read transparently via legacy-format detection in readFile; rewritten in new envelope format on next write

Summary by CodeRabbit

  • Tests

    • Added comprehensive test coverage for persistence stores and backend collection implementations.
  • Chores

    • Updated internal persistence layer architecture and store initialization across services to use a unified backend abstraction.
    • Restructured persistent data storage implementation for API keys, secrets, users, webhooks, and worker heartbeats.

Review Change Stack

@coderabbitai

coderabbitai Bot commented May 23, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 847c7609-eb78-45c7-a30a-fa84e2922c5b

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/persis-backend-interface

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

🧹 Nitpick comments (3)
internal/persis/workerheartbeat/store_test.go (1)

79-104: ⚡ Quick win

Add a refresh-during-cleanup regression test.

Please add a test where a worker is listed as stale, then heartbeat is refreshed before deletion, and assert DeleteStale does not remove that worker.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/workerheartbeat/store_test.go` around lines 79 - 104, Add a
regression test (e.g., TestDeleteStale_RefreshDuringCleanup) that creates a
stale record using exec.WorkerHeartbeatRecord and s.Upsert, then simulates a
concurrent refresh by calling s.Upsert again for that same WorkerID with an
updated LastHeartbeatAt (now) before invoking s.DeleteStale with the cutoff;
assert DeleteStale returns 0 deletions for that worker and that s.Get still
finds the refreshed record, using the existing DeleteStale, Upsert, and Get
methods to locate behavior.
internal/persis/session/store_test.go (1)

115-167: ⚡ Quick win

Add a concurrent message-write regression test.

Current tests cover single-writer behavior only. Add a concurrent AddMessage test (multiple goroutines writing to the same session) and assert no messages are lost.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/session/store_test.go` around lines 115 - 167, Add a new
concurrent test (e.g., TestAddMessage_Concurrent) that uses newStore and
newSession to create a session, then launches many goroutines that call
s.AddMessage(ctx, "s1", &agent.Message{SequenceID: i, Type:
agent.MessageTypeUser, Content: fmt.Sprintf("m%d", i)}) concurrently (use
sync.WaitGroup), collect/require no errors from each goroutine, wait for
completion, then call s.GetMessages(ctx, "s1") and assert the returned slice
length equals the number of goroutines and that all expected contents or
sequence IDs are present; also call s.GetLatestSequenceID(ctx, "s1") and assert
it equals the max SequenceID to ensure no messages were lost.
internal/persis/watermark/store_test.go (1)

25-85: ⚡ Quick win

Add tests for migration and corrupt-state recovery paths.

Load has important branches for legacy versions (0/1/2), unknown versions, and decode failure fallback. Covering these explicitly would protect the migration behavior from regressions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/watermark/store_test.go` around lines 25 - 85, The tests only
cover basic Save/Load and overwrite paths; add unit tests that exercise Load's
migration and corrupt-state recovery branches by writing legacy and invalid
payloads into the same backend store and asserting Load's behavior.
Specifically, add tests (e.g., TestLoad_MigrateLegacyVersions,
TestLoad_UnknownVersionRecovery, TestLoad_DecodeFailureFallback) that use
newStore(t) or the store backend to seed stored bytes representing versions
0/1/2, an unknown version value, and a deliberately corrupted/invalid encoded
blob, then call s.Load(ctx) and assert the expected migration result or recovery
behavior (version normalization to scheduler.SchedulerStateVersion, DAG
contents/LastTick for migrated cases, and the fallback/reset behavior for
unknown or decode failures) to lock in the existing Load, Save, and migration
logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/persis/apikey/store.go`:
- Around line 142-179: The read of existingRec/existingStored happens before
s.mu is acquired, leaving the byName index vulnerable to races; move the lock to
cover the entire read-modify-write path (i.e., acquire s.mu at the start of the
Update flow before calling s.col.Get and persis.Decode, perform the name
conflict check and call s.col.Put while holding the lock, then update s.byName
and release the lock) so that s.col.Get, the conflict check (the taken/id
comparison), the Put, and the subsequent delete/assignment to s.byName are all
serialized; alternatively, if you prefer a narrower change, keep the initial
read but re-check existingStored.Name and the byName conflict under s.mu after
locking before calling s.col.Put and before mutating s.byName to ensure no race
can produce stale byName entries.
- Around line 212-239: UpdateLastUsed performs an unsynchronized
read-modify-write against s.col (uses s.col.Get and s.col.Put) and can race with
Delete/Update; acquire the store mutex (s.mu) to serialize this path: after the
id == "" check, call s.mu.Lock() and defer s.mu.Unlock() so the Get -> Decode ->
encode -> Put sequence is atomic, ensuring you hold the lock while reading the
record and writing it back (Release only after s.col.Put completes) to prevent
clobbering or resurrecting deleted records.

In `@internal/persis/file/collection_test.go`:
- Around line 264-292: freshCollection is a package-global that gets reassigned
in TestFileCollection and TestMemoryCollection (both t.Parallel()), causing
races and flaky mixing; remove the shared global by making the factory local and
pass it into RunCollectionContract (or change RunCollectionContract to accept a
func(t *testing.T) persis.Collection), e.g. define a local freshCollection
closure inside TestFileCollection and inside TestMemoryCollection and call
RunCollectionContract(t, b.Collection("test"), freshCollection) (or update
RunCollectionContract signature to accept the factory) and update all callers to
use the passed-in factory instead of the package-level freshCollection.

In `@internal/persis/file/collection.go`:
- Around line 191-193: Sanitize and enforce a root-boundary check when mapping
IDs to disk paths: have filePath use idToRelPath(id) but then run filepath.Clean
and ensure the resulting joined path is not absolute and is contained inside
c.dir by using filepath.Rel(c.dir, joined) and rejecting if rel starts with ".."
or equals ".." (or returns an error); apply the same pattern to the other places
that join IDs/prefixes (the code around the blocks referenced at 273-287,
291-294) so no ".." or absolute paths can escape c.dir; replace any raw string
prefix checks (the check around line ~371) with a secure containment check using
filepath.Rel or by comparing path elements with filepath.Separator (e.g.,
compute rel, confirm it doesn't start with ".." and then use that for
prefix/sibling directory decisions).
- Around line 175-186: In Claim, don't swallow filesystem errors: when iterating
candidates, change the read/remove error handling so that if c.readFile(path) or
os.Remove(path) returns an error you check for os.IsNotExist and only continue
for not-found; for any other error return that error immediately instead of
continuing; after successful read and remove call
removeEmptyDirs(filepath.Dir(path), c.dir) and return fr.toRecord(), nil; only
return persis.ErrNotFound if every candidate either didn't exist (IsNotExist)
and none produced a record.

In `@internal/persis/secret/store.go`:
- Around line 209-250: The code computes oldRef/newRef before acquiring s.mu,
which allows a race where the stored record changes between Get and Lock and
leaves s.byRef inconsistent; to fix, move the critical ref computation and
existence-checks inside the mutex (i.e., acquire s.mu.Lock() immediately after
reading and decoding existingRec), then recompute
oldRef/refKey(existing.Secret.Workspace, existing.Secret.Ref) and
newRef/refKey(updated.Workspace, updated.Ref) while holding the lock, perform
the taken-check against s.byRef and apply delete/insert to s.byRef and update
existing.Secret only under the same lock so byRef remains synchronized with the
stored record for functions/methods manipulating s.byRef (e.g., Update/Delete
using sec.ID, existing, existingRec, s.col).
- Around line 285-313: WriteValue currently does a read-modify-write (s.col.Get
-> appendVersion -> s.col.Put) which causes a lost-update race under
concurrency; change it to an optimistic-transaction/retry pattern: read the
record (s.col.Get), compute the new storedRecord via appendVersion, then attempt
an atomic conditional update (compare-and-swap) against the original metadata
(e.g., UpdatedAt or record ID/ETag) instead of unconditional s.col.Put; if the
conditional Put/CompareAndSwap fails due to a concurrent update, re-read the
record and retry the appendVersion/conditional update loop a few times before
returning a conflict error. Ensure you reference the same storedRecord, rec
(Original persis.Record metadata like UpdatedAt/CreatedAt/ID), appendVersion and
s.col methods so the fix replaces the unconditional Put with an atomic
conditional update and retry logic.
- Around line 371-384: The loadByID function can return a storedRecord with a
nil Secret, leading to panics in callers like GetByID/GetCurrentVersion; update
loadByID (the function) to validate that the decoded storedRecord (sr) has a
non-nil Secret (sr.Secret) and return a clear not-found error (e.g.,
secret.ErrNotFound or fmt.Errorf("secret store: missing secret payload %q", id))
when it is nil instead of returning &sr, nil so callers that dereference
sr.Secret are protected.

In `@internal/persis/session/store.go`:
- Around line 188-227: UpdateSession currently does an unsynchronized
Get→mutate→Put causing lost updates under concurrency; wrap the entire
read-modify-write sequence in the store-level lock (or a dedicated per-session
lock) so the Get, mutation of storedSession, persis.Encode/Put and subsequent
in-memory updates (s.updatedAt and s.sortUserSessions) are atomic. Apply the
same locking pattern to AddMessage (and the other similar read-modify-write path
mentioned) so both methods acquire the lock before Get and release it only after
Put and in-memory state updates to prevent clobbered messages or stale metadata.

In `@internal/persis/testutil/memory.go`:
- Around line 168-174: copyRecord currently shallow-copies persis.Record but
leaves the ExpiresAt pointer aliased, allowing external mutations to leak;
modify copyRecord to deep-copy ExpiresAt as well: if r.ExpiresAt != nil allocate
a new value, copy the pointed value into it and assign it to cp.ExpiresAt
(analogous to how Data is duplicated) so the returned *persis.Record has its own
independent ExpiresAt.

In `@internal/persis/user/store.go`:
- Around line 189-247: The code fetches and decodes the existing record
(s.col.Get + persis.Decode and computation of oldOIDCKey) before acquiring
s.mu.Lock in the Update (and similarly in Delete), which can yield stale index
keys during concurrent writes; move the Get + persis.Decode + old key
computation (the calls to s.col.Get, persis.Decode and oidcKey for old values)
so they occur while holding s.mu.Lock (i.e., acquire s.mu.Lock before
fetching/decoding and computing oldOIDCKey) and keep the existing index-update
logic that modifies byUsername and byOIDCIdentity under the same lock; apply the
same change to the Delete code path (lines referenced in the review) to ensure
atomicity of read-modify-write on the indices.

In `@internal/persis/watermark/store.go`:
- Around line 72-76: In Store.Save, add a nil-check at the top to reject a nil
state and return a clear error instead of encoding it; specifically, in the
Store.Save(ctx context.Context, state *scheduler.SchedulerState) method validate
state != nil and return an error like "watermark store: nil state" (or similar)
before calling persis.Encode so you never persist a nil/empty payload that later
appears as corrupted in Load.

In `@internal/persis/webhook/store.go`:
- Around line 165-207: The existing code fetches and decodes the existing record
(calls to s.col.Get and persis.Decode producing existingStored) outside the
s.mu.Lock() that protects the byDAGName map, which can produce stale-byDAGName
races; move the Get + Decode (the existingRec and existingStored population) to
occur while holding s.mu.Lock() (i.e., call s.mu.Lock() before s.col.Get and
persis.Decode and keep the lock through the subsequent checks/Put and byDAGName
mutation), and apply the same change to the other similar block around the code
referenced at 217-236 so all reads/validation of existingStored and updates to
s.byDAGName happen under the same write lock.
- Around line 255-282: UpdateLastUsed performs an unlocked read-modify-write and
can be overwritten by concurrent Delete/Update calls; protect the whole
operation by serializing with the store's mutex (e.g., lock s.mu or the store's
existing sync.RWMutex) so that Get/Decode/modify/Encode/Put are executed under
the lock; acquire the write lock at the start of UpdateLastUsed and release it
after the Put (or use defer Unlock) to prevent stale overwrites or recreation
during concurrent mutations.

In `@internal/persis/workerheartbeat/store.go`:
- Around line 93-107: DeleteStale currently lists all records and then deletes
by WorkerID, opening a TOCTOU window between s.List and s.col.Delete where a
worker could refresh and be wrongly removed; change the logic to perform a
conditional/atomic delete based on the stored heartbeat timestamp or version
instead of a blind Delete. Update Store.DeleteStale to call a new or existing
conditional method on s.col (e.g.,
DeleteIfOlder/DeleteIfMatchTimestamp/DeleteIfVersion) passing r.WorkerID and
r.LastHeartbeatTime() (or version) so the backend only deletes when the stored
last-heartbeat is still <= before; if your storage backend lacks such an
operation, implement the check-and-delete inside a single transaction or loop
using Get+Delete in a single transactional API rather than separate List and
Delete calls. Ensure error handling still treats persis.ErrNotFound specially
and increments removed only when the conditional delete actually succeeded.

In `@refactor_persis_layer.html`:
- Around line 175-182: In the Mermaid HTML text nodes (e.g., the block
containing "RT --> FDR", "SC --> FQ & FP", "AG --> FSS & FA & FEV", etc.)
replace each raw arrow operator '-->' with escaped form '--&gt;' (and any other
'>' in edge operators) so the '>' is HTML-escaped; apply the same replacement to
the other Mermaid block noted (the one around the 932-937 region) so the
spec-char-escape lint rule passes.
- Around line 290-291: The rendered inline comment text for the fields Since and
Until uses raw comparison operators (">=" and "<") which trigger
spec-char-escape; update the comment strings for Since (*time.Time // CreatedAt
>= Since) and Until (*time.Time // CreatedAt < Until) to use HTML entities (e.g.
&gt;= and &lt;) so the operators are escaped in the HTML-rendered code snippet
and no parser warnings occur.

---

Nitpick comments:
In `@internal/persis/session/store_test.go`:
- Around line 115-167: Add a new concurrent test (e.g.,
TestAddMessage_Concurrent) that uses newStore and newSession to create a
session, then launches many goroutines that call s.AddMessage(ctx, "s1",
&agent.Message{SequenceID: i, Type: agent.MessageTypeUser, Content:
fmt.Sprintf("m%d", i)}) concurrently (use sync.WaitGroup), collect/require no
errors from each goroutine, wait for completion, then call s.GetMessages(ctx,
"s1") and assert the returned slice length equals the number of goroutines and
that all expected contents or sequence IDs are present; also call
s.GetLatestSequenceID(ctx, "s1") and assert it equals the max SequenceID to
ensure no messages were lost.

In `@internal/persis/watermark/store_test.go`:
- Around line 25-85: The tests only cover basic Save/Load and overwrite paths;
add unit tests that exercise Load's migration and corrupt-state recovery
branches by writing legacy and invalid payloads into the same backend store and
asserting Load's behavior. Specifically, add tests (e.g.,
TestLoad_MigrateLegacyVersions, TestLoad_UnknownVersionRecovery,
TestLoad_DecodeFailureFallback) that use newStore(t) or the store backend to
seed stored bytes representing versions 0/1/2, an unknown version value, and a
deliberately corrupted/invalid encoded blob, then call s.Load(ctx) and assert
the expected migration result or recovery behavior (version normalization to
scheduler.SchedulerStateVersion, DAG contents/LastTick for migrated cases, and
the fallback/reset behavior for unknown or decode failures) to lock in the
existing Load, Save, and migration logic.

In `@internal/persis/workerheartbeat/store_test.go`:
- Around line 79-104: Add a regression test (e.g.,
TestDeleteStale_RefreshDuringCleanup) that creates a stale record using
exec.WorkerHeartbeatRecord and s.Upsert, then simulates a concurrent refresh by
calling s.Upsert again for that same WorkerID with an updated LastHeartbeatAt
(now) before invoking s.DeleteStale with the cutoff; assert DeleteStale returns
0 deletions for that worker and that s.Get still finds the refreshed record,
using the existing DeleteStale, Upsert, and Get methods to locate behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: feb12d73-5ad5-4863-b9b6-03e7f1d171b3

📥 Commits

Reviewing files that changed from the base of the PR and between b8ad9ae and ee0528c.

📒 Files selected for processing (48)
  • internal/cmd/context.go
  • internal/cmd/process/agent_stores.go
  • internal/cmd/process/scheduler.go
  • internal/cmd/process/server.go
  • internal/engine/run.go
  • internal/intg/distr/fixtures_test.go
  • internal/intg/one_off_schedule_test.go
  • internal/intg/queue/fixture_test.go
  • internal/persis/apikey/store.go
  • internal/persis/apikey/store_test.go
  • internal/persis/backend.go
  • internal/persis/codec.go
  • internal/persis/errors.go
  • internal/persis/file/backend.go
  • internal/persis/file/collection.go
  • internal/persis/file/collection_test.go
  • internal/persis/fileapikey/store.go
  • internal/persis/fileapikey/store_test.go
  • internal/persis/filesecret/store.go
  • internal/persis/filesecret/store_test.go
  • internal/persis/fileuser/store.go
  • internal/persis/fileuser/store_test.go
  • internal/persis/filewatermark/store.go
  • internal/persis/filewatermark/store_test.go
  • internal/persis/filewebhook/store.go
  • internal/persis/filewebhook/store_test.go
  • internal/persis/secret/store.go
  • internal/persis/secret/store_test.go
  • internal/persis/session/store.go
  • internal/persis/session/store_test.go
  • internal/persis/testutil/memory.go
  • internal/persis/user/store.go
  • internal/persis/user/store_test.go
  • internal/persis/watermark/store.go
  • internal/persis/watermark/store_test.go
  • internal/persis/webhook/store.go
  • internal/persis/webhook/store_test.go
  • internal/persis/workerheartbeat/store.go
  • internal/persis/workerheartbeat/store_test.go
  • internal/runtime/agent/agent_test.go
  • internal/service/auth/service_test.go
  • internal/service/auth/webhook_test.go
  • internal/service/frontend/api/v1/secrets_test.go
  • internal/service/frontend/server.go
  • internal/service/frontend/server_test.go
  • internal/service/worker/remote_handler.go
  • internal/test/helper.go
  • refactor_persis_layer.html
💤 Files with no reviewable changes (10)
  • internal/persis/filewebhook/store.go
  • internal/persis/fileuser/store_test.go
  • internal/persis/filesecret/store_test.go
  • internal/persis/filewebhook/store_test.go
  • internal/persis/filewatermark/store_test.go
  • internal/persis/fileapikey/store.go
  • internal/persis/filesecret/store.go
  • internal/persis/fileuser/store.go
  • internal/persis/filewatermark/store.go
  • internal/persis/fileapikey/store_test.go

Comment thread internal/persis/store/apikey.go
Comment thread internal/persis/apikey/store.go Outdated
Comment on lines +212 to +239
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidAPIKeyID
}
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}
return err
}
var stored auth.APIKeyForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

UpdateLastUsed is an unsynchronized read-modify-write and can clobber/delete-resurrect state.

This path bypasses mu, so it can race with Delete/Update and write back stale payloads after another writer has already changed or removed the record.

🔧 Suggested fix (serialize with store mutex)
 func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
 	if id == "" {
 		return auth.ErrInvalidAPIKeyID
 	}
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
 	rec, err := s.col.Get(ctx, id)
 	if err != nil {
 		if errors.Is(err, persis.ErrNotFound) {
 			return auth.ErrAPIKeyNotFound
 		}
 		return err
 	}
@@
-	return s.col.Put(ctx, &persis.Record{
+	return s.col.Put(ctx, &persis.Record{
 		ID:        rec.ID,
 		Data:      data,
 		Encoding:  enc,
 		CreatedAt: rec.CreatedAt,
 		UpdatedAt: now,
 	})
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidAPIKeyID
}
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}
return err
}
var stored auth.APIKeyForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidAPIKeyID
}
s.mu.Lock()
defer s.mu.Unlock()
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}
return err
}
var stored auth.APIKeyForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("apikey store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/apikey/store.go` around lines 212 - 239, UpdateLastUsed
performs an unsynchronized read-modify-write against s.col (uses s.col.Get and
s.col.Put) and can race with Delete/Update; acquire the store mutex (s.mu) to
serialize this path: after the id == "" check, call s.mu.Lock() and defer
s.mu.Unlock() so the Get -> Decode -> encode -> Put sequence is atomic, ensuring
you hold the lock while reading the record and writing it back (Release only
after s.col.Put completes) to prevent clobbering or resurrecting deleted
records.

Comment thread internal/persis/file/collection_test.go Outdated
Comment thread internal/persis/file/collection.go
Comment thread internal/persis/file/collection.go Outdated
Comment thread internal/persis/store/webhook.go
Comment thread internal/persis/webhook/store.go Outdated
Comment on lines +255 to +282
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidWebhookID
}
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}
return err
}
var stored auth.WebhookForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Serialize UpdateLastUsed with other writers to prevent stale overwrite/recreation.

This method does unlocked read-modify-write and can race with Delete/Update, writing stale state back after a concurrent mutation.

🔧 Suggested fix (serialize with store mutex)
 func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
 	if id == "" {
 		return auth.ErrInvalidWebhookID
 	}
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
 	rec, err := s.col.Get(ctx, id)
 	if err != nil {
 		if errors.Is(err, persis.ErrNotFound) {
 			return auth.ErrWebhookNotFound
 		}
 		return err
 	}
@@
 	return s.col.Put(ctx, &persis.Record{
 		ID:        rec.ID,
 		Data:      data,
 		Encoding:  enc,
 		CreatedAt: rec.CreatedAt,
 		UpdatedAt: now,
 	})
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidWebhookID
}
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}
return err
}
var stored auth.WebhookForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
func (s *Store) UpdateLastUsed(ctx context.Context, id string) error {
if id == "" {
return auth.ErrInvalidWebhookID
}
s.mu.Lock()
defer s.mu.Unlock()
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}
return err
}
var stored auth.WebhookForStorage
if err := persis.Decode(rec, &stored); err != nil {
return fmt.Errorf("webhook store: decode for UpdateLastUsed: %w", err)
}
now := time.Now().UTC()
stored.LastUsedAt = &now
data, enc, err := persis.Encode(stored)
if err != nil {
return err
}
return s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/webhook/store.go` around lines 255 - 282, UpdateLastUsed
performs an unlocked read-modify-write and can be overwritten by concurrent
Delete/Update calls; protect the whole operation by serializing with the store's
mutex (e.g., lock s.mu or the store's existing sync.RWMutex) so that
Get/Decode/modify/Encode/Put are executed under the lock; acquire the write lock
at the start of UpdateLastUsed and release it after the Put (or use defer
Unlock) to prevent stale overwrites or recreation during concurrent mutations.

Comment on lines +93 to +107
func (s *Store) DeleteStale(ctx context.Context, before time.Time) (int, error) {
records, err := s.List(ctx)
if err != nil {
return 0, err
}
removed := 0
for _, r := range records {
if r.LastHeartbeatTime().After(before) {
continue
}
if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) {
return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err)
}
removed++
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Close the stale-delete TOCTOU window.

DeleteStale can delete a worker that was refreshed after List but before Delete. This can incorrectly remove active workers under concurrent heartbeat updates.

Safer pattern
 for _, r := range records {
 	if r.LastHeartbeatTime().After(before) {
 		continue
 	}
+	current, err := s.Get(ctx, r.WorkerID)
+	if err != nil {
+		if errors.Is(err, exec.ErrWorkerHeartbeatNotFound) {
+			continue
+		}
+		return removed, err
+	}
+	if current.LastHeartbeatTime().After(before) {
+		continue
+	}
 	if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) {
 		return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err)
 	}
 	removed++
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/persis/workerheartbeat/store.go` around lines 93 - 107, DeleteStale
currently lists all records and then deletes by WorkerID, opening a TOCTOU
window between s.List and s.col.Delete where a worker could refresh and be
wrongly removed; change the logic to perform a conditional/atomic delete based
on the stored heartbeat timestamp or version instead of a blind Delete. Update
Store.DeleteStale to call a new or existing conditional method on s.col (e.g.,
DeleteIfOlder/DeleteIfMatchTimestamp/DeleteIfVersion) passing r.WorkerID and
r.LastHeartbeatTime() (or version) so the backend only deletes when the stored
last-heartbeat is still <= before; if your storage backend lacks such an
operation, implement the check-and-delete inside a single transaction or loop
using Get+Delete in a single transactional API rather than separate List and
Delete calls. Ensure error handling still treats persis.ErrNotFound specially
and increments removed only when the conditional delete actually succeeded.

Comment on lines +175 to +182
RT --> FDR
SC --> FQ & FP
CO --> FD
AU --> FU & FS & FK
WH --> FW
AG --> FSS & FA & FEV

FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --> F

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Escape Mermaid edge operators in HTML text nodes to satisfy lint.

--> in these Mermaid blocks is currently raw HTML text and is flagged by spec-char-escape. Escape the > as &gt; so lint passes consistently.

🔧 Proposed fix
-  RT --> FDR
-  SC --> FQ & FP
-  CO --> FD
-  AU --> FU & FS & FK
-  WH --> FW
-  AG --> FSS & FA & FEV
-
-  FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --> F
+  RT --&gt; FDR
+  SC --&gt; FQ & FP
+  CO --&gt; FD
+  AU --&gt; FU & FS & FK
+  WH --&gt; FW
+  AG --&gt; FSS & FA & FEV
+
+  FDR & FQ & FP & FD & FU & FS & FW & FA & FK & FSS & FEV & FM --&gt; F
-  A["Step 1\nDefine interfaces\npersis/backend.go\npersis/codec.go"] -->
-  B["Step 2\nFileBackend\npersis/file/backend.go\n(no behavior change)"] -->
-  C["Step 3\nPort simple stores\nwebhook, apikey,\nuser, secret"] -->
-  D["Step 4\nPort complex stores\nqueue, proc,\ndistributed"] -->
-  E["Step 5\nPort dagrun store\n(most complex)"] -->
-  F["Step 6\nRemove file*/ packages\nSimplify context.go"] -->
+  A["Step 1\nDefine interfaces\npersis/backend.go\npersis/codec.go"] --&gt;
+  B["Step 2\nFileBackend\npersis/file/backend.go\n(no behavior change)"] --&gt;
+  C["Step 3\nPort simple stores\nwebhook, apikey,\nuser, secret"] --&gt;
+  D["Step 4\nPort complex stores\nqueue, proc,\ndistributed"] --&gt;
+  E["Step 5\nPort dagrun store\n(most complex)"] --&gt;
+  F["Step 6\nRemove file*/ packages\nSimplify context.go"] --&gt;

Also applies to: 932-937

🧰 Tools
🪛 HTMLHint (1.9.2)

[error] 175-175: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 176-176: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 177-177: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 178-178: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 179-179: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 180-180: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 182-182: Special characters must be escaped : [ > ].

(spec-char-escape)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@refactor_persis_layer.html` around lines 175 - 182, In the Mermaid HTML text
nodes (e.g., the block containing "RT --> FDR", "SC --> FQ & FP", "AG --> FSS &
FA & FEV", etc.) replace each raw arrow operator '-->' with escaped form
'--&gt;' (and any other '>' in edge operators) so the '>' is HTML-escaped; apply
the same replacement to the other Mermaid block noted (the one around the
932-937 region) so the spec-char-escape lint rule passes.

Comment on lines +290 to +291
Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span>
Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span>

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Escape comparison operators inside the rendered code comment text.

The inline code comment text contains raw >= / < and triggers spec-char-escape; use entities to avoid parser ambiguity in HTML-rendered code snippets.

🔧 Proposed fix
-    Since  *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span>
-    Until  *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span>
+    Since  *<span class="ty">time.Time</span> <span class="cm">// CreatedAt &gt;= Since</span>
+    Until  *<span class="ty">time.Time</span> <span class="cm">// CreatedAt &lt; Until</span>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt >= Since</span>
Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt < Until</span>
Since *<span class="ty">time.Time</span> <span class="cm">// CreatedAt &gt;= Since</span>
Until *<span class="ty">time.Time</span> <span class="cm">// CreatedAt &lt; Until</span>
🧰 Tools
🪛 HTMLHint (1.9.2)

[error] 290-290: Special characters must be escaped : [ > ].

(spec-char-escape)


[error] 291-291: Special characters must be escaped : [ < ].

(spec-char-escape)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@refactor_persis_layer.html` around lines 290 - 291, The rendered inline
comment text for the fields Since and Until uses raw comparison operators (">="
and "<") which trigger spec-char-escape; update the comment strings for Since
(*time.Time // CreatedAt >= Since) and Until (*time.Time // CreatedAt < Until)
to use HTML entities (e.g. &gt;= and &lt;) so the operators are escaped in the
HTML-rendered code snippet and no parser warnings occur.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ee0528cb41

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

if workerID == "" {
return nil, exec.ErrWorkerHeartbeatNotFound
}
rec, err := s.col.Get(ctx, workerID)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Resolve heartbeats by legacy record ID during reads

This lookup assumes the collection key is the plain workerID, but pre-refactor heartbeat files were keyed as encodeKey(workerID) (see internal/persis/filedistributed/worker_heartbeat_store.go). After upgrading, existing heartbeats are still present but cannot be read here, so Get returns ErrWorkerHeartbeatNotFound until each worker writes a new-format record. In that window, stale-run repair can treat a live remote worker as missing and incorrectly fail active distributed attempts.

Useful? React with 👍 / 👎.

Comment on lines +103 to +104
if err := s.col.Delete(ctx, r.WorkerID); err != nil && !errors.Is(err, persis.ErrNotFound) {
return removed, fmt.Errorf("worker heartbeat store: delete %q: %w", r.WorkerID, err)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Delete stale heartbeat files using stored record key

For migrated data, the on-disk record ID is still the legacy hashed filename while WorkerID in the payload is the human-readable ID. Deleting by r.WorkerID misses those legacy records, so stale entries are never removed (yet removed++ still increments), which leaves permanently stale workers visible in GetWorkers/metrics and can double-count workers after new-format upserts.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/session.go Outdated
// ─── helpers ─────────────────────────────────────────────────────────────────

func (s *Store) load(ctx context.Context, id string) (*storedSession, error) {
rec, err := s.col.Get(ctx, id)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve legacy session key mapping when loading records

Legacy session files were stored under <userID>/<sessionID>.json, so after envelope wrapping their collection ID is userID/sessionID. The rebuilt index stores ss.ID (just sessionID), but subsequent reads call Get with that bare ID, so migrated sessions are indexed yet unreadable and get skipped by ListSessions/ListSubSessions as not found. This effectively hides pre-upgrade agent session history until each session is recreated in the new layout.

Useful? React with 👍 / 👎.

}
now := time.Now().UTC()
return s.col.Put(ctx, &persis.Record{
ID: record.WorkerID,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject unsafe worker IDs before using them as record IDs

This persists WorkerID directly as a file-backed record key, and file.Collection maps record IDs to filesystem paths. Unlike the previous implementation (which hashed IDs), a worker ID containing path separators or .. can write outside the intended workers namespace on heartbeat upsert. Because WorkerId comes from RPC input and is only checked for emptiness, a malicious or misconfigured worker can trigger path traversal in coordinator storage.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/user.go Outdated
}

func (s *Store) rebuildIndex(ctx context.Context) error {
page, err := s.col.List(ctx, persis.ListQuery{})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drain paginated collection results when rebuilding indexes

Index rebuild reads only one page (ListQuery{} once) and ignores NextCursor, but the backend contract allows Limit=0 to use a backend default page size. Any backend that paginates by default will load only a subset of users into byUsername/byOIDCIdentity, causing false not found/duplicate behavior after startup even though records exist in storage.

Useful? React with 👍 / 👎.

Comment on lines +189 to +190
existingRec, err := s.col.Get(ctx, user.ID)
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Serialize update read/modify/write under the same user lock

This update path reads the persisted record before acquiring s.mu, so a concurrent Delete can remove index entries in between and then this Put recreates the record without restoring unchanged username/OIDC index keys. That leaves storage and in-memory indexes inconsistent (e.g., GetByUsername fails for an existing user), a race that did not exist in the previous fully locked file-store implementation.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8efcdd8bc2

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +283 to +287
func (s *SecretStore) WriteValue(ctx context.Context, id string, input secret.WriteValueInput) (*secret.Secret, error) {
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return nil, secret.ErrNotFound

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize secret value rotations before writing new versions

WriteValue now does an unlocked read/modify/write cycle (Get → decode → appendVersionPut), so concurrent rotations of the same secret can both read the same CurrentVersion and each write back version+1, dropping one update and corrupting version history/metadata. The previous file-backed store held the store mutex across this whole sequence, so this regression can surface whenever two writes hit the same secret close together.

Useful? React with 👍 / 👎.

Comment on lines +216 to +220
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent UpdateLastUsed from overwriting concurrent API-key edits

UpdateLastUsed reads the full record and writes it back without synchronizing with Update, so a normal auth request that bumps LastUsedAt can race with an admin update and re-persist stale role/scope metadata from its earlier snapshot. Because UpdateLastUsed is invoked on successful key use in auth/service.go, this can silently revert recent API-key configuration changes under load.

Useful? React with 👍 / 👎.

Comment on lines +208 to +212
existingRec, err := s.col.Get(ctx, sec.ID)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return secret.ErrNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep secret ref index consistent across concurrent update/delete

Update fetches and decodes the existing secret before taking s.mu, so a concurrent Delete can remove the byRef entry in between. If the update keeps the same ref (oldRef == newRef), the subsequent Put recreates the record but never restores the in-memory ref index, making GetByRef return not found even though the secret exists on disk.

Useful? React with 👍 / 👎.

Comment on lines +258 to +262
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize webhook last-used writes with webhook mutations

UpdateLastUsed performs an unlocked read/modify/write of the full webhook record, so webhook invocations can race with admin mutations (Update) and write back stale auth fields (token hash, auth mode, HMAC settings) from an older snapshot. Since ValidateWebhookToken/ValidateWebhookHMAC call this on successful requests, production traffic can silently undo recent webhook configuration changes.

Useful? React with 👍 / 👎.

Comment on lines +142 to +146
existingRec, err := s.col.Get(ctx, key.ID)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrAPIKeyNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restore API-key name index on update/delete races

Update reads existingRec before acquiring s.mu, so a concurrent Delete can remove the byName entry first; when the update then rewrites the same-name key, the existingStored.Name != key.Name block is skipped and the name index is never restored. After this race the key exists in storage but is absent from byName, which allows creating duplicate API-key names that should be rejected.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/apikey.go Outdated
Comment on lines +43 to +47
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {
return err
}
s.mu.Lock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drain paginated API-key pages when rebuilding index

rebuildIndex calls List once with ListQuery{} and ignores NextCursor, but the persistence contract allows backend-default pagination when Limit is zero. On paginated backends this rebuilds byName from only the first page, so after restart some existing keys are missing from uniqueness checks and duplicate names can be created.

Useful? React with 👍 / 👎.

Comment on lines +164 to +168
existingRec, err := s.col.Get(ctx, webhook.ID)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrWebhookNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restore DAG-name index on webhook update/delete races

Update loads the existing webhook before taking s.mu, so a concurrent Delete can remove byDAGName and the record in between. If the update keeps the same DAG name, it writes the webhook back but does not reinsert byDAGName (because the rename branch is skipped), leaving GetByDAGName/uniqueness checks inconsistent with stored data.

Useful? React with 👍 / 👎.

yohamta0 added 10 commits May 23, 2026 22:50
…acts

Define the storage seam for Dagu's control-plane persistence layer.
Backend + Collection (8 methods total) is the sole interface any future
database driver must implement; all domain stores become thin adapters
over Collection, with opaque id+blob Records — no per-domain schema.

Also adds Encode/Decode[T] codec helpers and ErrNotFound/ErrConflict
sentinel errors. Includes refactor_persis_layer.html design document.
FileBackend (persis/file/) implements Backend+Collection on the local
filesystem: each collection is a subdirectory, each record is a .json
file, "/" in IDs maps to path separators. Atomic writes via temp+rename,
CompareAndSwap via collection mutex, Claim via sorted-filename dequeue.

MemoryBackend (persis/testutil/) is a thread-safe in-memory backend for
unit tests — no filesystem required to test any store adapter.

Shared contract test (collection_test.go) exercises all 6 Collection
methods against both backends, confirming behavioural parity.
Replaces filewebhook's 472-line file-I/O implementation with a 200-line
adapter that serializes auth.WebhookForStorage as a JSON blob into any
persis.Collection. HMAC secret encryption stays in the adapter layer.
DAG-name secondary index rebuilt from collection on startup.

16 tests via MemoryBackend — no filesystem dependencies.
…ection

apikey.Store: flat CRUD with byName secondary index. 13 tests.
user.Store: three secondary indices (byUsername, byOIDCIdentity, count),
handles OIDC identity collision detection and cross-index atomicity. 17 tests.

All tests use MemoryBackend — no filesystem dependencies.
Implements secret.Store using a persis.Collection backend.
Keeps byRef in-memory index (workspace\x00ref → secretID) rebuilt on startup.
Encrypted version history stays in the data blob; encryptor injected at construction.
watermark: single-record store (Load/Save) with version migration.
session: implements agent.SessionStore with byUser/byParent/updatedAt
indices, maxPerUser cleanup, and sub-session tracking.
Simple Upsert/Get/List/DeleteStale over a Collection keyed by workerID.
No secondary indices needed; DeleteStale filters by LastHeartbeatTime.
…le* packages

Replace fileuser, fileapikey, filewebhook, filesecret, filewatermark, and
filedistributed.NewWorkerHeartbeatStore with the new persis.Collection-backed
adapters across all production wiring and tests, then delete the five old packages.

Key changes:
- server.go: use file.NewCollection(UsersDir/APIKeysDir/WebhooksDir) so paths
  match the configured dirs rather than DataDir+name, preserving backward compat
- file/backend.go: add NewCollection(dir) for lazy-creation collections (no
  eager MkdirAll), used where the parent dir may not exist yet
- file/collection.go: detect legacy flat-JSON format in readFile and wrap it in
  the new envelope for transparent lazy migration
- All callers (agent_stores.go, scheduler.go, process/server.go, engine/run.go,
  remote_handler.go, context.go, test/helper.go) updated to use new stores
- Test files switched to testutil.NewMemoryBackend() for unit tests and
  file.New/watermark.New for integration tests
… package

Move apikey, user, webhook, secret, watermark, session, workerheartbeat
adapters from seven one-file packages into a single persis/store package,
following Temporal's pattern of one package per backend rather than one
package per store type.

Renames: Store→XStore, New→NewXStore in each file.
All callers updated to import persis/store and use the new names.
No logic changes.
…or prefixes

- Drop unused `collector *telemetry.Collector` param from `initBuiltinAuthService`
- Update all call sites (server.go + 7 test call sites)
- Prefix watermarkMigrateState errors with "watermark store:" for consistency
@yohamta0 yohamta0 force-pushed the refactor/persis-backend-interface branch from 4094ea7 to fe0341d Compare May 23, 2026 13:50

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fe0341d687

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread internal/persis/store/secret.go Outdated
Comment on lines +358 to +364
_ = s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,
UpdatedAt: now,
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize ResolveValue before persisting secret snapshot

ResolveValue performs an unlocked read/modify/write of the entire secret record, then writes that stale snapshot back with Put. If a rotation (WriteValue) or metadata update commits between this Get and Put, the later Put from ResolveValue can overwrite the newer record and drop the new version/current-version metadata. This is a regression from the previous file-backed store, which held a store mutex across these operations, and it can corrupt secret history under concurrent reads and writes.

Useful? React with 👍 / 👎.

if err != nil {
return nil, err
}
return sr.Secret.Clone(), nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate decoded secret payload before dereferencing

If a stored secret record is valid JSON but missing the secret object (for example from manual edits or partially corrupted legacy data), GetByID dereferences sr.Secret unconditionally and panics instead of returning an error. The previous file-backed implementation explicitly rejected records with nil secret payloads, so this regression can turn recoverable bad data into a process crash path.

Useful? React with 👍 / 👎.

Comment on lines +255 to +259
rec, err := s.col.Get(ctx, id)
if err != nil {
if errors.Is(err, persis.ErrNotFound) {
return auth.ErrUserNotFound
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard user count updates against concurrent deletes

This method reads the record before acquiring s.mu and later decrements s.count unconditionally, so two concurrent Delete calls for the same user can both pass the pre-lock Get; the second col.Delete is a no-op but still decrements the counter. That leaves CountUsers inaccurate (potentially negative) and reports success on a delete that should have been not found.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/session.go Outdated
return errors.New("session store: message cannot be nil")
}

rec, err := s.col.Get(ctx, sessionID)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize session message writes with session deletion

AddMessage does an unlocked read/modify/write cycle (Get → append → Put) before touching in-memory indexes, so a concurrent DeleteSession can remove the session in between and then this Put recreates it from a stale snapshot. Because the delete path already removed byUser/byParent entries, the resurrected session can exist on disk but disappear from ListSessions/ListSubSessions, causing inconsistent session visibility under concurrent traffic.

Useful? React with 👍 / 👎.

yohamta0 added 2 commits May 23, 2026 23:01
Concurrency fixes (serialize full read-modify-write under store mutex):
- apikey: Update, Delete, UpdateLastUsed
- secret: Update, Delete, WriteValue; nil guard in loadByID
- session: UpdateSession, AddMessage (lock moved before col.Get)
- user: Update, Delete
- webhook: Update, Delete, UpdateLastUsed
- watermark: nil guard in Save
- workerheartbeat: TOCTOU fix in DeleteStale (re-read before delete);
  reject unsafe worker IDs containing path-traversal characters

Collection fixes:
- file/collection.go: filePath now validates path stays within root (traversal prevention);
  Claim propagates real I/O errors instead of silently swallowing them
- file/collection_test.go: remove shared-global freshCollection to fix parallel test race;
  pass factory as parameter to RunCollectionContract

Memory backend:
- testutil/memory.go: deep-copy ExpiresAt pointer in copyRecord

Regression tests:
- watermark_test: TestLoad_MigratesLegacyVersions (v0/1/2), UnknownVersion, CorruptData
- workerheartbeat_test: TestDeleteStale_RefreshDuringCleanup
- session_test: TestAddMessage_Concurrent (20 goroutines)
- codec.go: add explicit EncodingProto case in Decode switch (exhaustive lint)
- file/collection.go: suppress G304 gosec on readFile (path validated by filePath root check);
  use _ = for cleanup-path Close/Remove calls (errcheck lint)
- file/collection_test.go: rename unused t param to _ in memory freshCollection closure (revive lint)

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0857ca1c7b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

}

func (s *SecretStore) appendVersion(sr *secretStoredRecord, input secret.WriteValueInput) error {
if sr.Secret.ProviderType != secret.ProviderDaguManaged {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard nil secret payload before dereferencing in version writes

If a stored record decodes successfully but has secret: null (for example from manual edits or partially corrupted legacy data), WriteValue reaches appendVersion and dereferences sr.Secret unconditionally, which panics the process instead of returning a recoverable error. The previous file-backed implementation rejected nil secret metadata at load time, so this introduces a crash path for malformed persisted records.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/secret.go Outdated
Comment on lines +62 to +63
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drain all pages when rebuilding the secret ref index

rebuildIndex reads only one ListQuery{} page and ignores NextCursor, but the collection contract allows Limit=0 to use a backend default page size. On paginated backends this leaves byRef incomplete after startup, so GetByRef can return not found for existing secrets and duplicate ref conflicts can be missed in create/update flows.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/session.go Outdated
Comment on lines +92 to +93
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Iterate all pages when rebuilding session indexes

rebuildIndex consumes only the first List page and ignores NextCursor, while the persistence contract permits backend-default pagination when Limit is zero. With a paginated backend, sessions beyond page one are never inserted into byUser/byParent/updatedAt, so ListSessions and ListSubSessions can silently miss existing sessions after restart.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/webhook.go Outdated
Comment on lines +45 to +46
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drain webhook pages while rebuilding DAG-name index

Index rebuild reads a single ListQuery{} page and does not follow NextCursor. On backends that paginate by default, byDAGName is only partially reconstructed, which makes existing webhooks undiscoverable by DAG name and can allow duplicate DAG-name creations that should be rejected.

Useful? React with 👍 / 👎.


// List returns all heartbeat records.
func (s *WorkerHeartbeatStore) List(ctx context.Context) ([]exec.WorkerHeartbeatRecord, error) {
page, err := s.col.List(ctx, persis.ListQuery{})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge List and purge all heartbeat pages before stale cleanup

List fetches just one collection page and ignores pagination cursors, and DeleteStale relies on that method. On paginated backends, stale-heartbeat cleanup processes only the first page, leaving older stale worker records undeleted and making List callers observe an incomplete worker set.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/apikey.go Outdated
Comment on lines +114 to +115
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Paginate through all API keys in List

APIKeyStore.List is documented to return all keys, but it performs a single ListQuery{} call and ignores NextCursor. On a backend with default pagination, callers receive only the first page, which can hide existing keys from administrative views and tooling that relies on full key enumeration.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/user.go Outdated
Comment on lines +160 to +161
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Paginate through all users in List

UserStore.List promises all users but reads only one page from the collection and never follows NextCursor. With a paginated backend, user listings become incomplete after the first page, which can break user-management workflows that depend on full traversal.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/webhook.go Outdated
Comment on lines +135 to +136
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Paginate through all webhooks in List

WebhookStore.List performs a one-shot ListQuery{} and ignores pagination cursors even though the interface contract is to return all webhooks. On paginated backends this silently truncates results to page one, so existing webhooks can disappear from list-based management operations.

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/secret.go Outdated
Comment on lines +163 to +164
page, err := s.col.List(ctx, persis.ListQuery{})
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Paginate through all secrets in List

SecretStore.List currently reads a single page and does not follow NextCursor, despite being defined as returning all secrets (optionally workspace-filtered). On paginated backends this yields incomplete secret inventories, which can hide valid entries from operators and API consumers.

Useful? React with 👍 / 👎.

if err := persis.Decode(rec, &sr); err != nil {
return "", nil, fmt.Errorf("secret store: decode for ResolveValue: %w", err)
}
if sr.Secret.Status == secret.StatusDisabled {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Check for missing secret payload before status read

ResolveValue decodes the stored record and immediately accesses sr.Secret.Status without validating that sr.Secret is non-nil. If a record is syntactically valid JSON but missing the nested secret object, this path panics instead of returning an error, turning recoverable data corruption into a process crash.

Useful? React with 👍 / 👎.

Pagination:
- Add listAll() helper that drains all pages via NextCursor
- Replace single List() call with listAll() in rebuildIndex and List methods
  across all stores (apikey, webhook, secret, session, user, workerheartbeat)

secret.ResolveValue:
- Hold store mutex over full read-modify-write to prevent overwriting
  concurrent WriteValue/Update commits
- Add nil guard for sr.Secret before dereferencing

WorkerHeartbeatStore backward compatibility:
- Add SHA-256 fallback in Get() for legacy records keyed by encodeKey(workerID)
- Fix DeleteStale to iterate col.List directly using actual record IDs (rec.ID)
  instead of r.WorkerID, fixing stale-delete for legacy hashed-filename records

SessionStore legacy key resolution:
- Add collectionID map (logical session ID → actual collection key)
- rebuildIndex populates mapping when rec.ID != ss.ID (legacy userID/sessionID)
- loadSession, deleteLockedCtx, UpdateSession, AddMessage all resolve the
  collection key before col.Get/Put/Delete so pre-upgrade sessions are
  readable and writable without requiring data migration
@yohamta0

Copy link
Copy Markdown
Collaborator Author

@coderabbitai full review

@coderabbitai

coderabbitai Bot commented May 23, 2026

Copy link
Copy Markdown
✅ Actions performed

Full review triggered.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0f6ca5a1b3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +48 to +54
return s.col.Put(ctx, &persis.Record{
ID: record.WorkerID,
Data: data,
Encoding: enc,
CreatedAt: now,
UpdatedAt: now,
})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Delete legacy heartbeat key after successful upsert

During migration, Upsert writes the new record under record.WorkerID but leaves any legacy SHA-256-keyed record in place, so one worker can exist twice until stale cleanup runs. Because both GetWorkers (internal/service/coordinator/handler.go) and telemetry collection (internal/cmn/telemetry/collector.go) consume List() without deduping by worker ID, this can temporarily double-count workers and emit duplicate per-worker metrics series right after upgrade.

Useful? React with 👍 / 👎.

Comment on lines +69 to +72
// Try new-format key (plain workerID) first.
r, err := s.getByCollectionID(ctx, workerID)
if err == nil {
return r, nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Read freshest heartbeat when legacy and new keys coexist

The read path returns immediately on the plain workerID key and only consults the legacy SHA-256 key when the new key is missing. During rolling upgrades where old and new coordinators may write different key formats for the same worker, this can return a stale plain-key heartbeat even when a fresher legacy heartbeat exists, and RepairStaleDistributedRun uses this Get result to decide whether to fail active attempts (internal/runtime/distributed_stale_run.go).

Useful? React with 👍 / 👎.

Comment thread internal/persis/store/secret.go Outdated
Comment on lines +369 to +373
_ = s.col.Put(ctx, &persis.Record{
ID: rec.ID,
Data: data,
Encoding: enc,
CreatedAt: rec.CreatedAt,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return errors when ResolveValue metadata write fails

ResolveValue now ignores the result of col.Put after updating LastResolvedAt/UpdatedAt, so storage write failures are silently dropped while the call still returns plaintext and success. This regresses behavior from the previous file-backed store (which returned write errors) and can leave secret usage metadata/audit fields permanently stale under transient or persistent backend failures.

Useful? React with 👍 / 👎.

yohamta0 added 2 commits May 24, 2026 00:01
…rror

- WorkerHeartbeatStore.Get: when both plain-workerID and legacy SHA-256
  keys coexist during migration, compare LastHeartbeatTime and return the
  fresher record instead of blindly preferring the new-format key
- SecretStore.ResolveValue: propagate the col.Put error instead of
  silently discarding it with `_ = s.col.Put(...)`
No logic changes — pure gofmt whitespace/import formatting.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 145ca2238c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +132 to +136
s.mu.Lock()
defer s.mu.Unlock()

if err := s.col.Put(ctx, &persis.Record{
ID: sess.ID,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject duplicate session IDs before creating records

CreateSession writes sess.ID with col.Put unconditionally, so a duplicate ID silently overwrites the existing stored session instead of failing create-as-new. Because the method then only appends new index entries (byUser/updatedAt/byParent) and never removes the old owner’s mappings, a duplicate create can leave indexes inconsistent and return corrupted cross-user session lists; the previous file-backed store explicitly rejected existing IDs.

Useful? React with 👍 / 👎.

Comment thread internal/persis/file/collection.go Outdated
Comment on lines +47 to +50
return &fileRecord{
ID: r.ID,
Encoding: r.Encoding,
Data: json.RawMessage(r.Data),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve opaque record bytes in file envelope

The file backend stores Record.Data as json.RawMessage, which requires the bytes to already be valid JSON. For non-JSON payloads (e.g., any future EncodingProto/binary record), json.Marshal(fileRecord) fails with an invalid-character error, so Collection.Put cannot persist the record despite the backend interface describing Data as an opaque blob.

Useful? React with 👍 / 👎.

Comment on lines +223 to +224
oldRef := secretRefKey(existing.Secret.Workspace, existing.Secret.Ref)
newRef := secretRefKey(updated.Workspace, updated.Ref)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate stored secret payload before computing old ref

Update decodes the stored record and immediately dereferences existing.Secret to build oldRef. If persisted data is syntactically valid JSON but has "secret": null (manual edits, partial corruption, or bad legacy data), this path panics instead of returning an error, turning a recoverable record problem into a process crash.

Useful? React with 👍 / 👎.

Comment on lines +80 to +84
if _, exists := s.byName[key.Name]; exists {
return auth.ErrAPIKeyAlreadyExists
}
if err := s.col.Put(ctx, &persis.Record{
ID: key.ID,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject duplicate API-key IDs before write

Create checks only byName and then writes key.ID with Put, so reusing an existing ID silently overwrites the prior key record. Because the old name entry is never removed in this path, the in-memory byName index can keep stale mappings and diverge from persisted data, breaking uniqueness checks and name-based lookups.

Useful? React with 👍 / 👎.

Comment on lines +101 to +103
rk := secretRefKey(stored.Workspace, stored.Ref)
if _, exists := s.byRef[rk]; exists {
return secret.ErrAlreadyExists

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Enforce secret ID uniqueness during create

Create validates workspace/ref uniqueness but never checks whether sec.ID already exists before writing. Reusing an existing secret ID overwrites the stored record and leaves the previous ref key in byRef, so GetByRef/conflict checks can return inconsistent results for old and new refs.

Useful? React with 👍 / 👎.

- session.go: reject duplicate session IDs in CreateSession; add both
  in-memory index check and collection.Get check before Put
- apikey.go: reject duplicate API-key IDs in Create (checked byName
  already; now also check by ID via col.Get)
- secret.go Create: reject duplicate secret IDs (col.Get check before Put)
- secret.go Update: nil-guard existing.Secret before dereferencing for
  oldRef computation; return descriptive error on corrupt record
- file/collection.go: handle non-JSON (opaque/proto) record data; base64-
  encode non-JSON payloads as a JSON string so the file envelope always
  marshals cleanly; fix CompareAndSwap to compare decoded bytes

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 59fbb5a574

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +87 to +90
if _, exists := s.byUsername[user.Username]; exists {
return auth.ErrUserAlreadyExists
}
if user.OIDCIssuer != "" && user.OIDCSubject != "" {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject duplicate user IDs in create path

Create no longer verifies whether user.ID already exists before Put, so a caller reusing an ID can overwrite an existing user record instead of getting ErrUserAlreadyExists. In that case, the old username/OIDC index entries are not removed and count is incremented, leaving lookup/index state inconsistent with persisted data. The previous fileuser implementation explicitly rejected duplicate IDs, so this is a behavioral regression.

Useful? React with 👍 / 👎.

Comment on lines +86 to +90
if _, exists := s.byDAGName[webhook.DAGName]; exists {
return auth.ErrWebhookAlreadyExists
}
if err := s.col.Put(ctx, &persis.Record{
ID: webhook.ID,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent webhook ID overwrite during create

Create checks only DAG-name uniqueness and then writes by webhook.ID without confirming the ID is unused. If an existing ID is reused with a different DAG name, the stored record is overwritten while the old DAG-name index entry is left behind, which can corrupt DAG-name lookups and uniqueness checks. The prior filewebhook store guarded against duplicate IDs, so this is a regression in create semantics.

Useful? React with 👍 / 👎.

Comment on lines +389 to +390
return nil
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve distinct record IDs when deriving file paths

idToRelPath uses filepath.Join(strings.Split(id, "/")...), which normalizes path segments (.., duplicate slashes, leading slash) before appending .json. As a result, distinct logical IDs like "b" and "a/../b" can resolve to the same on-disk file and overwrite each other, breaking key uniqueness guarantees for any collection that accepts such IDs. This is a data-corruption risk introduced by the new shared file backend path mapping.

Useful? React with 👍 / 👎.

Comment on lines +37 to +38
if strings.ContainsAny(record.WorkerID, `/\`) || strings.Contains(record.WorkerID, "..") {
return fmt.Errorf("worker heartbeat store: workerID %q contains unsafe characters", record.WorkerID)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep worker heartbeat IDs backward-compatible

This new guard rejects WorkerID values containing /, \, or .., but worker.id is user-configurable and the previous filedistributed store accepted arbitrary IDs by hashing them for file keys. After upgrading, deployments using IDs like region/a will fail heartbeat persistence (Heartbeat returns internal error), causing workers to appear unhealthy/offline even though they were valid before.

Useful? React with 👍 / 👎.

@yohamta0 yohamta0 force-pushed the main branch 5 times, most recently from 6a78639 to db5ff31 Compare May 23, 2026 16:49
store.WatermarkStore writes state.json via persis.Collection.Put, which
wraps the payload in a file envelope: {id, encoding, data: {actual state}}.
waitForSchedulerDAGRegistered was reading the flat legacy format and
checking state.dags directly, which is always undefined in the envelope
format. Now falls back to parsed.data when parsed.dags is absent.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant