Skip to content

Commit 0a4de3d

Browse files
IWhatsskillclawsweeper[bot]Takhoffman
authored
[AI-assisted] fix(reply): wait for block replies before tools (#83722)
Summary: - The branch adds an abort-aware dispatcher-idle wait after successful same-channel and direct ACP block replies, plus regression tests and a changelog entry. - Reproducibility: yes. Current main source shows the same-channel block callback queues dispatcher delivery w ... spatcher idle, and the PR body supplies before/after diagnostic output for the tool-start ordering failure. Automerge notes: - PR branch already contained follow-up commit before automerge: [AI-assisted] fix(reply): wait for block replies before tools Validation: - ClawSweeper review passed for head 3257620. - Required merge gates passed before the squash merge. Prepared head SHA: 3257620 Review: #83722 (comment) Co-authored-by: JARVIS-Glasses <284122573+JARVIS-Glasses@users.noreply.github.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
1 parent eb7f3b7 commit 0a4de3d

8 files changed

Lines changed: 202 additions & 4 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
2424
### Fixes
2525

2626
- CLI/agents: allow `openclaw agent --session-key` to target explicit session keys, including agent-scoped legacy keys. (#85121) Thanks @Kaspre.
27+
- Auto-reply/ACP: wait for same-channel block reply delivery before starting tool work, while still honoring ACP dispatch aborts so stopped turns do not wait on slow channel sends. (#83722) Thanks @IWhatsskill.
2728
- Agents/subagents: surface blocked child-run completions as errors instead of successful subagent finishes. (#80886) Thanks @TurboTheTurtle.
2829
- Agents/Pi: treat accepted embedded `sessions_spawn` child-session handoffs as terminal progress so parent turns no longer report false non-deliverable failures. (#85054) Thanks @samzong.
2930
- WhatsApp: update Baileys to `7.0.0-rc13` and drop the obsolete logger type patch.

src/auto-reply/reply/dispatch-acp-delivery.test.ts

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { beforeEach, describe, expect, it, vi } from "vitest";
22
import type { OpenClawConfig } from "../../config/config.js";
33
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
4-
import type { ReplyDispatcher } from "./reply-dispatcher.js";
4+
import { createReplyDispatcher, type ReplyDispatcher } from "./reply-dispatcher.js";
55
import { buildTestCtx } from "./test-ctx.js";
66
import { createAcpTestConfig } from "./test-fixtures/acp-runtime.js";
77

@@ -240,6 +240,93 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
240240
expect(coordinator.getRoutedCounts().block).toBe(0);
241241
});
242242

243+
it("waits for direct block dispatcher delivery before resolving block delivery", async () => {
244+
const delivered: unknown[] = [];
245+
let releaseDelivery: (() => void) | undefined;
246+
let markDeliveryStarted: (() => void) | undefined;
247+
const deliveryStarted = new Promise<void>((resolve) => {
248+
markDeliveryStarted = resolve;
249+
});
250+
const deliveryGate = new Promise<void>((resolve) => {
251+
releaseDelivery = resolve;
252+
});
253+
const dispatcher = createReplyDispatcher({
254+
deliver: async (payload) => {
255+
delivered.push(payload);
256+
markDeliveryStarted?.();
257+
await deliveryGate;
258+
},
259+
});
260+
const coordinator = createAcpDispatchDeliveryCoordinator({
261+
cfg: createAcpTestConfig(),
262+
ctx: buildTestCtx({
263+
Provider: "visiblechat",
264+
Surface: "visiblechat",
265+
SessionKey: "agent:codex-acp:session-1",
266+
}),
267+
dispatcher,
268+
inboundAudio: false,
269+
shouldRouteToOriginating: false,
270+
});
271+
272+
let deliverySettled = false;
273+
const deliveryPromise = coordinator.deliver("block", { text: "hello" }, { skipTts: true });
274+
void deliveryPromise.then(() => {
275+
deliverySettled = true;
276+
});
277+
278+
await deliveryStarted;
279+
280+
expect(delivered).toEqual([{ text: "hello" }]);
281+
expect(deliverySettled).toBe(false);
282+
283+
releaseDelivery?.();
284+
await expect(deliveryPromise).resolves.toBe(true);
285+
expect(deliverySettled).toBe(true);
286+
});
287+
288+
it("stops waiting for direct block delivery when the ACP dispatch aborts", async () => {
289+
const delivered: unknown[] = [];
290+
const controller = new AbortController();
291+
let releaseDelivery: (() => void) | undefined;
292+
let markDeliveryStarted: (() => void) | undefined;
293+
const deliveryStarted = new Promise<void>((resolve) => {
294+
markDeliveryStarted = resolve;
295+
});
296+
const deliveryGate = new Promise<void>((resolve) => {
297+
releaseDelivery = resolve;
298+
});
299+
const dispatcher = createReplyDispatcher({
300+
deliver: async (payload) => {
301+
delivered.push(payload);
302+
markDeliveryStarted?.();
303+
await deliveryGate;
304+
},
305+
});
306+
const coordinator = createAcpDispatchDeliveryCoordinator({
307+
cfg: createAcpTestConfig(),
308+
ctx: buildTestCtx({
309+
Provider: "visiblechat",
310+
Surface: "visiblechat",
311+
SessionKey: "agent:codex-acp:session-1",
312+
}),
313+
dispatcher,
314+
inboundAudio: false,
315+
shouldRouteToOriginating: false,
316+
abortSignal: controller.signal,
317+
});
318+
319+
const deliveryPromise = coordinator.deliver("block", { text: "hello" }, { skipTts: true });
320+
await deliveryStarted;
321+
controller.abort();
322+
323+
await expect(deliveryPromise).resolves.toBe(true);
324+
expect(delivered).toEqual([{ text: "hello" }]);
325+
326+
releaseDelivery?.();
327+
await dispatcher.waitForIdle();
328+
});
329+
243330
it("strips split TTS directives from visible ACP block delivery", async () => {
244331
const dispatcher = createDispatcher();
245332
const coordinator = createAcpDispatchDeliveryCoordinator({

src/auto-reply/reply/dispatch-acp-delivery.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
1313
import { resolveConfiguredTtsMode, shouldCleanTtsDirectiveText } from "../../tts/tts-config.js";
1414
import type { FinalizedMsgContext } from "../templating.js";
1515
import type { ReplyPayload } from "../types.js";
16+
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
1617
import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js";
1718
import { resolveRoutedDeliveryThreadId } from "./routed-delivery-thread.js";
1819

@@ -191,6 +192,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
191192
originatingChannel?: string;
192193
originatingTo?: string;
193194
onReplyStart?: () => Promise<void> | void;
195+
abortSignal?: AbortSignal;
194196
}): AcpDispatchDeliveryCoordinator {
195197
const directChannel = normalizeOptionalLowercaseString(params.ctx.Provider ?? params.ctx.Surface);
196198
const routedChannel = normalizeOptionalLowercaseString(params.originatingChannel);
@@ -458,6 +460,9 @@ export function createAcpDispatchDeliveryCoordinator(params: {
458460
} else if (!delivered && tracksVisibleText) {
459461
state.failedVisibleTextDelivery = true;
460462
}
463+
if (kind === "block" && delivered) {
464+
await waitForReplyDispatcherIdle(params.dispatcher, params.abortSignal);
465+
}
461466
return delivered;
462467
};
463468

src/auto-reply/reply/dispatch-acp.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ export async function tryDispatchAcpReply(params: {
386386
originatingChannel: params.originatingChannel,
387387
originatingTo: params.originatingTo,
388388
onReplyStart: params.onReplyStart,
389+
abortSignal: params.abortSignal,
389390
});
390391

391392
const identityPendingBeforeTurn = isSessionIdentityPending(

src/auto-reply/reply/dispatch-from-config.test.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { createInternalHookEventPayload } from "../../test-utils/internal-hook-e
2727
import type { MsgContext } from "../templating.js";
2828
import { setReplyPayloadMetadata, type GetReplyOptions, type ReplyPayload } from "../types.js";
2929
import { PROVIDER_CONVERSATION_STATE_ERROR_USER_MESSAGE } from "./provider-request-error-classifier.js";
30-
import type { ReplyDispatcher } from "./reply-dispatcher.js";
30+
import { createReplyDispatcher, type ReplyDispatcher } from "./reply-dispatcher.js";
3131
import { buildTestCtx } from "./test-ctx.js";
3232

3333
type AbortResult = { handled: boolean; aborted: boolean; stoppedSubagents?: number };
@@ -4590,6 +4590,56 @@ describe("dispatchReplyFromConfig", () => {
45904590
expect(callOrder).toEqual(["queued:The answer is 42", "dispatch:The answer is 42"]);
45914591
});
45924592

4593+
it("waits for same-channel block dispatcher delivery before resolving block replies", async () => {
4594+
setNoAbort();
4595+
const ctx = buildTestCtx({ Provider: "whatsapp" });
4596+
const delivered: ReplyPayload[] = [];
4597+
let releaseDelivery: (() => void) | undefined;
4598+
let markDeliveryStarted: (() => void) | undefined;
4599+
const deliveryStarted = new Promise<void>((resolve) => {
4600+
markDeliveryStarted = resolve;
4601+
});
4602+
const deliveryGate = new Promise<void>((resolve) => {
4603+
releaseDelivery = resolve;
4604+
});
4605+
const dispatcher = createReplyDispatcher({
4606+
deliver: async (payload) => {
4607+
delivered.push(payload);
4608+
markDeliveryStarted?.();
4609+
await deliveryGate;
4610+
},
4611+
});
4612+
let blockReplySettled = false;
4613+
const replyResolver = async (
4614+
_ctx: MsgContext,
4615+
opts?: GetReplyOptions,
4616+
): Promise<ReplyPayload | undefined> => {
4617+
const blockReplyPromise = Promise.resolve(opts?.onBlockReply?.({ text: "before tool" })).then(
4618+
() => {
4619+
blockReplySettled = true;
4620+
},
4621+
);
4622+
4623+
await deliveryStarted;
4624+
4625+
expect(delivered).toEqual([{ text: "before tool" }]);
4626+
expect(blockReplySettled).toBe(false);
4627+
4628+
releaseDelivery?.();
4629+
await blockReplyPromise;
4630+
return undefined;
4631+
};
4632+
4633+
await dispatchReplyFromConfig({
4634+
ctx,
4635+
cfg: emptyConfig,
4636+
dispatcher,
4637+
replyResolver,
4638+
});
4639+
4640+
expect(blockReplySettled).toBe(true);
4641+
});
4642+
45934643
it("forwards payload metadata into onBlockReplyQueued context", async () => {
45944644
setNoAbort();
45954645
const dispatcher = createDispatcher();

src/auto-reply/reply/dispatch-from-config.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ import { resolveEffectiveReplyRoute } from "./effective-reply-route.js";
121121
import { withFullRuntimeReplyConfig } from "./get-reply-fast-path.js";
122122
import { claimInboundDedupe, commitInboundDedupe, releaseInboundDedupe } from "./inbound-dedupe.js";
123123
import { resolveOriginMessageProvider } from "./origin-routing.js";
124+
import { waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
124125
import type { ReplyDispatcher } from "./reply-dispatcher.types.js";
125126
import {
126127
createReplyOperation,
@@ -2066,7 +2067,10 @@ export async function dispatchReplyFromConfig(
20662067
await sendPayloadAsync(normalizedPayload, context?.abortSignal, false);
20672068
} else {
20682069
markInboundDedupeReplayUnsafe();
2069-
dispatcher.sendBlockReply(normalizedPayload);
2070+
const delivered = dispatcher.sendBlockReply(normalizedPayload);
2071+
if (delivered) {
2072+
await waitForReplyDispatcherIdle(dispatcher, context?.abortSignal);
2073+
}
20702074
}
20712075
};
20722076
return run();

src/auto-reply/reply/reply-dispatcher.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,30 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
254254
};
255255
}
256256

257+
export async function waitForReplyDispatcherIdle(
258+
dispatcher: Pick<ReplyDispatcher, "waitForIdle">,
259+
abortSignal?: AbortSignal,
260+
): Promise<void> {
261+
if (!abortSignal) {
262+
await dispatcher.waitForIdle();
263+
return;
264+
}
265+
if (abortSignal.aborted) {
266+
return;
267+
}
268+
let removeAbortListener: (() => void) | undefined;
269+
const aborted = new Promise<void>((resolve) => {
270+
const onAbort = () => resolve();
271+
abortSignal.addEventListener("abort", onAbort, { once: true });
272+
removeAbortListener = () => abortSignal.removeEventListener("abort", onAbort);
273+
});
274+
try {
275+
await Promise.race([dispatcher.waitForIdle(), aborted]);
276+
} finally {
277+
removeAbortListener?.();
278+
}
279+
}
280+
257281
export function createReplyDispatcherWithTyping(
258282
options: ReplyDispatcherWithTypingOptions,
259283
): ReplyDispatcherWithTypingResult {

src/auto-reply/reply/reply-flow.test.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { describe, expect, it, vi } from "vitest";
22
import type { OpenClawConfig } from "../../config/types.openclaw.js";
33
import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js";
4-
import { createReplyDispatcher } from "./reply-dispatcher.js";
4+
import { createReplyDispatcher, waitForReplyDispatcherIdle } from "./reply-dispatcher.js";
55
import { createReplyToModeFilter } from "./reply-threading.js";
66

77
type DeliverPayload = Parameters<Parameters<typeof createReplyDispatcher>[0]["deliver"]>[0];
@@ -217,6 +217,32 @@ describe("createReplyDispatcher", () => {
217217
});
218218
});
219219

220+
describe("waitForReplyDispatcherIdle", () => {
221+
it("returns when the abort signal fires before the dispatcher becomes idle", async () => {
222+
const controller = new AbortController();
223+
const waitForIdle = vi.fn(
224+
() =>
225+
new Promise<void>(() => {
226+
// Keep the dispatcher busy until the abort path wins.
227+
}),
228+
);
229+
230+
let settled = false;
231+
const waitPromise = waitForReplyDispatcherIdle({ waitForIdle }, controller.signal).then(() => {
232+
settled = true;
233+
});
234+
235+
await Promise.resolve();
236+
expect(settled).toBe(false);
237+
238+
controller.abort();
239+
await waitPromise;
240+
241+
expect(settled).toBe(true);
242+
expect(waitForIdle).toHaveBeenCalledTimes(1);
243+
});
244+
});
245+
220246
describe("createReplyToModeFilter", () => {
221247
it("handles off/all mode behavior for replyToId", () => {
222248
const cases: Array<{

0 commit comments

Comments
 (0)