Skip to content

Commit a6497b1

Browse files
committed
fix(gateway): avoid duplicate v4 deltas
1 parent 150bebc commit a6497b1

2 files changed

Lines changed: 98 additions & 10 deletions

File tree

src/gateway/server-chat.agent-events.test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,40 @@ describe("agent event handler", () => {
773773
nowSpy.mockRestore();
774774
});
775775

776+
it("does not emit a delta when a repeated assistant snapshot is unchanged", () => {
777+
let now = 11_250;
778+
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
779+
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
780+
chatRunState.registry.add("run-unchanged-snapshot", {
781+
sessionKey: "session-unchanged-snapshot",
782+
clientRunId: "client-unchanged-snapshot",
783+
});
784+
785+
handler({
786+
runId: "run-unchanged-snapshot",
787+
seq: 1,
788+
stream: "assistant",
789+
ts: Date.now(),
790+
data: { text: "Hello world" },
791+
});
792+
793+
now = 11_450;
794+
handler({
795+
runId: "run-unchanged-snapshot",
796+
seq: 2,
797+
stream: "assistant",
798+
ts: Date.now(),
799+
data: { text: "Hello world" },
800+
});
801+
802+
const chatCalls = chatBroadcastCalls(broadcast);
803+
expect(chatCalls).toHaveLength(1);
804+
const payload = chatCalls[0]?.[1] as { deltaText?: string };
805+
expect(payload.deltaText).toBe("Hello world");
806+
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(1);
807+
nowSpy.mockRestore();
808+
});
809+
776810
it("marks non-prefix replacement deltas explicitly", () => {
777811
let now = 11_300;
778812
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
@@ -815,6 +849,51 @@ describe("agent event handler", () => {
815849
nowSpy.mockRestore();
816850
});
817851

852+
it("flushes throttled shorter replacement deltas before final", () => {
853+
let now = 11_700;
854+
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
855+
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
856+
chatRunState.registry.add("run-short-replacement-flush", {
857+
sessionKey: "session-short-replacement-flush",
858+
clientRunId: "client-short-replacement-flush",
859+
});
860+
861+
handler({
862+
runId: "run-short-replacement-flush",
863+
seq: 1,
864+
stream: "assistant",
865+
ts: Date.now(),
866+
data: { text: "Hello world" },
867+
});
868+
869+
now = 11_760;
870+
handler({
871+
runId: "run-short-replacement-flush",
872+
seq: 2,
873+
stream: "assistant",
874+
ts: Date.now(),
875+
data: { text: "Hi" },
876+
});
877+
878+
emitLifecycleEnd(handler, "run-short-replacement-flush", 3);
879+
880+
const chatCalls = chatBroadcastCalls(broadcast);
881+
expect(chatCalls).toHaveLength(3);
882+
const replacementPayload = chatCalls[1]?.[1] as {
883+
state?: string;
884+
deltaText?: string;
885+
replace?: boolean;
886+
message?: { content?: Array<{ text?: string }> };
887+
};
888+
expect(replacementPayload.state).toBe("delta");
889+
expect(replacementPayload.deltaText).toBe("Hi");
890+
expect(replacementPayload.replace).toBe(true);
891+
expect(replacementPayload.message?.content?.[0]?.text).toBe("Hi");
892+
expect((chatCalls[2]?.[1] as { state?: string }).state).toBe("final");
893+
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
894+
nowSpy.mockRestore();
895+
});
896+
818897
it("cleans up agent run sequence tracking when lifecycle completes", () => {
819898
const { agentRunSeq, chatRunState, handler, nowSpy } = createHarness({ now: 2_500 });
820899
chatRunState.registry.add("run-cleanup", {

src/gateway/server-chat.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,24 @@ function readChatErrorKind(value: unknown): ErrorKind | undefined {
174174
: undefined;
175175
}
176176

177+
type BroadcastDelta = { deltaText: string; replace?: true };
178+
177179
function resolveBroadcastDelta(params: {
178180
text: string;
179181
previousBroadcastText: string | undefined;
180-
}): { deltaText: string; replace?: true } {
181-
const previous = params.previousBroadcastText ?? "";
182-
if (previous && !params.text.startsWith(previous)) {
182+
}): BroadcastDelta | undefined {
183+
if (!params.text) {
184+
return undefined;
185+
}
186+
const previous = params.previousBroadcastText;
187+
if (previous === undefined) {
188+
return { deltaText: params.text };
189+
}
190+
if (!params.text.startsWith(previous)) {
183191
return { deltaText: params.text, replace: true };
184192
}
185193
const deltaText = params.text.slice(previous.length);
186-
return { deltaText: deltaText || params.text };
194+
return deltaText ? { deltaText } : undefined;
187195
}
188196

189197
export type AgentEventHandlerOptions = {
@@ -476,6 +484,9 @@ export function createAgentEventHandler({
476484
text: mergedText,
477485
previousBroadcastText: chatRunState.deltaLastBroadcastText.get(clientRunId),
478486
});
487+
if (!broadcastDelta) {
488+
return;
489+
}
479490
chatRunState.deltaSentAt.set(clientRunId, now);
480491
chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length);
481492
chatRunState.deltaLastBroadcastText.set(clientRunId, mergedText);
@@ -537,16 +548,14 @@ export function createAgentEventHandler({
537548
return;
538549
}
539550

540-
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
541-
if (text.length <= lastBroadcastLen) {
542-
return;
543-
}
544-
545551
const now = Date.now();
546552
const delta = resolveBroadcastDelta({
547553
text,
548554
previousBroadcastText: chatRunState.deltaLastBroadcastText.get(clientRunId),
549555
});
556+
if (!delta) {
557+
return;
558+
}
550559
const spawnedBy = resolveSpawnedBy(sessionKey);
551560
const flushPayload = {
552561
runId: clientRunId,
@@ -585,7 +594,7 @@ export function createAgentEventHandler({
585594
// Flush any throttled delta so streaming clients receive the complete text
586595
// before the final event. The 150 ms throttle in emitChatDelta may have
587596
// suppressed the most recent chunk, leaving the client with stale text.
588-
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
597+
// Only flush if the buffered text differs from the last broadcast to avoid duplicates.
589598
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq);
590599
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
591600
chatRunState.deltaLastBroadcastText.delete(clientRunId);

0 commit comments

Comments
 (0)