Idea of LifespanManager to have multiple lifespans in app
#9397
Replies: 5 comments 5 replies
-
|
@uriyyo You could use this import contextlib
from collections.abc import AsyncIterator, Callable, Sequence
from contextlib import AbstractAsyncContextManager
from fastapi import FastAPI
@contextlib.asynccontextmanager
async def _manager(
app: FastAPI,
lifespans: Sequence[Callable[[FastAPI], AbstractAsyncContextManager[None]]],
) -> AsyncIterator[None]:
exit_stack = contextlib.AsyncExitStack()
async with exit_stack:
for lifespan in lifespans:
await exit_stack.enter_async_context(lifespan(app))
yield
class Lifespans:
def __init__(
self,
lifespans: Sequence[Callable[[FastAPI], AbstractAsyncContextManager[None]]],
) -> None:
self.lifespans = lifespans
def __call__(self, app: FastAPI) -> AbstractAsyncContextManager[None]:
self.app = app
return _manager(app, lifespans=self.lifespans)
@contextlib.asynccontextmanager
async def one(app: FastAPI) -> AsyncIterator[None]:
print("One")
yield
print("End One")
@contextlib.asynccontextmanager
async def two(app: FastAPI) -> AsyncIterator[None]:
print("Two")
yield
print("End Two")
app = FastAPI(lifespan=Lifespans([one, two])) |
Beta Was this translation helpful? Give feedback.
-
|
Thanks both for your help and inspiration! I wouldnt have found my solution without you... I confess I did not run this example, but the code is ripped from a working version that I am using....so apologies for any oversights. |
Beta Was this translation helpful? Give feedback.
-
|
For good reason, it's not possible. It's been known since the 1970s that Model-View-Controller (MVC) and its derivatives rule. Build the database externally. It's an unorthodox solution, better start a task. |
Beta Was this translation helpful? Give feedback.
-
|
The solutions above discuss multiple lifespan functions at the top level. But for mounted sub-applications, I've found that lifespan handlers are only called on the top-level application. In my case, I would like two sub-applications mounted at You could imagine the top-level lifespan handler might look like this, with a @contextlib.asynccontextmanager
async def top_lifespan(app):
async with cascade_lifespan(app):
# now all of our children have already started up
yield
# now all of our children are going to shut downA simple version looks like this: @contextlib.asynccontextmanager
async def cascade_lifespan(parent):
async with contextlib.AsyncExitStack() as stack:
for route in parent.router.routes:
if isinstance(route, fastapi.routing.Mount):
await stack.enter_async_context(
route.app.router.lifespan_context(route.app)
)
yield But this is unfortunately too simple and breaks down:
Here is a slightly more complete, but pretty convoluted implementation. It creates a task for each app callable to send it lifespan events. This version also doesn't handle exceptions or state correctly.@contextlib.asynccontextmanager
async def memory_stream_context(buffer_size):
send, receive = anyio.create_memory_object_stream(buffer_size)
try:
yield send, receive
finally:
await send.aclose()
await receive.aclose()
def _collect_subapps(app):
for r in getattr(app, 'routes', ()):
if isinstance(r, fastapi.routing.Mount):
yield from _collect_subapps(r.app) # comment out if you don't want recursive enumeration
yield r.app
def _lifespan_driver(app, tg):
@contextlib.asynccontextmanager
async def _cm():
scope = {
'type': 'lifespan',
'asgi': {'version': '3.0', 'spec_version': '1.0'},
'state': {},
}
async with (
memory_stream_context(0) as (
drv_send,
app_recv,
),
memory_stream_context(0) as (
app_send,
drv_recv,
),
):
tg.start_soon(app, scope, app_recv.receive, app_send.send)
# startup
await drv_send.send({'type': 'lifespan.startup'})
if (msg := await drv_recv.receive())[
'type'
] != 'lifespan.startup.complete':
raise RuntimeError(f'startup failed: {msg}')
try:
yield
finally:
# shutdown
await drv_send.send({'type': 'lifespan.shutdown'})
if (msg := await drv_recv.receive())[
'type'
] != 'lifespan.shutdown.complete':
raise RuntimeError(f'shutdown failed: {msg}')
return _cm()
@contextlib.asynccontextmanager
async def cascade_lifespan(root_app):
async with (
anyio.create_task_group() as tg,
contextlib.AsyncExitStack() as stack,
):
for sub in _collect_subapps(root_app): # inner → outer
await stack.enter_async_context(_lifespan_driver(sub, tg))
yield |
Beta Was this translation helpful? Give feedback.
-
|
Or ... undeprecate the original event handlers which are, anyway, a cleaner syntax. The stated reason for deprecation is that global (or, at least, module level) variables are required to coordinate startup/shutdown work. This is not a compelling rationale when the service-level features typically access singleton objects (as shown in all the examples!). Especially in a language with weak member access controls like Python. In short, it wasn't broken, why non un-fix it? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
The old approach with
on_startup,on_shutdownis deprecated and will be removed in future versions ofstarlette.The new
lifespanapproach has a small issue in that you can't have multiplelifespansdefined. It's great to have a separate lifespan for db connection, cache connection, .etc.I have created library called fastapi-lifespan-manager which aimed to fixed this issue.
I am happy to open PR to add this feature to
fastapicause the library is pretty small itself.Usage example:
Beta Was this translation helpful? Give feedback.
All reactions