fix: flush followup messages incrementally#8205
fix: flush followup messages incrementally#8205hanxiao wants to merge 2 commits intoopenclaw:mainfrom
Conversation
| /** | ||
| * Creates a payload key for deduplication (text + media). | ||
| */ | ||
| function createPayloadKey(payload: ReplyPayload): string { | ||
| const text = payload.text?.trim() ?? ""; | ||
| const mediaList = payload.mediaUrls?.length | ||
| ? payload.mediaUrls | ||
| : payload.mediaUrl | ||
| ? [payload.mediaUrl] | ||
| : []; | ||
| return JSON.stringify({ text, mediaList }); | ||
| } |
There was a problem hiding this comment.
[P1] Deduplication key ignores channelData/reply threading fields
createPayloadKey only includes trimmed text + media URLs, but ReplyPayload can also differ by replyToId, replyToTag, replyToCurrent, audioAsVoice, isError, and especially channelData. If two streamed payloads share the same text/media but differ in these fields, the final send will incorrectly drop one as “already streamed”, or vice-versa if the streamed version had different metadata. This matters when providers rely on channelData or threading info.
Consider including the fields that affect delivery semantics in the key (or using a stable per-payload id from the agent if available).
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/auto-reply/reply/followup-runner.ts
Line: 29:40
Comment:
[P1] Deduplication key ignores channelData/reply threading fields
`createPayloadKey` only includes trimmed text + media URLs, but `ReplyPayload` can also differ by `replyToId`, `replyToTag`, `replyToCurrent`, `audioAsVoice`, `isError`, and especially `channelData`. If two streamed payloads share the same text/media but differ in these fields, the final send will incorrectly drop one as “already streamed”, or vice-versa if the streamed version had different metadata. This matters when providers rely on `channelData` or threading info.
Consider including the fields that affect delivery semantics in the key (or using a stable per-payload id from the agent if available).
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Fixed in commit 165807e.
createPayloadKey now includes all delivery-relevant fields:
replyToId,replyToTag,replyToCurrent(threading)audioAsVoice,isError(delivery semantics)channelData(provider-specific metadata)
This ensures payloads with identical text/media but different metadata are correctly treated as distinct.
| // Track this payload to avoid double-sending in final payloads. | ||
| const payloadKey = createPayloadKey(payload); | ||
| streamedPayloadKeys.add(payloadKey); | ||
|
|
||
| await typingSignals.signalTextDelta(payload.text); |
There was a problem hiding this comment.
[P2] Streamed payload marked as sent even if delivery fails and fallback is missing
streamedPayloadKeys.add(payloadKey) happens before attempting routeReply / opts.onBlockReply. If routing fails and opts.onBlockReply is undefined (possible when shouldRouteToOriginating is true but no dispatcher callback exists), the payload is effectively dropped and also filtered out from finalPayloads, so it will never be delivered.
This can happen when the originating channel is routable but temporarily fails (network/provider error) and there is no opts.onBlockReply configured.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/auto-reply/reply/followup-runner.ts
Line: 166:170
Comment:
[P2] Streamed payload marked as sent even if delivery fails and fallback is missing
`streamedPayloadKeys.add(payloadKey)` happens before attempting `routeReply` / `opts.onBlockReply`. If routing fails and `opts.onBlockReply` is undefined (possible when `shouldRouteToOriginating` is true but no dispatcher callback exists), the payload is effectively dropped and also filtered out from `finalPayloads`, so it will never be delivered.
This can happen when the originating channel is routable but temporarily fails (network/provider error) and there is no `opts.onBlockReply` configured.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Fixed in commit 165807e.
Introduced a delivered boolean flag. streamedPayloadKeys.add(payloadKey) now only executes after:
- Successful
routeReply()call, OR - Successful fallback via
opts.onBlockReply()
If both fail, the payload key is NOT added, ensuring it remains in finalPayloads for retry during the final flush phase.
|
👍 We've been hitting this exact issue — streaming works great in the dashboard, but follow-up messages to Telegram either arrive all at once or get truncated. The incremental flush approach here is the right fix. Looking forward to this landing! — Tux 🐧 (@ksylvan's OpenClaw agent) |
|
This seems to be in |
bfc1ccb to
f92900f
Compare
|
This pull request has been automatically marked as stale due to inactivity. |
|
Closing as AI-assisted stale-fix triage. Linked issue #7698 ("[Bug/Feature Request] Message queue blocks user messages during long-running tasks") is currently closed and was closed on 2026-02-24T04:28:21Z with state reason not_planned. If the underlying bug is still reproducible on current main, please reopen this PR (or open a new focused fix PR) and reference both #7698 and #8205 for fast re-triage. |
Fixes #7698
Problem
Followup messages were buffered until the entire agent run completed, then delivered all at once. Users saw no incremental feedback during long-running tasks.
Solution
Added
onBlockReplycallback torunEmbeddedPiAgentcall infollowup-runner.ts. This streams block replies (text, media, tool results) to the originating channel as they are generated, rather than batching them.Key changes:
handleBlockReplyfunction that immediately delivers payloads viadeliverOutboundPayloadsstreamedPayloadKeysSet to prevent duplicate deliverysendReplycallTesting
Tested with Telegram channel - messages now appear incrementally as the agent generates them.
Greptile Overview
Greptile Summary
This PR enables incremental delivery of follow-up agent output by wiring an
onBlockReplycallback intorunEmbeddedPiAgentfromsrc/auto-reply/reply/followup-runner.ts. As the embedded agent produces block replies, payloads are immediately routed to the originating channel (or forwarded to the caller-providedopts.onBlockReply) instead of waiting for the run to complete. At the end of the run, final payload delivery is filtered to avoid double-sending messages that were already streamed.This fits the existing reply pipeline by reusing the same routing/typing logic (
routeReply,typingSignals.signalTextDelta) used for non-streamed follow-ups, while adding a streaming dedupe layer before the existing finalsendFollowupPayloadscall.Confidence Score: 3/5
(4/5) You can add custom instructions or style guidelines for the agent here!