@@ -51,11 +51,49 @@ const log = createSubsystemLogger("ollama-stream");
5151
5252export const OLLAMA_NATIVE_BASE_URL = OLLAMA_DEFAULT_BASE_URL ;
5353
54+ const OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12 ;
55+ const OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64 ;
5456const GARBLED_VISIBLE_TEXT_MODEL_RE = / \b (?: g l m | k i m i ) \b / i;
5557const GARBLED_VISIBLE_TEXT_MIN_CHARS = 80 ;
5658const GARBLED_VISIBLE_TEXT_SYMBOL_RE = / [ $ # % & = " ' _ ~ ` ^ | \\ / * + \- [ \] { } ( ) < > : ; , . ! ? ] / gu;
5759const LETTER_OR_DIGIT_RE = / [ \p{ L} \p{ N} ] / gu;
5860
61+ type OllamaStreamCooperativeScheduler = {
62+ afterEvent : ( ) => Promise < void > ;
63+ } ;
64+
65+ function throwIfOllamaStreamAborted ( signal ?: AbortSignal ) : void {
66+ if ( signal ?. aborted ) {
67+ throw new Error ( "Request was aborted" ) ;
68+ }
69+ }
70+
71+ function createOllamaStreamCooperativeScheduler (
72+ signal ?: AbortSignal ,
73+ ) : OllamaStreamCooperativeScheduler {
74+ let lastYieldedAt = Date . now ( ) ;
75+ let eventsSinceYield = 0 ;
76+ return {
77+ async afterEvent ( ) {
78+ throwIfOllamaStreamAborted ( signal ) ;
79+ eventsSinceYield += 1 ;
80+ const now = Date . now ( ) ;
81+ if (
82+ eventsSinceYield < OLLAMA_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
83+ now - lastYieldedAt < OLLAMA_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
84+ ) {
85+ return ;
86+ }
87+ eventsSinceYield = 0 ;
88+ lastYieldedAt = now ;
89+ await new Promise < void > ( ( resolve ) => {
90+ setTimeout ( resolve , 0 ) ;
91+ } ) ;
92+ throwIfOllamaStreamAborted ( signal ) ;
93+ } ,
94+ } ;
95+ }
96+
5997function countMatches ( text : string , re : RegExp ) : number {
6098 re . lastIndex = 0 ;
6199 return Array . from ( text . matchAll ( re ) ) . length ;
@@ -523,18 +561,22 @@ function buildStreamAssistantMessage(params: {
523561
524562function buildStreamErrorAssistantMessage ( params : {
525563 model : StreamModelDescriptor ;
564+ stopReason : Extract < StopReason , "aborted" | "error" > ;
526565 errorMessage : string ;
527566 timestamp ?: number ;
528- } ) : AssistantMessage & { stopReason : "error" ; errorMessage : string } {
567+ } ) : AssistantMessage & {
568+ stopReason : Extract < StopReason , "aborted" | "error" > ;
569+ errorMessage : string ;
570+ } {
529571 return {
530572 ...buildStreamAssistantMessage ( {
531573 model : params . model ,
532574 content : [ ] ,
533- stopReason : "error" ,
575+ stopReason : params . stopReason ,
534576 usage : buildUsageWithNoCost ( { } ) ,
535577 timestamp : params . timestamp ,
536578 } ) ,
537- stopReason : "error" ,
579+ stopReason : params . stopReason ,
538580 errorMessage : params . errorMessage ,
539581 } ;
540582}
@@ -1169,6 +1211,7 @@ function createRawOllamaStreamFn(
11691211 let pendingFinalVisibleContent : string | undefined ;
11701212 const modelInfo = { api : model . api , provider : model . provider , id : model . id } ;
11711213 const visibleContentSanitizer = createOllamaVisibleContentSanitizer ( model . id ) ;
1214+ const cooperativeScheduler = createOllamaStreamCooperativeScheduler ( options ?. signal ) ;
11721215 let streamStarted = false ;
11731216 let thinkingStarted = false ;
11741217 let thinkingEnded = false ;
@@ -1289,6 +1332,7 @@ function createRawOllamaStreamFn(
12891332 } ;
12901333
12911334 for await ( const chunk of parseNdjsonStream ( reader ) ) {
1335+ throwIfOllamaStreamAborted ( options ?. signal ) ;
12921336 const thinkingDelta = chunk . message ?. thinking ?? chunk . message ?. reasoning ;
12931337 if ( thinkingDelta ) {
12941338 if ( ! streamStarted ) {
@@ -1341,6 +1385,7 @@ function createRawOllamaStreamFn(
13411385 finalResponse = chunk ;
13421386 break ;
13431387 }
1388+ await cooperativeScheduler . afterEvent ( ) ;
13441389 }
13451390
13461391 if ( ! finalResponse ) {
@@ -1392,11 +1437,13 @@ function createRawOllamaStreamFn(
13921437 await release ( ) ;
13931438 }
13941439 } catch ( err ) {
1440+ const stopReason = options ?. signal ?. aborted ? "aborted" : "error" ;
13951441 stream . push ( {
13961442 type : "error" ,
1397- reason : "error" ,
1443+ reason : stopReason ,
13981444 error : buildStreamErrorAssistantMessage ( {
13991445 model,
1446+ stopReason,
14001447 errorMessage : formatErrorMessage ( err ) ,
14011448 } ) ,
14021449 } ) ;
0 commit comments