There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.
Pick and choose from a series of tools:
Synchronous versions using asyncio.run()
Provide synchronous wrappers around coroutines, which block until the coroutine completes.
Even an async generator function such as ticker()
can be handled this way, in a loop:
class UselessExample:
def __init__(self, delay):
self.delay = delay
async def a_ticker(self, to):
for i in range(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
agen = self.a_ticker(to)
try:
while True:
yield asyncio.run(agen.__anext__())
except StopAsyncIteration:
return
These synchronous wrappers can be generated with helper functions:
from functools import wraps
def sync_agen_method(agen_method):
@wraps(agen_method)
def wrapper(self, *args, **kwargs):
agen = agen_method(self, *args, **kwargs)
try:
while True:
yield asyncio.run(agen.__anext__())
except StopAsyncIteration:
return
if wrapper.__name__[:2] == 'a_':
wrapper.__name__ = wrapper.__name__[2:]
return wrapper
then just use ticker = sync_agen_method(a_ticker)
in the class definition.
Straight-up coroutine methods (not generator coroutines) could be wrapped with:
def sync_method(async_method):
@wraps(async_method)
def wrapper(self, *args, **kwargs):
return async.run(async_method(self, *args, **kwargs))
if wrapper.__name__[:2] == 'a_':
wrapper.__name__ = wrapper.__name__[2:]
return wrapper
Factor out common components
Refactor out the synchronous parts, into generators, context managers, utility functions, etc.
For your specific example, pulling out the for
loop into a separate generator would minimise the duplicated code to the way the two versions sleep:
class UselessExample:
def __init__(self, delay):
self.delay = delay
def _ticker_gen(self, to):
yield from range(to)
async def a_ticker(self, to):
for i in self._ticker_gen(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
for i in self._ticker_gen(to):
yield i
sleep(self.delay)
While this doesn't make much of any difference here it can work in other contexts.
Abstract Syntax Tree tranformation
Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep()
vs time.sleep()
:
import inspect
import ast
import copy
import textwrap
import time
asynciomap = {
# asyncio function to (additional globals, replacement source) tuples
"sleep": ({"time": time}, "time.sleep")
}
class AsyncToSync(ast.NodeTransformer):
def __init__(self):
self.globals = {}
def visit_AsyncFunctionDef(self, node):
return ast.copy_location(
ast.FunctionDef(
node.name,
self.visit(node.args),
[self.visit(stmt) for stmt in node.body],
[self.visit(stmt) for stmt in node.decorator_list],
node.returns and ast.visit(node.returns),
),
node,
)
def visit_Await(self, node):
return self.visit(node.value)
def visit_Attribute(self, node):
if (
isinstance(node.value, ast.Name)
and isinstance(node.value.ctx, ast.Load)
and node.value.id == "asyncio"
and node.attr in asynciomap
):
g, replacement = asynciomap[node.attr]
self.globals.update(g)
return ast.copy_location(
ast.parse(replacement, mode="eval").body,
node
)
return node
def transform_sync(f):
filename = inspect.getfile(f)
lines, lineno = inspect.getsourcelines(f)
ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
ast.increment_lineno(ast_tree, lineno - 1)
transformer = AsyncToSync()
transformer.visit(ast_tree)
tranformed_globals = {**f.__globals__, **transformer.globals}
exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
return tranformed_globals[f.__name__]
While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:
>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../example.py", line 32, in main
func(ue)
File "/.../example.py", line 21, in func
for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4
async.run
will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time). – Afford