Skip to content

Commit 3a478d7

Browse files
udaymanish6vincentkoc
authored andcommitted
fix(ollama): yield during dense stream processing
1 parent b9dc3c3 commit 3a478d7

2 files changed

Lines changed: 137 additions & 4 deletions

File tree

extensions/ollama/src/stream.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,4 +356,90 @@ describe("createOllamaStreamFn thinking events", () => {
356356
arguments: { path: "/path/to/file", line_start: 1, line_end: 400 },
357357
});
358358
});
359+
360+
it("yields to the event loop while processing dense native stream chunks", async () => {
361+
const chunks = [
362+
...Array.from({ length: 65 }, (_value, index) => ({
363+
model: "qwen3.5",
364+
created_at: `2026-01-01T00:00:${String(index % 60).padStart(2, "0")}Z`,
365+
message: { role: "assistant" as const, content: "x" },
366+
done: false,
367+
})),
368+
makeOllamaResponse({ content: "" }),
369+
];
370+
const body = makeNdjsonBody(chunks);
371+
fetchWithSsrFGuardMock.mockResolvedValue({
372+
response: new Response(body, { status: 200 }),
373+
release: vi.fn(async () => undefined),
374+
});
375+
376+
const streamFn = createOllamaStreamFn("http://localhost:11434");
377+
const stream = streamFn(
378+
{ api: "ollama", provider: "ollama", id: "qwen3.5", contextWindow: 65536 } as never,
379+
{ messages: [{ role: "user", content: "test" }] } as never,
380+
{},
381+
);
382+
383+
let timerFired = false;
384+
const timerPromise = new Promise<void>((resolve) => {
385+
setTimeout(() => {
386+
timerFired = true;
387+
resolve();
388+
}, 0);
389+
});
390+
let yieldedBeforeDone = false;
391+
for await (const event of stream as AsyncIterable<{ type: string }>) {
392+
if (timerFired && event.type !== "done") {
393+
yieldedBeforeDone = true;
394+
}
395+
}
396+
await timerPromise;
397+
398+
expect(yieldedBeforeDone).toBe(true);
399+
});
400+
401+
it("reports caller aborts during dense native stream processing as aborted", async () => {
402+
const chunks = [
403+
...Array.from({ length: 65 }, (_value, index) => ({
404+
model: "qwen3.5",
405+
created_at: `2026-01-01T00:00:${String(index % 60).padStart(2, "0")}Z`,
406+
message: { role: "assistant" as const, content: "x" },
407+
done: false,
408+
})),
409+
makeOllamaResponse({ content: "" }),
410+
];
411+
const body = makeNdjsonBody(chunks);
412+
fetchWithSsrFGuardMock.mockResolvedValue({
413+
response: new Response(body, { status: 200 }),
414+
release: vi.fn(async () => undefined),
415+
});
416+
417+
const controller = new AbortController();
418+
const streamFn = createOllamaStreamFn("http://localhost:11434");
419+
const stream = streamFn(
420+
{ api: "ollama", provider: "ollama", id: "qwen3.5", contextWindow: 65536 } as never,
421+
{ messages: [{ role: "user", content: "test" }] } as never,
422+
{ signal: controller.signal },
423+
);
424+
425+
setTimeout(() => {
426+
controller.abort();
427+
}, 0);
428+
429+
const events: Array<{ type: string; reason?: string; error?: { stopReason?: string } }> = [];
430+
for await (const event of stream as AsyncIterable<{
431+
type: string;
432+
reason?: string;
433+
error?: { stopReason?: string };
434+
}>) {
435+
events.push(event);
436+
}
437+
438+
const lastEvent = events.at(-1);
439+
expect(lastEvent).toMatchObject({
440+
type: "error",
441+
reason: "aborted",
442+
error: { stopReason: "aborted" },
443+
});
444+
});
359445
});

extensions/ollama/src/stream.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,49 @@ const log = createSubsystemLogger("ollama-stream");
5151

5252
export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL;
5353

