Skip to content

Commit 81db136

Browse files
committed
Embedded: stop replaying historical session replies
1 parent 8b667cb commit 81db136

5 files changed

Lines changed: 78 additions & 6 deletions

src/agents/pi-embedded-subscribe.handlers.messages.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ const stripTrailingDirective = (text: string): string => {
3838
return text.slice(0, openIndex);
3939
};
4040

41+
function resolveEligibleAssistantMessage(
42+
ctx: EmbeddedPiSubscribeContext,
43+
message: AgentMessage | undefined,
44+
) {
45+
if (!message || message.role !== "assistant") {
46+
return null;
47+
}
48+
if (ctx.state.initialReplayInProgress) {
49+
return null;
50+
}
51+
if (ctx.state.preexistingMessages.has(message)) {
52+
return null;
53+
}
54+
return message;
55+
}
56+
4157
function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) {
4258
if (!ctx.state.reasoningStreamOpen) {
4359
return;
@@ -133,8 +149,8 @@ export function handleMessageStart(
133149
ctx: EmbeddedPiSubscribeContext,
134150
evt: AgentEvent & { message: AgentMessage },
135151
) {
136-
const msg = evt.message;
137-
if (msg?.role !== "assistant") {
152+
const msg = resolveEligibleAssistantMessage(ctx, evt.message);
153+
if (!msg) {
138154
return;
139155
}
140156

@@ -152,8 +168,8 @@ export function handleMessageUpdate(
152168
ctx: EmbeddedPiSubscribeContext,
153169
evt: AgentEvent & { message: AgentMessage; assistantMessageEvent?: unknown },
154170
) {
155-
const msg = evt.message;
156-
if (msg?.role !== "assistant") {
171+
const msg = resolveEligibleAssistantMessage(ctx, evt.message);
172+
if (!msg) {
157173
return;
158174
}
159175

@@ -322,8 +338,8 @@ export function handleMessageEnd(
322338
ctx: EmbeddedPiSubscribeContext,
323339
evt: AgentEvent & { message: AgentMessage },
324340
) {
325-
const msg = evt.message;
326-
if (msg?.role !== "assistant") {
341+
const msg = resolveEligibleAssistantMessage(ctx, evt.message);
342+
if (!msg) {
327343
return;
328344
}
329345

src/agents/pi-embedded-subscribe.handlers.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import type {
2121

2222
export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeContext) {
2323
return (evt: EmbeddedPiSubscribeEvent) => {
24+
if (ctx.state.initialReplayInProgress) {
25+
return;
26+
}
2427
switch (evt.type) {
2528
case "message_start":
2629
handleMessageStart(ctx, evt as never);

src/agents/pi-embedded-subscribe.handlers.types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ export type ToolCallSummary = {
3333

3434
export type EmbeddedPiSubscribeState = {
3535
assistantTexts: string[];
36+
preexistingMessages: Set<AgentMessage>;
3637
toolMetas: Array<{ toolName?: string; meta?: string }>;
3738
toolMetaById: Map<string, ToolCallSummary>;
3839
toolSummaryById: Set<string>;
3940
lastToolError?: ToolErrorSummary;
41+
initialReplayInProgress: boolean;
4042

4143
blockReplyBreak: "text_end" | "message_end";
4244
reasoningMode: ReasoningLevel;

src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,32 @@ import {
1212
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
1313

1414
describe("subscribeEmbeddedPiSession", () => {
15+
function createReplayingAgentEventHarness(params: {
16+
replayEvents: unknown[];
17+
existingMessages?: unknown[];
18+
}) {
19+
let handler: ((evt: unknown) => void) | undefined;
20+
const session = {
21+
messages: params.existingMessages ?? [],
22+
subscribe: (fn: (evt: unknown) => void) => {
23+
handler = fn;
24+
for (const evt of params.replayEvents) {
25+
fn(evt);
26+
}
27+
return () => {};
28+
},
29+
} as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"];
30+
const onAgentEvent = vi.fn();
31+
32+
subscribeEmbeddedPiSession({
33+
session,
34+
runId: "run",
35+
onAgentEvent,
36+
});
37+
38+
return { emit: (evt: unknown) => handler?.(evt), onAgentEvent };
39+
}
40+
1541
function createAgentEventHarness(options?: { runId?: string; sessionKey?: string }) {
1642
const { session, emit } = createStubSessionHarness();
1743
const onAgentEvent = vi.fn();
@@ -293,6 +319,28 @@ describe("subscribeEmbeddedPiSession", () => {
293319
expectSingleAgentEventText(onAgentEvent.mock.calls, "Hello world");
294320
});
295321

322+
it("does not replay historical assistant finals when subscribing to an existing session", () => {
323+
const historicalAssistantMessage = {
324+
role: "assistant",
325+
content: [{ type: "text", text: "Earlier reply" }],
326+
} as AssistantMessage;
327+
const { emit, onAgentEvent } = createReplayingAgentEventHarness({
328+
existingMessages: [historicalAssistantMessage],
329+
replayEvents: [
330+
{ type: "agent_start" },
331+
{ type: "message_start", message: historicalAssistantMessage },
332+
{ type: "message_end", message: historicalAssistantMessage },
333+
{ type: "agent_end" },
334+
],
335+
});
336+
337+
emitMessageStartAndEndForAssistantText({ emit, text: "Current reply" });
338+
339+
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
340+
expect(payloads).toHaveLength(1);
341+
expect(payloads[0]?.text).toBe("Current reply");
342+
});
343+
296344
it("does not emit duplicate agent events when message_end repeats", () => {
297345
const { emit, onAgentEvent } = createAgentEventHarness();
298346

src/agents/pi-embedded-subscribe.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
3939
const useMarkdown = toolResultFormat === "markdown";
4040
const state: EmbeddedPiSubscribeState = {
4141
assistantTexts: [],
42+
preexistingMessages: new Set(params.session.messages ?? []),
4243
toolMetas: [],
4344
toolMetaById: new Map(),
4445
toolSummaryById: new Set(),
4546
lastToolError: undefined,
47+
initialReplayInProgress: true,
4648
blockReplyBreak: params.blockReplyBreak ?? "text_end",
4749
reasoningMode,
4850
includeReasoning: reasoningMode === "on",
@@ -652,6 +654,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
652654
};
653655

654656
const sessionUnsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx));
657+
state.initialReplayInProgress = false;
655658

656659
const unsubscribe = () => {
657660
if (state.unsubscribed) {

0 commit comments

Comments
 (0)