Skip to content

Commit 58d60ba

Browse files
committed
fix(memory): yield while parsing session transcripts
1 parent 3338bac commit 58d60ba

2 files changed

Lines changed: 82 additions & 3 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { afterEach, describe, expect, it, vi } from "vitest";
2+
3+
const { fileState } = vi.hoisted(() => ({
4+
fileState: { raw: "" },
5+
}));
6+
7+
vi.mock("./fs-utils.js", () => ({
8+
readRegularFile: vi.fn(async () => ({
9+
buffer: Buffer.from(fileState.raw, "utf-8"),
10+
})),
11+
statRegularFile: vi.fn(async () => ({
12+
missing: false,
13+
stat: {
14+
mtimeMs: 1,
15+
size: Buffer.byteLength(fileState.raw, "utf-8"),
16+
},
17+
})),
18+
}));
19+
20+
import { buildSessionEntry } from "./session-files.js";
21+
22+
describe("buildSessionEntry responsiveness", () => {
23+
afterEach(() => {
24+
fileState.raw = "";
25+
vi.clearAllMocks();
26+
});
27+
28+
it("yields while parsing a single large transcript", async () => {
29+
fileState.raw = Array.from({ length: 25 }, (_value, index) =>
30+
JSON.stringify({
31+
type: "message",
32+
message: { role: "user", content: `message ${index}` },
33+
}),
34+
).join("\n");
35+
let immediateRan = false;
36+
const immediate = new Promise<void>((resolve) => {
37+
setImmediate(() => {
38+
immediateRan = true;
39+
resolve();
40+
});
41+
});
42+
43+
const entry = await buildSessionEntry("/tmp/session.jsonl", {
44+
generatedByCronRun: false,
45+
generatedByDreamingNarrative: false,
46+
parseYieldEveryLines: 10,
47+
});
48+
49+
expect(entry?.lineMap).toHaveLength(25);
50+
expect(immediateRan).toBe(true);
51+
await immediate;
52+
});
53+
});

packages/memory-host-sdk/src/host/session-files.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const DREAMING_NARRATIVE_RUN_PREFIX = "dreaming-narrative-";
2727
// toxic line. Wrapped continuation lines still map back to the same JSONL line.
2828
// This limit applies to content only; the role label adds up to 11 chars.
2929
const SESSION_EXPORT_CONTENT_WRAP_CHARS = 800;
30+
const SESSION_ENTRY_PARSE_YIELD_LINES = 250;
3031
const DIRECT_CRON_PROMPT_RE = /^\[cron:[^\]]+\]\s*/;
3132

3233
export type SessionFileEntry = {
@@ -51,6 +52,8 @@ export type BuildSessionEntryOptions = {
5152
generatedByDreamingNarrative?: boolean;
5253
/** Optional preclassification from a caller-managed cron transcript lookup. */
5354
generatedByCronRun?: boolean;
55+
/** Override for tests or specialized callers that need a tighter parse yield cadence. */
56+
parseYieldEveryLines?: number;
5457
};
5558

5659
export type SessionTranscriptClassification = {
@@ -520,6 +523,25 @@ function parseSessionTimestampMs(
520523
return 0;
521524
}
522525

526+
function resolveSessionEntryParseYieldLines(opts: BuildSessionEntryOptions): number {
527+
const configured = opts.parseYieldEveryLines;
528+
if (typeof configured === "number" && Number.isFinite(configured)) {
529+
return Math.max(1, Math.floor(configured));
530+
}
531+
return SESSION_ENTRY_PARSE_YIELD_LINES;
532+
}
533+
534+
async function yieldSessionEntryParseIfNeeded(
535+
lineIndex: number,
536+
everyLines: number,
537+
): Promise<void> {
538+
if (lineIndex > 0 && lineIndex % everyLines === 0) {
539+
await new Promise<void>((resolve) => {
540+
setImmediate(resolve);
541+
});
542+
}
543+
}
544+
523545
export async function buildSessionEntry(
524546
absPath: string,
525547
opts: BuildSessionEntryOptions = {},
@@ -543,10 +565,10 @@ export async function buildSessionEntry(
543565
};
544566
}
545567
const raw = (await readRegularFile({ filePath: absPath })).buffer.toString("utf-8");
546-
const lines = raw.split("\n");
547568
const collected: string[] = [];
548569
const lineMap: number[] = [];
549570
const messageTimestampsMs: number[] = [];
571+
const parseYieldEveryLines = resolveSessionEntryParseYieldLines(opts);
550572
const sessionStoreClassification =
551573
opts.generatedByDreamingNarrative === undefined || opts.generatedByCronRun === undefined
552574
? classifySessionTranscriptFromSessionStore(absPath)
@@ -559,8 +581,12 @@ export async function buildSessionEntry(
559581
opts.generatedByCronRun ?? sessionStoreClassification?.generatedByCronRun ?? false;
560582
const allowArchiveContentCronClassification =
561583
isUsageCountedSessionArchiveTranscriptPath(absPath);
562-
for (let jsonlIdx = 0; jsonlIdx < lines.length; jsonlIdx++) {
563-
const line = lines[jsonlIdx];
584+
for (let jsonlIdx = 0, lineStart = 0; lineStart <= raw.length; jsonlIdx++) {
585+
await yieldSessionEntryParseIfNeeded(jsonlIdx, parseYieldEveryLines);
586+
const newlineIndex = raw.indexOf("\n", lineStart);
587+
const lineEnd = newlineIndex === -1 ? raw.length : newlineIndex;
588+
const line = raw.slice(lineStart, lineEnd);
589+
lineStart = newlineIndex === -1 ? raw.length + 1 : newlineIndex + 1;
564590
if (!line.trim()) {
565591
continue;
566592
}

0 commit comments

Comments
 (0)