Skip to content

Commit ede47a5

Browse files
karthikeyannteknium1
authored andcommitted
fix(gateway): pin Telegram DM-topic routing to user's current topic
Topic-mode DM replies were fragmenting one conversation across many sessions: a Reply on a message in another topic delivered Telegram's message_thread_id for *that* topic, and #3206's strip routed plain replies to the lobby. Both pulled the user away from their current session. Fix: when topic mode is on, rewrite source.thread_id to the user's most-recent binding if the inbound id is missing/General or not a known topic. Non-topic-mode users unchanged.
1 parent 470edfa commit ede47a5

3 files changed

Lines changed: 185 additions & 0 deletions

File tree

gateway/run.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,6 +1814,54 @@ def _record_telegram_topic_binding(
18141814
session_id=session_entry.session_id,
18151815
)
18161816

1817+
def _recover_telegram_topic_thread_id(
1818+
self,
1819+
source: SessionSource,
1820+
) -> Optional[str]:
1821+
"""Pin DM-topic routing to the user's last-active topic.
1822+
1823+
Telegram fragments topic-mode DMs two ways: a Reply on a message
1824+
in another topic delivers ``message_thread_id`` for *that* topic,
1825+
and ``_build_message_event`` strips the thread_id on plain replies
1826+
(#3206 — needed for non-topic users). Both route the user to the
1827+
wrong session. When topic mode is on, rewrite the thread_id to the
1828+
user's most-recent binding if the inbound id is missing/General or
1829+
not a known topic for this chat. Returns None to leave it alone.
1830+
"""
1831+
if (
1832+
source.platform != Platform.TELEGRAM
1833+
or source.chat_type != "dm"
1834+
or not source.chat_id
1835+
or not source.user_id
1836+
or not self._telegram_topic_mode_enabled(source)
1837+
):
1838+
return None
1839+
session_db = getattr(self, "_session_db", None)
1840+
if session_db is None:
1841+
return None
1842+
try:
1843+
bindings = session_db.list_telegram_topic_bindings_for_chat(
1844+
chat_id=str(source.chat_id),
1845+
)
1846+
except Exception:
1847+
logger.debug("topic-recover: read failed", exc_info=True)
1848+
return None
1849+
if not bindings:
1850+
return None
1851+
inbound = str(source.thread_id or "")
1852+
is_lobby = not inbound or inbound in self._TELEGRAM_GENERAL_TOPIC_IDS
1853+
known = {str(b.get("thread_id") or "") for b in bindings}
1854+
if not is_lobby and inbound in known:
1855+
return None
1856+
user_id = str(source.user_id)
1857+
for b in bindings: # newest-first
1858+
if str(b.get("user_id") or "") == user_id:
1859+
recovered = str(b.get("thread_id") or "")
1860+
if recovered and recovered != inbound:
1861+
return recovered
1862+
return None
1863+
return None
1864+
18171865
def _resolve_session_agent_runtime(
18181866
self,
18191867
*,
@@ -7498,6 +7546,21 @@ async def _handle_message_with_agent(self, event, source, _quick_key: str, run_g
74987546
)
74997547

75007548
# Get or create session
7549+
# Topic-mode DMs: rewrite a stale/foreign thread_id to the user's
7550+
# last-active topic so a cross-topic Reply or stripped plain reply
7551+
# doesn't fragment the conversation across sessions.
7552+
recovered = self._recover_telegram_topic_thread_id(source)
7553+
if recovered is not None:
7554+
logger.info(
7555+
"telegram topic recovery: chat=%s user=%s %r -> %s",
7556+
source.chat_id, source.user_id, source.thread_id, recovered,
7557+
)
7558+
source = dataclasses.replace(source, thread_id=recovered)
7559+
try:
7560+
event.source = source
7561+
except Exception:
7562+
pass
7563+
75017564
session_entry = self.session_store.get_or_create_session(source)
75027565
session_key = session_entry.session_key
75037566
self._cache_session_source(session_key, source)

hermes_state.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2831,6 +2831,27 @@ def get_telegram_topic_binding(
28312831
return None
28322832
return dict(row) if row else None
28332833

2834+
def list_telegram_topic_bindings_for_chat(
2835+
self,
2836+
*,
2837+
chat_id: str,
2838+
) -> List[Dict[str, Any]]:
2839+
"""All Telegram DM topic bindings for one chat, newest first.
2840+
2841+
Read-only; returns [] if the bindings table doesn't exist yet
2842+
(does not trigger the topic-mode migration).
2843+
"""
2844+
with self._lock:
2845+
try:
2846+
rows = self._conn.execute(
2847+
"SELECT * FROM telegram_dm_topic_bindings "
2848+
"WHERE chat_id = ? ORDER BY updated_at DESC",
2849+
(str(chat_id),),
2850+
).fetchall()
2851+
except sqlite3.OperationalError:
2852+
return []
2853+
return [dict(row) for row in rows]
2854+
28342855
def bind_telegram_topic(
28352856
self,
28362857
*,

tests/gateway/test_telegram_topic_mode.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,5 +1050,106 @@ async def test_topic_refuses_unauthorized_user(tmp_path, monkeypatch):
10501050
assert tables == set()
10511051

10521052

1053+
# ──────────────────────────────────────────────────────────────────────
1054+
# Cross-topic Reply leak / stripped-reply recovery
1055+
# ──────────────────────────────────────────────────────────────────────
1056+
1057+
1058+
def _seed_two_topic_bindings(session_db):
1059+
"""Create two topics for the same user in topic mode, oldest first."""
1060+
session_db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
1061+
# Seed two distinct sessions so the bind FK resolves.
1062+
session_db.create_session(
1063+
session_id="sess-A",
1064+
source="telegram",
1065+
user_id="208214988",
1066+
)
1067+
session_db.create_session(
1068+
session_id="sess-B",
1069+
source="telegram",
1070+
user_id="208214988",
1071+
)
1072+
# Old topic A first, then current topic B (so B is "most recent").
1073+
src_a = _make_source(thread_id="111")
1074+
session_db.bind_telegram_topic(
1075+
chat_id=src_a.chat_id,
1076+
thread_id=src_a.thread_id,
1077+
user_id=src_a.user_id,
1078+
session_key=build_session_key(src_a),
1079+
session_id="sess-A",
1080+
)
1081+
src_b = _make_source(thread_id="222")
1082+
session_db.bind_telegram_topic(
1083+
chat_id=src_b.chat_id,
1084+
thread_id=src_b.thread_id,
1085+
user_id=src_b.user_id,
1086+
session_key=build_session_key(src_b),
1087+
session_id="sess-B",
1088+
)
1089+
1090+
1091+
def test_recover_returns_none_for_known_topic(tmp_path):
1092+
db = SessionDB(db_path=tmp_path / "state.db")
1093+
_seed_two_topic_bindings(db)
1094+
runner = _make_runner(session_db=db)
1095+
1096+
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="222")) is None
1097+
1098+
1099+
def test_recover_rewrites_unknown_thread_id_to_most_recent(tmp_path):
1100+
# Cross-topic Reply leak: inbound thread_id is a Telegram-only id we never bound.
1101+
db = SessionDB(db_path=tmp_path / "state.db")
1102+
_seed_two_topic_bindings(db)
1103+
runner = _make_runner(session_db=db)
1104+
1105+
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id="9999")) == "222"
1106+
1107+
1108+
def test_recover_rewrites_lobby_thread_id_to_most_recent(tmp_path):
1109+
# Stripped plain reply: thread_id is None, topic mode is on.
1110+
db = SessionDB(db_path=tmp_path / "state.db")
1111+
_seed_two_topic_bindings(db)
1112+
runner = _make_runner(session_db=db)
1113+
1114+
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) == "222"
1115+
1116+
1117+
def test_recover_returns_none_when_topic_mode_disabled(tmp_path):
1118+
# Non-topic-mode DMs keep the existing strip-to-lobby behavior.
1119+
db = SessionDB(db_path=tmp_path / "state.db")
1120+
runner = _make_runner(session_db=db)
1121+
1122+
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None
1123+
1124+
1125+
def test_recover_returns_none_when_no_bindings_yet(tmp_path):
1126+
db = SessionDB(db_path=tmp_path / "state.db")
1127+
db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
1128+
runner = _make_runner(session_db=db)
1129+
1130+
assert runner._recover_telegram_topic_thread_id(_make_source(thread_id=None)) is None
1131+
1132+
1133+
def test_list_telegram_topic_bindings_for_chat(tmp_path):
1134+
db = SessionDB(db_path=tmp_path / "state.db")
1135+
_seed_two_topic_bindings(db)
1136+
rows = db.list_telegram_topic_bindings_for_chat(chat_id="208214988")
1137+
assert [r["thread_id"] for r in rows] == ["222", "111"]
1138+
1139+
1140+
def test_list_telegram_topic_bindings_for_chat_no_table(tmp_path):
1141+
# Missing topic-mode tables → [] without auto-migrating.
1142+
db = SessionDB(db_path=tmp_path / "state.db")
1143+
assert db.list_telegram_topic_bindings_for_chat(chat_id="208214988") == []
1144+
tables = {
1145+
row[0]
1146+
for row in db._conn.execute(
1147+
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'telegram_dm%'"
1148+
).fetchall()
1149+
}
1150+
assert tables == set()
1151+
1152+
1153+
10531154

10541155

0 commit comments

Comments
 (0)