Skip to content

Commit 9a9cd0c

Browse files
committed
refactor(channels): add shared turn kernel
1 parent 4396361 commit 9a9cd0c

62 files changed

Lines changed: 3444 additions & 1328 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
cb1975fe65fcab0d50f4bf368118e61640d870a13bb8d9a44a9abb0f79f3c729 plugin-sdk-api-baseline.json
2-
c8e2ebe7dc13d170b83b96109dd46fc33057e6f4200f981dc5ea9623e73affab plugin-sdk-api-baseline.jsonl
1+
6e8aa3634daa81d054c339d2a8b6a526ec22b93e737980d21191ff7d53449eda plugin-sdk-api-baseline.json
2+
6bb635a9d95b671c24251406d098ac052a6773551a1db30529bdc97caf1bb735 plugin-sdk-api-baseline.jsonl

extensions/bluebubbles/src/monitor-processing.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,11 +1714,18 @@ async function processMessageAfterDedupe(
17141714
},
17151715
},
17161716
});
1717-
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
1718-
ctx: ctxPayload,
1717+
await core.channel.turn.dispatchAssembled({
17191718
cfg: config,
1720-
dispatcherOptions: {
1721-
...replyPipeline,
1719+
channel: "bluebubbles",
1720+
accountId: account.accountId,
1721+
agentId: route.agentId,
1722+
routeSessionKey: route.sessionKey,
1723+
storePath,
1724+
ctxPayload,
1725+
recordInboundSession: core.channel.session.recordInboundSession,
1726+
dispatchReplyWithBufferedBlockDispatcher:
1727+
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
1728+
delivery: {
17221729
deliver: async (payload, info) => {
17231730
const rawReplyToId =
17241731
privateApiEnabled && typeof payload.replyToId === "string"
@@ -1845,8 +1852,6 @@ async function processMessageAfterDedupe(
18451852
}
18461853
}
18471854
},
1848-
onReplyStart: typingCallbacks?.onReplyStart,
1849-
onIdle: typingCallbacks?.onIdle,
18501855
onError: (err, info) => {
18511856
// Flag the outer dedupe wrapper so it releases the claim instead
18521857
// of committing. Without this, a transient BlueBubbles send failure
@@ -1864,13 +1869,23 @@ async function processMessageAfterDedupe(
18641869
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`);
18651870
},
18661871
},
1872+
dispatcherOptions: {
1873+
...replyPipeline,
1874+
onReplyStart: typingCallbacks?.onReplyStart,
1875+
onIdle: typingCallbacks?.onIdle,
1876+
},
18671877
replyOptions: {
18681878
onModelSelected,
18691879
disableBlockStreaming:
18701880
typeof account.config.blockStreaming === "boolean"
18711881
? !account.config.blockStreaming
18721882
: undefined,
18731883
},
1884+
record: {
1885+
onRecordError: (err) => {
1886+
runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`);
1887+
},
1888+
},
18741889
});
18751890
} finally {
18761891
const shouldStopTyping =

extensions/discord/src/monitor/agent-components.dispatch.ts

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
resolveEnvelopeFormatOptions,
55
} from "openclaw/plugin-sdk/channel-inbound";
66
import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/dangerous-name-runtime";
7+
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
78
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
89
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
910
import { createNonExitingRuntime, logVerbose } from "openclaw/plugin-sdk/runtime-env";
@@ -238,35 +239,6 @@ export async function dispatchDiscordComponentEvent(params: {
238239
resolveDiscordComponentOriginatingTo(interactionCtx) ?? `channel:${interactionCtx.channelId}`,
239240
});
240241

