Skip to content

Commit e40e6a4

Browse files
committed
fix(ollama): yield during dense stream processing
1 parent f6a49a4 commit e40e6a4

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

extensions/ollama/src/stream.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,45 @@ describe("createOllamaStreamFn thinking events", () => {
249249
auditContext: "ollama-stream.chat",
250250
});
251251
});
252+
253+
it("yields to the event loop while processing dense native stream chunks", async () => {
254+
const chunks = [
255+
...Array.from({ length: 200 }, (_value, index) => ({
256+
model: "qwen3.5",
257+
created_at: `2026-01-01T00:00:${String(index % 60).padStart(2, "0")}Z`,
258+
message: { role: "assistant" as const, content: "x" },
259+
done: false,
260+
})),
261+
makeOllamaResponse({ content: "" }),
262+
];
263+
const body = makeNdjsonBody(chunks);
264+
fetchWithSsrFGuardMock.mockResolvedValue({
265+
response: new Response(body, { status: 200 }),
266+
release: vi.fn(async () => undefined),
267+
});
268+
269+
const streamFn = createOllamaStreamFn("http://localhost:11434");
270+
const stream = streamFn(
271+
{ api: "ollama", provider: "ollama", id: "qwen3.5", contextWindow: 65536 } as never,
272+
{ messages: [{ role: "user", content: "test" }] } as never,
273+
{},
274+
);
275+
276+
let timerFired = false;
277+
const timerPromise = new Promise<void>((resolve) => {
278+
setTimeout(() => {
279+
timerFired = true;
280+
resolve();
281+
}, 0);
282+
});
283+
let yieldedBeforeDone = false;
284+
for await (const event of stream as AsyncIterable<{ type: string }>) {
285+
if (timerFired && event.type !== "done") {
286+
yieldedBeforeDone = true;
287+
}
288+
}
289+
await timerPromise;
290+
291+
expect(yieldedBeforeDone).toBe(true);
292+
});
252293
});

extensions/ollama/src/stream.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,49 @@ const log = createSubsystemLogger("ollama-stream");
5050

5151
export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL;
5252

53+
const OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12;
54+
const OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64;
5355
const GARBLED_VISIBLE_TEXT_MODEL_RE = /\b(?:glm|kimi)\b/i;
5456
const GARBLED_VISIBLE_TEXT_MIN_CHARS = 80;
5557
const GARBLED_VISIBLE_TEXT_SYMBOL_RE = /[$#%&="'_~`^|\\/*+\-[\]{}()<>:;,.!?]/gu;
5658
const LETTER_OR_DIGIT_RE = /[\p{L}\p{N}]/gu;
5759

60+
type OllamaStreamCooperativeScheduler = {
61+
afterEvent: () => Promise<void>;
62+
};
63+
64+
function throwIfOllamaStreamAborted(signal?: AbortSignal): void {
65+
if (signal?.aborted) {
66+
throw new Error("Request was aborted");
67+
}
68+
}
69+
70+
function createOllamaStreamCooperativeScheduler(
71+
signal?: AbortSignal,
72+
): OllamaStreamCooperativeScheduler {
73+
let lastYieldedAt = Date.now();
74+
let eventsSinceYield = 0;
75+
return {
76+
async afterEvent() {
77+
throwIfOllamaStreamAborted(signal);
78+
eventsSinceYield += 1;
79+
const now = Date.now();
80+
if (
81+
eventsSinceYield < OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
82+
now - lastYieldedAt < OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
83+
) {
84+
return;
85+
}
86+
eventsSinceYield = 0;
87+
lastYieldedAt = now;
88+
await new Promise<void>((resolve) => {
89+
setImmediate(resolve);
90+
});
91+
throwIfOllamaStreamAborted(signal);
92+
},
93+
};
94+
}
95+
5896
function countMatches(text: string, re: RegExp): number {
5997
re.lastIndex = 0;
6098
return Array.from(text.matchAll(re)).length;
@@ -1155,6 +1193,7 @@ export function createOllamaStreamFn(
11551193
let pendingFinalVisibleContent: string | undefined;
11561194
const modelInfo = { api: model.api, provider: model.provider, id: model.id };
11571195
const visibleContentSanitizer = createOllamaVisibleContentSanitizer(model.id);
1196+
const cooperativeScheduler = createOllamaStreamCooperativeScheduler(options?.signal);
11581197
let streamStarted = false;
11591198
let thinkingStarted = false;
11601199
let thinkingEnded = false;
@@ -1275,6 +1314,7 @@ export function createOllamaStreamFn(
12751314
};
12761315

12771316
for await (const chunk of parseNdjsonStream(reader)) {
1317+
throwIfOllamaStreamAborted(options?.signal);
12781318
const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning;
12791319
if (thinkingDelta) {
12801320
if (!streamStarted) {
@@ -1327,6 +1367,7 @@ export function createOllamaStreamFn(
13271367
finalResponse = chunk;
13281368
break;
13291369
}
1370+
await cooperativeScheduler.afterEvent();
13301371
}
13311372

13321373
if (!finalResponse) {

0 commit comments

Comments
 (0)