Skip to content

Commit 8a9258b

Browse files
authored
šŸ› Fix, avoid yield from a TaskGroup, only as an async context manager, closed in the request async exit stack (#15038)
1 parent 6038507 commit 8a9258b

1 file changed

Lines changed: 50 additions & 10 deletions

File tree

ā€Žfastapi/routing.pyā€Ž

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import anyio
3232
from annotated_doc import Doc
33+
from anyio.abc import ObjectReceiveStream
3334
from fastapi import params
3435
from fastapi._compat import (
3536
ModelField,
@@ -526,14 +527,24 @@ def _serialize_sse_item(item: Any) -> bytes:
526527
else:
527528
sse_aiter = iterate_in_threadpool(gen)
528529

529-
async def _async_stream_sse() -> AsyncIterator[bytes]:
530+
@asynccontextmanager
531+
async def _sse_producer_cm() -> AsyncIterator[
532+
ObjectReceiveStream[bytes]
533+
]:
530534
# Use a memory stream to decouple generator iteration
531535
# from the keepalive timer. A producer task pulls items
532536
# from the generator independently, so
533537
# `anyio.fail_after` never wraps the generator's
534538
# `__anext__` directly - avoiding CancelledError that
535539
# would finalize the generator and also working for sync
536540
# generators running in a thread pool.
541+
#
542+
# This context manager is entered on the request-scoped
543+
# AsyncExitStack so its __aexit__ (which cancels the
544+
# task group) is called by the exit stack after the
545+
# streaming response completes — not by async generator
546+
# finalization via GeneratorExit.
547+
# Ref: https://peps.python.org/pep-0789/
537548
send_stream, receive_stream = anyio.create_memory_object_stream[
538549
bytes
539550
](max_buffer_size=1)
@@ -543,25 +554,54 @@ async def _producer() -> None:
543554
async for raw_item in sse_aiter:
544555
await send_stream.send(_serialize_sse_item(raw_item))
545556

546-
async with anyio.create_task_group() as tg:
547-
tg.start_soon(_producer)
548-
async with receive_stream:
557+
send_keepalive, receive_keepalive = (
558+
anyio.create_memory_object_stream[bytes](max_buffer_size=1)
559+
)
560+
561+
async def _keepalive_inserter() -> None:
562+
"""Read from the producer and forward to the output,
563+
inserting keepalive comments on timeout."""
564+
async with send_keepalive, receive_stream:
549565
try:
550566
while True:
551567
try:
552568
with anyio.fail_after(_PING_INTERVAL):
553569
data = await receive_stream.receive()
554-
yield data
555-
# To allow for cancellation to trigger
556-
# Ref: https://github.com/fastapi/fastapi/issues/14680
557-
await anyio.sleep(0)
570+
await send_keepalive.send(data)
558571
except TimeoutError:
559-
yield KEEPALIVE_COMMENT
572+
await send_keepalive.send(KEEPALIVE_COMMENT)
560573
except anyio.EndOfStream:
561574
pass
562575

576+
async with anyio.create_task_group() as tg:
577+
tg.start_soon(_producer)
578+
tg.start_soon(_keepalive_inserter)
579+
yield receive_keepalive
580+
tg.cancel_scope.cancel()
581+
582+
# Enter the SSE context manager on the request-scoped
583+
# exit stack. The stack outlives the streaming response,
584+
# so __aexit__ runs via proper structured teardown, not
585+
# via GeneratorExit thrown into an async generator.
586+
sse_receive_stream = await async_exit_stack.enter_async_context(
587+
_sse_producer_cm()
588+
)
589+
# Ensure the receive stream is closed when the exit stack
590+
# unwinds, preventing ResourceWarning from __del__.
591+
async_exit_stack.push_async_callback(sse_receive_stream.aclose)
592+
593+
async def _sse_with_checkpoints(
594+
stream: ObjectReceiveStream[bytes],
595+
) -> AsyncIterator[bytes]:
596+
async for data in stream:
597+
yield data
598+
# Guarantee a checkpoint so cancellation can be
599+
# delivered even when the producer is faster than
600+
# the consumer and receive() never suspends.
601+
await anyio.sleep(0)
602+
563603
sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
564-
_async_stream_sse()
604+
_sse_with_checkpoints(sse_receive_stream)
565605
)
566606

567607
response = StreamingResponse(

0 commit comments

Comments
Ā (0)