Skip to content

Commit c0f574b

Browse files
udaymanish6vincentkoc
authored andcommitted
fix(ollama): yield during dense stream processing
1 parent c8cc010 commit c0f574b

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

extensions/ollama/src/stream.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,4 +356,45 @@ describe("createOllamaStreamFn thinking events", () => {
356356
arguments: { path: "/path/to/file", line_start: 1, line_end: 400 },
357357
});
358358
});
359+
360+
it("yields to the event loop while processing dense native stream chunks", async () => {
361+
const chunks = [
362+
...Array.from({ length: 200 }, (_value, index) => ({
363+
model: "qwen3.5",
364+
created_at: `2026-01-01T00:00:${String(index % 60).padStart(2, "0")}Z`,
365+
message: { role: "assistant" as const, content: "x" },
366+
done: false,
367+
})),
368+
makeOllamaResponse({ content: "" }),
369+
];
370+
const body = makeNdjsonBody(chunks);
371+
fetchWithSsrFGuardMock.mockResolvedValue({
372+
response: new Response(body, { status: 200 }),
373+
release: vi.fn(async () => undefined),
374+
});
375+
376+
const streamFn = createOllamaStreamFn("http://localhost:11434");
377+
const stream = streamFn(
378+
{ api: "ollama", provider: "ollama", id: "qwen3.5", contextWindow: 65536 } as never,
379+
{ messages: [{ role: "user", content: "test" }] } as never,
380+
{},
381+
);
382+
383+
let timerFired = false;
384+
const timerPromise = new Promise<void>((resolve) => {
385+
setTimeout(() => {
386+
timerFired = true;
387+
resolve();
388+
}, 0);
389+
});
390+
let yieldedBeforeDone = false;
391+
for await (const event of stream as AsyncIterable<{ type: string }>) {
392+
if (timerFired && event.type !== "done") {
393+
yieldedBeforeDone = true;
394+
}
395+
}
396+
await timerPromise;
397+
398+
expect(yieldedBeforeDone).toBe(true);
399+
});
359400
});

extensions/ollama/src/stream.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,49 @@ const log = createSubsystemLogger("ollama-stream");
5151

5252
export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL;
5353

54+
const OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12;
55+
const OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64;
5456
const GARBLED_VISIBLE_TEXT_MODEL_RE = /\b(?:glm|kimi)\b/i;
5557
const GARBLED_VISIBLE_TEXT_MIN_CHARS = 80;
5658
const GARBLED_VISIBLE_TEXT_SYMBOL_RE = /[$#%&="'_~`^|\\/*+\-[\]{}()<>:;,.!?]/gu;
5759
const LETTER_OR_DIGIT_RE = /[\p{L}\p{N}]/gu;
5860

61+
type OllamaStreamCooperativeScheduler = {
62+
afterEvent: () => Promise<void>;
63+
};
64+
65+
function throwIfOllamaStreamAborted(signal?: AbortSignal): void {
66+
if (signal?.aborted) {
67+
throw new Error("Request was aborted");
68+
}
69+
}
70+
71+
function createOllamaStreamCooperativeScheduler(
72+
signal?: AbortSignal,
73+
): OllamaStreamCooperativeScheduler {
74+
let lastYieldedAt = Date.now();
75+
let eventsSinceYield = 0;
76+
return {
77+
async afterEvent() {
78+
throwIfOllamaStreamAborted(signal);
79+
eventsSinceYield += 1;
80+
const now = Date.now();
81+
if (
82+
eventsSinceYield < OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
83+
now - lastYieldedAt < OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
84+
) {
85+
return;
86+
}
87+
eventsSinceYield = 0;
88+
lastYieldedAt = now;
89+
await new Promise<void>((resolve) => {
90+
setImmediate(resolve);
91+
});
92+
throwIfOllamaStreamAborted(signal);
93+
},
94+
};
95+
}
96+
5997
function countMatches(text: string, re: RegExp): number {
6098
re.lastIndex = 0;
6199
return Array.from(text.matchAll(re)).length;
@@ -1169,6 +1207,7 @@ function createRawOllamaStreamFn(
11691207
let pendingFinalVisibleContent: string | undefined;
11701208
const modelInfo = { api: model.api, provider: model.provider, id: model.id };
11711209
const visibleContentSanitizer = createOllamaVisibleContentSanitizer(model.id);
1210+
const cooperativeScheduler = createOllamaStreamCooperativeScheduler(options?.signal);
11721211
let streamStarted = false;
11731212
let thinkingStarted = false;
11741213
let thinkingEnded = false;
@@ -1289,6 +1328,7 @@ function createRawOllamaStreamFn(
12891328
};
12901329

12911330
for await (const chunk of parseNdjsonStream(reader)) {
1331+
throwIfOllamaStreamAborted(options?.signal);
12921332
const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning;
12931333
if (thinkingDelta) {
12941334
if (!streamStarted) {
@@ -1341,6 +1381,7 @@ function createRawOllamaStreamFn(
13411381
finalResponse = chunk;
13421382
break;
13431383
}
1384+
await cooperativeScheduler.afterEvent();
13441385
}
13451386

13461387
if (!finalResponse) {

0 commit comments

Comments
 (0)