Skip to content

Commit bbf932f

Browse files
committed
fix(channels): preserve observe-only turn compatibility
1 parent 7a2bb2f commit bbf932f

7 files changed

Lines changed: 132 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
3131

3232
### Fixes
3333

34+
- Channels/groups: preserve observe-only turn suppression for prepared dispatch paths and restore deprecated channel turn runtime aliases, so passive observer/group flows stay silent while older plugins keep compiling. Thanks @vincentkoc.
3435
- Feishu/Bitable: clean up newly created placeholder rows whose fields contain only default empty values while preserving meaningful link, attachment, user, number, boolean, and location values during create-app cleanup. (#73920) Carries forward #40602. Thanks @boat2moon.
3536
- macOS app: keep attach-only mode and the Debug Settings launchd toggle marker-only, so launching with `--attach-only`/`--no-launchd` no longer uninstalls the Gateway LaunchAgent or drops active sessions. (#72174) Thanks @DolencLuka.
3637
- Plugin SDK: restore the deprecated `plugin-sdk/zalouser` command-auth facade so published Lark/Zalo plugins that import it load on current hosts. Fixes #74702. Thanks @Goron01.

src/channels/turn/kernel.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,50 @@ describe("channel turn kernel", () => {
342342
expect(result.dispatchResult.queuedFinal).toBe(true);
343343
});
344344

345+
it("suppresses prepared dispatch for observe-only full turns", async () => {
346+
const events: string[] = [];
347+
const onFinalize = vi.fn();
348+
const runDispatch = vi.fn(async () => {
349+
events.push("custom-dispatch");
350+
return {
351+
queuedFinal: true,
352+
counts: { tool: 0, block: 0, final: 1 },
353+
};
354+
});
355+
const result = await runChannelTurn({
356+
channel: "test",
357+
raw: { id: "msg-1", text: "hello" },
358+
adapter: {
359+
ingest: () => ({ id: "msg-1", rawText: "hello" }),
360+
preflight: () => ({ kind: "observeOnly", reason: "broadcast-observer" }),
361+
resolveTurn: () => ({
362+
channel: "test",
363+
routeSessionKey: "agent:observer:test:peer",
364+
storePath: "/tmp/sessions.json",
365+
ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }),
366+
recordInboundSession: createRecordInboundSession(events),
367+
runDispatch,
368+
}),
369+
onFinalize,
370+
},
371+
});
372+
373+
expect(result.admission).toEqual({ kind: "observeOnly", reason: "broadcast-observer" });
374+
expect(result.dispatched).toBe(true);
375+
expect(events).toEqual(["record"]);
376+
expect(runDispatch).not.toHaveBeenCalled();
377+
if (!result.dispatched) {
378+
throw new Error("expected dispatch");
379+
}
380+
expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false);
381+
expect(onFinalize).toHaveBeenCalledWith(
382+
expect.objectContaining({
383+
admission: { kind: "observeOnly", reason: "broadcast-observer" },
384+
dispatched: true,
385+
}),
386+
);
387+
});
388+
345389
it("finalizes failed dispatches before rethrowing", async () => {
346390
const onFinalize = vi.fn();
347391
const dispatchError = new Error("dispatch failed");

src/channels/turn/kernel.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
22
import { clearHistoryEntriesIfEnabled } from "../../auto-reply/reply/history.js";
3+
import { EMPTY_CHANNEL_TURN_DISPATCH_COUNTS } from "./dispatch-result.js";
34
export { buildChannelTurnContext, filterChannelTurnSupplementalContext } from "./context.js";
45
export type { BuildChannelTurnContextParams } from "./context.js";
56
import type {
@@ -15,6 +16,7 @@ import type {
1516
PreparedChannelTurn,
1617
PreflightFacts,
1718
RunChannelTurnParams,
19+
RunResolvedChannelTurnParams,
1820
} from "./types.js";
1921
export {
2022
EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
@@ -49,6 +51,7 @@ export type {
4951
ReplyPlanFacts,
5052
RouteFacts,
5153
RunChannelTurnParams,
54+
RunResolvedChannelTurnParams,
5255
SenderFacts,
5356
SupplementalContextFacts,
5457
} from "./types.js";
@@ -110,6 +113,15 @@ function clearPendingHistoryAfterTurn(params?: ChannelTurnHistoryFinalizeOptions
110113
});
111114
}
112115

116+
function resolveObserveOnlyDispatchResult<TDispatchResult>(
117+
params: PreparedChannelTurn<TDispatchResult>,
118+
): TDispatchResult {
119+
return (params.observeOnlyDispatchResult ?? {
120+
queuedFinal: false,
121+
counts: EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
122+
}) as TDispatchResult;
123+
}
124+
113125
export async function dispatchAssembledChannelTurn(
114126
params: AssembledChannelTurn,
115127
): Promise<DispatchedChannelTurnResult> {
@@ -158,7 +170,14 @@ async function dispatchResolvedChannelTurn<TDispatchResult>(
158170
},
159171
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
160172
if (isPreparedChannelTurn(params)) {
161-
return await runPreparedChannelTurn(params);
173+
return await runPreparedChannelTurn(
174+
params.admission.kind === "observeOnly"
175+
? {
176+
...params,
177+
runDispatch: async () => resolveObserveOnlyDispatchResult(params),
178+
}
179+
: params,
180+
);
162181
}
163182
return (await dispatchAssembledChannelTurn(
164183
params,
@@ -431,3 +450,21 @@ export async function runChannelTurn<
431450

432451
return result;
433452
}
453+
454+
export async function runResolvedChannelTurn<
455+
TRaw,
456+
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
457+
>(
458+
params: RunResolvedChannelTurnParams<TRaw, TDispatchResult>,
459+
): Promise<ChannelTurnResult<TDispatchResult>> {
460+
return await runChannelTurn({
461+
channel: params.channel,
462+
accountId: params.accountId,
463+
raw: params.raw,
464+
log: params.log,
465+
adapter: {
466+
ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input),
467+
resolveTurn: params.resolveTurn,
468+
},
469+
});
470+
}

src/channels/turn/types.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
235235
history?: ChannelTurnHistoryFinalizeOptions;
236236
onPreDispatchFailure?: (err: unknown) => void | Promise<void>;
237237
runDispatch: () => Promise<TDispatchResult>;
238+
observeOnlyDispatchResult?: TDispatchResult;
238239
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
239240
log?: (event: ChannelTurnLogEvent) => void;
240241
messageId?: string;
@@ -315,3 +316,18 @@ export type RunChannelTurnParams<TRaw, TDispatchResult = DispatchFromConfigResul
315316
adapter: ChannelTurnAdapter<TRaw, TDispatchResult>;
316317
log?: (event: ChannelTurnLogEvent) => void;
317318
};
319+
320+
export type RunResolvedChannelTurnParams<TRaw, TDispatchResult = DispatchFromConfigResult> = {
321+
channel: string;
322+
accountId?: string;
323+
raw: TRaw;
324+
input:
325+
| NormalizedTurnInput
326+
| ((raw: TRaw) => Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null);
327+
resolveTurn: (
328+
input: NormalizedTurnInput,
329+
eventClass: ChannelEventClass,
330+
preflight: PreflightFacts,
331+
) => Promise<ChannelTurnResolved<TDispatchResult>> | ChannelTurnResolved<TDispatchResult>;
332+
log?: (event: ChannelTurnLogEvent) => void;
333+
};

src/plugin-sdk/test-helpers/plugin-runtime-mock.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,16 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
157157
}
158158
throw err;
159159
}
160-
const dispatchResult = await params.runDispatch();
160+
const admission = params.admission ?? { kind: "dispatch" as const };
161+
const dispatchResult =
162+
admission.kind === "observeOnly"
163+
? (params.observeOnlyDispatchResult ?? {
164+
queuedFinal: false,
165+
counts: { tool: 0, block: 0, final: 0 },
166+
})
167+
: await params.runDispatch();
161168
return {
162-
admission: params.admission ?? { kind: "dispatch" as const },
169+
admission,
163170
dispatched: true,
164171
ctxPayload: params.ctxPayload,
165172
routeSessionKey: params.routeSessionKey,
@@ -617,8 +624,24 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
617624
},
618625
turn: {
619626
run: runChannelTurnMock,
627+
runResolved: vi.fn(
628+
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) =>
629+
await runChannelTurnMock({
630+
channel: params.channel,
631+
accountId: params.accountId,
632+
raw: params.raw,
633+
log: params.log,
634+
adapter: {
635+
ingest: (raw) =>
636+
typeof params.input === "function" ? params.input(raw) : params.input,
637+
resolveTurn: params.resolveTurn,
638+
},
639+
}),
640+
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
620641
buildContext: buildChannelTurnContextMock,
621642
runPrepared: runPreparedChannelTurnMock,
643+
dispatchAssembled:
644+
dispatchAssembledChannelTurnMock as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
622645
},
623646
threadBindings: {
624647
setIdleTimeoutBySessionKey:

src/plugins/runtime/runtime-channel.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ import {
5454
buildChannelTurnContext,
5555
runChannelTurn,
5656
runPreparedChannelTurn,
57+
runResolvedChannelTurn,
58+
dispatchAssembledChannelTurn,
5759
} from "../../channels/turn/kernel.js";
5860
import {
5961
resolveChannelGroupPolicy,
@@ -172,8 +174,10 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
172174
},
173175
turn: {
174176
run: runChannelTurn,
177+
runResolved: runResolvedChannelTurn,
175178
buildContext: buildChannelTurnContext,
176179
runPrepared: runPreparedChannelTurn,
180+
dispatchAssembled: dispatchAssembledChannelTurn,
177181
},
178182
threadBindings: {
179183
setIdleTimeoutBySessionKey: ({ channelId, targetSessionKey, accountId, idleTimeoutMs }) =>

src/plugins/runtime/types-channel.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,12 @@ export type PluginRuntimeChannel = {
153153
};
154154
turn: {
155155
run: typeof import("../../channels/turn/kernel.js").runChannelTurn;
156+
/** @deprecated Prefer `run(...)`. */
157+
runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn;
156158
buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext;
157159
runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn;
160+
/** @deprecated Prefer `run(...)` or `runPrepared(...)`. */
161+
dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;
158162
};
159163
threadBindings: {
160164
setIdleTimeoutBySessionKey: (params: {

0 commit comments

Comments
 (0)