Skip to content

Commit 9e0d632

Browse files
committed
fix(gateway): unify session history snapshots
1 parent 8838fdc commit 9e0d632

4 files changed

Lines changed: 180 additions & 53 deletions

File tree

src/gateway/session-history-state.test.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, expect, test, vi } from "vitest";
2-
import { SessionHistorySseState } from "./session-history-state.js";
2+
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
33
import * as sessionUtils from "./session-utils.js";
44

55
describe("SessionHistorySseState", () => {
@@ -12,9 +12,9 @@ describe("SessionHistorySseState", () => {
1212
},
1313
]);
1414
try {
15-
const state = new SessionHistorySseState({
15+
const state = SessionHistorySseState.fromRawSnapshot({
1616
target: { sessionId: "sess-main" },
17-
initialRawMessages: [
17+
rawMessages: [
1818
{
1919
role: "assistant",
2020
content: [{ type: "text", text: "fresh snapshot message" }],
@@ -53,4 +53,26 @@ describe("SessionHistorySseState", () => {
5353
readSpy.mockRestore();
5454
}
5555
});
56+
57+
test("reuses one canonical array for items and messages", () => {
58+
const snapshot = buildSessionHistorySnapshot({
59+
rawMessages: [
60+
{
61+
role: "assistant",
62+
content: [{ type: "text", text: "first" }],
63+
__openclaw: { seq: 1 },
64+
},
65+
{
66+
role: "assistant",
67+
content: [{ type: "text", text: "second" }],
68+
__openclaw: { seq: 2 },
69+
},
70+
],
71+
limit: 1,
72+
});
73+
74+
expect(snapshot.history.items).toBe(snapshot.history.messages);
75+
expect(snapshot.history.messages[0]?.__openclaw?.seq).toBe(2);
76+
expect(snapshot.rawTranscriptSeq).toBe(2);
77+
});
5678
});

src/gateway/session-history-state.ts

Lines changed: 88 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,26 @@ import {
44
} from "./server-methods/chat.js";
55
import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js";
66

7+
type SessionHistoryTranscriptMeta = {
8+
seq?: number;
9+
};
10+
11+
export type SessionHistoryMessage = Record<string, unknown> & {
12+
__openclaw?: SessionHistoryTranscriptMeta;
13+
};
14+
715
export type PaginatedSessionHistory = {
8-
items: unknown[];
9-
messages: unknown[];
16+
items: SessionHistoryMessage[];
17+
messages: SessionHistoryMessage[];
1018
nextCursor?: string;
1119
hasMore: boolean;
1220
};
1321

22+
export type SessionHistorySnapshot = {
23+
history: PaginatedSessionHistory;
24+
rawTranscriptSeq: number;
25+
};
26+
1427
type SessionHistoryTranscriptTarget = {
1528
sessionId: string;
1629
storePath?: string;
@@ -26,20 +39,33 @@ function resolveCursorSeq(cursor: string | undefined): number | undefined {
2639
return Number.isFinite(value) && value > 0 ? value : undefined;
2740
}
2841

29-
export function resolveMessageSeq(message: unknown): number | undefined {
30-
if (!message || typeof message !== "object" || Array.isArray(message)) {
31-
return undefined;
32-
}
33-
const meta = (message as { __openclaw?: unknown }).__openclaw;
34-
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
35-
return undefined;
36-
}
37-
const seq = (meta as { seq?: unknown }).seq;
42+
function toSessionHistoryMessages(messages: unknown[]): SessionHistoryMessage[] {
43+
return messages.filter(
44+
(message): message is SessionHistoryMessage =>
45+
Boolean(message) && typeof message === "object" && !Array.isArray(message),
46+
);
47+
}
48+
49+
function buildPaginatedSessionHistory(params: {
50+
messages: SessionHistoryMessage[];
51+
hasMore: boolean;
52+
nextCursor?: string;
53+
}): PaginatedSessionHistory {
54+
return {
55+
items: params.messages,
56+
messages: params.messages,
57+
hasMore: params.hasMore,
58+
...(params.nextCursor ? { nextCursor: params.nextCursor } : {}),
59+
};
60+
}
61+
62+
export function resolveMessageSeq(message: SessionHistoryMessage | undefined): number | undefined {
63+
const seq = message?.__openclaw?.seq;
3864
return typeof seq === "number" && Number.isFinite(seq) && seq > 0 ? seq : undefined;
3965
}
4066

4167
export function paginateSessionMessages(
42-
messages: unknown[],
68+
messages: SessionHistoryMessage[],
4369
limit: number | undefined,
4470
cursor: string | undefined,
4571
): PaginatedSessionHistory {
@@ -58,30 +84,36 @@ export function paginateSessionMessages(
5884
}
5985
}
6086
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
61-
const items = messages.slice(start, endExclusive);
62-
const firstSeq = resolveMessageSeq(items[0]);
63-
return {
64-
items,
65-
messages: items,
87+
const paginatedMessages = messages.slice(start, endExclusive);
88+
const firstSeq = resolveMessageSeq(paginatedMessages[0]);
89+
return buildPaginatedSessionHistory({
90+
messages: paginatedMessages,
6691
hasMore: start > 0,
6792
...(start > 0 && typeof firstSeq === "number" ? { nextCursor: String(firstSeq) } : {}),
68-
};
93+
});
6994
}
7095

71-
function sanitizeRawTranscriptMessages(params: {
96+
export function buildSessionHistorySnapshot(params: {
7297
rawMessages: unknown[];
7398
maxChars?: number;
7499
limit?: number;
75100
cursor?: string;
76-
}): PaginatedSessionHistory {
77-
return paginateSessionMessages(
78-
sanitizeChatHistoryMessages(
79-
params.rawMessages,
80-
params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
101+
}): SessionHistorySnapshot {
102+
const history = paginateSessionMessages(
103+
toSessionHistoryMessages(
104+
sanitizeChatHistoryMessages(
105+
params.rawMessages,
106+
params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
107+
),
81108
),
82109
params.limit,
83110
params.cursor,
84111
);
112+
const rawHistoryMessages = toSessionHistoryMessages(params.rawMessages);
113+
return {
114+
history,
115+
rawTranscriptSeq: resolveMessageSeq(rawHistoryMessages.at(-1)) ?? rawHistoryMessages.length,
116+
};
85117
}
86118

87119
export class SessionHistorySseState {
@@ -92,6 +124,22 @@ export class SessionHistorySseState {
92124
private sentHistory: PaginatedSessionHistory;
93125
private rawTranscriptSeq: number;
94126

127+
static fromRawSnapshot(params: {
128+
target: SessionHistoryTranscriptTarget;
129+
rawMessages: unknown[];
130+
maxChars?: number;
131+
limit?: number;
132+
cursor?: string;
133+
}): SessionHistorySseState {
134+
return new SessionHistorySseState({
135+
target: params.target,
136+
maxChars: params.maxChars,
137+
limit: params.limit,
138+
cursor: params.cursor,
139+
initialRawMessages: params.rawMessages,
140+
});
141+
}
142+
95143
constructor(params: {
96144
target: SessionHistoryTranscriptTarget;
97145
maxChars?: number;
@@ -104,13 +152,14 @@ export class SessionHistorySseState {
104152
this.limit = params.limit;
105153
this.cursor = params.cursor;
106154
const rawMessages = params.initialRawMessages ?? this.readRawMessages();
107-
this.sentHistory = sanitizeRawTranscriptMessages({
155+
const snapshot = buildSessionHistorySnapshot({
108156
rawMessages,
109157
maxChars: this.maxChars,
110158
limit: this.limit,
111159
cursor: this.cursor,
112160
});
113-
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
161+
this.sentHistory = snapshot.history;
162+
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
114163
}
115164

116165
snapshot(): PaginatedSessionHistory {
@@ -133,28 +182,31 @@ export class SessionHistorySseState {
133182
if (sanitized.length === 0) {
134183
return null;
135184
}
136-
const sanitizedMessage = sanitized[0];
137-
this.sentHistory = {
138-
items: [...this.sentHistory.items, sanitizedMessage],
139-
messages: [...this.sentHistory.items, sanitizedMessage],
185+
const [sanitizedMessage] = toSessionHistoryMessages(sanitized);
186+
if (!sanitizedMessage) {
187+
return null;
188+
}
189+
const nextMessages = [...this.sentHistory.messages, sanitizedMessage];
190+
this.sentHistory = buildPaginatedSessionHistory({
191+
messages: nextMessages,
140192
hasMore: false,
141-
};
193+
});
142194
return {
143195
message: sanitizedMessage,
144196
messageSeq: resolveMessageSeq(sanitizedMessage),
145197
};
146198
}
147199

148200
refresh(): PaginatedSessionHistory {
149-
const rawMessages = this.readRawMessages();
150-
this.rawTranscriptSeq = resolveMessageSeq(rawMessages.at(-1)) ?? rawMessages.length;
151-
this.sentHistory = sanitizeRawTranscriptMessages({
152-
rawMessages,
201+
const snapshot = buildSessionHistorySnapshot({
202+
rawMessages: this.readRawMessages(),
153203
maxChars: this.maxChars,
154204
limit: this.limit,
155205
cursor: this.cursor,
156206
});
157-
return this.sentHistory;
207+
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
208+
this.sentHistory = snapshot.history;
209+
return snapshot.history;
158210
}
159211

160212
private readRawMessages(): unknown[] {

src/gateway/sessions-history-http.test.ts

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
createGatewaySuiteHarness,
1414
installGatewayTestHooks,
1515
rpcReq,
16+
startServerWithClient,
1617
writeSessionStore,
1718
} from "./test-helpers.server.js";
1819

@@ -33,6 +34,10 @@ async function createSessionStoreFile(): Promise<string> {
3334
cleanupDirs.push(dir);
3435
const storePath = path.join(dir, "sessions.json");
3536
testState.sessionStorePath = storePath;
37+
await writeSessionStore({
38+
entries: {},
39+
storePath,
40+
});
3641
return storePath;
3742
}
3843

@@ -504,6 +509,51 @@ describe("session history HTTP endpoints", () => {
504509
});
505510
});
506511

512+
test("seeds SSE raw sequence state from startup snapshots, not only visible history", async () => {
513+
const { storePath } = await seedSession({ text: "first message" });
514+
await appendTranscriptMessage({
515+
sessionKey: "agent:main:main",
516+
storePath,
517+
message: makeTranscriptAssistantMessage({ text: "NO_REPLY" }),
518+
emitInlineMessage: false,
519+
});
520+
521+
await withGatewayHarness(async (harness) => {
522+
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
523+
headers: { Accept: "text/event-stream" },
524+
});
525+
526+
expect(res.status).toBe(200);
527+
const reader = res.body?.getReader();
528+
expect(reader).toBeTruthy();
529+
const streamState = { buffer: "" };
530+
const historyEvent = await readSseEvent(reader!, streamState);
531+
expect(historyEvent.event).toBe("history");
532+
expect(
533+
(
534+
historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }
535+
).messages?.map((message) => message.content?.[0]?.text),
536+
).toEqual(["first message"]);
537+
538+
const visible = await appendAssistantMessageToSessionTranscript({
539+
sessionKey: "agent:main:main",
540+
text: "third visible message",
541+
storePath,
542+
});
543+
expect(visible.ok).toBe(true);
544+
545+
const messageEvent = await readSseEvent(reader!, streamState);
546+
expect(messageEvent.event).toBe("message");
547+
expect(
548+
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
549+
?.content?.[0]?.text,
550+
).toBe("third visible message");
551+
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3);
552+
553+
await reader?.cancel();
554+
});
555+
});
556+
507557
test("suppresses NO_REPLY-only SSE fast-path updates while preserving raw sequence numbering", async () => {
508558
const { storePath } = await seedSession({ text: "first message" });
509559

@@ -629,8 +679,8 @@ describe("session history HTTP endpoints", () => {
629679
test("rejects session history when operator.read is not requested", async () => {
630680
await seedSession({ text: "scope-guarded history" });
631681

632-
const harness = await createGatewaySuiteHarness();
633-
const ws = await harness.openWs();
682+
const started = await startServerWithClient("test-gateway-token-1234567890");
683+
const { server, ws, port, envSnapshot } = started;
634684
try {
635685
const connect = await connectReq(ws, {
636686
token: "test-gateway-token-1234567890",
@@ -646,7 +696,7 @@ describe("session history HTTP endpoints", () => {
646696
expect(wsHistory.error?.message).toBe("missing scope: operator.read");
647697

648698
const httpHistory = await fetch(
649-
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
699+
`http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
650700
{
651701
headers: {
652702
...AUTH_HEADER,
@@ -664,7 +714,7 @@ describe("session history HTTP endpoints", () => {
664714
});
665715

666716
const httpHistoryWithoutScopes = await fetch(
667-
`http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
717+
`http://127.0.0.1:${port}/sessions/${encodeURIComponent("agent:main:main")}/history?limit=1`,
668718
{
669719
headers: AUTH_HEADER,
670720
},
@@ -679,7 +729,8 @@ describe("session history HTTP endpoints", () => {
679729
});
680730
} finally {
681731
ws.close();
682-
await harness.close();
732+
await server.close();
733+
envSnapshot.restore();
683734
}
684735
});
685736
});

0 commit comments

Comments
 (0)