Skip to content

Commit 8e5183c

Browse files
committed
refactor: move channel message sdk compat into core
1 parent ef17bba commit 8e5183c

13 files changed

Lines changed: 458 additions & 385 deletions
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
83399e00723ea5cc4e7d3a4db0baaef73ad681a42baf72d18b088c649c1c7772 plugin-sdk-api-baseline.json
2-
89fd85479942e9cc3bf30692a0a94a0a0ebfed72ebe9eaf36cec650103cddb11 plugin-sdk-api-baseline.jsonl
1+
ce09dfd1c6f67d49916da2557fb208744b7d8a4912bde944004f44c0998c8e9d plugin-sdk-api-baseline.json
2+
371bdfb13fda61dda885827ffeb922bd46e97ca30e09fa0d09baab80c58a7d1e plugin-sdk-api-baseline.jsonl

docs/plugins/sdk-channel-message.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ This page moved to [Channel outbound API](/plugins/sdk-channel-outbound).
99
`openclaw/plugin-sdk/channel-message-runtime` remain deprecated compatibility
1010
subpaths for older plugins. New channel plugins should use
1111
`openclaw/plugin-sdk/channel-outbound` for message lifecycle, receipt, durable
12-
send, and live preview helpers.
12+
send, and live preview helpers. The deprecated subpaths are thin aliases over
13+
the shared channel message core and the focused inbound/outbound SDK surfaces;
14+
do not add new helpers there.
1315

1416
Removal plan: keep these aliases through the external plugin migration window,
1517
then remove them in the next major SDK cleanup after callers have moved to

