How to call a async function from a synchronized code Python
Asked Answered
A

5

84

So I'm locked to a python 3.6.2 interpreter that follows my desktop application.

What I want is to call an async function from a synchronized method or function.

When calling the python function from the desktop application it has to be a normal function which can not be awaited.

From the desktop application I am able to send a list of urls, and what I want is to send back response from every url in an async matter.

here is my try I've marked the SyntaxError which I don't know how to bypass.

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)
        
    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature
        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature

I have tried making these changes:

import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)
        
    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)

        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]


        

but now I end up with this error:

Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
Aime answered 9/8, 2018 at 8:29 Comment(2)
You may want to have a look at the trio library. It has a much more straightforward interface than the asyncio standard library.Selfcontained
Cool, looks like it implements run similar to asyncio in python 3.7. I will have a look at this.Aime
D
76

@deceze answer is probably the best you can do in Python 3.6. But in Python 3.7, you could directly use asyncio.run in the following way:

newfeature = asyncio.run(main(urls))

It will properly create, handle, and close an event_loop.

Dismissal answered 9/8, 2018 at 8:52 Comment(4)
But what if the code is already running in an asyncio.run call? If you use asyncio.run inside a function that is invoked with asyncio.run then you get RuntimeError: asyncio.run() cannot be called from a running event loopPeralta
@Peralta If you are already inside an event_loop you can simply call it via result = await main(urls).Inexplicable
@Inexplicable That doesn't solve the problem in the original question, you cannot use await unless the function is declared "async def" which may not be possible to do, e.g. for magic methods or interfaces that can't be changed.Skysail
Either asyncio.get_event_loop or asyncio.get_running_loop can be used to check for (and retrieve) any running loop; if there is one, the coroutine can be scheduled by calling asyncio.ensure_future.Benedicite
P
36

You would use an event loop to execute the asynchronous function to completion:

newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))

(This technique is already used inside main. And I'm not sure why, since main is async you could/should use await fetch_all(...) there.)

Penang answered 9/8, 2018 at 8:34 Comment(3)
But then I probably need to rewrite main, since it already has an event_loop?Aime
Interesting point, I'm not actually sure off the top of my head whether that would cause any issues. But as I wrote, it makes little sense to use run_until_complete inside an async function to begin with, you should simply await it.Penang
It works for me. Note that in the case you have not event_loop and so you have the error : "RuntimeError: There is no current event loop in thread 'Thread-n'", you can add asyncio.set_event_loop(asyncio.new_event_loop()) in your function to set an event loop.Burkle
T
20

There are also some libraries that exist to handle this and always do the right thing. One example is asgiref.sync described here which has methods async_to_sync and sync_to_async for performing these conversions:

from asgiref.sync import async_to_sync

@async_to_sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously

More info from the docs for asgiref.sync:

AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.

SyncToAsync lets async code call a synchronous function, which is run in a threadpool and control returned to the async coroutine when the synchronous function completes.

There are also other similar projects like koil

Tarver answered 3/5, 2020 at 4:38 Comment(4)
How come this answer has so little upvotes? Thanks, it's great for writing packages with aiohttp and for beginners taking their first steps with asynchronous features in Python.Tip
...because adding fragile third-party dependencies to do something Python's standard library already trivially does out-of-the-box is a bad idea – always. Just directly call asyncio.run() or asyncio.get_event_loop().run_until_complete(). In either case, it's a trivial one-liner. The asyncio module exists for a reason. </facepalm>Philipines
This not a simple thing that is handled by the standard library in any way I see described here. As commented around, functions can be nested in other threads, other runloops, and calling hierarchies of both sync and async code, and there may or may not be a runloop already for the current thread. A function that handles and documents all this well is needed.Tacet
That is right, asyncio.run() and run_until_complete() just doesn't work in general. Sure, why use a third party library when you can use the standard library in some obvious way, but in this case there really isn't one.Sidewalk
C
5

I was able to get this working in pure python 3.10 using the built-in asyncio.run_coroutine_threadsafe.

This is new to me, so there are probably some caveats, e.g. since the async method is not actually awaited, the process could (will) exit before the callback completes (unless you do something to ensure it doesn't).

For a reference on where this might occur in the wild, see the bleak BLE library class BleakClient callback method disconnected_callback. Then, in the callback try to emit using the async version of socket.io client, AsyncClient.

Concise problem/solution:

import asyncio
from typing import Callable

Callback = Callable[[int], None]


class SomeSystem:
    """Some library you don't control that is mostly async, but provides a callback that
    is _not_ async."""

    def __init__(self, callback: Callback):
        self._callback = callback

    async def do_something(self):
        """do some work and then call the non-async callback"""
        await asyncio.sleep(1.0)
        self._callback(1)
        await asyncio.sleep(1.0)
        self._callback(2)


async def some_async_method(value: int):
    """some long-running operation normally called by async code"""
    await asyncio.sleep(0.1)
    print(f"long-running: {value}")


async def main():
    """main is async and started as normal with asyncio.run"""
    print("BEGIN main")

    loop = asyncio.get_running_loop()

    def cb(value: int) -> None:
        """This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
        # some_async_method(value)  # RuntimeWarning: coroutine 'some_async_method' was never awaited
        asyncio.run_coroutine_threadsafe(some_async_method(value), loop)  # okay

    system = SomeSystem(cb)
    await system.do_something()

    # maybe ensure the last call to async method is awaited? Without this call, the final callback
    # won't be handled, since it's never being awaited. If anyone knows how to properly wait
    # for this, let me know in the comments!
    await asyncio.sleep(1.0)

    print("END main")


if __name__ == "__main__":
    asyncio.run(main())

Output

BEGIN main
long-running: 1
long-running: 2
END main
Coen answered 24/5, 2023 at 0:19 Comment(1)
How can I get result of some_async_method if it returns value?Martinet
G
0

I used the following code to call async functions using ros1 callbacks.
Wrap the async functions in a decorator with context to the "loop" variable and executing "run_coroutine_threadsafe".

The wrapped functions can then be used directly inside the ros1 sub callback.

This method creates a separate thread to run the async loop.
rospy.spin() and other blocking calls would prevent the async loop from functioning otherwise.

from contextlib import contextmanager
from asyncio import AbstractEventLoop as Loop
import asyncio
from threading import Thread
import time


@contextmanager
def async_context():
    """
    Creats an async loop on a seperate thread
    Run tasks on async loop from other threads with:
        asyncio.run_coroutine_threadsafe(async_function(*arg, **kwargs), loop)
    """

    def start_event_loop(loop: Loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()

    try:
        loop = asyncio.new_event_loop()
        loop_thread = Thread(target=start_event_loop, args=(loop,), daemon=True)
        loop_thread.start()
        yield loop
    finally:
        loop.call_soon_threadsafe(loop.stop)
        loop_thread.join()


async def example_async_function(time: float):
    print("async sleep: started")
    await asyncio.sleep(time)
    print("async sleep: done")


def main():
    with async_context() as loop:
        asyncio.run_coroutine_threadsafe(example_async_function(2), loop)
        print("sync sleep: started")
        time.sleep(5)
        print("sync sleep: done")


if __name__ == "__main__":
    main()
Greenleaf answered 28/5, 2024 at 15:38 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.