4

There is a lot of libraries that use their custom version of Future. kafka and s3transfer are just two examples: all their custom future-like classes have object as the superclass.

Not surprisingly, you cannot directly call asyncio.wrap_future() on such objects and can't use await with them.

What is the proper way of wrapping such futures for use with asyncio?

4
  • If you want to use kafka or s3transfer -- perhaps you don't want to work with asyncio code. At least mixing kafka calls inside an executor with regular async program gives expectedly slow result. The same is for s3transfer. What you need is aiokafka and aiobotocore usage. Commented Mar 18, 2018 at 21:00
  • We tried aiokafka 0.4.0 in the first place, but unfortunately it's way too buggy -- e.g., it just hangs in position() call or throws obscure exceptions. Just replacing it with kafka instantly solved all our problems. Commented Mar 18, 2018 at 22:05
  • Also, there should be no problem mixing asyncio and foreign futures as long as the blocking calls are run outside the asyncio thread, and connected to asyncio using call_soon_threadsafe (shown in the answer) or run_coroutine_threadsafe. Commented Mar 19, 2018 at 7:28
  • @kirill.shirokov sorry, I'm not Kafka expert. aiokarka is driven by my friend Taras. Sure, hi does the best and evolves the project fast. Please file a bug on github tracker if you've found a problem. Commented Mar 19, 2018 at 14:05

1 Answer 1

7

If the future class supports standard future features such as done callbacks and the result method, just use something like this:

def wrap_future(f):
    loop = asyncio.get_event_loop()
    aio_future = loop.create_future()
    def on_done(*_):
        try:
            result = f.result()
        except Exception as e:
            loop.call_soon_threadsafe(aio_future.set_exception, e)
        else:
            loop.call_soon_threadsafe(aio_future.set_result, result)
    f.add_done_callback(on_done)
    return aio_future

Consider that code a template which you can customize to match the specifics of the future you are dealing with.

Intended usage is to call it from the thread that runs the asyncio event loop:

value = await wrap_future(some_foreign_future)

If you are calling it from a different thread, be sure to pass loop explicitly because asyncio.get_event_loop will fail when invoked from a thread not registered with asyncio.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.