Skip to content

Commit aba97a4

Browse files
buyitsydneyvincentkoc
authored andcommitted
fix(memory): reindex archived session transcript updates
1 parent 4788870 commit aba97a4

6 files changed

Lines changed: 275 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai
6666
- CLI/devices: request `operator.admin` for `openclaw devices approve <requestId>` only when the exact pending device request would mint or inherit admin-scoped operator access, while keeping lower-scope approvals on the pairing scope.
6767
- Memory/embedding: broaden the embedding reindex retry classifier to include transient socket-layer errors (`fetch failed`, `ECONNRESET`, `socket hang up`, `UND_ERR_*`, `closed`) so memory reindex survives provider network hiccups instead of aborting mid-run. Related #56815, #44166. (#76311) Thanks @buyitsydney.
6868
- Memory/sessions: keep rotated and deleted session transcripts (`.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>`) searchable end-to-end by indexing their real content in `buildSessionEntry` instead of short-circuiting to empty entries, and by mapping archive hit paths back to their live transcript stem during `memory_search` visibility filtering so hits are no longer dropped at the guard. `.jsonl.bak.<iso>` backups and compaction checkpoints remain opaque. Refs #56131. Thanks @buyitsydney.
69+
- Memory/sessions: emit a `sessionTranscriptUpdate` event when `archiveFileOnDisk` rotates a live session transcript into `.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>` / `.jsonl.bak.<iso>`, and bypass the delta-bytes / delta-messages threshold gate in `processSessionDeltaBatch` for usage-counted archive paths (`.jsonl.reset.<iso>` and `.jsonl.deleted.<iso>`). Without the bypass the archive event was forwarded to the listener but dropped at the threshold check, because an archive is a one-shot file-rename mutation rather than an incremental append and would typically land below the default `deltaBytes: 100000` / `deltaMessages: 50` reindex thresholds. Archives now feed the memory sync incremental path the same way `appendMessage` / compaction / tool-result rewrite / chat inject / command execution events already do. Refs #56131. Thanks @buyitsydney.
6970
- Memory/search: keep sqlite-vec optional in packaged installs and point missing-extension recovery at the valid `agents.defaults.memorySearch.store.vector.extensionPath` setting. Thanks @willemsej and @vincentkoc.
7071
- Gateway: keep directly requested plugin tools invokable under restrictive tool profiles while preserving explicit deny lists and the HTTP safety deny list, preventing catalog/invoke mismatches that surface as "Tool not available". Thanks @BunsDev.
7172
- Gateway/update: allow beta binaries to refresh gateway services when the config was last written by the matching stable release version, avoiding false newer-config downgrade blocks during beta channel updates.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import fs from "node:fs/promises";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import type { DatabaseSync } from "node:sqlite";
5+
import type {
6+
OpenClawConfig,
7+
ResolvedMemorySearchConfig,
8+
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
9+
import type {
10+
MemorySource,
11+
MemorySyncProgressUpdate,
12+
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
13+
import { afterEach, beforeEach, describe, expect, it } from "vitest";
14+
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
15+
16+
type MemoryIndexEntry = {
17+
path: string;
18+
absPath: string;
19+
mtimeMs: number;
20+
size: number;
21+
hash: string;
22+
content?: string;
23+
};
24+
25+
type SyncParams = {
26+
reason?: string;
27+
force?: boolean;
28+
forceSessions?: boolean;
29+
sessionFile?: string;
30+
progress?: (update: MemorySyncProgressUpdate) => void;
31+
};
32+
33+
class SessionDeltaHarness extends MemoryManagerSyncOps {
34+
protected readonly cfg = {} as OpenClawConfig;
35+
protected readonly agentId = "main";
36+
protected readonly workspaceDir = "/tmp/openclaw-test-workspace";
37+
protected readonly settings = {
38+
sync: {
39+
sessions: {
40+
deltaBytes: 100_000,
41+
deltaMessages: 50,
42+
postCompactionForce: true,
43+
},
44+
},
45+
} as ResolvedMemorySearchConfig;
46+
protected readonly batch = {
47+
enabled: false,
48+
wait: false,
49+
concurrency: 1,
50+
pollIntervalMs: 0,
51+
timeoutMs: 0,
52+
};
53+
protected readonly vector = { enabled: false, available: false };
54+
protected readonly cache = { enabled: false };
55+
protected db = null as unknown as DatabaseSync;
56+
57+
readonly syncCalls: SyncParams[] = [];
58+
59+
addPendingSessionFile(sessionFile: string) {
60+
this.sessionPendingFiles.add(sessionFile);
61+
}
62+
63+
getDirtySessionFiles(): string[] {
64+
return Array.from(this.sessionsDirtyFiles);
65+
}
66+
67+
isSessionsDirty(): boolean {
68+
return this.sessionsDirty;
69+
}
70+
71+
async processPendingSessionDeltas(): Promise<void> {
72+
await (
73+
this as unknown as {
74+
processSessionDeltaBatch: () => Promise<void>;
75+
}
76+
).processSessionDeltaBatch();
77+
}
78+
79+
protected computeProviderKey(): string {
80+
return "test";
81+
}
82+
83+
protected async sync(params?: SyncParams): Promise<void> {
84+
this.syncCalls.push(params ?? {});
85+
}
86+
87+
protected async withTimeout<T>(
88+
promise: Promise<T>,
89+
_timeoutMs: number,
90+
_message: string,
91+
): Promise<T> {
92+
return await promise;
93+
}
94+
95+
protected getIndexConcurrency(): number {
96+
return 1;
97+
}
98+
99+
protected pruneEmbeddingCacheIfNeeded(): void {}
100+
101+
protected async indexFile(
102+
_entry: MemoryIndexEntry,
103+
_options: { source: MemorySource; content?: string },
104+
): Promise<void> {}
105+
}
106+
107+
describe("session archive delta bypass", () => {
108+
let tmpDir = "";
109+
110+
beforeEach(async () => {
111+
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-archive-delta-"));
112+
});
113+
114+
afterEach(async () => {
115+
await fs.rm(tmpDir, { recursive: true, force: true });
116+
});
117+
118+
async function writeSessionFile(name: string): Promise<string> {
119+
const filePath = path.join(tmpDir, name);
120+
await fs.writeFile(
121+
filePath,
122+
JSON.stringify({
123+
type: "message",
124+
message: { role: "user", content: "short archived session" },
125+
}) + "\n",
126+
"utf-8",
127+
);
128+
return filePath;
129+
}
130+
131+
it.each(["reset", "deleted"] as const)(
132+
"marks below-threshold %s archives dirty immediately",
133+
async (reason) => {
134+
const archivePath = await writeSessionFile(
135+
`session-a.jsonl.${reason}.2026-05-03T05-38-59.000Z`,
136+
);
137+
const harness = new SessionDeltaHarness();
138+
harness.addPendingSessionFile(archivePath);
139+
140+
await harness.processPendingSessionDeltas();
141+
142+
expect(harness.getDirtySessionFiles()).toEqual([archivePath]);
143+
expect(harness.isSessionsDirty()).toBe(true);
144+
expect(harness.syncCalls).toEqual([{ reason: "session-delta" }]);
145+
},
146+
);
147+
148+
it("keeps .jsonl.bak archives on the normal below-threshold delta path", async () => {
149+
const bakPath = await writeSessionFile("session-a.jsonl.bak.2026-05-03T05-38-59.000Z");
150+
const harness = new SessionDeltaHarness();
151+
harness.addPendingSessionFile(bakPath);
152+
153+
await harness.processPendingSessionDeltas();
154+
155+
expect(harness.getDirtySessionFiles()).toEqual([]);
156+
expect(harness.isSessionsDirty()).toBe(false);
157+
expect(harness.syncCalls).toEqual([]);
158+
});
159+
160+
it("keeps live transcripts below the configured thresholds", async () => {
161+
const livePath = await writeSessionFile("session-a.jsonl");
162+
const harness = new SessionDeltaHarness();
163+
harness.addPendingSessionFile(livePath);
164+
165+
await harness.processPendingSessionDeltas();
166+
167+
expect(harness.getDirtySessionFiles()).toEqual([]);
168+
expect(harness.isSessionsDirty()).toBe(false);
169+
expect(harness.syncCalls).toEqual([]);
170+
});
171+
});

extensions/memory-core/src/memory/manager-sync-ops.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import {
1717
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
1818
import {
1919
buildSessionEntry,
20+
isSessionArchiveArtifactName,
21+
isUsageCountedSessionTranscriptFileName,
2022
listSessionFilesForAgent,
2123
sessionPathForFile,
2224
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
@@ -491,6 +493,24 @@ export abstract class MemoryManagerSyncOps {
491493
this.sessionPendingFiles.clear();
492494
let shouldSync = false;
493495
for (const sessionFile of pending) {
496+
// Usage-counted session archives (`.jsonl.reset.<iso>` and
497+
// `.jsonl.deleted.<iso>`) are one-shot mutation events: the file is
498+
// written once by the archive rotation and then never touched again.
499+
// They carry no incremental `append` semantics, so the delta-bytes /
500+
// delta-messages thresholds (designed for live transcripts accumulating
501+
// appended messages) cannot gate them correctly — a short archive
502+
// below the threshold would simply never reindex. Mark them dirty
503+
// directly and skip the delta accounting.
504+
const baseName = path.basename(sessionFile);
505+
if (
506+
isSessionArchiveArtifactName(baseName) &&
507+
isUsageCountedSessionTranscriptFileName(baseName)
508+
) {
509+
this.sessionsDirtyFiles.add(sessionFile);
510+
this.sessionsDirty = true;
511+
shouldSync = true;
512+
continue;
513+
}
494514
const delta = await this.updateSessionDelta(sessionFile);
495515
if (!delta) {
496516
continue;

packages/memory-host-sdk/src/engine-qmd.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ export {
1212
type SessionFileEntry,
1313
type SessionTranscriptClassification,
1414
} from "./host/session-files.js";
15-
export { parseUsageCountedSessionIdFromFileName } from "./host/openclaw-runtime-session.js";
15+
export {
16+
isSessionArchiveArtifactName,
17+
isUsageCountedSessionTranscriptFileName,
18+
parseUsageCountedSessionIdFromFileName,
19+
} from "./host/openclaw-runtime-session.js";
1620
export { parseQmdQueryJson, type QmdQueryResult } from "./host/qmd-query-parser.js";
1721
export {
1822
deriveQmdScopeChannel,
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import fs from "node:fs";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, describe, expect, it } from "vitest";
5+
import {
6+
onSessionTranscriptUpdate,
7+
type SessionTranscriptUpdate,
8+
} from "../sessions/transcript-events.js";
9+
import { archiveFileOnDisk } from "./session-transcript-files.fs.js";
10+
11+
const subscriptions: Array<() => void> = [];
12+
13+
afterEach(() => {
14+
while (subscriptions.length > 0) {
15+
subscriptions.pop()?.();
16+
}
17+
});
18+
19+
describe("archiveFileOnDisk transcript updates", () => {
20+
it("emits a session transcript update for the archived path on reset", () => {
21+
const updates: SessionTranscriptUpdate[] = [];
22+
subscriptions.push(onSessionTranscriptUpdate((update) => updates.push(update)));
23+
24+
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "oc-archive-events-reset-"));
25+
try {
26+
const sessionFile = path.join(tmpDir, "live.jsonl");
27+
fs.writeFileSync(sessionFile, '{"type":"session-meta","agentId":"main"}\n');
28+
29+
const archived = archiveFileOnDisk(sessionFile, "reset");
30+
31+
expect(fs.existsSync(archived)).toBe(true);
32+
expect(fs.existsSync(sessionFile)).toBe(false);
33+
expect(archived).toContain(".jsonl.reset.");
34+
expect(updates).toHaveLength(1);
35+
expect(updates[0].sessionFile).toBe(archived);
36+
// Archive does not carry a messageId/message payload — this is a
37+
// pure-path mutation notification, matching how compaction-only
38+
// emits (sessionFile + sessionKey-only) behave.
39+
expect(updates[0].message).toBeUndefined();
40+
expect(updates[0].messageId).toBeUndefined();
41+
} finally {
42+
fs.rmSync(tmpDir, { recursive: true, force: true });
43+
}
44+
});
45+
46+
it("also emits for deleted and bak archive reasons", () => {
47+
const updates: SessionTranscriptUpdate[] = [];
48+
subscriptions.push(onSessionTranscriptUpdate((update) => updates.push(update)));
49+
50+
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "oc-archive-events-mixed-"));
51+
try {
52+
const deletedSource = path.join(tmpDir, "deleted.jsonl");
53+
fs.writeFileSync(deletedSource, "{}\n");
54+
const deletedArchived = archiveFileOnDisk(deletedSource, "deleted");
55+
56+
const bakSource = path.join(tmpDir, "bak.jsonl");
57+
fs.writeFileSync(bakSource, "{}\n");
58+
const bakArchived = archiveFileOnDisk(bakSource, "bak");
59+
60+
expect(deletedArchived).toContain(".jsonl.deleted.");
61+
expect(bakArchived).toContain(".jsonl.bak.");
62+
expect(updates.map((update) => update.sessionFile)).toEqual([deletedArchived, bakArchived]);
63+
} finally {
64+
fs.rmSync(tmpDir, { recursive: true, force: true });
65+
}
66+
});
67+
});

src/gateway/session-transcript-files.fs.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
resolveSessionTranscriptPathInDir,
1313
} from "../config/sessions/paths.js";
1414
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
15+
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
1516

1617
type ArchiveFileReason = SessionArchiveReason;
1718
export type ArchivedSessionTranscript = {
@@ -127,6 +128,16 @@ export function archiveFileOnDisk(filePath: string, reason: ArchiveFileReason):
127128
const ts = formatSessionArchiveTimestamp();
128129
const archived = `${filePath}.${reason}.${ts}`;
129130
fs.renameSync(filePath, archived);
131+
// Notify the session transcript subscribers (memory index, sessions-history
132+
// HTTP, etc.) that a mutation landed on a session-owned path. Without this
133+
// emit the memory sync's incremental path never learns the new archive
134+
// exists: chokidar does not watch the sessions directory, and the event bus
135+
// is the only channel gateway code uses to signal session-file mutations.
136+
// All other in-process mutations (append, compaction, tool-result rewrite,
137+
// chat inject, command execution) already emit here; archive was the sole
138+
// remaining gap, which is why `.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>`
139+
// files only surfaced in the index after a full reindex.
140+
emitSessionTranscriptUpdate({ sessionFile: archived });
130141
return archived;
131142
}
132143

0 commit comments

Comments
 (0)