Skip to content

Fix streaming text sanitization to prevent non-monotonic output#9412

Closed
ksylvan wants to merge 4 commits intoopenclaw:mainfrom
ksylvan:fix/delta-buffer-partial-overlap-handling
Closed

Fix streaming text sanitization to prevent non-monotonic output#9412
ksylvan wants to merge 4 commits intoopenclaw:mainfrom
ksylvan:fix/delta-buffer-partial-overlap-handling

Conversation

@ksylvan
Copy link

@ksylvan ksylvan commented Feb 5, 2026

Fix streaming text sanitization to prevent non-monotonic output

Fixes #8537

Summary

This PR fixes a streaming text bug where the sanitizeUserFacingText function's duplicate-block collapsing behavior caused non-monotonic output during streaming, resulting in lost textin the draft stream. The fix introduces a streaming mode that bypasses the duplicate-blockcollapsing logic while preserving other sanitization behaviors.

Files Changed

File Change Type Description
src/agents/pi-embedded-helpers/errors.ts Modified Added optional streaming parameter to sanitizeUserFacingText
src/agents/pi-embedded-helpers.sanitizeuserfacingtext.test.ts Modified Added test coverage for new streaming mode
src/agents/pi-embedded-subscribe.handlers.messages.ts Modified Added partial overlaphandling for delta buffer
src/auto-reply/reply/agent-runner-execution.ts Modified Enabled streaming mode when sanitizing text
src/telegram/bot-message-dispatch.ts Modified Fixed non-monotonic stream recovery topreserve accumulated text

Code Changes

errors.ts - New Streaming Mode

export function sanitizeUserFacingText(text: string, opts?: { streaming?: boolean }): string{
  // ... existing logic ...

  // Skip duplicate-block collapsing during streaming: the function is stateless
  // and can produce non-monotonic output as chunk boundaries shift, which causes
  // the downstream draft stream to lose accumulated text.
  if (opts?.streaming) {
    return stripped;
  }

  return collapseConsecutiveDuplicateBlocks(stripped);
}

pi-embedded-subscribe.handlers.messages.ts - Partial Overlap Handling

} else {
  // Partial overlap: deltaBuffer includes content as a substring but
  // neither is a prefix of the other. Use the suffix of content that
  // extends beyond the current deltaBuffer to avoid dropping text.
  const overlapIdx = content.indexOf(ctx.state.deltaBuffer);
  if (overlapIdx >= 0) {
    chunk = content.slice(0, overlapIdx);
  }
}

bot-message-dispatch.ts - Recovery Fix

} else {
  // Non-monotonic stream (e.g. sanitizer changed output shape).
  // Recover by using the full `text` as the new baseline instead of
  // losing all previously accumulated content.
  draftChunker?.reset();
  draftText = "";
  delta = text;  // <-- Previously missing, caused text loss
}

Reason for Changes

The collapseConsecutiveDuplicateBlocks function is stateless and operates on whatever textit receives at any given moment. During streaming, chunk boundaries shift as more content arrives, which can cause the function to produce different output shapes for the same logical content at different times. This non-monotonic behavior caused downstream consumers to detect a "reset" condition and discard accumulated text.

Impact of Changes

  1. Streaming reliability: Text will no longer be lost during streaming due to sanitizer output shape changes
  2. Final output unchanged: The final delivered message still uses extractAssistantText, so duplicate collapsing still applies to completed messages
  3. Telegram bot stability: Non-monotonic streams now recover gracefully instead of losing all prior content

Test Plan

New unit tests verify:

  • Streaming mode preserves text that would otherwise be collapsed
  • Streaming mode still strips <final> tags
  • Streaming mode still sanitizes error messages
it("skips duplicate-block collapsing in streaming mode", () => {
  const text = "Hello there!\n\nHello there!";
  expect(sanitizeUserFacingText(text)).toBe("Hello there!");
  expect(sanitizeUserFacingText(text, { streaming: true })).toBe(text);
});

Additional Notes

  • The partial overlap logic in handleMessageUpdate handles an edge case where neither the new content nor the buffer is a prefix of the other, but they share a substring relationship
  • The comment notes that leaving chunk as empty string is safe because final delivery doesn't rely on deltaBuffer

