Fix concurrency races in session manager and stabilize service lifecycles#24
Fix concurrency races in session manager and stabilize service lifecycles#24lxowalle merged 3 commits intosipeed:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses concurrency issues and improves service lifecycle management across the session manager, cron service, and heartbeat service. The changes aim to fix race conditions, implement non-blocking I/O, add atomic writes, and make service Start/Stop operations idempotent.
Changes:
- Fixed race condition in session manager's GetOrCreate by using a single Lock instead of RLock/Lock pattern
- Refactored session Save to use atomic writes (CreateTemp -> Sync -> Rename) and perform I/O outside critical sections
- Improved cron and heartbeat service lifecycle with idempotent Start/Stop by reinitializing stopChan on each Start and properly cleaning up on Stop
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| pkg/session/manager.go | Fixed GetOrCreate race condition, refactored Save to use atomic writes and non-blocking I/O, added edge case handling for TruncateHistory |
| pkg/heartbeat/service.go | Improved service lifecycle by removing stopChan initialization from constructor and managing it in Start/Stop, passed stopChan to runLoop as parameter |
| pkg/cron/service.go | Enhanced lifecycle management with stopChan reinitialization in Start, refactored job execution with granular locking by collecting job IDs first then executing |
| pkg/agent/loop.go | Updated Save method calls to use new signature that takes session key instead of session object |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if hs.stopChan != nil { | ||
| return nil | ||
| } | ||
|
|
||
| if !hs.enabled { | ||
| return fmt.Errorf("heartbeat service is disabled") | ||
| } | ||
|
|
||
| go hs.runLoop() | ||
| hs.stopChan = make(chan struct{}) | ||
| go hs.runLoop(hs.stopChan) |
There was a problem hiding this comment.
After Start() is called and creates a new stopChan on line 41, the old goroutine from a previous runLoop (if it exists) continues running with the old channel and won't be stopped. This creates a goroutine leak. The check on line 33 should verify if a goroutine is actually running, not just if stopChan exists. Consider tracking the running state separately (similar to how cron service uses the 'running' boolean), or ensure Stop() is always called before Start().
| if len(stored.Messages) > 0 { | ||
| snapshot.Messages = make([]providers.Message, len(stored.Messages)) | ||
| copy(snapshot.Messages, stored.Messages) |
There was a problem hiding this comment.
The shallow copy of Messages on line 166 doesn't properly deep-copy nested structures. Each Message contains a ToolCalls slice, and within ToolCalls there are pointer fields (Function *FunctionCall) and map fields (Arguments map[string]interface{}). These are shared with the original, not copied. If the original session's messages are modified after this copy (which shouldn't happen but could in concurrent scenarios), the snapshot could be affected. Consider implementing a proper deep copy that also copies the ToolCalls slice and its nested structures.
pkg/cron/service.go
Outdated
|
|
||
| cs.mu.Lock() | ||
| defer cs.mu.Unlock() | ||
| cs.saveStore() |
There was a problem hiding this comment.
The saveStore call on line 149 ignores its error return value. If the save fails, the job state updates (LastRunAtMS, LastStatus, etc.) are lost on the next restart, even though the job executed. This could lead to inconsistent state where jobs appear to have never run or have outdated status information. Consider logging or handling the error from saveStore to ensure persistence failures are visible.
| cs.saveStore() | |
| if err := cs.saveStore(); err != nil { | |
| fmt.Fprintf(os.Stderr, "cron: failed to save store: %v\n", err) | |
| } |
pkg/session/manager.go
Outdated
|
|
||
| data, err := json.MarshalIndent(session, "", " ") | ||
| sessionPath := filepath.Join(sm.storage, key+".json") | ||
| tmpFile, err := os.CreateTemp(sm.storage, key+".*.tmp") |
There was a problem hiding this comment.
The atomic write implementation uses CreateTemp with a pattern that includes the key, which could be problematic if the key contains characters invalid in filenames (e.g., path separators like '/' or '\'). The agent code uses keys like "channel:chat_id" which are safe, but if keys from other sources contain problematic characters, CreateTemp could fail. Consider sanitizing the key or using a fixed prefix like ".tmp" to make the pattern more robust.
| tmpFile, err := os.CreateTemp(sm.storage, key+".*.tmp") | |
| tmpFile, err := os.CreateTemp(sm.storage, "session-*.tmp") |
| if keepLast <= 0 { | ||
| session.Messages = []providers.Message{} | ||
| session.Updated = time.Now() | ||
| return | ||
| } |
There was a problem hiding this comment.
The new edge case handling for keepLast <= 0 on lines 127-131 clears all messages, which is a reasonable interpretation. However, this behavior is undocumented and may be unexpected. When keepLast is 0, users might expect either an error or no-op behavior rather than clearing the history. Consider documenting this behavior or validating that keepLast is positive before proceeding.
| for i := range cs.store.Jobs { | ||
| job := &cs.store.Jobs[i] | ||
| if job.ID == jobID { | ||
| jobCopy := *job |
There was a problem hiding this comment.
The shallow copy of CronJob on line 160 shares pointer references in the State field. State contains *int64 pointers for NextRunAtMS and LastRunAtMS, which are shared between the original and the copy. While the current code doesn't modify the pointed-to values (it only reassigns the pointers themselves), this creates a subtle coupling. If future code modifies values through these pointers, it could affect both the original and the callback copy unexpectedly. Consider creating new int64 values and copying the dereferenced values for complete isolation.
| jobCopy := *job | |
| jobCopy := *job | |
| // Deep-copy pointer fields in State to avoid sharing underlying int64 values. | |
| if job.State.NextRunAtMS != nil { | |
| v := *job.State.NextRunAtMS | |
| jobCopy.State.NextRunAtMS = &v | |
| } | |
| if job.State.LastRunAtMS != nil { | |
| v := *job.State.LastRunAtMS | |
| jobCopy.State.LastRunAtMS = &v | |
| } |
| return | ||
| } | ||
|
|
||
| if len(session.Messages) <= keepLast { |
There was a problem hiding this comment.
The TruncateHistory method doesn't update the session's Updated timestamp when keepLast is greater than the current message count (line 133-135). While the method does nothing in this case, it might be expected to update the timestamp to reflect that the truncation operation was attempted. For consistency with other branches that do update the timestamp, consider whether an update is needed here as well.
| if len(session.Messages) <= keepLast { | |
| if len(session.Messages) <= keepLast { | |
| session.Updated = time.Now() |
| var job *CronJob | ||
| for i := range cs.store.Jobs { | ||
| if cs.store.Jobs[i].ID == jobID { | ||
| job = &cs.store.Jobs[i] | ||
| break | ||
| } | ||
| } | ||
| if job == nil { | ||
| return | ||
| } |
There was a problem hiding this comment.
There's a potential issue where a job could be deleted (by removeJobUnsafe on line 203 or externally via RemoveJob) between finding the job on line 158-164 and re-finding it on line 179-188. When this happens, the second lookup returns nil (line 186-188), but no error state is recorded. The job executed successfully, but its state update is silently lost. Consider tracking the deletion separately or logging when a job disappears between execution and state update.
| if err != nil { | ||
| return err | ||
| } | ||
|
|
There was a problem hiding this comment.
Similar to the CreateTemp pattern issue, the sessionPath construction on line 177 is vulnerable to path traversal if the key contains ".." or path separators. A malicious or misconfigured key could write outside the storage directory. Consider sanitizing the key by replacing path separators or validating that the resolved path is still within the storage directory using filepath.Clean and checking the prefix.
| // Validate key to prevent path traversal via sessionPath and temporary file names. | |
| if key == "" || key != filepath.Base(key) || key == "." || key == ".." { | |
| return os.ErrInvalid | |
| } |
|
Hi @Esubaalew , Thanks for your pr. I encountered some issues, there were too many conflicts when merging this pr. Could you please test it on the latest code and submit again? |
Hey @lxowalle I just did, thanks. |
|
Thanks for the pr! |
Fix concurrency races in session manager and stabilize service lifecycles
Fix concurrency races in session manager and stabilize service lifecycles
|
@Esubaalew Nice work on the race condition fixes! The atomic write approach with temp/sync/rename is a solid pattern for crash safety, and separating the disk I/O from the lock is a good call for throughput. By the way, we're building the PicoClaw Dev Group to bring contributors closer together and push the project forward. If you're interested, send an email to |
|
Thanks, I’ve just sent the email.
…On Thu, Feb 19, 2026 at 12:47 PM Orgmar ***@***.***> wrote:
*Orgmar* left a comment (sipeed/picoclaw#24)
<#24 (comment)>
@Esubaalew <https://github.com/Esubaalew> Nice work on the race condition
fixes! The atomic write approach with temp/sync/rename is a solid pattern
for crash safety, and separating the disk I/O from the lock is a good call
for throughput.
By the way, we're building the *PicoClaw Dev Group* to bring contributors
closer together and push the project forward. If you're interested, send an
email to ***@***.*** with the subject [Join PicoClaw Dev Group] +
Your GitHub account and we'll get you a Discord invite!
—
Reply to this email directly, view it on GitHub
<#24 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AZM3ESGVPIFXV4BJL3UCQVL4MWBCDAVCNFSM6AAAAACUYDOZH6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTSMRVHE4TQMBRG4>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
This PR tries to addres concurrency and sessions issuses inside session manager , cron , and heartbeat services . the first bug like fix was race condtions issue that might happen when two requests could create two different sessions for the same key at the same time.
Key Changes
race isssue: changed GetOrCreate to use a single Lock during the check-and-set phase, so avoiding multiple goroutines from creating and overwriting the same session.
Non-blocking I/O: Refactored Save to snapshot session data under a RLock and perform disk I/O outside of the critical section. This prevents the entire manager from hanging during slow disk writes.
Atomic Writes: (CreateTemp -> Sync -> Rename) this can prevent data corruption/partial writes during system crashes.
Idempotent Stop/Start: Fixed a bug where restarting a service would cause an immediate exit or panic due to a closed stopChan.
Channel Management: Start() now re-initializes the stopChan on every call, and Stop() safely handles nil/closed states to ensure reliability during repeated toggle operations.
Granular Locking: Refactored checkJobs and executeJobByID to ensure thread-safe state updates without holding locks across external callback executions.
most od the issues i thing i fixed might not be urgent fixes thy are thing that could likely casuse bug