241-
await recordInboundSession({
242-
storePath,
243-
sessionKey: ctxPayload.SessionKey ?? sessionKey,
244-
ctx: ctxPayload,
245-
updateLastRoute: interactionCtx.isDirectMessage
246-
? {
247-
sessionKey: route.mainSessionKey,
248-
channel: "discord",
249-
to:
250-
resolveDiscordComponentOriginatingTo(interactionCtx) ?? `user:${interactionCtx.userId}`,
251-
accountId,
252-
mainDmOwnerPin: pinnedMainDmOwner
253-
? {
254-
ownerRecipient: pinnedMainDmOwner,
255-
senderRecipient: interactionCtx.userId,
256-
onSkip: ({ ownerRecipient, senderRecipient }) => {
257-
logVerbose(
258-
`discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
259-
);
260-
},
261-
}
262-
: undefined,
263-
}
264-
: undefined,
265-
onRecordError: (err) => {
266-
logVerbose(`discord: failed updating component session meta: ${String(err)}`);
267-
},
268-
});
269-
270242
const deliverTarget = `channel:${interactionCtx.channelId}`;
271243
const typingChannelId = interactionCtx.channelId;
272244
const { createChannelReplyPipeline } = await loadReplyPipelineRuntime();
@@ -298,48 +270,83 @@ export async function dispatchDiscordComponentEvent(params: {
298270
startId: params.replyToId,
299271
});
300272

301-
await dispatchReplyWithBufferedBlockDispatcher({
302-
ctx: ctxPayload,
303-
cfg: ctx.cfg,
304-
replyOptions: { onModelSelected },
305-
dispatcherOptions: {
306-
...replyPipeline,
307-
humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId),
308-
deliver: async (payload) => {
309-
const replyToId = replyReference.use();
310-
await deliverDiscordReply({
311-
cfg: ctx.cfg,
312-
replies: [payload],
313-
target: deliverTarget,
314-
token,
315-
accountId,
316-
rest: interaction.client.rest,
317-
runtime,
318-
replyToId,
319-
replyToMode,
320-
textLimit,
321-
maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({
322-
cfg: ctx.cfg,
323-
discordConfig: ctx.discordConfig,
273+
await runPreparedInboundReplyTurn({
274+
channel: "discord",
275+
accountId,
276+
routeSessionKey: sessionKey,
277+
storePath,
278+
ctxPayload,
279+
recordInboundSession,
280+
record: {
281+
updateLastRoute: interactionCtx.isDirectMessage
282+
? {
283+
sessionKey: route.mainSessionKey,
284+
channel: "discord",
285+
to:
286+
resolveDiscordComponentOriginatingTo(interactionCtx) ??
287+
`user:${interactionCtx.userId}`,
324288
accountId,
325-
}),
326-
tableMode,
327-
chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId),
328-
mediaLocalRoots,
329-
});
330-
replyReference.markSent();
331-
},
332-
onReplyStart: async () => {
333-
try {
334-
const { sendTyping } = await loadTypingRuntime();
335-
await sendTyping({ rest: feedbackRest, channelId: typingChannelId });
336-
} catch (err) {
337-
logVerbose(`discord: typing failed for component reply: ${String(err)}`);
338-
}
339-
},
340-
onError: (err) => {
341-
logError(`discord component dispatch failed: ${String(err)}`);
289+
mainDmOwnerPin: pinnedMainDmOwner
290+
? {
291+
ownerRecipient: pinnedMainDmOwner,
292+
senderRecipient: interactionCtx.userId,
293+
onSkip: ({ ownerRecipient, senderRecipient }) => {
294+
logVerbose(
295+
`discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
296+
);
297+
},
298+
}
299+
: undefined,
300+
}
301+
: undefined,
302+
onRecordError: (err) => {
303+
logVerbose(`discord: failed updating component session meta: ${String(err)}`);
342304
},
343305
},
306+
runDispatch: () =>
307+
dispatchReplyWithBufferedBlockDispatcher({
308+
ctx: ctxPayload,
309+
cfg: ctx.cfg,
310+
replyOptions: { onModelSelected },
311+
dispatcherOptions: {
312+
...replyPipeline,
313+
humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId),
314+
deliver: async (payload) => {
315+
const replyToId = replyReference.use();
316+
await deliverDiscordReply({
317+
cfg: ctx.cfg,
318+
replies: [payload],
319+
target: deliverTarget,
320+
token,
321+
accountId,
322+
rest: interaction.client.rest,
323+
runtime,
324+
replyToId,
325+
replyToMode,
326+
textLimit,
327+
maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({
328+
cfg: ctx.cfg,
329+
discordConfig: ctx.discordConfig,
330+
accountId,
331+
}),
332+
tableMode,
333+
chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId),
334+
mediaLocalRoots,
335+
});
336+
replyReference.markSent();
337+
},
338+
onReplyStart: async () => {
339+
try {
340+
const { sendTyping } = await loadTypingRuntime();
341+
await sendTyping({ rest: feedbackRest, channelId: typingChannelId });
342+
} catch (err) {
343+
logVerbose(`discord: typing failed for component reply: ${String(err)}`);
344+
}
345+
},
346+
onError: (err) => {
347+
logError(`discord component dispatch failed: ${String(err)}`);
348+
},
349+
},
350+
}),
344351
});
345352
}