Greptile Overview

Greptile Summary

This PR introduces a streaming option to sanitizeUserFacingText so that streaming paths can bypass stateless duplicate-paragraph collapsing (which can change output shape across chunk boundaries), while preserving other sanitization behaviors like stripping <final> tags and rewriting common API/role-ordering errors. It also adjusts streaming/draft consumers (agent runner, Telegram draft streaming) to handle non-monotonic streams more safely by recovering with a full-text baseline instead of discarding accumulated text, and adds unit tests for the new streaming mode.

These changes fit into the codebase’s partial-reply pipelines by making sanitization predictable for incremental updates, while keeping the existing “final” message behavior (including duplicate collapsing) unchanged for completed deliveries.

Confidence Score: 5/5

  • This PR is safe to merge.
  • Most changes are localized and add test coverage for the new sanitization mode, and the Telegram recovery change is straightforward.

(2/5) Greptile learns from your feedback when you react with thumbs up/down!

@openclaw-barnacle openclaw-barnacle bot added channel: telegram Channel integration: telegram agents Agent runtime and tooling labels Feb 5, 2026
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@ksylvan
Copy link
Author

ksylvan commented Feb 5, 2026

Good catch @greptile-apps! You're absolutely right — the overlapIdx logic was flawed.

When deltaBuffer.includes(content) is true but neither is a prefix of the other, content is already a substring within deltaBuffer, meaning there's nothing new to append. The slice computation was nonsensical.

Fixed in 79c4a99: simplified to just chunk = "" with a clearer comment explaining why.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 5, 2026

You're welcome! The fix looks good — explicitly setting chunk = "" is much clearer and correctly handles the case where content is already present in deltaBuffer.

The updated logic now properly covers all three cases:

  1. Prefix: extend with new suffix
  2. Exact match: no new content
  3. Substring: no new content (your fix)

The comment in the new commit accurately explains why this is safe: the final delivery doesn't rely on deltaBuffer, so skipping the delta emission won't cause text loss.

- Add `streaming` option to `sanitizeUserFacingText` to skip duplicate-block collapsing
- Handle partial overlap in delta buffer to avoid dropping streamed text
- Pass streaming flag in agent runner execution for draft updates
- Recover gracefully from non-monotonic streams in Telegram dispatch
- Add comprehensive tests for streaming mode behavior
@ksylvan ksylvan force-pushed the fix/delta-buffer-partial-overlap-handling branch from 79c4a99 to ab69c5f Compare February 5, 2026 07:28
@ksylvan
Copy link
Author

ksylvan commented Feb 5, 2026

@steipete This fixes the common case I was seeing recently where long tool-use conversations were generating slightly garbled messages on both Telegram and the local gateway UI chat interface.

- Add buffer reset on content backtrack to prevent garbled output
- Clarify branch conditions with explicit comments for each case
- Reset `deltaBuffer`, `blockBuffer`, and `blockChunker` on content mismatch
- Reorder conditional branches from prefix-match to substring to fallback
- Remove stale comment about `extractAssistantText` fallback safety
- Clear `lastStreamedAssistant` and `lastStreamedAssistantCleaned` on backtrack rewrite
- Reset all streaming state, not just delta and block buffers
- Prevent stale cached assistant content from corrupting rewritten output
- Update comment to reflect full state reset scope
…tput

- Add `lastContentBlockIndex` to track current content block
- Flush pending block reply data when content block index changes
- Reset `deltaBuffer`, `blockBuffer`, and streamed state on new block
- Initialize `lastContentBlockIndex` to `-1` in subscribe state
- Reset `lastContentBlockIndex` on turn boundaries
- Prevent delta contamination across multi-block assistant responses
@ksylvan
Copy link
Author

ksylvan commented Feb 7, 2026

This fix does not completely work. Closing and refreshing.

@ksylvan ksylvan closed this Feb 7, 2026
@ksylvan ksylvan deleted the fix/delta-buffer-partial-overlap-handling branch February 7, 2026 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agents Agent runtime and tooling channel: telegram Channel integration: telegram

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Draft stream preview garbles long replies on Telegram (streamMode: block)

1 participant