scripts/check-deprecated-api-usage.mjs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ const rules = [
166166
allowedFiles: [
167167
"src/channels/turn/durable-delivery.ts",
168168
"src/channels/turn/kernel.ts",
169+
"src/channels/message/inbound-reply-dispatch.ts",
169170
"src/infra/outbound/deliver-runtime.ts",
170171
"src/infra/outbound/deliver.ts",
171172
"src/plugin-sdk/channel-message-runtime.ts",

src/channels/message/adapter.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import type {
2+
ChannelMessageAdapter,
3+
ChannelMessageAdapterShape,
4+
ChannelMessageReceiveAdapterShape,
5+
} from "./types.js";
6+
7+
const defaultManualReceiveAdapter = {
8+
defaultAckPolicy: "manual",
9+
supportedAckPolicies: ["manual"],
10+
} as const satisfies ChannelMessageReceiveAdapterShape;
11+
12+
type ChannelMessageAdapterWithDefaultReceive<TAdapter extends ChannelMessageAdapterShape> =
13+
TAdapter & {
14+
receive: TAdapter["receive"] extends undefined
15+
? typeof defaultManualReceiveAdapter
16+
: NonNullable<TAdapter["receive"]>;
17+
};
18+
19+
export function defineChannelMessageAdapter<const TAdapter extends ChannelMessageAdapterShape>(
20+
adapter: TAdapter,
21+
): ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>> {
22+
return {
23+
...adapter,
24+
receive: adapter.receive ?? defaultManualReceiveAdapter,
25+
} as ChannelMessageAdapter<ChannelMessageAdapterWithDefaultReceive<TAdapter>>;
26+
}
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/**
2+
* Shared inbound reply dispatch helpers for channel message adapters and
3+
* deprecated SDK compatibility facades.
4+
*/
5+
6+
import { withReplyDispatcher } from "../../auto-reply/dispatch.js";
7+
import type { GetReplyOptions } from "../../auto-reply/get-reply-options.types.js";
8+
import {
9+
dispatchReplyFromConfig,
10+
type DispatchFromConfigResult,
11+
} from "../../auto-reply/reply/dispatch-from-config.js";
12+
import type { DispatchReplyWithBufferedBlockDispatcher } from "../../auto-reply/reply/provider-dispatcher.types.js";
13+
import type { ReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.types.js";
14+
import type { FinalizedMsgContext } from "../../auto-reply/templating.js";
15+
import type { OpenClawConfig } from "../../config/types.openclaw.js";
16+
import {
17+
normalizeOutboundReplyPayload,
18+
type OutboundReplyPayload,
19+
} from "../../infra/outbound/reply-payload-normalize.js";
20+
import {
21+
hasFinalChannelTurnDispatch,
22+
hasVisibleChannelTurnDispatch,
23+
deliverInboundReplyWithMessageSendContext,
24+
dispatchChannelInboundReply as dispatchChannelInboundReplyCore,
25+
isDurableInboundReplyDeliveryHandled,
26+
resolveChannelTurnDispatchCounts,
27+
recordDroppedChannelInboundHistory,
28+
runChannelInboundEvent as runChannelInboundEventCore,
29+
runPreparedInboundReply as runPreparedInboundReplyCore,
30+
throwIfDurableInboundReplyDeliveryFailed,
31+
} from "../turn/kernel.js";
32+
import type {
33+
ChannelTurnResult,
34+
DispatchedChannelTurnResult,
35+
DurableInboundReplyDeliveryOptions,
36+
} from "../turn/kernel.js";
37+
import type {
38+
AssembledChannelTurn,
39+
PreparedChannelTurn,
40+
RunChannelTurnParams,
41+
} from "../turn/types.js";
42+
43+
export type {
44+
ChannelTurnDroppedHistoryOptions,
45+
ChannelTurnDroppedHistoryOptions as ChannelInboundDroppedHistoryOptions,
46+
ChannelTurnRecordOptions,
47+
ChannelTurnRecordOptions as InboundReplyRecordOptions,
48+
} from "../turn/types.js";
49+
export type { DurableInboundReplyDeliveryParams } from "../turn/kernel.js";
50+
export type { ChannelBotLoopProtectionFacts } from "../turn/kernel.js";
51+
export { recordChannelBotPairLoopAndCheckSuppression } from "../turn/kernel.js";
52+
53+
type ReplyOptionsWithoutModelSelected = Omit<
54+
Omit<GetReplyOptions, "onBlockReply">,
55+
"onModelSelected"
56+
>;
57+
type RecordInboundSessionFn = typeof import("../session.js").recordInboundSession;
58+
59+
type ReplyDispatchFromConfigOptions = Omit<GetReplyOptions, "onBlockReply">;
60+
export type ChannelInboundEventRunnerParams<
61+
TRaw,
62+
TDispatchResult = DispatchFromConfigResult,
63+
> = RunChannelTurnParams<TRaw, TDispatchResult>;
64+
export type PreparedInboundReply<TDispatchResult> = PreparedChannelTurn<TDispatchResult>;
65+
export type AssembledInboundReply = AssembledChannelTurn;
66+
export type InboundReplyDispatchResult<TDispatchResult> = ChannelTurnResult<TDispatchResult>;
67+
68+
/** Run an already prepared inbound reply through shared session-record + dispatch ordering. */
69+
type PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult> =
70+
PreparedChannelTurn<TDispatchResult> & {
71+
botLoopProtection: NonNullable<PreparedChannelTurn<TDispatchResult>["botLoopProtection"]>;
72+
};
73+
74+
type PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult> = Omit<
75+
PreparedChannelTurn<TDispatchResult>,
76+
"botLoopProtection"
77+
> & {
78+
botLoopProtection?: undefined;
79+
};
80+
81+
export function runPreparedInboundReply<TDispatchResult>(
82+
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
83+
): Promise<ChannelTurnResult<TDispatchResult>>;
84+
export function runPreparedInboundReply<TDispatchResult>(
85+
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
86+
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
87+
export function runPreparedInboundReply<TDispatchResult>(
88+
params: PreparedChannelTurn<TDispatchResult>,
89+
): Promise<ChannelTurnResult<TDispatchResult>>;
90+
export async function runPreparedInboundReply<TDispatchResult>(
91+
params: PreparedChannelTurn<TDispatchResult>,
92+
): Promise<ChannelTurnResult<TDispatchResult>> {
93+
return await runPreparedInboundReplyCore(params);
94+
}
95+
96+
/** @deprecated Use `runPreparedInboundReply`. */
97+
export function runPreparedInboundReplyTurn<TDispatchResult>(
98+
params: PreparedInboundReplyTurnWithBotLoopProtection<TDispatchResult>,
99+
): Promise<ChannelTurnResult<TDispatchResult>>;
100+
export function runPreparedInboundReplyTurn<TDispatchResult>(
101+
params: PreparedInboundReplyTurnWithoutBotLoopProtection<TDispatchResult>,
102+
): Promise<DispatchedChannelTurnResult<TDispatchResult>>;
103+
export function runPreparedInboundReplyTurn<TDispatchResult>(
104+
params: PreparedChannelTurn<TDispatchResult>,
105+
): Promise<ChannelTurnResult<TDispatchResult>>;
106+
export async function runPreparedInboundReplyTurn<TDispatchResult>(
107+
params: PreparedChannelTurn<TDispatchResult>,
108+
): Promise<ChannelTurnResult<TDispatchResult>> {
109+
return await runPreparedInboundReply(params);
110+
}
111+
112+
export async function runChannelInboundEvent<TRaw, TDispatchResult = DispatchFromConfigResult>(
113+
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
114+
) {
115+
return await runChannelInboundEventCore(params);
116+
}
117+
118+
/** @deprecated Use `runChannelInboundEvent`. */
119+
export async function runInboundReplyTurn<TRaw, TDispatchResult = DispatchFromConfigResult>(
120+
params: ChannelInboundEventRunnerParams<TRaw, TDispatchResult>,
121+
) {
122+
return await runChannelInboundEvent(params);
123+
}
124+
125+
export async function dispatchChannelInboundReply(params: AssembledInboundReply) {
126+
return await dispatchChannelInboundReplyCore(params);
127+
}
128+
129+
export {
130+
hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch,
131+
hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch,
132+
deliverInboundReplyWithMessageSendContext as deliverDurableInboundReplyPayload,
133+
deliverInboundReplyWithMessageSendContext,
134+
recordDroppedChannelInboundHistory as recordDroppedChannelTurnHistory,
135+
recordDroppedChannelInboundHistory,
136+
resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts,
137+
};
138+
139+
/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */
140+
export async function dispatchReplyFromConfigWithSettledDispatcher(params: {
141+
cfg: OpenClawConfig;
142+
ctxPayload: FinalizedMsgContext;
143+
dispatcher: ReplyDispatcher;
144+
onSettled: () => void | Promise<void>;
145+
replyOptions?: ReplyDispatchFromConfigOptions;
146+
configOverride?: OpenClawConfig;
147+
}): Promise<DispatchFromConfigResult> {
148+
return await withReplyDispatcher({
149+
dispatcher: params.dispatcher,
150+
onSettled: params.onSettled,
151+
run: () =>
152+
dispatchReplyFromConfig({
153+
ctx: params.ctxPayload,
154+
cfg: params.cfg,
155+
dispatcher: params.dispatcher,
156+
replyOptions: params.replyOptions,
157+
configOverride: params.configOverride,
158+
}),
159+
});
160+
}
161+
162+
/** Assemble the common inbound reply dispatch dependencies for a resolved route. */
163+
export function buildInboundReplyDispatchBase(params: {
164+
cfg: OpenClawConfig;
165+
channel: string;
166+
accountId?: string;
167+
route: {
168+
agentId: string;
169+
sessionKey: string;
170+
};
171+
storePath: string;
172+
ctxPayload: FinalizedMsgContext;
173+
core: {
174+
channel: {
175+
session: {
176+
recordInboundSession: RecordInboundSessionFn;
177+
};
178+
reply: {
179+
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
180+
};
181+
};
182+
};
183+
}) {
184+
return {
185+
cfg: params.cfg,
186+
channel: params.channel,
187+
accountId: params.accountId,
188+
agentId: params.route.agentId,
189+
routeSessionKey: params.route.sessionKey,
190+
storePath: params.storePath,
191+
ctxPayload: params.ctxPayload,
192+
recordInboundSession: params.core.channel.session.recordInboundSession,
193+
dispatchReplyWithBufferedBlockDispatcher:
194+
params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
195+
};
196+
}
197+
198+
type BuildInboundReplyDispatchBaseParams = Parameters<typeof buildInboundReplyDispatchBase>[0];
199+
type RecordChannelMessageReplyDispatchParams = {
200+
cfg: OpenClawConfig;
201+
channel: string;
202+
accountId?: string;
203+
agentId: string;
204+
routeSessionKey: string;
205+
storePath: string;
206+
ctxPayload: FinalizedMsgContext;
207+
recordInboundSession: RecordInboundSessionFn;
208+
dispatchReplyWithBufferedBlockDispatcher: DispatchReplyWithBufferedBlockDispatcher;
209+
deliver: (payload: OutboundReplyPayload) => Promise<void>;
210+
durable?: false | DurableInboundReplyDeliveryOptions;
211+
onRecordError: (err: unknown) => void;
212+
onDispatchError: (err: unknown, info: { kind: string }) => void;
213+
replyOptions?: ReplyOptionsWithoutModelSelected;
214+
};
215+
216+
/**
217+
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
218+
*
219+
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
220+
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
221+
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
222+
* `sendDurableMessageBatch(...)`.
223+
*/
224+
export async function dispatchChannelMessageReplyWithBase(
225+
params: BuildInboundReplyDispatchBaseParams &
226+
Pick<
227+
RecordChannelMessageReplyDispatchParams,
228+
"deliver" | "durable" | "onRecordError" | "onDispatchError" | "replyOptions"
229+
>,
230+
): Promise<void> {
231+
const dispatchBase = buildInboundReplyDispatchBase(params);
232+
await recordChannelMessageReplyDispatch({
233+
...dispatchBase,
234+
deliver: params.deliver,
235+
durable: params.durable,
236+
onRecordError: params.onRecordError,
237+
onDispatchError: params.onDispatchError,
238+
replyOptions: params.replyOptions,
239+
});
240+
}
241+
242+
/**
243+
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
244+
*
245+
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
246+
* `message` adapter via `defineChannelMessageAdapter(...)` and use
247+
* `dispatchChannelMessageReplyWithBase` only for compatibility dispatchers that
248+
* have not moved to the message lifecycle yet.
249+
*/
250+
export async function dispatchInboundReplyWithBase(
251+
params: Parameters<typeof dispatchChannelMessageReplyWithBase>[0],
252+
): Promise<void> {
253+
await dispatchChannelMessageReplyWithBase(params);
254+
}
255+
256+
/**
257+
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
258+
*
259+
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
260+
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
261+
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
262+
* `sendDurableMessageBatch(...)`.
263+
*/
264+
export async function recordChannelMessageReplyDispatch(
265+
params: RecordChannelMessageReplyDispatchParams,
266+
): Promise<void> {
267+
await dispatchChannelInboundReplyCore({
268+
cfg: params.cfg,
269+
channel: params.channel,
270+
accountId: params.accountId,
271+
agentId: params.agentId,
272+
routeSessionKey: params.routeSessionKey,
273+
storePath: params.storePath,
274+
ctxPayload: params.ctxPayload,
275+
recordInboundSession: params.recordInboundSession,
276+
dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher,
277+
delivery: {
278+
preparePayload: (payload) =>
279+
(payload && typeof payload === "object"
280+
? normalizeOutboundReplyPayload(payload as Record<string, unknown>)
281+
: {}) as OutboundReplyPayload,
282+
deliver: async (payload, info) => {
283+
if (params.durable) {
284+
const durable = await deliverInboundReplyWithMessageSendContext({
285+
cfg: params.cfg,
286+
channel: params.channel,
287+
accountId: params.accountId,
288+
agentId: params.agentId,
289+
ctxPayload: params.ctxPayload,
290+
payload,
291+
info,
292+
...params.durable,
293+
});
294+
throwIfDurableInboundReplyDeliveryFailed(durable);
295+
if (isDurableInboundReplyDeliveryHandled(durable)) {
296+
return durable.delivery;
297+
}
298+
}
299+
return await params.deliver(payload as OutboundReplyPayload);
300+
},
301+
onError: params.onDispatchError,
302+
},
303+
replyPipeline: {},
304+
replyOptions: params.replyOptions,
305+
record: {
306+
onRecordError: params.onRecordError,
307+
},
308+
});
309+
}
310+
311+
/**
312+
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
313+
*
314+
* @deprecated Legacy inbound reply helper. New channel plugins should expose a
315+
* `message` adapter via `defineChannelMessageAdapter(...)` and use
316+
* `recordChannelMessageReplyDispatch` only for compatibility dispatchers that
317+
* have not moved to the message lifecycle yet.
318+
*/
319+
export async function recordInboundSessionAndDispatchReply(
320+
params: RecordChannelMessageReplyDispatchParams,
321+
): Promise<void> {
322+
await recordChannelMessageReplyDispatch(params);
323+
}
324+
325+
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
326+
export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase;
327+
/** @deprecated Compatibility helper for legacy reply dispatch results. */
328+
export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch;
329+
/** @deprecated Compatibility helper for legacy reply dispatch results. */
330+
export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch;
331+
/** @deprecated Compatibility helper for legacy reply dispatch results. */
332+
export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts;

src/channels/message/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export { deriveDurableFinalDeliveryRequirements } from "./capabilities.js";
2+
export { defineChannelMessageAdapter } from "./adapter.js";
23
export { createChannelMessageAdapterFromOutbound } from "./outbound-bridge.js";
34
export { createDurableInboundReceiveJournal } from "./durable-receive.js";
45
export {

0 commit comments

Comments
 (0)