extensions/discord/src/monitor/message-handler.context.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import {
33
resolveEnvelopeFormatOptions,
44
} from "openclaw/plugin-sdk/channel-inbound";
55
import { resolveChannelContextVisibilityMode } from "openclaw/plugin-sdk/context-visibility-runtime";
6-
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
76
import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/dangerous-name-runtime";
87
import { finalizeInboundContext } from "openclaw/plugin-sdk/reply-dispatch-runtime";
98
import { buildPendingHistoryContextFromMap } from "openclaw/plugin-sdk/reply-history";
@@ -330,21 +329,6 @@ export async function buildDiscordMessageProcessContext(params: {
330329
});
331330
const persistedSessionKey = ctxPayload.SessionKey ?? route.sessionKey;
332331

333-
await recordInboundSession({
334-
storePath,
335-
sessionKey: persistedSessionKey,
336-
ctx: ctxPayload,
337-
updateLastRoute: {
338-
sessionKey: persistedSessionKey,
339-
channel: "discord",
340-
to: lastRouteTo,
341-
accountId: route.accountId,
342-
},
343-
onRecordError: (err) => {
344-
logVerbose(`discord: failed updating session meta: ${String(err)}`);
345-
},
346-
});
347-
348332
if (shouldLogVerbose()) {
349333
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
350334
logVerbose(
@@ -355,6 +339,20 @@ export async function buildDiscordMessageProcessContext(params: {
355339
return {
356340
ctxPayload,
357341
persistedSessionKey,
342+
turn: {
343+
storePath,
344+
record: {
345+
updateLastRoute: {
346+
sessionKey: persistedSessionKey,
347+
channel: "discord",
348+
to: lastRouteTo,
349+
accountId: route.accountId,
350+
},
351+
onRecordError: (err: unknown) => {
352+
logVerbose(`discord: failed updating session meta: ${String(err)}`);
353+
},
354+
},
355+
},
358356
replyPlan,
359357
deliverTarget,
360358
replyTarget,

extensions/discord/src/monitor/message-handler.process.test.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,17 @@ let processDiscordMessage: typeof import("./message-handler.process.js").process
162162

163163
vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({
164164
dispatchInboundMessage: (params: DispatchInboundParams) => dispatchInboundMessage(params),
165+
settleReplyDispatcher: async (params: {
166+
dispatcher: { markComplete: () => void; waitForIdle: () => Promise<void> };
167+
onSettled?: () => void | Promise<void>;
168+
}) => {
169+
params.dispatcher.markComplete();
170+
try {
171+
await params.dispatcher.waitForIdle();
172+
} finally {
173+
await params.onSettled?.();
174+
}
175+
},
165176
createReplyDispatcherWithTyping: (opts: {
166177
deliver: (payload: unknown, info: { kind: string }) => Promise<void> | void;
167178
}) => ({

0 commit comments

Comments
 (0)