54+
const OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12;
55+
const OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64;
5456
const GARBLED_VISIBLE_TEXT_MODEL_RE = /\b(?:glm|kimi)\b/i;
5557
const GARBLED_VISIBLE_TEXT_MIN_CHARS = 80;
5658
const GARBLED_VISIBLE_TEXT_SYMBOL_RE = /[$#%&="'_~`^|\\/*+\-[\]{}()<>:;,.!?]/gu;
5759
const LETTER_OR_DIGIT_RE = /[\p{L}\p{N}]/gu;
5860

61+
type OllamaStreamCooperativeScheduler = {
62+
afterEvent: () => Promise<void>;
63+
};
64+
65+
function throwIfOllamaStreamAborted(signal?: AbortSignal): void {
66+
if (signal?.aborted) {
67+
throw new Error("Request was aborted");
68+
}
69+
}
70+
71+
function createOllamaStreamCooperativeScheduler(
72+
signal?: AbortSignal,
73+
): OllamaStreamCooperativeScheduler {
74+
let lastYieldedAt = Date.now();
75+
let eventsSinceYield = 0;
76+
return {
77+
async afterEvent() {
78+
throwIfOllamaStreamAborted(signal);
79+
eventsSinceYield += 1;
80+
const now = Date.now();
81+
if (
82+
eventsSinceYield < OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
83+
now - lastYieldedAt < OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
84+
) {
85+
return;
86+
}
87+
eventsSinceYield = 0;
88+
lastYieldedAt = now;
89+
await new Promise<void>((resolve) => {
90+
setTimeout(resolve, 0);
91+
});
92+
throwIfOllamaStreamAborted(signal);
93+
},
94+
};
95+
}
96+
5997
function countMatches(text: string, re: RegExp): number {
6098
re.lastIndex = 0;
6199
return Array.from(text.matchAll(re)).length;
@@ -523,18 +561,22 @@ function buildStreamAssistantMessage(params: {
523561

524562
function buildStreamErrorAssistantMessage(params: {
525563
model: StreamModelDescriptor;
564+
stopReason: Extract<StopReason, "aborted" | "error">;
526565
errorMessage: string;
527566
timestamp?: number;
528-
}): AssistantMessage & { stopReason: "error"; errorMessage: string } {
567+
}): AssistantMessage & {
568+
stopReason: Extract<StopReason, "aborted" | "error">;
569+
errorMessage: string;
570+
} {
529571
return {
530572
...buildStreamAssistantMessage({
531573
model: params.model,
532574
content: [],
533-
stopReason: "error",
575+
stopReason: params.stopReason,
534576
usage: buildUsageWithNoCost({}),
535577
timestamp: params.timestamp,
536578
}),
537-
stopReason: "error",
579+
stopReason: params.stopReason,
538580
errorMessage: params.errorMessage,
539581
};
540582
}
@@ -1169,6 +1211,7 @@ function createRawOllamaStreamFn(
11691211
let pendingFinalVisibleContent: string | undefined;
11701212
const modelInfo = { api: model.api, provider: model.provider, id: model.id };
11711213
const visibleContentSanitizer = createOllamaVisibleContentSanitizer(model.id);
1214+
const cooperativeScheduler = createOllamaStreamCooperativeScheduler(options?.signal);
11721215
let streamStarted = false;
11731216
let thinkingStarted = false;
11741217
let thinkingEnded = false;
@@ -1289,6 +1332,7 @@ function createRawOllamaStreamFn(
12891332
};
12901333

12911334
for await (const chunk of parseNdjsonStream(reader)) {
1335+
throwIfOllamaStreamAborted(options?.signal);
12921336
const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning;
12931337
if (thinkingDelta) {
12941338
if (!streamStarted) {
@@ -1341,6 +1385,7 @@ function createRawOllamaStreamFn(
13411385
finalResponse = chunk;
13421386
break;
13431387
}
1388+
await cooperativeScheduler.afterEvent();
13441389
}
13451390

13461391
if (!finalResponse) {
@@ -1392,11 +1437,13 @@ function createRawOllamaStreamFn(
13921437
await release();
13931438
}
13941439
} catch (err) {
1440+
const stopReason = options?.signal?.aborted ? "aborted" : "error";
13951441
stream.push({
13961442
type: "error",
1397-
reason: "error",
1443+
reason: stopReason,
13981444
error: buildStreamErrorAssistantMessage({
13991445
model,
1446+
stopReason,
14001447
errorMessage: formatErrorMessage(err),
14011448
}),
14021449
});

0 commit comments

Comments
 (0)