python: proper way to run an async routine in a pytest fixture?
Asked Answered
A

3

6

The test below passes, but I have doubts that I am using asyncio correctly:

  • The code mixes asyncio and threading
  • The test is passing but never exits (probably because the "loop.run_until_complete" never ends)
import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(websocket_server())
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    # no idea how to stop the loop here
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

(note: for stopping the loop I attempted the solution proposed here (How to stop websocket server created with websockets.serve()?) but this resulted in the server not starting)

Amphicoelous answered 16/6, 2023 at 9:0 Comment(2)
Would you mind changing the question to "proper way to cancel an asyncio server running in other thread" or something like that ? That answer is more generic than test-case scenarios.Fournier
@Amphicoelous Have you seen my (simpler) answer?Genevagenevan
F
3

You need some other code in the thread running the server to receive a signal from the main thread and shut itself down.

Fortunately, due to asyncio nature, this control can be built in a separate function, without interfering at all with the function implementing the server itself. Only the function that creates the loop and calls the server task have to arrange for some code that will check for this signal from the other thread to arrive, in another task - asyncio will take care that both tasks run in turn.

The proper way to communicate across threads is to use a queue - though in this case, even a module level (global) variable would work. Note that even though there are "asyncio queues" - in this case we want to send a message from one thread to another, and there are no two async tasks trying to read it in parallel, so we use the "traditional" multi-threading Queue class in the queue module.

Also, not related, but I changed the code starting the asyncio loop to the new way, using asyncio.run, without all the boilerplate that was needed in the first Python versions featuring asyncio.

import asyncio
import threading
import pytest
import websockets
from queue import Queue, Empty

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

async def coordinate(q):
    server = asyncio.create_task(websocket_server())
    while True:
        await asyncio.sleep(0)  # this is necessary to allow the asyncio loop to switch tasks.
        try:
            q.get_nowait()
        except Empty:
            pass
        else:  # block will run whenever there is _any_ message in the queue.
            server.cancel()
            return
    server.cancel()


def _run_server(q):
    asyncio.run(coordinate(q))

@pytest.fixture
def run_server():
    command  = Queue()
    thread = threading.Thread(target=_run_server, args=(command,))
    thread.start()
    yield thread
    command.put("quit")
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

A second method, without the need for the message-monitoring code in the server thread, is simply to make a call to cancel the server task from the thread running the tests. Asyncio has a prevision for that in the call loop.call_soon_threadsafe - we just need a reference to the loop and the server task (so we can get its .cancel method) in the original thread - which can be done with module level (global) variables. The "run_server" function won't return, so the global variables are needed as their values can be checked in the parent thread as soon as they are set. Otherwise, if you don't want to resort to these due to their global state the threading queue could be used to post the "loop" and "server" objects from the child thread to the fixture code, just as well. Using global variables would prevent the tests from running in parallel properly.

import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    global loop, server
    loop = asyncio.new_event_loop()
    server = loop.create_task(websocket_server())
    try:
        loop.run_until_complete(server)
    except asyncio.CancelledError:
        pass
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    loop.call_soon_threadsafe(server.cancel)
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

This time around we need an explicit reference to the asyncio loop object itself, so instead of calling asyncio.run, we do the "create_loop", "run_until_complete" calls.

(Thanks for providing the complete, self-contained, executable, minimal example - without which I would not had spent time with this question)

Fournier answered 20/6, 2023 at 18:29 Comment(1)
I think it would be worth it to say that, at this level of complexity, using an encapsulating object/class would be a good idea.Arcature
O
0

A simple way to fix this is to force the thread to terminate after a timeout.

import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(websocket_server())
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    # no idea how to stop the loop here
    thread.join(timeout=3) # <--- set timeout for thread so it doesn't run forever.


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

Not super elegant but indeed if the server is running, the test will never terminate until the fixture terminates

Octogenarian answered 20/6, 2023 at 16:44 Comment(0)
G
0

You don't need threads for this. Use the pytest-asyncio plugin.

import pytest
import pytest_asyncio
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

@pytest_asyncio.fixture
async def run_server():
    async with websockets.serve(echo, "localhost", 8765):
        yield

@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"
Genevagenevan answered 23/6, 2023 at 14:34 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.