Skip to content

Commit 2ead150

Browse files
committed
feat: route outbound sends through durable lifecycle
1 parent 8bfabd6 commit 2ead150

26 files changed

Lines changed: 2535 additions & 119 deletions

src/auto-reply/reply/agent-runner-payloads.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { describe, expect, it } from "vitest";
1+
import { beforeEach, describe, expect, it } from "vitest";
22
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js";
33
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
44
import {
@@ -31,6 +31,10 @@ async function expectSameTargetRepliesDelivered(params: { provider: string; to:
3131
}
3232

3333
describe("buildReplyPayloads media filter integration", () => {
34+
beforeEach(() => {
35+
resetPluginRuntimeStateForTest();
36+
});
37+
3438
it("strips legacy bracket tool blocks from heartbeat replies", async () => {
3539
const { replyPayloads } = await buildReplyPayloads({
3640
...baseParams,

src/auto-reply/reply/reply-dispatcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type ReplyDispatchSkipHandler = (
2929
type ReplyDispatchDeliverer = (
3030
payload: ReplyPayload,
3131
info: { kind: ReplyDispatchKind },
32-
) => Promise<void>;
32+
) => Promise<unknown>;
3333

3434
export type ReplyDispatchBeforeDeliver = (
3535
payload: ReplyPayload,

src/auto-reply/reply/reply-payloads-dedupe.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
22
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js";
33
import { getChannelPlugin } from "../../channels/plugins/index.js";
4+
import { getLoadedChannelPluginForRead } from "../../channels/plugins/registry-loaded-read.js";
45
import { normalizeAnyChannelId } from "../../channels/registry.js";
5-
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
66
import {
77
channelRouteTargetsMatchExact,
88
stringifyRouteThreadId,
@@ -91,6 +91,18 @@ function normalizeThreadIdForComparison(value?: string): string | undefined {
9191
return stringifyRouteThreadId(value);
9292
}
9393

94+
function normalizeTargetForDedupe(provider: string, rawTarget?: string): string | undefined {
95+
const fallback = normalizeOptionalString(rawTarget);
96+
if (!fallback) {
97+
return undefined;
98+
}
99+
const providerId = normalizeProviderForComparison(provider);
100+
const normalizer = providerId
101+
? getLoadedChannelPluginForRead(providerId)?.messaging?.normalizeTarget
102+
: undefined;
103+
return normalizeOptionalString(normalizer?.(rawTarget ?? "") ?? fallback);
104+
}
105+
94106
function resolveTargetProviderForComparison(params: {
95107
currentProvider: string;
96108
targetProvider?: string;
@@ -113,7 +125,7 @@ function normalizeRouteTargetForDedupe(params: {
113125
accountId?: string;
114126
threadId?: string;
115127
}): MessagingToolDedupeRouteTarget | null {
116-
const to = normalizeTargetForProvider(params.provider, params.rawTarget);
128+
const to = normalizeTargetForDedupe(params.provider, params.rawTarget);
117129
if (!to) {
118130
return null;
119131
}

src/gateway/server-methods/chat.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
type SavedMedia,
3434
saveMediaBuffer,
3535
} from "../../media/store.js";
36-
import { createChannelReplyPipeline } from "../../plugin-sdk/channel-reply-pipeline.js";
36+
import { createChannelMessageReplyPipeline } from "../../plugin-sdk/channel-message.js";
3737
import { isPluginOwnedSessionBindingRecord } from "../../plugins/conversation-binding.js";
3838
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
3939
import { resolveSendPolicy } from "../../sessions/send-policy.js";
@@ -2247,7 +2247,7 @@ export const chatHandlers: GatewayRequestHandlers = {
22472247
ctx.MediaStaged = true;
22482248
}
22492249

2250-
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
2250+
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
22512251
cfg,
22522252
agentId,
22532253
channel: INTERNAL_MESSAGE_CHANNEL,

src/gateway/server-restart-sentinel.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
33

44
type LoadedSessionEntry = ReturnType<typeof import("./session-utils.js").loadSessionEntry>;
55
type RecordInboundSessionAndDispatchReplyParams = Parameters<
6-
typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply
6+
typeof import("../plugin-sdk/channel-message.js").recordChannelMessageReplyDispatch
77
>[0];
88

99
const mocks = vi.hoisted(() => {
@@ -194,8 +194,8 @@ vi.mock("../infra/system-events.js", () => ({
194194
enqueueSystemEvent: mocks.enqueueSystemEvent,
195195
}));
196196

197-
vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({
198-
recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply,
197+
vi.mock("../plugin-sdk/channel-message.js", () => ({
198+
recordChannelMessageReplyDispatch: mocks.recordInboundSessionAndDispatchReply,
199199
}));
200200

201201
vi.mock("../infra/heartbeat-wake.js", async () => {

src/gateway/server-restart-sentinel.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import {
3434
} from "../infra/session-delivery-queue.js";
3535
import { enqueueSystemEvent } from "../infra/system-events.js";
3636
import { createSubsystemLogger } from "../logging/subsystem.js";
37+
import { recordChannelMessageReplyDispatch } from "../plugin-sdk/channel-message.js";
3738
import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js";
38-
import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js";
3939
import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js";
4040
import {
4141
deliveryContextFromSession,
@@ -272,7 +272,7 @@ async function deliverQueuedSessionDelivery(params: {
272272
config: cfg,
273273
});
274274
let dispatchError: unknown;
275-
await recordInboundSessionAndDispatchReply({
275+
await recordChannelMessageReplyDispatch({
276276
cfg,
277277
channel: route.channel,
278278
accountId: route.accountId,
Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,56 @@
1+
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
2+
import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js";
13
import type { OpenClawConfig } from "../../config/types.openclaw.js";
4+
import { resolveRuntimePluginRegistry } from "../../plugins/loader.js";
5+
import {
6+
getActivePluginChannelRegistry,
7+
getActivePluginChannelRegistryVersion,
8+
} from "../../plugins/runtime.js";
29
import type { DeliverableMessageChannel } from "../../utils/message-channel.js";
310

11+
const bootstrapAttempts = new Set<string>();
12+
413
export function resetOutboundChannelBootstrapStateForTests(): void {
5-
// Runtime channel plugins are loaded during Gateway startup now.
14+
bootstrapAttempts.clear();
615
}
716

817
export function bootstrapOutboundChannelPlugin(params: {
918
channel: DeliverableMessageChannel;
1019
cfg?: OpenClawConfig;
1120
}): void {
12-
void params;
21+
const cfg = params.cfg;
22+
if (!cfg) {
23+
return;
24+
}
25+
26+
const activeChannelRegistry = getActivePluginChannelRegistry();
27+
const activeHasRequestedChannel = activeChannelRegistry?.channels?.some(
28+
(entry) => entry?.plugin?.id === params.channel,
29+
);
30+
if (activeHasRequestedChannel) {
31+
return;
32+
}
33+
34+
const attemptKey = `${getActivePluginChannelRegistryVersion()}:${params.channel}`;
35+
if (bootstrapAttempts.has(attemptKey)) {
36+
return;
37+
}
38+
bootstrapAttempts.add(attemptKey);
39+
40+
const autoEnabled = applyPluginAutoEnable({ config: cfg });
41+
const defaultAgentId = resolveDefaultAgentId(autoEnabled.config);
42+
const workspaceDir = resolveAgentWorkspaceDir(autoEnabled.config, defaultAgentId);
43+
try {
44+
resolveRuntimePluginRegistry({
45+
config: autoEnabled.config,
46+
activationSourceConfig: cfg,
47+
autoEnabledReasons: autoEnabled.autoEnabledReasons,
48+
workspaceDir,
49+
runtimeOptions: {
50+
allowGatewaySubagentBinding: true,
51+
},
52+
});
53+
} catch {
54+
bootstrapAttempts.delete(attemptKey);
55+
}
1356
}

src/infra/outbound/channel-resolution.test.ts

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,28 +128,33 @@ describe("outbound channel resolution", () => {
128128
).toBe(plugin);
129129
});
130130

131-
it("does not load registries while resolving outbound plugins", async () => {
131+
it("bootstraps configured channel plugins when the active registry is missing the target", async () => {
132132
const plugin = { id: "alpha" };
133133
getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin);
134-
const channelResolution = await importChannelResolution("no-bootstrap");
134+
const channelResolution = await importChannelResolution("bootstrap-missing-target");
135135

136136
expect(
137137
channelResolution.resolveOutboundChannelPlugin({
138138
channel: "alpha",
139139
cfg: { channels: {} } as never,
140+
allowBootstrap: true,
140141
}),
141142
).toBe(plugin);
142-
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
143-
144-
getChannelPluginMock.mockReturnValue(undefined);
145-
channelResolution.resolveOutboundChannelPlugin({
146-
channel: "alpha",
147-
cfg: { channels: {} } as never,
148-
});
149-
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
143+
expect(applyPluginAutoEnableMock).toHaveBeenCalledWith({ config: { channels: {} } });
144+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledWith(
145+
expect.objectContaining({
146+
config: { autoEnabled: true },
147+
activationSourceConfig: { channels: {} },
148+
autoEnabledReasons: {},
149+
workspaceDir: "/tmp/workspace",
150+
runtimeOptions: {
151+
allowGatewaySubagentBinding: true,
152+
},
153+
}),
154+
);
150155
});
151156

152-
it("does not load when the active registry has other channels but not the requested one", async () => {
157+
it("attempts activation when the active registry has other channels but not the requested one", async () => {
153158
getLoadedChannelPluginMock.mockReturnValue(undefined);
154159
getChannelPluginMock.mockReturnValue(undefined);
155160
getActivePluginRegistryMock.mockReturnValue({
@@ -164,47 +169,80 @@ describe("outbound channel resolution", () => {
164169
channelResolution.resolveOutboundChannelPlugin({
165170
channel: "alpha",
166171
cfg: { channels: {} } as never,
172+
allowBootstrap: true,
167173
}),
168174
).toBeUndefined();
169-
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
175+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
170176
});
171177

172178
it("does not retry registry loads after a missing outbound plugin", async () => {
173179
getChannelPluginMock.mockReturnValue(undefined);
174-
resolveRuntimePluginRegistryMock.mockImplementationOnce(() => {
175-
throw new Error("transient");
176-
});
177180
const channelResolution = await importChannelResolution("bootstrap-retry");
178181

179182
expect(
180183
channelResolution.resolveOutboundChannelPlugin({
181184
channel: "alpha",
182185
cfg: { channels: {} } as never,
186+
allowBootstrap: true,
183187
}),
184188
).toBeUndefined();
185189

186190
channelResolution.resolveOutboundChannelPlugin({
187191
channel: "alpha",
188192
cfg: { channels: {} } as never,
193+
allowBootstrap: true,
189194
});
190-
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
195+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
191196
});
192197

193-
it("does not load when the pinned channel registry version changes", async () => {
198+
it("allows another activation attempt when the pinned channel registry version changes", async () => {
194199
getChannelPluginMock.mockReturnValue(undefined);
195200
const channelResolution = await importChannelResolution("channel-version-change");
196201

197202
channelResolution.resolveOutboundChannelPlugin({
198203
channel: "alpha",
199204
cfg: { channels: {} } as never,
205+
allowBootstrap: true,
200206
});
201-
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
207+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
202208

203209
getActivePluginChannelRegistryVersionMock.mockReturnValue(2);
204210
channelResolution.resolveOutboundChannelPlugin({
205211
channel: "alpha",
206212
cfg: { channels: {} } as never,
213+
allowBootstrap: true,
207214
});
215+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(2);
216+
});
217+
218+
it("resolves message adapters through the activation-aware channel plugin path", async () => {
219+
const message = { send: { text: vi.fn() } };
220+
const plugin = { id: "alpha", message };
221+
getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin);
222+
const channelResolution = await importChannelResolution("message-adapter-bootstrap");
223+
224+
expect(
225+
channelResolution.resolveOutboundChannelMessageAdapter({
226+
channel: "alpha",
227+
cfg: { channels: {} } as never,
228+
allowBootstrap: true,
229+
}),
230+
).toBe(message);
231+
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
232+
});
233+
234+
it("does not bootstrap by default for outbound hot-path resolution", async () => {
235+
const plugin = { id: "alpha" };
236+
getLoadedChannelPluginMock.mockReturnValue(undefined);
237+
getChannelPluginMock.mockReturnValue(plugin);
238+
const channelResolution = await importChannelResolution("no-bootstrap-default");
239+
240+
expect(
241+
channelResolution.resolveOutboundChannelPlugin({
242+
channel: "alpha",
243+
cfg: { channels: {} } as never,
244+
}),
245+
).toBe(plugin);
208246
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
209247
});
210248
});

src/infra/outbound/channel-resolution.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ChannelMessageAdapterShape } from "../../channels/message/types.js";
12
import { getChannelPlugin, getLoadedChannelPlugin } from "../../channels/plugins/index.js";
23
import type { ChannelPlugin } from "../../channels/plugins/types.plugin.js";
34
import type { OpenClawConfig } from "../../config/types.openclaw.js";
@@ -52,6 +53,7 @@ function resolveDirectFromActiveRegistry(
5253
export function resolveOutboundChannelPlugin(params: {
5354
channel: string;
5455
cfg?: OpenClawConfig;
56+
allowBootstrap?: boolean;
5557
}): ChannelPlugin | undefined {
5658
const normalized = normalizeDeliverableOutboundChannel(params.channel);
5759
if (!normalized) {
@@ -69,6 +71,18 @@ export function resolveOutboundChannelPlugin(params: {
6971
return directCurrent;
7072
}
7173

74+
if (params.allowBootstrap !== true) {
75+
return resolve();
76+
}
77+
7278
maybeBootstrapChannelPlugin({ channel: normalized, cfg: params.cfg });
7379
return resolveLoaded() ?? resolveDirectFromActiveRegistry(normalized) ?? resolve();
7480
}
81+
82+
export function resolveOutboundChannelMessageAdapter(params: {
83+
channel: string;
84+
cfg?: OpenClawConfig;
85+
allowBootstrap?: boolean;
86+
}): ChannelMessageAdapterShape | undefined {
87+
return resolveOutboundChannelPlugin(params)?.message;
88+
}

src/infra/outbound/deliver-types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { MessageReceipt } from "../../channels/message/types.js";
12
import type { ChannelId } from "../../channels/plugins/channel-id.types.js";
23

34
export type OutboundDeliveryResult = {
@@ -10,6 +11,7 @@ export type OutboundDeliveryResult = {
1011
timestamp?: number;
1112
toJid?: string;
1213
pollId?: string;
14+
receipt?: MessageReceipt;
1315
// Channel docking: stash channel-specific fields here to avoid core type churn.
1416
meta?: Record<string, unknown>;
1517
};

0 commit comments

Comments
 (0)