Skip to content

Commit bab18d5

Browse files
authored
refactor(plugin-sdk): persist dedupe state in sqlite
1 parent a4e78ae commit bab18d5

7 files changed

Lines changed: 474 additions & 182 deletions

File tree

extensions/nextcloud-talk/src/doctor.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
// Nextcloud Talk tests cover doctor plugin behavior.
2+
import fs from "node:fs/promises";
3+
import os from "node:os";
4+
import path from "node:path";
5+
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
26
import { beforeEach, describe, expect, it, vi } from "vitest";
7+
import { createNextcloudTalkReplayGuard } from "./replay-guard.js";
38

49
const hoisted = vi.hoisted(() => ({
510
probeNextcloudTalkBotResponseFeature: vi.fn(),
@@ -24,6 +29,7 @@ function getNextcloudTalkCompatibilityNormalizer(): NonNullable<
2429
describe("nextcloud-talk doctor", () => {
2530
beforeEach(() => {
2631
hoisted.probeNextcloudTalkBotResponseFeature.mockReset();
32+
resetPluginStateStoreForTests();
2733
});
2834

2935
it("normalizes legacy private-network aliases", () => {
@@ -85,4 +91,48 @@ describe("nextcloud-talk doctor", () => {
8591
'- channels.nextcloud-talk.default: Nextcloud Talk bot "OpenClaw" (1) is missing the response feature (features=9); outbound replies will fail.',
8692
]);
8793
});
94+
95+
it("migrates legacy replay dedupe JSON into SQLite during doctor repair", async () => {
96+
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-nextcloud-doctor-"));
97+
const legacyDir = path.join(stateDir, "nextcloud-talk", "replay-dedupe");
98+
const legacyPath = path.join(legacyDir, "account-a.json");
99+
await fs.mkdir(legacyDir, { recursive: true });
100+
await fs.writeFile(
101+
legacyPath,
102+
JSON.stringify({
103+
"room-1:msg-1": Date.now(),
104+
}),
105+
);
106+
107+
const mutation = await nextcloudTalkDoctor.repairConfig?.({
108+
cfg: {
109+
channels: {
110+
"nextcloud-talk": {
111+
accounts: {
112+
"account-a": {
113+
baseUrl: "https://cloud.example.com",
114+
botSecret: "secret",
115+
},
116+
},
117+
},
118+
},
119+
} as never,
120+
doctorFixCommand: "openclaw doctor --fix",
121+
env: { ...process.env, OPENCLAW_STATE_DIR: stateDir },
122+
});
123+
124+
expect(mutation?.changes.join("\n")).toContain(
125+
'Migrated Nextcloud Talk replay dedupe cache for account "account-a" to SQLite',
126+
);
127+
await expect(fs.access(legacyPath)).rejects.toThrow();
128+
129+
const guard = createNextcloudTalkReplayGuard({ stateDir });
130+
await expect(
131+
guard.shouldProcessMessage({
132+
accountId: "account-a",
133+
roomToken: "room-1",
134+
messageId: "msg-1",
135+
}),
136+
).resolves.toBe(false);
137+
});
88138
});

extensions/nextcloud-talk/src/doctor.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,42 @@
11
// Nextcloud Talk plugin module implements doctor behavior.
2+
import fs from "node:fs/promises";
3+
import os from "node:os";
4+
import path from "node:path";
25
import type { ChannelDoctorAdapter } from "openclaw/plugin-sdk/channel-contract";
6+
import { migratePersistentDedupeLegacyJsonFile } from "openclaw/plugin-sdk/persistent-dedupe";
7+
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
38
import { listNextcloudTalkAccountIds, resolveNextcloudTalkAccount } from "./accounts.js";
49
import { probeNextcloudTalkBotResponseFeature } from "./bot-preflight.js";
510
import {
611
legacyConfigRules as NEXTCLOUD_TALK_LEGACY_CONFIG_RULES,
712
normalizeCompatibilityConfig as normalizeNextcloudTalkCompatibilityConfig,
813
} from "./doctor-contract.js";
14+
import {
15+
NEXTCLOUD_TALK_PLUGIN_ID,
16+
NEXTCLOUD_TALK_REPLAY_DEDUPE_NAMESPACE_PREFIX,
17+
} from "./replay-guard.js";
918
import type { CoreConfig } from "./types.js";
1019

20+
const REPLAY_DEDUPE_TTL_MS = 24 * 60 * 60 * 1000;
21+
const REPLAY_DEDUPE_MAX_ENTRIES = 10_000;
22+
23+
function sanitizeLegacyReplaySegment(value: string): string {
24+
const trimmed = value.trim();
25+
if (!trimmed) {
26+
return "default";
27+
}
28+
return trimmed.replace(/[^a-zA-Z0-9_-]/g, "_");
29+
}
30+
31+
async function fileExists(filePath: string): Promise<boolean> {
32+
try {
33+
await fs.access(filePath);
34+
return true;
35+
} catch {
36+
return false;
37+
}
38+
}
39+
1140
async function collectNextcloudTalkBotResponseWarnings(params: {
1241
cfg: CoreConfig;
1342
}): Promise<string[]> {
@@ -33,9 +62,59 @@ async function collectNextcloudTalkBotResponseWarnings(params: {
3362
return warnings;
3463
}
3564

65+
async function repairNextcloudTalkReplayDedupeState(params: {
66+
cfg: CoreConfig;
67+
env?: NodeJS.ProcessEnv;
68+
}): Promise<{ changes: string[]; warnings: string[] }> {
69+
const changes: string[] = [];
70+
const warnings: string[] = [];
71+
const env = params.env ?? process.env;
72+
const stateDir = resolveStateDir(env, os.homedir);
73+
const replayDir = path.join(stateDir, "nextcloud-talk", "replay-dedupe");
74+
75+
for (const accountId of listNextcloudTalkAccountIds(params.cfg)) {
76+
const legacyPath = path.join(replayDir, `${sanitizeLegacyReplaySegment(accountId)}.json`);
77+
if (!(await fileExists(legacyPath))) {
78+
continue;
79+
}
80+
try {
81+
const result = await migratePersistentDedupeLegacyJsonFile({
82+
filePath: legacyPath,
83+
namespace: accountId,
84+
ttlMs: REPLAY_DEDUPE_TTL_MS,
85+
memoryMaxSize: 0,
86+
pluginId: NEXTCLOUD_TALK_PLUGIN_ID,
87+
namespacePrefix: NEXTCLOUD_TALK_REPLAY_DEDUPE_NAMESPACE_PREFIX,
88+
stateMaxEntries: REPLAY_DEDUPE_MAX_ENTRIES,
89+
env,
90+
});
91+
changes.push(
92+
`Migrated Nextcloud Talk replay dedupe cache for account "${accountId}" to SQLite (${result.imported} imported, ${result.skippedExpired} expired, ${result.skippedExisting} already current).`,
93+
);
94+
} catch (error) {
95+
warnings.push(
96+
`Skipped Nextcloud Talk replay dedupe cache for account "${accountId}": ${String(error)}`,
97+
);
98+
}
99+
}
100+
101+
return { changes, warnings };
102+
}
103+
36104
export const nextcloudTalkDoctor: ChannelDoctorAdapter = {
37105
legacyConfigRules: NEXTCLOUD_TALK_LEGACY_CONFIG_RULES,
38106
normalizeCompatibilityConfig: normalizeNextcloudTalkCompatibilityConfig,
39107
collectPreviewWarnings: async ({ cfg }) =>
40108
await collectNextcloudTalkBotResponseWarnings({ cfg: cfg as CoreConfig }),
109+
repairConfig: async ({ cfg, env }) => {
110+
const repair = await repairNextcloudTalkReplayDedupeState({
111+
cfg: cfg as CoreConfig,
112+
...(env ? { env } : {}),
113+
});
114+
return {
115+
config: cfg,
116+
changes: repair.changes,
117+
warnings: repair.warnings,
118+
};
119+
},
41120
};

extensions/nextcloud-talk/src/replay-guard.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
11
// Nextcloud Talk plugin module implements replay guard behavior.
2-
import path from "node:path";
32
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
43

4+
export const NEXTCLOUD_TALK_PLUGIN_ID = "nextcloud-talk";
5+
export const NEXTCLOUD_TALK_REPLAY_DEDUPE_NAMESPACE_PREFIX = "replay-dedupe";
56
const DEFAULT_REPLAY_TTL_MS = 24 * 60 * 60 * 1000;
67
const DEFAULT_MEMORY_MAX_SIZE = 1_000;
7-
const DEFAULT_FILE_MAX_ENTRIES = 10_000;
8-
9-
function sanitizeSegment(value: string): string {
10-
const trimmed = value.trim();
11-
if (!trimmed) {
12-
return "default";
13-
}
14-
return trimmed.replace(/[^a-zA-Z0-9_-]/g, "_");
15-
}
8+
const DEFAULT_STATE_MAX_ENTRIES = 10_000;
169

1710
function buildReplayKey(params: { roomToken: string; messageId: string }): string | null {
1811
const roomToken = params.roomToken.trim();
@@ -27,6 +20,8 @@ type NextcloudTalkReplayGuardOptions = {
2720
stateDir?: string;
2821
ttlMs?: number;
2922
memoryMaxSize?: number;
23+
stateMaxEntries?: number;
24+
/** @deprecated Use stateMaxEntries. */
3025
fileMaxEntries?: number;
3126
onDiskError?: (error: unknown) => void;
3227
};
@@ -67,14 +62,14 @@ export function createNextcloudTalkReplayGuard(
6762
stateDir
6863
? {
6964
...baseOptions,
70-
fileMaxEntries: options.fileMaxEntries ?? DEFAULT_FILE_MAX_ENTRIES,
71-
resolveFilePath: (namespace) =>
72-
path.join(
73-
stateDir,
74-
"nextcloud-talk",
75-
"replay-dedupe",
76-
`${sanitizeSegment(namespace)}.json`,
77-
),
65+
pluginId: NEXTCLOUD_TALK_PLUGIN_ID,
66+
namespacePrefix: NEXTCLOUD_TALK_REPLAY_DEDUPE_NAMESPACE_PREFIX,
67+
stateMaxEntries:
68+
options.stateMaxEntries ?? options.fileMaxEntries ?? DEFAULT_STATE_MAX_ENTRIES,
69+
env: {
70+
...process.env,
71+
OPENCLAW_STATE_DIR: stateDir,
72+
},
7873
onDiskError: options.onDiskError,
7974
}
8075
: baseOptions,

src/channels/plugins/types.adapters.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ export type ChannelDoctorAdapter = {
526526
repairConfig?: (params: {
527527
cfg: OpenClawConfig;
528528
doctorFixCommand: string;
529+
env?: NodeJS.ProcessEnv;
529530
}) => ChannelDoctorConfigMutation | Promise<ChannelDoctorConfigMutation>;
530531
runConfigSequence?: (params: {
531532
cfg: OpenClawConfig;

src/commands/doctor/shared/channel-doctor.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ export async function collectChannelDoctorRepairMutations(params: {
407407
const mutation = await entry.doctor.repairConfig?.({
408408
cfg: nextCfg,
409409
doctorFixCommand: params.doctorFixCommand,
410+
...(params.env ? { env: params.env } : {}),
410411
});
411412
if (!mutation || mutation.changes.length === 0) {
412413
if (mutation?.warnings?.length) {

src/plugin-sdk/memory-host-events.test.ts

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
*/
44
import fs from "node:fs/promises";
55
import path from "node:path";
6-
import { describe, expect, it } from "vitest";
6+
import { afterEach, describe, expect, it } from "vitest";
7+
import { resetPluginStateStoreForTests } from "../plugin-state/plugin-state-store.js";
78
import {
89
appendMemoryHostEvent,
910
readMemoryHostEvents,
@@ -18,11 +19,17 @@ function createDedupe(root: string, overrides?: { ttlMs?: number }) {
1819
return createPersistentDedupe({
1920
ttlMs: overrides?.ttlMs ?? 24 * 60 * 60 * 1000,
2021
memoryMaxSize: 100,
21-
fileMaxEntries: 1000,
22-
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
22+
pluginId: "test-persistent-dedupe",
23+
namespacePrefix: "test-dedupe",
24+
stateMaxEntries: 1000,
25+
env: { ...process.env, OPENCLAW_STATE_DIR: root },
2326
});
2427
}
2528

29+
afterEach(() => {
30+
resetPluginStateStoreForTests();
31+
});
32+
2633
describe("memory host event journal helpers", () => {
2734
it("appends and reads typed workspace events", async () => {
2835
const workspaceDir = await createTempDir("memory-host-events-");
@@ -94,8 +101,10 @@ describe("createPersistentDedupe", () => {
94101
const dedupe = createPersistentDedupe({
95102
ttlMs: Number.NaN,
96103
memoryMaxSize: Number.NaN,
97-
fileMaxEntries: Number.NaN,
98-
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
104+
pluginId: "test-persistent-dedupe",
105+
namespacePrefix: "test-bounds",
106+
stateMaxEntries: Number.NaN,
107+
env: { ...process.env, OPENCLAW_STATE_DIR: root },
99108
});
100109

101110
expect(await dedupe.checkAndRecord("m1", { namespace: "a", now: 100 })).toBe(true);
@@ -104,33 +113,32 @@ describe("createPersistentDedupe", () => {
104113
expect(dedupe.memorySize()).toBe(0);
105114
});
106115

107-
it("falls back to memory-only behavior on disk errors", async () => {
116+
it("uses legacy JSON paths only as SQLite namespace identifiers", async () => {
117+
const root = await createTempDir("openclaw-legacy-dedupe-");
118+
const legacyPath = path.join(root, "legacy.json");
108119
const dedupe = createPersistentDedupe({
109120
ttlMs: 10_000,
110121
memoryMaxSize: 100,
111122
fileMaxEntries: 1000,
112-
resolveFilePath: () => path.join("/dev/null", "dedupe.json"),
123+
resolveFilePath: () => legacyPath,
124+
env: { ...process.env, OPENCLAW_STATE_DIR: root },
113125
});
114126

115-
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(true);
116-
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(false);
127+
expect(await dedupe.checkAndRecord("sqlite-only", { namespace: "x" })).toBe(true);
128+
expect(await dedupe.checkAndRecord("sqlite-only", { namespace: "x" })).toBe(false);
129+
await expect(fs.access(legacyPath)).rejects.toThrow();
117130
});
118131

119-
it("warms empty namespaces and skips expired disk entries", async () => {
132+
it("warms empty namespaces and ignores retired JSON cache files", async () => {
120133
const root = await createTempDir("openclaw-dedupe-");
121134
const emptyReader = createDedupe(root, { ttlMs: 10_000 });
122135
expect(await emptyReader.warmup("nonexistent")).toBe(0);
123136

124-
const oldNow = Date.now() - 2000;
125-
await fs.writeFile(
126-
path.join(root, "acct.json"),
127-
JSON.stringify({ "old-msg": oldNow, "new-msg": Date.now() }),
128-
);
137+
await fs.writeFile(path.join(root, "acct.json"), JSON.stringify({ "retired-msg": Date.now() }));
129138

130139
const reader = createDedupe(root, { ttlMs: 1000 });
131-
expect(await reader.warmup("acct")).toBe(1);
132-
expect(await reader.checkAndRecord("old-msg", { namespace: "acct" })).toBe(true);
133-
expect(await reader.checkAndRecord("new-msg", { namespace: "acct" })).toBe(false);
140+
expect(await reader.warmup("acct")).toBe(0);
141+
expect(await reader.checkAndRecord("retired-msg", { namespace: "acct" })).toBe(true);
134142
});
135143
});
136144

@@ -189,8 +197,10 @@ describe("createClaimableDedupe", () => {
189197
const writer = createClaimableDedupe({
190198
ttlMs: 10_000,
191199
memoryMaxSize: 100,
192-
fileMaxEntries: 1000,
193-
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
200+
pluginId: "test-claimable-dedupe",
201+
namespacePrefix: "test-claimable-dedupe",
202+
stateMaxEntries: 1000,
203+
env: { ...process.env, OPENCLAW_STATE_DIR: root },
194204
});
195205

196206
await expect(writer.claim("m1", { namespace: "acct" })).resolves.toEqual({ kind: "claimed" });
@@ -199,8 +209,10 @@ describe("createClaimableDedupe", () => {
199209
const reader = createClaimableDedupe({
200210
ttlMs: 10_000,
201211
memoryMaxSize: 100,
202-
fileMaxEntries: 1000,
203-
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
212+
pluginId: "test-claimable-dedupe",
213+
namespacePrefix: "test-claimable-dedupe",
214+
stateMaxEntries: 1000,
215+
env: { ...process.env, OPENCLAW_STATE_DIR: root },
204216
});
205217

206218
expect(await reader.hasRecent("m1", { namespace: "acct" })).toBe(true);

0 commit comments

Comments
 (0)