Duplication of code for synchronous and asynchronous implementations
Asked Answered
P

3

53

When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.

Just as an example, consider:

from time import sleep
import asyncio


class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in range(to):
            yield i
            sleep(self.delay)


def func(ue):
    for value in ue.ticker(5):
        print(value)


async def a_func(ue):
    async for value in ue.a_ticker(5):
        print(value)


def main():
    ue = UselessExample(1)
    func(ue)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(a_func(ue))


if __name__ == '__main__':
    main()

In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).

Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?

Pammy answered 14/3, 2019 at 0:4 Comment(0)
K
31

There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.

Pick and choose from a series of tools:

Synchronous versions using asyncio.run()

Provide synchronous wrappers around coroutines, which block until the coroutine completes.

Even an async generator function such as ticker() can be handled this way, in a loop:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        agen = self.a_ticker(to)
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return

These synchronous wrappers can be generated with helper functions:

from functools import wraps

def sync_agen_method(agen_method):
    @wraps(agen_method)
    def wrapper(self, *args, **kwargs):
        agen = agen_method(self, *args, **kwargs)   
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper
    

then just use ticker = sync_agen_method(a_ticker) in the class definition.

Straight-up coroutine methods (not generator coroutines) could be wrapped with:

def sync_method(async_method):
    @wraps(async_method)
    def wrapper(self, *args, **kwargs):
        return async.run(async_method(self, *args, **kwargs))
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper

Factor out common components

Refactor out the synchronous parts, into generators, context managers, utility functions, etc.

For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    def _ticker_gen(self, to):
        yield from range(to)

    async def a_ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            sleep(self.delay)

While this doesn't make much of any difference here it can work in other contexts.

Abstract Syntax Tree tranformation

Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():

import inspect
import ast
import copy
import textwrap
import time

asynciomap = {
    # asyncio function to (additional globals, replacement source) tuples
    "sleep": ({"time": time}, "time.sleep")
}


class AsyncToSync(ast.NodeTransformer):
    def __init__(self):
        self.globals = {}

    def visit_AsyncFunctionDef(self, node):
        return ast.copy_location(
            ast.FunctionDef(
                node.name,
                self.visit(node.args),
                [self.visit(stmt) for stmt in node.body],
                [self.visit(stmt) for stmt in node.decorator_list],
                node.returns and ast.visit(node.returns),
            ),
            node,
        )

    def visit_Await(self, node):
        return self.visit(node.value)

    def visit_Attribute(self, node):
        if (
            isinstance(node.value, ast.Name)
            and isinstance(node.value.ctx, ast.Load)
            and node.value.id == "asyncio"
            and node.attr in asynciomap
        ):
            g, replacement = asynciomap[node.attr]
            self.globals.update(g)
            return ast.copy_location(
                ast.parse(replacement, mode="eval").body,
                node
            )
        return node


def transform_sync(f):
    filename = inspect.getfile(f)
    lines, lineno = inspect.getsourcelines(f)
    ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
    ast.increment_lineno(ast_tree, lineno - 1)

    transformer = AsyncToSync()
    transformer.visit(ast_tree)
    tranformed_globals = {**f.__globals__, **transformer.globals}
    exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
    return tranformed_globals[f.__name__]

While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:

>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../example.py", line 32, in main
    func(ue)
  File "/.../example.py", line 21, in func
    for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4
Keg answered 26/3, 2019 at 20:11 Comment(6)
The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).Afford
Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.Pammy
@gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.Keg
@MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.Afford
@gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.Keg
Cool to see someone else explaining example #1. I use this in my scenario since the people who need the sync version for my use case are generally not using asyncio at all.Ula
U
3

async/await is infectious by design.

Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.

Publish separate libraries

For example, compare aiohttp vs. aiohttp-requests vs. requests.

Likewise, compare asyncpg vs. psycopg2.

How to get there

Opt1. (easy) clone implementation, allow them to diverge.

Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.

Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .

On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)

On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.

For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/

Ultramicroscopic answered 1/4, 2019 at 1:37 Comment(3)
I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).Pammy
"Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program." There's no point in async if you have only a single task, so the natural equivalent is a threaded sync program. So consider how hard it is to force the test framework to evaluate possible concurrent execution orders in a threaded sync program! Async has defined places that it can change task, and these can be controlled via mocking and async locks. With sync, well, it's actually impossible.Patronymic
👆these researchers beg to differ: researchgate.net/publication/… TL;DR a tool that hijacks thread scheduling to run a test case under any (many) possible orderings, for sync programs. My point was about pure libraries, where an explicit event order test harness is easier to create than for an async library.Ultramicroscopic
F
2

I want to give an additional approach, which I ended up using.

My use case was to implement a web service API SDK that could be used synchronously and asynchronously on the fly, i.e. without committing to one beforehand, and if possible, even with the ability to use sync calls from an async environment and the other way around.

The accepted answer's solutions work in most cases. However, each approach has its downsides:

  • asyncio.run(): This way the sync methods can not be called from an async environment, because nested event loops are forbidden. nest_asyncio exists, but has non-documented trade-offs. Furthermore, some libraries' objects can not run across multiple event loops, which was the case for me as I was using httpx.
  • Factor our common components: This solution needs three methods for a single "functionality": The actual implementation, the sync wrapper and the async wrapper. I was planning to implement a lot of API endpoints, which would result in the code being mostly boilerplate. Furthermore, in order to supply docstrings the documentation would need to be duplicated two or three times.
  • Abstract Syntax Tree transformation: This makes static analysis infeasible and your IDE won't be able to give code completion or type hints anymore.

My solution is close to the "factor out common components", but with the distinct advantage of not having to implement three methods for each endpoint.

The gist of the implementation is the following:

class Api:

    def __init__(self):
        self.sync_client = httpx.Client()
        self.async_client = httpx.AsyncClient()

    def _prepare_request(self, method: str, url: str):
        return PreparedRequest(self.sync_client, self.async_client, method, url)

    def some_endpoint(self):
        return self._prepare_request("POST", "https://httpbin.org/post")

    def some_other_endpoint(self):
        return self._prepare_request("GET", "https://httpbin.org/get")
class PreparedRequest:

    def __init__(self, sync_client: httpx.Client, async_client: httpx.AsyncClient, method: str, url: str):
        self.sync_client = sync_client
        self.async_client = async_client
        self.method = method
        self.url = url

    def send_sync(self):
        return self.sync_client.request(self.method, self.url)

    async def send_async(self):
        return await self.async_client.request(self.method, self.url)

Here, the Api class only prepares requests, which themselves implement the sending. All PreparedRequests get a shared reference to the sync and async clients which are managed by the Api.

It is then used in the following way:

async def main():
    api = Api()

    await api.some_endpoint().send_async()
    await api.some_other_endpoint().send_async()
    api.some_endpoint().send_sync()  # you can also make sync calls here


if __name__ == '__main__':
    asyncio.run(main())

As you can see, there is no code duplication, minimal boilerplate, async and sync calls available on the same object, and the ability to cross-run sync/async functions in sync/async environments.

This solution has additional advantages:

  • The send_sync() and send_async() methods can accept additional parameters that control the "way" the request should be sent. Also they can have completely different implementations, which may be necessary in some cases.
  • It is possible to add a layer of abstraction that wraps both clients into one. This may then be extended by the user to allow the usage of their own requests library.
Frizzell answered 2/11, 2023 at 10:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.