Skip to content

Commit 552ebcc

Browse files
committed
fix(slack): preserve buffered thread stream replies
1 parent 99b1726 commit 552ebcc

5 files changed

Lines changed: 86 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ Docs: https://docs.openclaw.ai
119119
- Web fetch: bound guarded dispatcher cleanup after request timeouts so timed-out fetches return tool errors instead of leaving Gateway tool lanes active. (#78439) Thanks @obviyus.
120120
- Mattermost/setup: prompt for and persist the server base URL after the bot token in `openclaw setup --wizard`, instead of failing validation before `--http-url` is collected. Fixes #76670. Thanks @jacobtomlinson.
121121
- Gate Slack startup user allowlist resolution [AI]. (#77898) Thanks @pgondhi987.
122+
- Slack/streaming: fall back to normal threaded delivery when native stream finalization rejects a locally buffered reply, so generated Slack thread replies no longer disappear before posting. Fixes #78061. Thanks @KennanHoa.
122123
- OpenAI/Codex: suppress stale `openai-codex` GPT-5.1/5.2/5.3 model refs that ChatGPT/Codex OAuth accounts now reject, keeping model lists, config validation, and forward-compat resolution on current 5.4/5.5 routes. Fixes #67158. Thanks @drpau.
123124
- CLI/update: keep pnpm package updates on the running custom global install root and pass pnpm's `--global-dir` so `openclaw update` does not create a second default-prefix install when `OPENCLAW_HOME` or the shell points at a custom OpenClaw directory. Fixes #78377. Thanks @amknight.
124125
- Google Meet/Voice Call: wait longer before playing PIN-derived Twilio DTMF for Meet dial-in prompts and retire stale delegated phone sessions instead of reusing completed calls.

extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,36 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
10911091
expect(session.stopped).toBe(true);
10921092
});
10931093

1094+
it("routes pending native stream text through chunked sender for unexpected finalize failures", async () => {
1095+
mockedNativeStreaming = true;
1096+
const session = {
1097+
channel: "C123",
1098+
threadTs: THREAD_TS,
1099+
stopped: false,
1100+
delivered: false,
1101+
pendingText: FINAL_REPLY_TEXT,
1102+
};
1103+
startSlackStreamMock.mockResolvedValueOnce(session);
1104+
stopSlackStreamMock.mockRejectedValueOnce(
1105+
new TestSlackStreamNotDeliveredError(
1106+
FINAL_REPLY_TEXT,
1107+
"method_not_supported_for_channel_type",
1108+
),
1109+
);
1110+
1111+
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
1112+
1113+
expect(postMessageMock).not.toHaveBeenCalled();
1114+
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
1115+
expect(deliverRepliesMock).toHaveBeenCalledWith(
1116+
expect.objectContaining({
1117+
replyThreadTs: THREAD_TS,
1118+
replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })],
1119+
}),
1120+
);
1121+
expect(session.stopped).toBe(true);
1122+
});
1123+
10941124
it("routes all pending native stream text through chunked sender when an append flush fails", async () => {
10951125
mockedNativeStreaming = true;
10961126
mockedDispatchSequence = [

extensions/slack/src/monitor/message-handler/dispatch.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,8 +1235,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
12351235
} catch (err) {
12361236
if (err instanceof SlackStreamNotDeliveredError) {
12371237
streamFallbackDelivered = await deliverPendingStreamFallback(finalStream, err);
1238+
if (!streamFallbackDelivered && !finalStream.delivered) {
1239+
dispatchError ??= err;
1240+
}
12381241
} else {
12391242
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));
1243+
if (!finalStream.delivered) {
1244+
dispatchError ??= err;
1245+
}
12401246
}
12411247
}
12421248
}

extensions/slack/src/streaming.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,40 @@ describe("stopSlackStream finalize error handling", () => {
8383
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world");
8484
});
8585

86+
it("throws SlackStreamNotDeliveredError for unexpected finalize codes while text is buffered", async () => {
87+
const session = makeSession({
88+
appendImpl: async () => null,
89+
stopImpl: async () => {
90+
throw slackApiError("method_not_supported_for_channel_type");
91+
},
92+
});
93+
await appendSlackStream({ session, text: "short thread reply" });
94+
95+
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
96+
97+
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
98+
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe(
99+
"method_not_supported_for_channel_type",
100+
);
101+
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("short thread reply");
102+
});
103+
104+
it("throws SlackStreamNotDeliveredError for non-Slack stop errors while text is buffered", async () => {
105+
const session = makeSession({
106+
appendImpl: async () => null,
107+
stopImpl: async () => {
108+
throw new Error("socket reset");
109+
},
110+
});
111+
await appendSlackStream({ session, text: "locally buffered reply" });
112+
113+
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
114+
115+
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
116+
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("unknown");
117+
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("locally buffered reply");
118+
});
119+
86120
it("clears pendingText after an append flush is acknowledged by Slack", async () => {
87121
const session = makeSession({
88122
appendImpl: async () => ({ ts: "1700000000.100203" }),

extensions/slack/src/streaming.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
197197
* After calling this the stream message becomes a normal Slack message.
198198
* Optionally include final text to append before stopping.
199199
*
200-
* If Slack's `chat.stopStream` responds with a known benign finalize error
201-
* (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) AND any prior `append`
202-
* has already landed on Slack, the error is swallowed and the session is
203-
* marked stopped - the already-delivered text stays visible.
200+
* If Slack's `chat.stopStream` responds with an error while text is still
201+
* buffered locally, this function throws a {@link SlackStreamNotDeliveredError}
202+
* carrying that pending text so the caller can deliver it through the normal
203+
* Slack reply path.
204204
*
205-
* If the same benign error fires while text is still only buffered locally
206-
* (e.g. short replies that never exceeded the SDK's buffer_size), this
207-
* function throws a {@link SlackStreamNotDeliveredError} carrying that pending
208-
* text so the caller can deliver it through the normal Slack reply path.
205+
* If Slack responds with a known benign finalize error (see
206+
* {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) after prior `append` calls already
207+
* landed, the error is swallowed and the session is marked stopped - the
208+
* already-delivered text stays visible.
209209
*
210210
* All other errors propagate unchanged.
211211
*/
@@ -233,13 +233,14 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
233233
session.delivered = true;
234234
session.pendingText = "";
235235
} catch (err) {
236+
const code = extractSlackErrorCode(err) ?? "unknown";
237+
if (session.pendingText) {
238+
// stop() can be the first network call for short replies. If Slack
239+
// rejects that finalize for any reason, the user has not seen the
240+
// SDK-buffered text yet. Let the caller fall back to chat.postMessage.
241+
throw new SlackStreamNotDeliveredError(session.pendingText, code);
242+
}
236243
if (isBenignSlackFinalizeError(err)) {
237-
const code = extractSlackErrorCode(err) ?? "unknown";
238-
if (session.pendingText) {
239-
// stop() can be the first network call for short replies. If Slack
240-
// Connect rejects it, the user has not seen the SDK-buffered text yet.
241-
throw new SlackStreamNotDeliveredError(session.pendingText, code);
242-
}
243244
if (session.delivered) {
244245
logVerbose(
245246
`slack-stream: finalize rejected by Slack (${code}); prior appends delivered, treating stream as stopped`,

0 commit comments

Comments
 (0)