Skip to content

Commit 268daf1

Browse files
committed
fix(adapter): clear _pending_enqueued_at on teardown + cancellation (#33)
The Kimi adapter maintains _pending_enqueued_at as a parallel TTL dict alongside the base class's _pending_messages and _active_sessions (gateway/platforms/base.py). The base clears the latter two during cancel_background_tasks (base.py:2553-2554), but our subclass's parallel dict is untouched — entries leak across reconnects since the gateway reuses the same adapter instance (gateway/run.py:2725-2729 calls cancel_background_tasks then disconnect on the same instance, and the adapter is then re-used for the next connect). Three layered guarantees close the leak: 1. cancel_background_tasks override mirrors the base's clear() — this is the primary fix. super() runs the drain (await asyncio.gather over _background_tasks) and clears base state; our override then clears _pending_enqueued_at as a final sweep. Order matters: super() first, our clear() last. Reversed, an in-flight handler's finally block could re-insert a key after our clear during the gather. With this order, every handler's finally has already run and self-popped via the "if session_key not in _pending_messages" guard. 2. disconnect() also clears as defense-in-depth for direct- disconnect call sites that bypass cancel_background_tasks (gateway/run.py:_safe_adapter_disconnect, the error-recovery branch in connect()). Documented limitation: those direct paths don't clear base _pending_messages or _active_sessions either — pre-existing behaviour, out of scope. 3. handle_message wraps the post-super cleanup in try/finally so any unexpected exception from super (most relevantly a CancelledError propagating up from base.handle_message) doesn't skip the cleanup. The cleanup guard is deterministic under cancellation because base's _pending_messages writes happen synchronously with no await between the write and the function return — so the guard observes the slot in one of two known states: empty (pop) or owned by a follow-up (preserve fresh ts). Plus connect() clears _pending_enqueued_at at session start as belt-and-braces against any future code path that bypasses the teardown machinery entirely (e.g. partial-init reuse). Cheap and idempotent. Three-way review feedback applied: - Codex caught that my original docstring overstated the consequence (a stale-only timestamp can't actually evict a fresh message — the TTL guard at line ~1247 first does _pending_messages.get and bails if None). Reworded as "memory hygiene, not correctness". - Claude caught that the try/finally docstring rationale was misdirected (cancel_background_tasks cancels _background_tasks, not direct handle_message callers). Reframed as "any unexpected super() exception". - Kimi audited other parallel dicts (_last_message_id_per_room, _probe_msg_id_room_counts) and confirmed they don't need clearing — the first persists by design as a replay-dedup anchor; the second is debug-only and gated behind isEnabledFor(DEBUG). Regression tests (tests/test_kimi.py PendingEnqueuedAtCleanupTests): - test_cancel_background_tasks_clears_pending_enqueued_at: seeds the dict, calls the override, asserts empty. - test_disconnect_clears_pending_enqueued_at: seeds the dict, calls disconnect with WS/HTTP/lock teardown patched, asserts empty. - test_connect_clears_pending_enqueued_at: seeds a "stale" entry, calls connect with GetMe forced to fail, asserts the entry was cleared at session start (before the GetMe failure). - test_handle_message_cleanup_runs_on_cancellation: patches super().handle_message to raise CancelledError, asserts the try/ finally pops the timestamp via the guard. No production behavior change for healthy connect/disconnect cycles — this only matters for reconnects, error-recovery, and the rare per-task cancellation outside of full adapter teardown.
1 parent 6f3bccb commit 268daf1

2 files changed

Lines changed: 194 additions & 7 deletions

File tree

kimi/kimi_adapter.py

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,15 @@ async def connect(self) -> bool:
10951095
self._startup_ts = time.time()
10961096
self._http_session = aiohttp.ClientSession()
10971097

1098+
# Belt-and-braces sweep of parallel TTL state at the start of
1099+
# every connect cycle. Standard teardown paths (disconnect,
1100+
# cancel_background_tasks) already clear this, but partial-init
1101+
# failures or future code paths that bypass them could leave
1102+
# stale entries from a prior session — and the gateway reuses
1103+
# this same adapter instance on reconnect. Clearing here
1104+
# guarantees every connect starts from a known-empty state.
1105+
self._pending_enqueued_at.clear()
1106+
10981107
# Fetch bot identity once — needed to filter self-authored group messages.
10991108
try:
11001109
me = await self._rpc_unary("GetMe", {})
@@ -1161,6 +1170,63 @@ async def disconnect(self) -> None:
11611170
await self._cleanup_http()
11621171
self._release_platform_lock()
11631172

1173+
# Defense-in-depth: clear our parallel TTL state. The gateway's
1174+
# standard shutdown path calls cancel_background_tasks() before
1175+
# disconnect() (gateway/run.py:2725-2729), and that override
1176+
# already clears _pending_enqueued_at via super(). But other
1177+
# call sites — gateway/run.py:_safe_adapter_disconnect at
1178+
# ~line 953 + line 1145, plus the error-recovery branch at
1179+
# ~line 1110-1112 in connect() — call disconnect() directly
1180+
# without a prior drain. Clearing here ensures
1181+
# _pending_enqueued_at never outlives the connection,
1182+
# regardless of teardown path.
1183+
#
1184+
# Known limitation (out of scope for this fix): direct-
1185+
# disconnect paths don't clear the base class's
1186+
# ``_pending_messages`` or ``_active_sessions`` either — those
1187+
# only get cleaned up via cancel_background_tasks(). If a
1188+
# future code path reuses an adapter after a direct disconnect
1189+
# with real pending messages, those would also need clearing.
1190+
# The pre-existing behaviour is unchanged by this commit.
1191+
self._pending_enqueued_at.clear()
1192+
1193+
async def cancel_background_tasks(self) -> None:
1194+
"""Mirror base behaviour for our parallel TTL state.
1195+
1196+
BasePlatformAdapter.cancel_background_tasks (gateway/platforms/
1197+
base.py:2553-2554) clears ``_pending_messages`` and
1198+
``_active_sessions`` at the end of its drain. Our subclass
1199+
maintains a parallel ``_pending_enqueued_at`` dict that is
1200+
only meaningful while the corresponding ``_pending_messages``
1201+
slot is live; once base clears its state, our timestamps are
1202+
orphaned. Without this override they leak across reconnects
1203+
(the gateway typically reuses the adapter instance).
1204+
1205+
Correctness note: a stale-only timestamp is benign — the TTL
1206+
guard in ``handle_message`` keys off ``_pending_messages.get
1207+
(session_key)`` first (see line ~1247) and bails if no slot
1208+
exists, so a phantom ``_pending_enqueued_at`` entry can't
1209+
evict a real later message. The leak is a memory-hygiene
1210+
issue, not a correctness one — relevant for long-running pi
1211+
deployments that reconnect repeatedly over weeks.
1212+
1213+
Order: ``super()`` first, then our ``clear()``. Reversed,
1214+
an in-flight handler whose ``finally`` block runs during the
1215+
drain's ``await asyncio.gather`` could re-insert a key after
1216+
our clear, leaving us with a single stray entry per drain.
1217+
With this order, the base awaits all such handlers to
1218+
completion (their ``finally`` blocks see ``_pending_messages``
1219+
empty and pop their own timestamp via the guard), so our
1220+
clear is a final sweep over a known-empty dict.
1221+
1222+
Other parallel dicts on this adapter (``_last_message_id_per_
1223+
room`` for replay dedup, ``_probe_msg_id_room_counts`` for
1224+
debug counters) intentionally persist across reconnects or
1225+
carry no semantic state — they're not in scope here.
1226+
"""
1227+
await super().cancel_background_tasks()
1228+
self._pending_enqueued_at.clear()
1229+
11641230
async def _cleanup_http(self) -> None:
11651231
if self._http_session is not None:
11661232
try:
@@ -1247,13 +1313,32 @@ async def handle_message(self, event: MessageEvent) -> None: # type: ignore[ove
12471313
# with what super() puts in _pending_messages.
12481314
self._pending_enqueued_at[session_key] = now
12491315

1250-
await super().handle_message(event)
1251-
1252-
# Clean up timestamp when the session finishes (slot consumed or
1253-
# not needed). Guard: only drop if the slot itself is gone, so a
1254-
# rapidly-arriving follow-up doesn't race-clear a fresh timestamp.
1255-
if session_key not in self._pending_messages:
1256-
self._pending_enqueued_at.pop(session_key, None)
1316+
try:
1317+
await super().handle_message(event)
1318+
finally:
1319+
# Clean up timestamp when the session finishes (slot consumed
1320+
# or not needed). Wrapped in finally so any unexpected
1321+
# exception from super() — most relevantly a CancelledError
1322+
# propagating up from base.handle_message itself — doesn't
1323+
# skip the cleanup and leak a timestamp into a future
1324+
# invocation. (Note: gateway/run.py's task-drain at
1325+
# cancel_background_tasks cancels ``_background_tasks`` —
1326+
# the spawned ``_process_message_background`` workers —
1327+
# not direct ``handle_message`` callers, so the
1328+
# cancellation pressure here is from other paths.)
1329+
#
1330+
# Why the guard is deterministic under cancellation: base
1331+
# writes to ``_pending_messages[session_key]`` happen
1332+
# synchronously with no ``await`` between the write and
1333+
# the function return (see gateway/platforms/base.py
1334+
# interrupt-queue path). So by the time our ``finally``
1335+
# observes ``_pending_messages``, the slot is in one of
1336+
# two known states: empty (no follow-up landed → safe to
1337+
# pop) or owned by a follow-up (write completed before
1338+
# our await unwound → preserve the fresh timestamp the
1339+
# follow-up's own pre-super block set).
1340+
if session_key not in self._pending_messages:
1341+
self._pending_enqueued_at.pop(session_key, None)
12571342

12581343
# Public send / platform-surface overrides
12591344
# ──────────────────────────────────────────────────────────────────────

tests/test_kimi.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3187,6 +3187,108 @@ async def test_3a_4_ttl_enabled_evicts_expired_pending(self):
31873187
self.assertEqual(len(overwrite_warnings), 0, "Eviction should not also fire a drop warning")
31883188

31893189

3190+
# ═══════════════════════════════════════════════════════════════════════════════
3191+
# Issue #33: _pending_enqueued_at cleanup across teardown paths
3192+
#
3193+
# The Kimi adapter maintains `_pending_enqueued_at` as a parallel TTL dict
3194+
# alongside the base class's `_pending_messages` and `_active_sessions`. The
3195+
# base clears the latter two during `cancel_background_tasks`; without parallel
3196+
# clears in our subclass, the TTL dict leaks across reconnects (the gateway
3197+
# reuses the adapter instance). The fix layers three guarantees: (1) the
3198+
# `cancel_background_tasks` override mirrors the base's clear; (2) `disconnect`
3199+
# also clears for direct-disconnect paths that bypass cancel_background_tasks
3200+
# (gateway/run.py:_safe_adapter_disconnect, error-recovery in connect()); (3)
3201+
# `handle_message`'s post-super cleanup is wrapped in `try/finally` so any
3202+
# unexpected exception from super doesn't leak a stamped timestamp.
3203+
# ═══════════════════════════════════════════════════════════════════════════════
3204+
3205+
class PendingEnqueuedAtCleanupTests(unittest.IsolatedAsyncioTestCase):
3206+
"""Issue #33: _pending_enqueued_at must be cleared across teardown paths."""
3207+
3208+
async def test_cancel_background_tasks_clears_pending_enqueued_at(self):
3209+
"""cancel_background_tasks override must mirror the base's clear()
3210+
behaviour for our parallel TTL state."""
3211+
adapter = KimiAdapter(_cfg())
3212+
adapter._pending_enqueued_at["dm:test:abc"] = 1.0
3213+
adapter._pending_enqueued_at["room:xyz"] = 2.0
3214+
# super().cancel_background_tasks() walks _background_tasks (empty
3215+
# on a fresh adapter) and clears the base's parallel dicts; our
3216+
# override should clear _pending_enqueued_at on top of that.
3217+
await adapter.cancel_background_tasks()
3218+
self.assertEqual(
3219+
adapter._pending_enqueued_at, {},
3220+
"cancel_background_tasks should clear _pending_enqueued_at"
3221+
)
3222+
3223+
async def test_disconnect_clears_pending_enqueued_at(self):
3224+
"""disconnect() must clear _pending_enqueued_at as defense-in-depth
3225+
for direct-disconnect call sites that bypass cancel_background_tasks."""
3226+
adapter = KimiAdapter(_cfg())
3227+
adapter._pending_enqueued_at["dm:test:abc"] = 1.0
3228+
adapter._pending_enqueued_at["room:xyz"] = 2.0
3229+
# Patch out network/lock teardown — the test only cares about the
3230+
# parallel-state clear; the rest is unrelated infrastructure.
3231+
with patch.object(adapter, "_cleanup_http", new=AsyncMock()), \
3232+
patch.object(adapter, "_release_platform_lock"):
3233+
await adapter.disconnect()
3234+
self.assertEqual(
3235+
adapter._pending_enqueued_at, {},
3236+
"disconnect should clear _pending_enqueued_at"
3237+
)
3238+
3239+
async def test_connect_clears_pending_enqueued_at(self):
3240+
"""connect() must clear stale TTL state as a belt-and-braces sweep
3241+
before establishing a new session — protects against any path that
3242+
reuses the adapter without going through disconnect first."""
3243+
adapter = KimiAdapter(_cfg())
3244+
adapter._pending_enqueued_at["dm:stale:xyz"] = 99.0 # leftover from prior session
3245+
3246+
# connect() reaches the clear() before any network IO. Make connect
3247+
# short-circuit at GetMe so we don't have to mock the whole WS stack.
3248+
from kimi_adapter import KimiAuthError
3249+
with patch.object(adapter, "_acquire_platform_lock", return_value=True), \
3250+
patch.object(adapter, "_rpc_unary", new=AsyncMock(side_effect=KimiAuthError("test"))), \
3251+
patch.object(adapter, "_cleanup_http", new=AsyncMock()), \
3252+
patch.object(adapter, "_release_platform_lock"):
3253+
# Returns False because GetMe raises; the clear() ran before that.
3254+
result = await adapter.connect()
3255+
self.assertFalse(result, "connect() should fail when GetMe raises")
3256+
self.assertEqual(
3257+
adapter._pending_enqueued_at, {},
3258+
"connect should clear stale TTL state at session start"
3259+
)
3260+
3261+
async def test_handle_message_cleanup_runs_on_cancellation(self):
3262+
"""try/finally ensures the post-super cleanup runs on CancelledError,
3263+
so a per-task cancellation outside of full adapter teardown doesn't
3264+
leak a stamped timestamp into the next handler invocation."""
3265+
adapter = KimiAdapter(_cfg())
3266+
event = _make_message_event("test message")
3267+
session_key = _compute_session_key(adapter, event)
3268+
3269+
# Simulate an active session so the override stamps _pending_enqueued_at.
3270+
adapter._active_sessions[session_key] = asyncio.Event()
3271+
3272+
# Patch super().handle_message to raise CancelledError mid-await, AFTER
3273+
# the override stamped its timestamp. _pending_messages is left empty
3274+
# (the mock does no enqueueing), so the cleanup guard's "if session_key
3275+
# not in _pending_messages" branch should fire.
3276+
with patch.object(
3277+
adapter.__class__.__bases__[0],
3278+
"handle_message",
3279+
new=AsyncMock(side_effect=asyncio.CancelledError),
3280+
):
3281+
with self.assertRaises(asyncio.CancelledError):
3282+
await adapter.handle_message(event)
3283+
3284+
self.assertNotIn(
3285+
session_key,
3286+
adapter._pending_enqueued_at,
3287+
"try/finally should pop the timestamp on CancelledError when the "
3288+
"pending slot was never populated",
3289+
)
3290+
3291+
31903292
# ═══════════════════════════════════════════════════════════════════════════════
31913293
# Lift 3b: output_mode flag
31923294
# ═══════════════════════════════════════════════════════════════════════════════

0 commit comments

Comments
 (0)