Skip to content

Commit 2eee70e

Browse files
authored
refactor: run prepared Discord and Slack turns
Route Discord and Slack prepared message turns through the core prepared-turn runner directly. Local proof before landing: - node scripts/run-vitest.mjs src/channels/turn/kernel.test.ts extensions/discord/src/monitor/message-handler.process.test.ts extensions/slack/src/monitor/message-handler/prepare.test.ts extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts - node scripts/run-tsgo.mjs -p tsconfig.core.json --incremental false - node scripts/run-tsgo.mjs -p tsconfig.extensions.json --incremental false - OPENCLAW_TESTBOX_REMOTE_RUN=1 OPENCLAW_VITEST_MAX_WORKERS=1 pnpm check:changed - codex-review clean after accepted Slack bot-loop history cleanup finding was fixed in core GitHub checks had no failures; Blacksmith/GitHub runner jobs were still queued when maintainer approved landing based on local proof.
1 parent 369917f commit 2eee70e

6 files changed

Lines changed: 327 additions & 345 deletions

File tree

extensions/discord/src/monitor/message-handler.process.ts

Lines changed: 160 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
2626
import {
2727
hasFinalInboundReplyDispatch,
2828
recordChannelBotPairLoopAndCheckSuppression,
29-
runInboundReplyTurn,
29+
runPreparedInboundReplyTurn,
3030
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
3131
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
3232
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
@@ -631,184 +631,169 @@ export async function processDiscordMessage(
631631
await settleDispatchBeforeStart();
632632
return;
633633
}
634-
const preparedResult = await runInboundReplyTurn({
634+
const preparedResult = await runPreparedInboundReplyTurn({
635635
channel: "discord",
636636
accountId: route.accountId,
637-
raw: ctx,
638-
adapter: {
639-
ingest: () => ({
640-
id: message.id,
641-
timestamp: message.timestamp ? Date.parse(message.timestamp) : undefined,
642-
rawText: text,
643-
textForAgent: ctxPayload.BodyForAgent,
644-
textForCommands: ctxPayload.CommandBody,
645-
raw: message,
646-
}),
647-
resolveTurn: () => ({
648-
channel: "discord",
649-
accountId: route.accountId,
650-
routeSessionKey: persistedSessionKey,
651-
storePath: turn.storePath,
652-
ctxPayload,
653-
recordInboundSession,
654-
record: turn.record,
655-
history: {
656-
isGroup: isGuildMessage,
657-
historyKey: messageChannelId,
658-
historyMap: guildHistories,
659-
limit: historyLimit,
660-
},
661-
onPreDispatchFailure: settleDispatchBeforeStart,
662-
runDispatch: async () => {
663-
return await dispatchInboundMessage({
664-
ctx: ctxPayload,
665-
cfg,
666-
dispatcher,
667-
replyOptions: {
668-
...replyOptions,
669-
abortSignal,
670-
skillFilter: channelConfig?.skills,
671-
sourceReplyDeliveryMode,
672-
disableBlockStreaming: sourceRepliesAreToolOnly
673-
? true
674-
: (draftPreview.disableBlockStreamingForDraft ??
675-
(typeof resolvedBlockStreamingEnabled === "boolean"
676-
? !resolvedBlockStreamingEnabled
677-
: undefined)),
678-
onPartialReply: draftPreview.draftStream
679-
? (payload) => draftPreview.updateFromPartial(payload.text)
680-
: undefined,
681-
onAssistantMessageStart: draftPreview.draftStream
682-
? () => draftPreview.handleAssistantMessageBoundary()
683-
: undefined,
684-
onReasoningEnd: draftPreview.draftStream
685-
? () => draftPreview.handleAssistantMessageBoundary()
686-
: undefined,
687-
onModelSelected,
688-
suppressDefaultToolProgressMessages:
689-
draftPreview.suppressDefaultToolProgressMessages ? true : undefined,
690-
onReasoningStream: async (payload) => {
691-
await statusReactions.setThinking();
692-
const formattedText = payload?.text
693-
? formatReasoningMessage(payload.text)
694-
: undefined;
695-
await draftPreview.pushReasoningProgress(formattedText);
696-
},
697-
onToolStart: async (payload) => {
698-
if (isProcessAborted(abortSignal)) {
699-
return;
700-
}
701-
await maybeBindStatusReactionsToToolReaction(payload);
702-
await statusReactions.setTool(payload.name);
703-
await draftPreview.pushToolProgress(
704-
buildChannelProgressDraftLineForEntry(
705-
discordConfig,
706-
{
707-
event: "tool",
708-
name: payload.name,
709-
phase: payload.phase,
710-
args: payload.args,
711-
},
712-
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
713-
),
714-
{ toolName: payload.name },
715-
);
716-
},
717-
onItemEvent: async (payload) => {
718-
await draftPreview.pushToolProgress(
719-
buildChannelProgressDraftLineForEntry(discordConfig, {
720-
event: "item",
721-
itemId: payload.itemId,
722-
itemKind: payload.kind,
723-
title: payload.title,
724-
name: payload.name,
725-
phase: payload.phase,
726-
status: payload.status,
727-
summary: payload.summary,
728-
progressText: payload.progressText,
729-
meta: payload.meta,
730-
}),
731-
);
732-
},
733-
onPlanUpdate: async (payload) => {
734-
if (payload.phase !== "update") {
735-
return;
736-
}
737-
await draftPreview.pushToolProgress(
738-
buildChannelProgressDraftLine({
739-
event: "plan",
740-
phase: payload.phase,
741-
title: payload.title,
742-
explanation: payload.explanation,
743-
steps: payload.steps,
744-
}),
745-
);
746-
},
747-
onApprovalEvent: async (payload) => {
748-
if (payload.phase !== "requested") {
749-
return;
750-
}
751-
await draftPreview.pushToolProgress(
752-
buildChannelProgressDraftLine({
753-
event: "approval",
754-
phase: payload.phase,
755-
title: payload.title,
756-
command: payload.command,
757-
reason: payload.reason,
758-
message: payload.message,
759-
}),
760-
);
761-
},
762-
onCommandOutput: async (payload) => {
763-
if (payload.phase !== "end") {
764-
return;
765-
}
766-
await draftPreview.pushToolProgress(
767-
buildChannelProgressDraftLine({
768-
event: "command-output",
769-
phase: payload.phase,
770-
title: payload.title,
771-
name: payload.name,
772-
status: payload.status,
773-
exitCode: payload.exitCode,
774-
}),
775-
);
776-
},
777-
onPatchSummary: async (payload) => {
778-
if (payload.phase !== "end") {
779-
return;
780-
}
781-
await draftPreview.pushToolProgress(
782-
buildChannelProgressDraftLine({
783-
event: "patch",
784-
phase: payload.phase,
785-
title: payload.title,
786-
name: payload.name,
787-
added: payload.added,
788-
modified: payload.modified,
789-
deleted: payload.deleted,
790-
summary: payload.summary,
791-
}),
792-
);
793-
},
794-
onCompactionStart: async () => {
795-
if (isProcessAborted(abortSignal)) {
796-
return;
797-
}
798-
await statusReactions.setCompacting();
799-
},
800-
onCompactionEnd: async () => {
801-
if (isProcessAborted(abortSignal)) {
802-
return;
803-
}
804-
statusReactions.cancelPending();
805-
await statusReactions.setThinking();
806-
},
807-
},
808-
});
637+
routeSessionKey: persistedSessionKey,
638+
storePath: turn.storePath,
639+
ctxPayload,
640+
recordInboundSession,
641+
record: turn.record,
642+
history: {
643+
isGroup: isGuildMessage,
644+
historyKey: messageChannelId,
645+
historyMap: guildHistories,
646+
limit: historyLimit,
647+
},
648+
onPreDispatchFailure: settleDispatchBeforeStart,
649+
runDispatch: async () =>
650+
await dispatchInboundMessage({
651+
ctx: ctxPayload,
652+
cfg,
653+
dispatcher,
654+
replyOptions: {
655+
...replyOptions,
656+
abortSignal,
657+
skillFilter: channelConfig?.skills,
658+
sourceReplyDeliveryMode,
659+
disableBlockStreaming: sourceRepliesAreToolOnly
660+
? true
661+
: (draftPreview.disableBlockStreamingForDraft ??
662+
(typeof resolvedBlockStreamingEnabled === "boolean"
663+
? !resolvedBlockStreamingEnabled
664+
: undefined)),
665+
onPartialReply: draftPreview.draftStream
666+
? (payload) => draftPreview.updateFromPartial(payload.text)
667+
: undefined,
668+
onAssistantMessageStart: draftPreview.draftStream
669+
? () => draftPreview.handleAssistantMessageBoundary()
670+
: undefined,
671+
onReasoningEnd: draftPreview.draftStream
672+
? () => draftPreview.handleAssistantMessageBoundary()
673+
: undefined,
674+
onModelSelected,
675+
suppressDefaultToolProgressMessages: draftPreview.suppressDefaultToolProgressMessages
676+
? true
677+
: undefined,
678+
onReasoningStream: async (payload) => {
679+
await statusReactions.setThinking();
680+
const formattedText = payload?.text
681+
? formatReasoningMessage(payload.text)
682+
: undefined;
683+
await draftPreview.pushReasoningProgress(formattedText);
684+
},
685+
onToolStart: async (payload) => {
686+
if (isProcessAborted(abortSignal)) {
687+
return;
688+
}
689+
await maybeBindStatusReactionsToToolReaction(payload);
690+
await statusReactions.setTool(payload.name);
691+
await draftPreview.pushToolProgress(
692+
buildChannelProgressDraftLineForEntry(
693+
discordConfig,
694+
{
695+
event: "tool",
696+
name: payload.name,
697+
phase: payload.phase,
698+
args: payload.args,
699+
},
700+
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
701+
),
702+
{ toolName: payload.name },
703+
);
704+
},
705+
onItemEvent: async (payload) => {
706+
await draftPreview.pushToolProgress(
707+
buildChannelProgressDraftLineForEntry(discordConfig, {
708+
event: "item",
709+
itemId: payload.itemId,
710+
itemKind: payload.kind,
711+
title: payload.title,
712+
name: payload.name,
713+
phase: payload.phase,
714+
status: payload.status,
715+
summary: payload.summary,
716+
progressText: payload.progressText,
717+
meta: payload.meta,
718+
}),
719+
);
720+
},
721+
onPlanUpdate: async (payload) => {
722+
if (payload.phase !== "update") {
723+
return;
724+
}
725+
await draftPreview.pushToolProgress(
726+
buildChannelProgressDraftLine({
727+
event: "plan",
728+
phase: payload.phase,
729+
title: payload.title,
730+
explanation: payload.explanation,
731+
steps: payload.steps,
732+
}),
733+
);
734+
},
735+
onApprovalEvent: async (payload) => {
736+
if (payload.phase !== "requested") {
737+
return;
738+
}
739+
await draftPreview.pushToolProgress(
740+
buildChannelProgressDraftLine({
741+
event: "approval",
742+
phase: payload.phase,
743+
title: payload.title,
744+
command: payload.command,
745+
reason: payload.reason,
746+
message: payload.message,
747+
}),
748+
);
749+
},
750+
onCommandOutput: async (payload) => {
751+
if (payload.phase !== "end") {
752+
return;
753+
}
754+
await draftPreview.pushToolProgress(
755+
buildChannelProgressDraftLine({
756+
event: "command-output",
757+
phase: payload.phase,
758+
title: payload.title,
759+
name: payload.name,
760+
status: payload.status,
761+
exitCode: payload.exitCode,
762+
}),
763+
);
764+
},
765+
onPatchSummary: async (payload) => {
766+
if (payload.phase !== "end") {
767+
return;
768+
}
769+
await draftPreview.pushToolProgress(
770+
buildChannelProgressDraftLine({
771+
event: "patch",
772+
phase: payload.phase,
773+
title: payload.title,
774+
name: payload.name,
775+
added: payload.added,
776+
modified: payload.modified,
777+
deleted: payload.deleted,
778+
summary: payload.summary,
779+
}),
780+
);
781+
},
782+
onCompactionStart: async () => {
783+
if (isProcessAborted(abortSignal)) {
784+
return;
785+
}
786+
await statusReactions.setCompacting();
787+
},
788+
onCompactionEnd: async () => {
789+
if (isProcessAborted(abortSignal)) {
790+
return;
791+
}
792+
statusReactions.cancelPending();
793+
await statusReactions.setThinking();
794+
},
809795
},
810796
}),
811-
},
812797
});
813798
if (!preparedResult.dispatched) {
814799
return;

0 commit comments

Comments
 (0)