You need some other code in the thread running the server to receive a signal from the main thread and shut itself down.
Fortunately, due to asyncio nature, this control can be built in a separate function, without interfering at all with the function implementing the server itself. Only the function that creates the loop and calls the server task have to arrange for some code that will check for this signal from the other thread to arrive, in another task - asyncio will take care that both tasks run in turn.
The proper way to communicate across threads is to use a queue - though in this case, even a module level (global) variable would work. Note that even though there are "asyncio queues" - in this case we want to send a message from one thread to another, and there are no two async tasks trying to read it in parallel, so we use the "traditional" multi-threading Queue
class in the queue
module.
Also, not related, but I changed the code starting the asyncio loop to the new way, using asyncio.run
, without all the boilerplate that was needed in the first Python versions featuring asyncio.
import asyncio
import threading
import pytest
import websockets
from queue import Queue, Empty
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def websocket_server():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
async def coordinate(q):
server = asyncio.create_task(websocket_server())
while True:
await asyncio.sleep(0) # this is necessary to allow the asyncio loop to switch tasks.
try:
q.get_nowait()
except Empty:
pass
else: # block will run whenever there is _any_ message in the queue.
server.cancel()
return
server.cancel()
def _run_server(q):
asyncio.run(coordinate(q))
@pytest.fixture
def run_server():
command = Queue()
thread = threading.Thread(target=_run_server, args=(command,))
thread.start()
yield thread
command.put("quit")
thread.join()
@pytest.mark.asyncio
async def test_websocket(run_server):
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello!")
response = await websocket.recv()
assert response == "Hello!"
A second method, without the need for the message-monitoring code in the server thread, is simply to make a call to cancel the server task from the thread running the tests.
Asyncio has a prevision for that in the call loop.call_soon_threadsafe
- we just need a reference to the loop and the server task (so we can get its .cancel
method) in the original thread - which can be done with module level (global) variables. The "run_server" function won't return, so the global variables are needed as their values can be checked in the parent thread as soon as they are set. Otherwise, if you don't want to resort to these due to their global state the threading queue could be used to post the "loop" and "server" objects from the child thread to the fixture code, just as well. Using global variables would prevent the tests from running in parallel properly.
import asyncio
import threading
import pytest
import websockets
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def websocket_server():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
def _run_server():
global loop, server
loop = asyncio.new_event_loop()
server = loop.create_task(websocket_server())
try:
loop.run_until_complete(server)
except asyncio.CancelledError:
pass
loop.close()
@pytest.fixture
def run_server():
thread = threading.Thread(target=_run_server)
thread.start()
yield thread
loop.call_soon_threadsafe(server.cancel)
thread.join()
@pytest.mark.asyncio
async def test_websocket(run_server):
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello!")
response = await websocket.recv()
assert response == "Hello!"
This time around we need an explicit reference to the asyncio loop object itself, so instead of calling asyncio.run
, we do the "create_loop", "run_until_complete" calls.
(Thanks for providing the complete, self-contained, executable, minimal example - without which I would not had spent time with this question)