Skip to content

Commit 15d9d50

Browse files
giodl73-repoCopilot
andcommitted
Deduplicate preview-streamed final replies
Track the latest partial-preview reply text during reply-agent runs and suppress matching final text-only payloads so Telegram partial streaming does not resend already-previewed blocks when block streaming is disabled. Keep the dedupe exact-match based to avoid dropping unrelated short finals, preserve errors, and keep unsent media while stripping duplicate caption text. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 16e5d66 commit 15d9d50

4 files changed

Lines changed: 146 additions & 2 deletions

File tree

src/auto-reply/reply/agent-runner-payloads.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,63 @@ describe("buildReplyPayloads media filter integration", () => {
490490
expect(replyPayloads).toHaveLength(0);
491491
});
492492

493+
it("suppresses final text payloads already covered by partial preview streaming", async () => {
494+
const { replyPayloads } = await buildReplyPayloads({
495+
...baseParams,
496+
previewStreamedText: "First block\n\nSecond block",
497+
payloads: [{ text: "First block" }, { text: "Second block" }],
498+
});
499+
500+
expect(replyPayloads).toHaveLength(0);
501+
});
502+
503+
it("keeps final text that was not covered by partial preview streaming", async () => {
504+
const { replyPayloads } = await buildReplyPayloads({
505+
...baseParams,
506+
previewStreamedText: "Working...",
507+
payloads: [{ text: "Done." }],
508+
});
509+
510+
expect(replyPayloads).toHaveLength(1);
511+
expectFields(replyPayloads[0], { text: "Done." });
512+
});
513+
514+
it("does not suppress short final text just because it appears inside preview text", async () => {
515+
const { replyPayloads } = await buildReplyPayloads({
516+
...baseParams,
517+
previewStreamedText: "Working on item 3",
518+
payloads: [{ text: "3" }],
519+
});
520+
521+
expect(replyPayloads).toHaveLength(1);
522+
expectFields(replyPayloads[0], { text: "3" });
523+
});
524+
525+
it("preserves media while removing duplicate preview-streamed caption text", async () => {
526+
const { replyPayloads } = await buildReplyPayloads({
527+
...baseParams,
528+
previewStreamedText: "Here is the chart",
529+
payloads: [{ text: "Here is the chart", mediaUrl: "file:///tmp/chart.png" }],
530+
});
531+
532+
expect(replyPayloads).toHaveLength(1);
533+
expectFields(replyPayloads[0], {
534+
text: undefined,
535+
mediaUrl: "file:///tmp/chart.png",
536+
});
537+
});
538+
539+
it("preserves errors even when their text appears in partial preview streaming", async () => {
540+
const { replyPayloads } = await buildReplyPayloads({
541+
...baseParams,
542+
previewStreamedText: "Agent couldn't generate a response. Please try again.",
543+
payloads: [{ text: "Agent couldn't generate a response. Please try again.", isError: true }],
544+
});
545+
546+
expect(replyPayloads).toHaveLength(1);
547+
expectFields(replyPayloads[0], { isError: true });
548+
});
549+
493550
it("drops all final payloads when block pipeline streamed successfully", async () => {
494551
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
495552
didStream: () => true,

src/auto-reply/reply/agent-runner-payloads.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ export async function buildReplyPayloads(params: {
158158
blockStreamingEnabled: boolean;
159159
blockReplyPipeline: BlockReplyPipeline | null;
160160
/** Payload keys sent directly (not via pipeline) during tool flush. */
161+
previewStreamedText?: string;
161162
directlySentBlockKeys?: Set<string>;
162163
replyToMode: ReplyToMode;
163164
replyToChannel?: OriginatingChannelType;
@@ -323,6 +324,54 @@ export async function buildReplyPayloads(params: {
323324
: mediaFilteredPayloads;
324325
const isDirectlySentBlockPayload = (payload: ReplyPayload) =>
325326
Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload)));
327+
const normalizePreviewDedupeText = (value: string | undefined): string =>
328+
(value ?? "").replace(/\s+/g, " ").trim();
329+
const buildPreviewDedupeTextSet = (value: string | undefined): Set<string> => {
330+
const dedupeText = new Set<string>();
331+
const normalizedWhole = normalizePreviewDedupeText(value);
332+
if (normalizedWhole) {
333+
dedupeText.add(normalizedWhole);
334+
}
335+
for (const block of (value ?? "").split(/\n{2,}/u)) {
336+
const normalizedBlock = normalizePreviewDedupeText(block);
337+
if (normalizedBlock) {
338+
dedupeText.add(normalizedBlock);
339+
}
340+
}
341+
return dedupeText;
342+
};
343+
const previewStreamedText = buildPreviewDedupeTextSet(params.previewStreamedText);
344+
const isPreviewStreamedTextPayload = (payload: ReplyPayload): boolean => {
345+
if (previewStreamedText.size === 0 || payload.isError) {
346+
return false;
347+
}
348+
const text = normalizePreviewDedupeText(payload.text);
349+
return Boolean(text && previewStreamedText.has(text));
350+
};
351+
const preserveUnsentMediaAfterPreviewStream = (payload: ReplyPayload): ReplyPayload | null => {
352+
if (!isPreviewStreamedTextPayload(payload)) {
353+
return payload;
354+
}
355+
const reply = resolveSendableOutboundReplyParts(payload);
356+
if (!reply.hasMedia) {
357+
return null;
358+
}
359+
return copyReplyPayloadMetadata(payload, {
360+
...payload,
361+
text: undefined,
362+
audioAsVoice: payload.audioAsVoice || undefined,
363+
});
364+
};
365+
const suppressPreviewStreamedPayloads = (payloads: ReplyPayload[]): ReplyPayload[] => {
366+
const unsent: ReplyPayload[] = [];
367+
for (const payload of payloads) {
368+
const next = preserveUnsentMediaAfterPreviewStream(payload);
369+
if (next) {
370+
unsent.push(next);
371+
}
372+
}
373+
return unsent;
374+
};
326375
const preserveUnsentMediaAfterBlockStream = (payload: ReplyPayload): ReplyPayload | null => {
327376
if (payload.isError) {
328377
return payload;
@@ -383,7 +432,9 @@ export async function buildReplyPayloads(params: {
383432
}
384433
return unsent;
385434
})()
386-
: dedupedPayloads;
435+
: previewStreamedText.size > 0
436+
? suppressPreviewStreamedPayloads(dedupedPayloads)
437+
: dedupedPayloads;
387438
const blockSentMediaUrls = params.blockStreamingEnabled
388439
? await normalizeSentMediaUrlsForDedupe({
389440
sentMediaUrls: params.blockReplyPipeline?.getSentMediaUrls() ?? [],

src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,27 @@ describe("runReplyAgent typing (heartbeat)", () => {
622622
}
623623
});
624624

625+
it("suppresses final text blocks already delivered through partial preview streaming", async () => {
626+
const onPartialReply = vi.fn();
627+
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
628+
await params.onPartialReply?.({ text: "First block\n\nSecond block" });
629+
return {
630+
payloads: [{ text: "First block" }, { text: "Second block" }],
631+
meta: {},
632+
};
633+
});
634+
635+
const { run } = createMinimalRun({
636+
opts: { onPartialReply },
637+
typingMode: "message",
638+
});
639+
640+
const result = await run();
641+
642+
expect(onPartialReply).toHaveBeenCalledWith({ text: "First block\n\nSecond block" });
643+
expect(result).toBeUndefined();
644+
});
645+
625646
it("suppresses narrated silent-turn partials, block replies, and final payloads", async () => {
626647
const onPartialReply = vi.fn();
627648
const onBlockReply = vi.fn();

src/auto-reply/reply/agent-runner.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,20 @@ export async function runReplyAgent(params: {
12711271
: null;
12721272

12731273
const replySessionKey = sessionKey ?? followupRun.run.sessionKey;
1274+
let latestPreviewStreamedText: string | undefined;
1275+
const effectiveOpts = opts?.onPartialReply
1276+
? {
1277+
...opts,
1278+
onPartialReply: async (
1279+
payload: Parameters<NonNullable<GetReplyOptions["onPartialReply"]>>[0],
1280+
) => {
1281+
if (typeof payload.text === "string" && payload.text.trim()) {
1282+
latestPreviewStreamedText = payload.text;
1283+
}
1284+
await opts.onPartialReply?.(payload);
1285+
},
1286+
}
1287+
: opts;
12741288
let replyOperation: ReplyOperation;
12751289
try {
12761290
replyOperation =
@@ -1453,7 +1467,7 @@ export async function runReplyAgent(params: {
14531467
sessionCtx,
14541468
replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading,
14551469
replyOperation,
1456-
opts,
1470+
opts: effectiveOpts,
14571471
typingSignals,
14581472
blockReplyPipeline,
14591473
blockStreamingEnabled,
@@ -1667,6 +1681,7 @@ export async function runReplyAgent(params: {
16671681
silentExpected: followupRun.run.silentExpected,
16681682
blockStreamingEnabled,
16691683
blockReplyPipeline,
1684+
previewStreamedText: latestPreviewStreamedText,
16701685
directlySentBlockKeys,
16711686
replyToMode,
16721687
replyToChannel,

0 commit comments

Comments
 (0)