Skip to content

Commit f1cb9f2

Browse files
authored
fix(slack): keep DM thread turns out of active steering
Keep Slack direct-message sessions stable while tracking routed Slack thread ids on active reply operations. Different top-level Slack DM threads from the same sender no longer steer into or block each other, while ordinary same-thread follow-ups and non-Slack direct-message behavior keep their existing semantics. Verification: - `git diff --check origin/main...FETCH_HEAD` - `/Users/steipete/Projects/agent-scripts/skills/autoreview/scripts/autoreview --mode branch --base origin/main --output /tmp/pr85904-autoreview.txt --json-output /tmp/pr85904-autoreview.json` - GitHub CI green for head `6703e166545bcb96c1a50de93a42446212cca9a7`, including Real behavior proof and auto-reply reply routing/dispatch shards. Co-authored-by: guanbear <123guan@gmail.com>
1 parent 667393b commit f1cb9f2

10 files changed

Lines changed: 417 additions & 6 deletions

extensions/slack/src/monitor/message-handler/prepare.thread-session-key.test.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,11 @@ describe("thread-level session keys", () => {
490490
expect(sessionKey).not.toContain(":thread:");
491491
});
492492

493-
it("keeps top-level DMs on the direct session when replyToMode=all", () => {
493+
it("keeps top-level DMs on the stable DM session when replyToMode=all", () => {
494494
const ctx = buildCtx({ replyToMode: "all", dmScope: "per-channel-peer" });
495495
const account = buildAccount("all");
496496

497-
const routing = resolveSlackRoutingContext({
497+
const first = resolveSlackRoutingContext({
498498
ctx,
499499
account,
500500
message: {
@@ -509,9 +509,26 @@ describe("thread-level session keys", () => {
509509
isRoom: false,
510510
isRoomish: false,
511511
});
512+
const second = resolveSlackRoutingContext({
513+
ctx,
514+
account,
515+
message: {
516+
channel: "D456",
517+
channel_type: "im",
518+
user: "U3",
519+
text: "second dm message",
520+
ts: "1770408531.000000",
521+
} as SlackMessageEvent,
522+
isDirectMessage: true,
523+
isGroupDm: false,
524+
isRoom: false,
525+
isRoomish: false,
526+
});
512527

513-
expect(routing.sessionKey).toBe("agent:main:slack:direct:u3");
514-
expect(routing.threadContext.messageThreadId).toBe("1770408530.000000");
528+
expect(first.sessionKey).toBe("agent:main:slack:direct:u3");
529+
expect(second.sessionKey).toBe("agent:main:slack:direct:u3");
530+
expect(first.threadContext.messageThreadId).toBe("1770408530.000000");
531+
expect(second.threadContext.messageThreadId).toBe("1770408531.000000");
515532
});
516533

517534
it("routes DM thread replies to the main DM session, not a thread-scoped session", () => {

src/auto-reply/reply/agent-runner.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ import { createReplyMediaContext } from "./reply-media-paths.js";
106106
import { replyRunRegistry, type ReplyOperation } from "./reply-run-registry.js";
107107
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
108108
import { admitReplyTurn, resolveReplyTurnKind } from "./reply-turn-admission.js";
109+
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
109110
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
110111
import { resolveSourceReplyVisibilityPolicy } from "./source-reply-delivery-mode.js";
111112
import { createTypingSignaler } from "./typing-mode.js";
@@ -1335,6 +1336,10 @@ export async function runReplyAgent(params: {
13351336
: null;
13361337

13371338
const replySessionKey = sessionKey ?? followupRun.run.sessionKey;
1339+
const replyRouteThreadId = resolveRoutedDeliveryThreadId({
1340+
ctx: sessionCtx,
1341+
sessionKey: replySessionKey,
1342+
});
13381343
let replyOperation: ReplyOperation;
13391344
if (providedReplyOperation) {
13401345
replyOperation = providedReplyOperation;
@@ -1345,6 +1350,7 @@ export async function runReplyAgent(params: {
13451350
sessionKey: replySessionKey ?? "",
13461351
kind: replyTurnKind,
13471352
resetTriggered: effectiveResetTriggered,
1353+
routeThreadId: replyRouteThreadId,
13481354
upstreamAbortSignal: opts?.abortSignal,
13491355
});
13501356
if (admission.status === "skipped") {

src/auto-reply/reply/dispatch-from-config.test.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,126 @@ describe("dispatchReplyFromConfig", () => {
11351135
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
11361136
});
11371137

1138+
it("records routed Slack thread id on dispatch-owned reply operations", async () => {
1139+
setNoAbort();
1140+
const cfg = emptyConfig;
1141+
const dispatcher = createDispatcher();
1142+
const ctx = buildTestCtx({
1143+
Provider: "slack",
1144+
Surface: "slack",
1145+
OriginatingChannel: "slack",
1146+
OriginatingTo: "user:U1",
1147+
ChatType: "direct",
1148+
SessionKey: "agent:main:slack:direct:U1",
1149+
MessageThreadId: "501.000",
1150+
});
1151+
const replyResolver = vi.fn(async (_ctx: MsgContext, opts?: GetReplyOptions) => {
1152+
const operation = (
1153+
opts as { replyOperation?: { routeThreadId?: string | number } } | undefined
1154+
)?.replyOperation;
1155+
expect(operation?.routeThreadId).toBe("501.000");
1156+
return { text: "hi" } satisfies ReplyPayload;
1157+
});
1158+
1159+
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
1160+
1161+
expect(replyResolver).toHaveBeenCalledTimes(1);
1162+
});
1163+
1164+
it("lets a different Slack DM routed thread reach reply resolution while another thread is active", async () => {
1165+
setNoAbort();
1166+
const sessionKey = "agent:main:slack:direct:U1";
1167+
const activeOperation = createReplyOperation({
1168+
sessionKey,
1169+
sessionId: "active-session",
1170+
resetTriggered: false,
1171+
routeThreadId: "500.000",
1172+
});
1173+
activeOperation.setPhase("running");
1174+
const dispatcher = createDispatcher();
1175+
const replyResolver = vi.fn(async () => ({ text: "thread B reply" }) satisfies ReplyPayload);
1176+
1177+
try {
1178+
const resultPromise = dispatchReplyFromConfig({
1179+
ctx: buildTestCtx({
1180+
Provider: "slack",
1181+
Surface: "slack",
1182+
OriginatingChannel: "slack",
1183+
OriginatingTo: "user:U1",
1184+
ChatType: "direct",
1185+
SessionKey: sessionKey,
1186+
MessageThreadId: "501.000",
1187+
BodyForAgent: "second top-level DM",
1188+
}),
1189+
cfg: emptyConfig,
1190+
dispatcher,
1191+
replyResolver,
1192+
});
1193+
1194+
const result = await Promise.race([
1195+
resultPromise,
1196+
new Promise<"timed-out">((resolve) => setTimeout(() => resolve("timed-out"), 1_000)),
1197+
]);
1198+
if (result === "timed-out") {
1199+
activeOperation.complete();
1200+
await resultPromise;
1201+
throw new Error("Slack routed thread was blocked by the active reply operation");
1202+
}
1203+
1204+
expect(result).toMatchObject({
1205+
queuedFinal: true,
1206+
counts: { tool: 0, block: 0, final: 0 },
1207+
});
1208+
expect(replyResolver).toHaveBeenCalledTimes(1);
1209+
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
1210+
} finally {
1211+
activeOperation.complete();
1212+
}
1213+
});
1214+
1215+
it("keeps non-Slack routed direct turns behind the active reply operation", async () => {
1216+
setNoAbort();
1217+
const sessionKey = "agent:main:telegram:direct:1";
1218+
const activeOperation = createReplyOperation({
1219+
sessionKey,
1220+
sessionId: "active-session",
1221+
resetTriggered: false,
1222+
routeThreadId: "500.000",
1223+
});
1224+
activeOperation.setPhase("running");
1225+
const dispatcher = createDispatcher();
1226+
const replyResolver = vi.fn(async () => ({ text: "telegram reply" }) satisfies ReplyPayload);
1227+
1228+
const resultPromise = dispatchReplyFromConfig({
1229+
ctx: buildTestCtx({
1230+
Provider: "telegram",
1231+
Surface: "telegram",
1232+
OriginatingChannel: "telegram",
1233+
OriginatingTo: "user:1",
1234+
ChatType: "direct",
1235+
SessionKey: sessionKey,
1236+
MessageThreadId: "501.000",
1237+
BodyForAgent: "second telegram direct turn",
1238+
}),
1239+
cfg: emptyConfig,
1240+
dispatcher,
1241+
replyResolver,
1242+
});
1243+
1244+
try {
1245+
const result = await Promise.race([
1246+
resultPromise,
1247+
new Promise<"blocked">((resolve) => setTimeout(() => resolve("blocked"), 1_000)),
1248+
]);
1249+
1250+
expect(result).toBe("blocked");
1251+
expect(replyResolver).not.toHaveBeenCalled();
1252+
} finally {
1253+
activeOperation.complete();
1254+
await resultPromise;
1255+
}
1256+
});
1257+
11381258
it("routes when OriginatingChannel differs from Provider", async () => {
11391259
setNoAbort();
11401260
mocks.routeReply.mockClear();

src/auto-reply/reply/dispatch-from-config.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,44 @@ function composeAbortSignals(...signals: Array<AbortSignal | undefined>): AbortS
192192
return controller.signal;
193193
}
194194

195+
function routeThreadIdsDiffer(
196+
left: string | number | undefined,
197+
right: string | number | undefined,
198+
): boolean {
199+
if (left === undefined || right === undefined) {
200+
return false;
201+
}
202+
return String(left) !== String(right);
203+
}
204+
205+
function isSlackDirectRoutedThreadTurn(
206+
ctx: Pick<
207+
FinalizedMsgContext,
208+
"ChatType" | "MessageThreadId" | "OriginatingChannel" | "Provider" | "Surface" | "TransportThreadId"
209+
>,
210+
): boolean {
211+
if (normalizeChatType(ctx.ChatType) !== "direct") {
212+
return false;
213+
}
214+
if (ctx.MessageThreadId == null && ctx.TransportThreadId == null) {
215+
return false;
216+
}
217+
return [ctx.Provider, ctx.Surface, ctx.OriginatingChannel].some(
218+
(value) => normalizeOptionalString(value)?.toLowerCase() === "slack",
219+
);
220+
}
221+
222+
function shouldLetSlackRoutedThreadBypassBusyReplyOperation(params: {
223+
activeOperation?: ReplyOperation;
224+
ctx: FinalizedMsgContext;
225+
routeThreadId?: string | number;
226+
}): boolean {
227+
return (
228+
isSlackDirectRoutedThreadTurn(params.ctx) &&
229+
routeThreadIdsDiffer(params.activeOperation?.routeThreadId, params.routeThreadId)
230+
);
231+
}
232+
195233
const routeReplyRuntimeLoader = createLazyImportLoader(() => import("./route-reply.runtime.js"));
196234
const getReplyFromConfigRuntimeLoader = createLazyImportLoader(
197235
() => import("./get-reply-from-config.runtime.js"),
@@ -1178,19 +1216,40 @@ export async function dispatchReplyFromConfig(
11781216
crypto.randomUUID();
11791217
const replyTurnKind = resolveReplyTurnKind(params.replyOptions);
11801218
const allowActivePreDispatch = phase === "pre_dispatch" && replyTurnKind === "visible";
1219+
const allowSlackRoutedThreadBypass =
1220+
phase === "dispatch" &&
1221+
shouldLetSlackRoutedThreadBypassBusyReplyOperation({
1222+
activeOperation: replyRunRegistry.get(dispatchOperationSessionKey),
1223+
ctx,
1224+
routeThreadId,
1225+
});
11811226
const admission = await admitReplyTurn({
11821227
sessionKey: dispatchOperationSessionKey,
11831228
sessionId: operationSessionId,
11841229
kind: replyTurnKind,
11851230
resetTriggered: false,
1231+
routeThreadId,
11861232
upstreamAbortSignal: params.replyOptions?.abortSignal,
1187-
waitForActive: !allowActivePreDispatch,
1233+
waitForActive: !allowActivePreDispatch && !allowSlackRoutedThreadBypass,
11881234
});
11891235
if (admission.status === "skipped") {
11901236
if (allowActivePreDispatch && admission.reason === "active-run") {
11911237
preDispatchAbortOperation = admission.activeOperation;
11921238
return { status: "ready" };
11931239
}
1240+
if (
1241+
admission.reason === "active-run" &&
1242+
shouldLetSlackRoutedThreadBypassBusyReplyOperation({
1243+
activeOperation: admission.activeOperation,
1244+
ctx,
1245+
routeThreadId,
1246+
})
1247+
) {
1248+
logVerbose(
1249+
`dispatch-from-config: allowing Slack routed thread ${routeThreadId} while ${dispatchOperationSessionKey} has an active reply operation in another Slack thread`,
1250+
);
1251+
return { status: "ready" };
1252+
}
11941253
dispatchAbortOperation = admission.activeOperation;
11951254
logVerbose(
11961255
`dispatch-from-config: skipped reply operation admission for ${dispatchOperationSessionKey}; reason=${admission.reason}`,

src/auto-reply/reply/followup-runner.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1637,6 +1637,33 @@ describe("createFollowupRunner runtime config", () => {
16371637
});
16381638

16391639
describe("createFollowupRunner progress forwarding", () => {
1640+
it("records queued thread id on follow-up reply operations", async () => {
1641+
const queued = createQueuedRun({
1642+
originatingChannel: "slack",
1643+
originatingTo: "user:U1",
1644+
originatingThreadId: "501.000",
1645+
run: {
1646+
messageProvider: "slack",
1647+
},
1648+
});
1649+
runEmbeddedAgentMock.mockImplementationOnce(
1650+
async (args: { replyOperation?: { routeThreadId?: string | number } }) => {
1651+
expect(args.replyOperation?.routeThreadId).toBe("501.000");
1652+
return { payloads: [], meta: { agentMeta: {} } };
1653+
},
1654+
);
1655+
1656+
const runner = createFollowupRunner({
1657+
typing: createMockTypingController(),
1658+
typingMode: "instant",
1659+
defaultModel: "claude",
1660+
});
1661+
1662+
await runner(queued);
1663+
1664+
expect(runEmbeddedAgentMock).toHaveBeenCalledTimes(1);
1665+
});
1666+
16401667
it("forwards queued follow-up tool progress and verbose tool result payloads", async () => {
16411668
const onToolStart = vi.fn(async () => {});
16421669
const queued = createQueuedRun({

src/auto-reply/reply/followup-runner.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ export function createFollowupRunner(params: {
507507
sessionKey: replySessionKey ?? "",
508508
kind: "queued_followup",
509509
resetTriggered: false,
510+
routeThreadId: queued.originatingThreadId,
510511
upstreamAbortSignal: queued.abortSignal,
511512
});
512513
if (admission.status === "skipped") {

0 commit comments

Comments
 (0)