@@ -50,11 +50,49 @@ const log = createSubsystemLogger("ollama-stream");
5050
5151export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL ;
5252
53+ const OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12 ;
54+ const OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64 ;
5355const GARBLED_VISIBLE_TEXT_MODEL_RE = / \b (?: g l m | k i m i ) \b / i;
5456const GARBLED_VISIBLE_TEXT_MIN_CHARS = 80 ;
5557const GARBLED_VISIBLE_TEXT_SYMBOL_RE = / [ $ # % & = " ' _ ~ ` ^ | \\ / * + \- [ \] { } ( ) < > : ; , . ! ? ] / gu;
5658const LETTER_OR_DIGIT_RE = / [ \p{ L} \p{ N} ] / gu;
5759
60+ type OllamaStreamCooperativeScheduler = {
61+ afterEvent : ( ) => Promise < void > ;
62+ } ;
63+
64+ function throwIfOllamaStreamAborted ( signal ?: AbortSignal ) : void {
65+ if ( signal ?. aborted ) {
66+ throw new Error ( "Request was aborted" ) ;
67+ }
68+ }
69+
70+ function createOllamaStreamCooperativeScheduler (
71+ signal ?: AbortSignal ,
72+ ) : OllamaStreamCooperativeScheduler {
73+ let lastYieldedAt = Date . now ( ) ;
74+ let eventsSinceYield = 0 ;
75+ return {
76+ async afterEvent ( ) {
77+ throwIfOllamaStreamAborted ( signal ) ;
78+ eventsSinceYield += 1 ;
79+ const now = Date . now ( ) ;
80+ if (
81+ eventsSinceYield < OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
82+ now - lastYieldedAt < OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
83+ ) {
84+ return ;
85+ }
86+ eventsSinceYield = 0 ;
87+ lastYieldedAt = now ;
88+ await new Promise < void > ( ( resolve ) => {
89+ setImmediate ( resolve ) ;
90+ } ) ;
91+ throwIfOllamaStreamAborted ( signal ) ;
92+ } ,
93+ } ;
94+ }
95+
5896function countMatches ( text : string , re : RegExp ) : number {
5997 re . lastIndex = 0 ;
6098 return Array . from ( text . matchAll ( re ) ) . length ;
@@ -1155,6 +1193,7 @@ export function createOllamaStreamFn(
11551193 let pendingFinalVisibleContent : string | undefined ;
11561194 const modelInfo = { api : model . api , provider : model . provider , id : model . id } ;
11571195 const visibleContentSanitizer = createOllamaVisibleContentSanitizer ( model . id ) ;
1196+ const cooperativeScheduler = createOllamaStreamCooperativeScheduler ( options ?. signal ) ;
11581197 let streamStarted = false ;
11591198 let thinkingStarted = false ;
11601199 let thinkingEnded = false ;
@@ -1275,6 +1314,7 @@ export function createOllamaStreamFn(
12751314 } ;
12761315
12771316 for await ( const chunk of parseNdjsonStream ( reader ) ) {
1317+ throwIfOllamaStreamAborted ( options ?. signal ) ;
12781318 const thinkingDelta = chunk . message ?. thinking ?? chunk . message ?. reasoning ;
12791319 if ( thinkingDelta ) {
12801320 if ( ! streamStarted ) {
@@ -1327,6 +1367,7 @@ export function createOllamaStreamFn(
13271367 finalResponse = chunk ;
13281368 break ;
13291369 }
1370+ await cooperativeScheduler . afterEvent ( ) ;
13301371 }
13311372
13321373 if ( ! finalResponse ) {
0 commit comments