To follow on from this, I was tweaking with Motiejus Jakštys's solution to get it to work with asynchronous code (and with pytest) and this is what I came up with. I thought I'd share it in case it's useful to anyone. I'll also include the synchronous version to give a side-by-side comparison.
Overview
- If your gRPC server is ran in a synchronous way, you actually can just start the server then run your test, as
server.start()
is non-blocking.
- If your gRPC is asynchronous, it's similar, but you have to wrap your
server.start()
in an asyncio.gather()
for reasons I do not know. (Because of this, it's probably easier to write a separate composition root for your tests (e.g. create your own servicer and server), rather than trying to reuse anything from your application.)
Synchronous
@pytest.fixture(scope="function")
def stub():
servicer = Servicer()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_Servicer_to_server(servicer, server)
port = server.add_insecure_port("[::]:0")
server.start()
try:
with grpc.insecure_channel(f"localhost:{port}") as channel:
yield pb2_grpc.Stub(channel)
finally:
server.stop(None)
def test_request(stub):
request: pb2.Request = pb2.Request()
response: pb2.Response = stub.Endpoint(request)
assert response.value == 1
Asynchronous
Basically the same as above, but use:
grpc.aio.server
grpc.aio.insecure_channel
- Don't use
grpc.aio.insecure_channel
in a with statement - for some reason it doesn't work
- Replace
server.start()
with asyncio.gather(server.start())
- I'm not sure why this works, but it was the only solution I found. (This also this means the function that calls this line should still be synchronous, as you do not use an await
.)
NB: I would suggest setting asyncio_mode = auto
in your configuration file. When I tried to use the pytest.fixture
and pytest.mark.asyncio
decorators I had issues with receiving an async_generator object in my test.
This gives us:
@pytest.fixture(scope="function")
def stub():
servicer = Servicer()
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_Servicer_to_server(servicer, server)
port = server.add_insecure_port("[::]:0")
asyncio.gather(server.start())
try:
channel = grpc.aio.insecure_channel(f"localhost:{port}")
yield pb2_grpc.Stub(channel)
finally:
server.stop(None)
async def test_request(stub):
request: pb2.Request = pb2.Request()
response: pb2.Response = await stub.Endpoint(request)
assert response.value == 1