Skip to content

Concurrency is not being optimized #10

@adriangb

Description

@adriangb

It looks like concurrency / parallelism is not being maximized due to the grouping of dependencies into node groups. Here's a simple example:

import asyncio
from time import time
from typing import Annotated

async def a():
    await asyncio.sleep(1)

async def b():
    await asyncio.sleep(2)

async def c(a):
    await asyncio.sleep(1)

async def d(b, c):
    pass

async def main_asyncinjector():
    reg = Registry(a, b, c, d)
    start = time()
    await reg.resolve(d)
    print(time()-start)

asyncio.run(main_asyncinjector())

This should take 2 seconds to run (start a and b, once a finishes start c, b and c finish at the same time and you're done) but takes 3 seconds (start a and b, wait for both to finish then start c).

This happens because graphlib.TopologicalSorter is not used online and instead it is being used to statically compute groups of dependencies.

I don't think it would be too hard to address this, but I'm not sure how much you'd want to change to accommodate this.
I work on a similar project (https://github.com/adriangb/di) and there I found it very useful to break out the concept of an "executor" out of the container/registry concept, which means that instead of a parallel option you'd have pluggable executors that could choose to use concurrency, limit concurrency, use threads instead, etc.
FWIW here's what that looks like with this example:

import asyncio
from time import time
from typing import Annotated

from asyncinject import Registry
from di.dependant import Marker, Dependant
from di.container import Container
from di.executors import ConcurrentAsyncExecutor


async def a():
    await asyncio.sleep(1)

async def b():
    await asyncio.sleep(2)

async def c(a: Annotated[None, Marker(a)]):
    await asyncio.sleep(1)

async def d(b: Annotated[None, Marker(b)], c: Annotated[None, Marker(c)]):
    pass

async def main_asyncinjector():
    reg = Registry(a, b, c, d)
    start = time()
    await reg.resolve(d)
    print(time()-start)


async def main_di():
    container = Container()
    solved = container.solve(Dependant(d), scopes=[None])
    executor = ConcurrentAsyncExecutor()
    async with container.enter_scope(None) as state:
        start = time()
        await container.execute_async(solved, executor, state=state)
        print(time()-start)

asyncio.run(main_asyncinjector())  # 3 seconds
asyncio.run(main_di())  # 2 seconds

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions