fix(agent): identity-based session-DB flush — stop positional assistant drops (#43936)#45277
fix(agent): identity-based session-DB flush — stop positional assistant drops (#43936)#45277Willhong wants to merge 1 commit into
Conversation
…nt drops (NousResearch#43936) The session-DB flush in `_flush_messages_to_session_db` was position-based (`messages[max(start_idx, _last_flushed_db_idx):]`). It assumes `messages == conversation_history + this turn's new messages`, which breaks two ways, both reproduced live: 1. Overlapping turns on the cached agent corrupt the shared `_last_flushed_db_idx` (it indexes the turn-local `messages` array) → the earlier completed turn flushes an empty slice and its delivered assistant row is never written. 2. `repair_message_sequence` compacts `messages` in place below `len(conversation_history)`, so `start_idx > len(messages)` permanently → self-reinforcing drop loop. The merged NousResearch#44837 fix (NousResearch#45260) clamps `_last_flushed_db_idx` to `len(messages)` but leaves `start_idx = len(conversation_history)` unbounded, so shape 2 is still an empty slice. No index arithmetic survives the two arrays diverging. Replace the positional slice with identity-based dedup: stamp each persisted dict with `_db_persisted`, recognize this turn's `conversation_history` entries by object identity (`id()`) and stamp instead of re-write. A message is written exactly once regardless of how the arrays shift; the NousResearch#860 and NousResearch#31507 guards hold by construction. `_db_persisted` is underscore-prefixed and already stripped from API payloads. The compression split clears the stamps so the surviving messages re-write into the new session row. `_last_flushed_db_idx` is kept updated for legacy readers but no longer decides what is written. Complementary to NousResearch#43962 (backfills delivered text after an interrupt in turn_finalizer.py); this prevents the drop at the flush layer and also covers the non-interrupt repair-shrink path. Regression: tests/run_agent/test_identity_flush.py (5, incl. an explicit test that the NousResearch#44837 clamp still drops where identity flush persists) + test_message_sequence_repair, test_860_dedup, test_compression_persistence all GREEN.
7a1b036 to
fc868f4
Compare
|
Verification: Code Review — Clean ✅ Reviewed the identity-based session-DB flush implementation. Key checks:
No dead variables, no guard-block leaks, no exception-ordering issues. Well-documented with inline comments explaining the design decisions. |
What does this PR do?
Fixes assistant messages that are delivered to the chat but silently dropped
from
state.db, so the next turn replays a user-only backlog and loses context(#43936).
The root cause is that
_flush_messages_to_session_db(inrun_agent.py) wasposition-based:
This assumes
messages == conversation_history + this turn's new messages, soslicing at
len(conversation_history)isolates the new tail. That assumptionbreaks in two real, independently-reproduced ways:
Overlapping turns corrupt the shared cursor.
_last_flushed_db_idxliveson the cached agent instance, but it indexes the turn-local
messagesarray that every turn rebuilds. When a second inbound message interrupts an
in-flight turn on the same cached agent (the exact repro in [Bug]: State.db drops assistant messages on interrupt; .jsonl session log no longer available as fallback #43936 — "send
another message before the agent finishes"), the later turn advances
_last_flushed_db_idxpast the earlier turn'slen(messages). The earlier,now-completed turn's final flush then computes
flush_from > len(messages),the slice is empty, and a fully-generated, already-delivered assistant
response is never written.
conversation_historyends up longer thanmessages. Once a drop haspoisoned the transcript (consecutive-user / orphan-tool rows), the next
turn's
repair_message_sequencemerges/drops those rows and shrinksmessagesbelowlen(conversation_history). Nowstart_idx > len(messages)permanently, every later flush slices past the end, and the session
enters a self-reinforcing drop loop. No interrupt is needed to keep it going.
No index arithmetic survives the two arrays diverging, so this replaces the
positional slice with identity-based dedup.
The fix is the minimal flush-layer change, not the issue's suggested ".jsonl
fallback restore" — restoring a second log would paper over the drop instead of
preventing it, and the same
repair-shrink path (trigger #2) would still desyncboth stores.
Related Issue
Fixes #43936
Related to #44837 (the repair-compaction drop), whose merged fix #45260 reduced
but did not eliminate the drop — see the note above; this PR rebases on #45260
and supersedes its positional clamp.
Complementary to (not overlapping with) #43962, which backfills delivered text
after an interrupt in
turn_finalizer.py. This PR prevents the drop at theflush layer, which also covers the non-interrupt trigger #2 that a finalizer
backfill cannot reach. The two can land independently.
Type of Change
Changes Made
run_agent.py—_flush_messages_to_session_dbnow stamps each persisteddict with
_db_persistedafter its DB write, and recognizes this turn'sconversation_historyentries by object identity (id()), stamping theminstead of re-writing. A message is written exactly once regardless of how the
arrays shift.
_db_persistedis underscore-prefixed and already stripped fromAPI payloads (
chat_completion_helpers.py,anthropic_adapter.py; the CodexResponses adapter builds fresh items), so it never reaches a provider.
_last_flushed_db_idxis kept updated for legacy readers (cli.pyresume/branch) but no longer decides what gets written.
agent/conversation_compression.py— the compression session split clears the_db_persistedstamps from the surviving messages so they re-write into thenew session row.
tests/run_agent/test_identity_flush.py— new regression suite (4 cases).tests/run_agent/test_compression_persistence.py—test_flush_with_stale_history_loses_messagespreviously asserted the bugcondition (0 rows persisted); it now asserts the fix (both messages persist
despite a stale, longer
conversation_history).How to Test
Reproduction (matches #43936):
state.db(gap; next turn replays a user-only backlog).After: the assistant row is persisted; no gap.
Automated:
tests/run_agent/test_identity_flush.pyencodes both triggers:test_overlap_corrupted_cursor_does_not_drop_assistant(trigger 1)test_repair_shrunk_messages_still_persist_new_turn(trigger 2)test_repeated_flush_same_turn_writes_once(bug: SQLite session transcript accumulates duplicate messages (3-4x token inflation) #860 dedup still holds)test_history_dicts_not_rewritten(no duplication of history rows)Verified live on the gateway: a 99-second, 6-API-call multi-tool turn with two
overlapping inbound messages (the exact shape that dropped a 1096-char response
before this change) persisted every assistant/tool row, and the
FLUSH-IDENTITYlog line fired on thelen_conv > len_messagesdivergence withassistant_written=True.Checklist
Code
fix(agent): …)scripts/run_tests.sh tests/run_agent/and all tests passDocumentation & Housekeeping
cli-config.yaml.example— N/A (no config keys added)CONTRIBUTING.md/AGENTS.md— N/A (no architecture/workflow change)bookkeeping + the existing SQLite path; no OS-specific behavior
Risk / rollout
Behavior-neutral on normal sequential turns: a fresh message dict is never in
_hist_idsand carries no stamp, so it is written exactly as before. The onlybehavior change is on the divergence paths, which previously dropped data.