3030
3131import anyio
3232from annotated_doc import Doc
33+ from anyio .abc import ObjectReceiveStream
3334from fastapi import params
3435from fastapi ._compat import (
3536 ModelField ,
@@ -526,14 +527,24 @@ def _serialize_sse_item(item: Any) -> bytes:
526527 else :
527528 sse_aiter = iterate_in_threadpool (gen )
528529
529- async def _async_stream_sse () -> AsyncIterator [bytes ]:
530+ @asynccontextmanager
531+ async def _sse_producer_cm () -> AsyncIterator [
532+ ObjectReceiveStream [bytes ]
533+ ]:
530534 # Use a memory stream to decouple generator iteration
531535 # from the keepalive timer. A producer task pulls items
532536 # from the generator independently, so
533537 # `anyio.fail_after` never wraps the generator's
534538 # `__anext__` directly - avoiding CancelledError that
535539 # would finalize the generator and also working for sync
536540 # generators running in a thread pool.
541+ #
542+ # This context manager is entered on the request-scoped
543+ # AsyncExitStack so its __aexit__ (which cancels the
544+ # task group) is called by the exit stack after the
545+ # streaming response completes ā not by async generator
546+ # finalization via GeneratorExit.
547+ # Ref: https://peps.python.org/pep-0789/
537548 send_stream , receive_stream = anyio .create_memory_object_stream [
538549 bytes
539550 ](max_buffer_size = 1 )
@@ -543,25 +554,54 @@ async def _producer() -> None:
543554 async for raw_item in sse_aiter :
544555 await send_stream .send (_serialize_sse_item (raw_item ))
545556
546- async with anyio .create_task_group () as tg :
547- tg .start_soon (_producer )
548- async with receive_stream :
557+ send_keepalive , receive_keepalive = (
558+ anyio .create_memory_object_stream [bytes ](max_buffer_size = 1 )
559+ )
560+
561+ async def _keepalive_inserter () -> None :
562+ """Read from the producer and forward to the output,
563+ inserting keepalive comments on timeout."""
564+ async with send_keepalive , receive_stream :
549565 try :
550566 while True :
551567 try :
552568 with anyio .fail_after (_PING_INTERVAL ):
553569 data = await receive_stream .receive ()
554- yield data
555- # To allow for cancellation to trigger
556- # Ref: https://github.com/fastapi/fastapi/issues/14680
557- await anyio .sleep (0 )
570+ await send_keepalive .send (data )
558571 except TimeoutError :
559- yield KEEPALIVE_COMMENT
572+ await send_keepalive . send ( KEEPALIVE_COMMENT )
560573 except anyio .EndOfStream :
561574 pass
562575
576+ async with anyio .create_task_group () as tg :
577+ tg .start_soon (_producer )
578+ tg .start_soon (_keepalive_inserter )
579+ yield receive_keepalive
580+ tg .cancel_scope .cancel ()
581+
582+ # Enter the SSE context manager on the request-scoped
583+ # exit stack. The stack outlives the streaming response,
584+ # so __aexit__ runs via proper structured teardown, not
585+ # via GeneratorExit thrown into an async generator.
586+ sse_receive_stream = await async_exit_stack .enter_async_context (
587+ _sse_producer_cm ()
588+ )
589+ # Ensure the receive stream is closed when the exit stack
590+ # unwinds, preventing ResourceWarning from __del__.
591+ async_exit_stack .push_async_callback (sse_receive_stream .aclose )
592+
593+ async def _sse_with_checkpoints (
594+ stream : ObjectReceiveStream [bytes ],
595+ ) -> AsyncIterator [bytes ]:
596+ async for data in stream :
597+ yield data
598+ # Guarantee a checkpoint so cancellation can be
599+ # delivered even when the producer is faster than
600+ # the consumer and receive() never suspends.
601+ await anyio .sleep (0 )
602+
563603 sse_stream_content : AsyncIterator [bytes ] | Iterator [bytes ] = (
564- _async_stream_sse ( )
604+ _sse_with_checkpoints ( sse_receive_stream )
565605 )
566606
567607 response = StreamingResponse (
0 commit comments