Skip to content

Commit 10315ce

Browse files
samzongsteipete
authored andcommitted
fix(gateway): add incremental chat delta payloads
1 parent 2a67a7f commit 10315ce

23 files changed

Lines changed: 215 additions & 7 deletions

docs/gateway/protocol.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,9 @@ enumeration of `src/gateway/server-methods/*.ts`.
468468
### Common event families
469469

470470
- `chat`: UI chat updates such as `chat.inject` and other transcript-only chat
471-
events.
471+
events. Delta payloads keep `message` as the cumulative assistant snapshot for
472+
compatibility and may include `deltaText` when the Gateway can provide a safe
473+
additive text suffix.
472474
- `session.message` and `session.tool`: transcript/event-stream updates for a
473475
subscribed session.
474476
- `sessions.changed`: session index or metadata changed.

packages/sdk/src/client.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ function readChatProjectionText(payload: Record<string, unknown>): string | unde
240240
return text.length > 0 ? text : undefined;
241241
}
242242

243+
function readChatProjectionDeltaText(payload: Record<string, unknown>): string | undefined {
244+
return typeof payload.deltaText === "string" ? payload.deltaText : undefined;
245+
}
246+
243247
function isAssistantRunEvent(event: OpenClawEvent): boolean {
244248
return event.type === "assistant.delta" || event.type === "assistant.message";
245249
}
@@ -259,8 +263,9 @@ function normalizeChatProjectionEvent(
259263
previousText: string | undefined,
260264
): OpenClawEvent {
261265
const text = readChatProjectionText(projection.payload);
266+
const deltaText = readChatProjectionDeltaText(projection.payload);
262267
const isReplacement = Boolean(
263-
previousText && text !== undefined && !text.startsWith(previousText),
268+
deltaText === undefined && previousText && text !== undefined && !text.startsWith(previousText),
264269
);
265270
return {
266271
...event,
@@ -270,7 +275,7 @@ function normalizeChatProjectionEvent(
270275
? text !== undefined
271276
? {
272277
text,
273-
delta: isReplacement ? text : text.slice(previousText?.length ?? 0),
278+
delta: deltaText ?? (isReplacement ? text : text.slice(previousText?.length ?? 0)),
274279
...(isReplacement ? { replace: true } : {}),
275280
}
276281
: event.data

packages/sdk/src/index.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,77 @@ describe("OpenClaw SDK", () => {
776776
}
777777
});
778778

779+
it("uses chat projection deltaText when present", async () => {
780+
const ts = 1_777_000_000_300;
781+
const transport = new FakeTransport({
782+
agent: (
783+
_params: unknown,
784+
_options: GatewayRequestOptions | undefined,
785+
fake: FakeTransport,
786+
) => {
787+
fake.emit({
788+
event: "chat",
789+
seq: 1,
790+
payload: {
791+
runId: "run_chat_delta_text",
792+
sessionKey: "chat-delta-text",
793+
state: "delta",
794+
deltaText: "hello",
795+
message: {
796+
role: "assistant",
797+
content: [{ type: "text", text: "hello" }],
798+
timestamp: ts,
799+
},
800+
},
801+
});
802+
fake.emit({
803+
event: "chat",
804+
seq: 2,
805+
payload: {
806+
runId: "run_chat_delta_text",
807+
sessionKey: "chat-delta-text",
808+
state: "delta",
809+
deltaText: " provided",
810+
message: {
811+
role: "assistant",
812+
content: [{ type: "text", text: "hello again" }],
813+
timestamp: ts + 1,
814+
},
815+
},
816+
});
817+
return { status: "accepted", runId: "run_chat_delta_text", sessionKey: "chat-delta-text" };
818+
},
819+
});
820+
const oc = new OpenClaw({ transport });
821+
822+
const run = await oc.runs.create({
823+
input: "stream with chat deltaText",
824+
idempotencyKey: "chat-delta-text-events",
825+
sessionKey: "chat-delta-text",
826+
});
827+
const iterator = run.events()[Symbol.asyncIterator]();
828+
829+
try {
830+
const first = await iterator.next();
831+
expect(first.done).toBe(false);
832+
if (first.done !== false) {
833+
throw new Error("expected first chat projection event");
834+
}
835+
expect(first.value.type).toBe("assistant.delta");
836+
expect(first.value.data).toEqual({ text: "hello", delta: "hello" });
837+
838+
const second = await iterator.next();
839+
expect(second.done).toBe(false);
840+
if (second.done !== false) {
841+
throw new Error("expected second chat projection event");
842+
}
843+
expect(second.value.type).toBe("assistant.delta");
844+
expect(second.value.data).toEqual({ text: "hello again", delta: " provided" });
845+
} finally {
846+
await iterator.return?.();
847+
}
848+
});
849+
779850
it("creates a session and sends a message as a run", async () => {
780851
const transport = new FakeTransport({
781852
"sessions.create": { key: "session-main", label: "Main" },

src/gateway/chat-abort.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ function createOps(params: {
5353
chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []),
5454
chatDeltaSentAt: new Map([[runId, Date.now()]]),
5555
chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]),
56+
chatDeltaLastBroadcastText: new Map(buffer !== undefined ? [[runId, buffer]] : []),
5657
chatAbortedRuns: new Map(),
5758
removeChatRun,
5859
agentRunSeq: new Map(),
@@ -108,6 +109,7 @@ describe("abortChatRunById", () => {
108109
expect(ops.chatRunBuffers.has(runId)).toBe(false);
109110
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
110111
expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
112+
expect(ops.chatDeltaLastBroadcastText.has(runId)).toBe(false);
111113
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
112114
expect(ops.agentRunSeq.has(runId)).toBe(false);
113115
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);

src/gateway/chat-abort.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ export type ChatAbortOps = {
112112
chatRunBuffers: Map<string, string>;
113113
chatDeltaSentAt: Map<string, number>;
114114
chatDeltaLastBroadcastLen: Map<string, number>;
115+
chatDeltaLastBroadcastText: Map<string, string>;
115116
chatAbortedRuns: Map<string, number>;
116117
removeChatRun: (
117118
sessionId: string,
@@ -176,6 +177,7 @@ export function abortChatRunById(
176177
ops.chatRunBuffers.delete(runId);
177178
ops.chatDeltaSentAt.delete(runId);
178179
ops.chatDeltaLastBroadcastLen.delete(runId);
180+
ops.chatDeltaLastBroadcastText.delete(runId);
179181
const removed = ops.removeChatRun(runId, runId, sessionKey);
180182
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
181183
emitAgentEvent({

src/gateway/protocol/index.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { describe, expect, it } from "vitest";
33
import { TALK_TEST_PROVIDER_ID } from "../../test-utils/talk-test-provider.js";
44
import {
55
formatValidationErrors,
6+
validateChatEvent,
67
validateModelsListParams,
78
validateNodeEventResult,
89
validateNodePairRequestParams,
@@ -449,6 +450,24 @@ describe("validateWakeParams", () => {
449450
});
450451
});
451452

453+
describe("validateChatEvent", () => {
454+
it("accepts additive chat delta text", () => {
455+
expect(
456+
validateChatEvent({
457+
runId: "run-chat",
458+
sessionKey: "agent:main:main",
459+
seq: 1,
460+
state: "delta",
461+
deltaText: "hello",
462+
message: {
463+
role: "assistant",
464+
content: [{ type: "text", text: "hello" }],
465+
},
466+
}),
467+
).toBe(true);
468+
});
469+
});
470+
452471
describe("validateModelsListParams", () => {
453472
it("accepts the supported model catalog views", () => {
454473
expect(validateModelsListParams({})).toBe(true);

src/gateway/protocol/schema/logs-chat.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ export const ChatEventSchema = Type.Object(
8383
Type.Literal("error"),
8484
]),
8585
message: Type.Optional(Type.Unknown()),
86+
deltaText: Type.Optional(Type.String()),
8687
errorMessage: Type.Optional(Type.String()),
8788
errorKind: Type.Optional(
8889
Type.Union([

src/gateway/server-chat-state.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export type ChatRunState = {
7070
deltaSentAt: Map<string, number>;
7171
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
7272
deltaLastBroadcastLen: Map<string, number>;
73+
deltaLastBroadcastText: Map<string, string>;
7374
abortedRuns: Map<string, number>;
7475
clear: () => void;
7576
};
@@ -80,6 +81,7 @@ export function createChatRunState(): ChatRunState {
8081
const buffers = new Map<string, string>();
8182
const deltaSentAt = new Map<string, number>();
8283
const deltaLastBroadcastLen = new Map<string, number>();
84+
const deltaLastBroadcastText = new Map<string, string>();
8385
const abortedRuns = new Map<string, number>();
8486

8587
const clear = () => {
@@ -88,6 +90,7 @@ export function createChatRunState(): ChatRunState {
8890
buffers.clear();
8991
deltaSentAt.clear();
9092
deltaLastBroadcastLen.clear();
93+
deltaLastBroadcastText.clear();
9194
abortedRuns.clear();
9295
};
9396

@@ -97,6 +100,7 @@ export function createChatRunState(): ChatRunState {
97100
buffers,
98101
deltaSentAt,
99102
deltaLastBroadcastLen,
103+
deltaLastBroadcastText,
100104
abortedRuns,
101105
clear,
102106
};

src/gateway/server-chat.agent-events.test.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,11 @@ describe("agent event handler", () => {
383383
expect(chatCalls).toHaveLength(1);
384384
const payload = chatCalls[0]?.[1] as {
385385
state?: string;
386+
deltaText?: string;
386387
message?: { content?: Array<{ text?: string }> };
387388
};
388389
expect(payload.state).toBe("delta");
390+
expect(payload.deltaText).toBe("Hello world");
389391
expect(payload.message?.content?.[0]?.text).toBe("Hello world");
390392
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(1);
391393
nowSpy?.mockRestore();
@@ -619,14 +621,17 @@ describe("agent event handler", () => {
619621

620622
const chatCalls = chatBroadcastCalls(broadcast);
621623
expect(chatCalls).toHaveLength(3);
622-
const firstPayload = chatCalls[0]?.[1] as { state?: string };
624+
const firstPayload = chatCalls[0]?.[1] as { state?: string; deltaText?: string };
623625
const secondPayload = chatCalls[1]?.[1] as {
624626
state?: string;
627+
deltaText?: string;
625628
message?: { content?: Array<{ text?: string }> };
626629
};
627630
const thirdPayload = chatCalls[2]?.[1] as { state?: string };
628631
expect(firstPayload.state).toBe("delta");
632+
expect(firstPayload.deltaText).toBe("Hello");
629633
expect(secondPayload.state).toBe("delta");
634+
expect(secondPayload.deltaText).toBe(" world");
630635
expect(secondPayload.message?.content?.[0]?.text).toBe("Hello world");
631636
expect(thirdPayload.state).toBe("final");
632637
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
@@ -665,13 +670,15 @@ describe("agent event handler", () => {
665670
expect(chatCalls).toHaveLength(3);
666671
const secondPayload = chatCalls[1]?.[1] as {
667672
state?: string;
673+
deltaText?: string;
668674
message?: { content?: Array<{ text?: string }> };
669675
};
670676
const finalPayload = chatCalls[2]?.[1] as {
671677
state?: string;
672678
message?: { content?: Array<{ text?: string }> };
673679
};
674680
expect(secondPayload.state).toBe("delta");
681+
expect(secondPayload.deltaText).toBe("\nAfter tool call");
675682
expect(secondPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
676683
expect(finalPayload.state).toBe("final");
677684
expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
@@ -711,13 +718,15 @@ describe("agent event handler", () => {
711718
expect(chatCalls).toHaveLength(3);
712719
const flushPayload = chatCalls[1]?.[1] as {
713720
state?: string;
721+
deltaText?: string;
714722
message?: { content?: Array<{ text?: string }> };
715723
};
716724
const finalPayload = chatCalls[2]?.[1] as {
717725
state?: string;
718726
message?: { content?: Array<{ text?: string }> };
719727
};
720728
expect(flushPayload.state).toBe("delta");
729+
expect(flushPayload.deltaText).toBe("\nAfter tool call");
721730
expect(flushPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
722731
expect(finalPayload.state).toBe("final");
723732
expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
@@ -764,6 +773,46 @@ describe("agent event handler", () => {
764773
nowSpy.mockRestore();
765774
});
766775

776+
it("omits deltaText when a non-prefix replacement is broadcast", () => {
777+
let now = 11_300;
778+
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
779+
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
780+
chatRunState.registry.add("run-replacement", {
781+
sessionKey: "session-replacement",
782+
clientRunId: "client-replacement",
783+
});
784+
785+
handler({
786+
runId: "run-replacement",
787+
seq: 1,
788+
stream: "assistant",
789+
ts: Date.now(),
790+
data: { text: "Hello world" },
791+
});
792+
793+
now = 11_500;
794+
handler({
795+
runId: "run-replacement",
796+
seq: 2,
797+
stream: "assistant",
798+
ts: Date.now(),
799+
data: { text: "Goodbye world" },
800+
});
801+
802+
const chatCalls = chatBroadcastCalls(broadcast);
803+
expect(chatCalls).toHaveLength(2);
804+
const firstPayload = chatCalls[0]?.[1] as { deltaText?: string };
805+
const replacementPayload = chatCalls[1]?.[1] as {
806+
deltaText?: string;
807+
message?: { content?: Array<{ text?: string }> };
808+
};
809+
expect(firstPayload.deltaText).toBe("Hello world");
810+
expect(replacementPayload.message?.content?.[0]?.text).toBe("Goodbye world");
811+
expect(replacementPayload.deltaText).toBeUndefined();
812+
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2);
813+
nowSpy.mockRestore();
814+
});
815+
767816
it("cleans up agent run sequence tracking when lifecycle completes", () => {
768817
const { agentRunSeq, chatRunState, handler, nowSpy } = createHarness({ now: 2_500 });
769818
chatRunState.registry.add("run-cleanup", {
@@ -887,9 +936,11 @@ describe("agent event handler", () => {
887936
expect(chatCalls).toHaveLength(2);
888937
const flushedPayload = chatCalls[1]?.[1] as {
889938
state?: string;
939+
deltaText?: string;
890940
message?: { content?: Array<{ text?: string }> };
891941
};
892942
expect(flushedPayload.state).toBe("delta");
943+
expect(flushedPayload.deltaText).toBe(" expanded");
893944
expect(flushedPayload.message?.content?.[0]?.text).toBe("Before tool expanded");
894945
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2);
895946

0 commit comments

Comments
 (0)