How to write unit tests for your GRPC server in Python?
Asked Answered
E

5

12

I would like to use Python unittest to write tests for my GRPC server implementation. I have found grpcio-testing package but I could not find any documentation how to use this.

Let's say that I have the following server:

import helloworld_pb2
import helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):    
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

How do I create an unit test to call SayHello and check the response?

Eberhard answered 10/8, 2018 at 18:57 Comment(4)
Use the code as documentation - python libraries are mostly python code you can read. This question is off-topic here, as you have to provide some code attempt and ask a specific question.Various
Code has no documentation. See here. I added code for the server.Eberhard
@Eberhard I think that nosklo is suggesting that you dig into the code for grpcio-testing to figure out how to use it.Besmirch
Yes, I have done that and I could not figure it out. So I came to this great site for a bit of help. Probably any open source programming question on this site could be answered with "read the code and figure it out". I do not think this site exists for such answers.Eberhard
E
13

You can start a real server When setUp and stop the server when tearDown.

import unittest
from concurrent import futures


class RPCGreeterServerTest(unittest.TestCase):
    server_class = Greeter
    port = 50051

    def setUp(self):
        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

        helloworld_pb2_grpc.add_GreeterServicer_to_server(self.server_class(), self.server)
        self.server.add_insecure_port(f'[::]:{self.port}')
        self.server.start()

    def tearDown(self):
        self.server.stop(None)

    def test_server(self):
        with grpc.insecure_channel(f'localhost:{self.port}') as channel:
            stub = helloworld_pb2_grpc.GreeterStub(channel)
            response = stub.SayHello(helloworld_pb2.HelloRequest(name='Jack'))
        self.assertEqual(response.message, 'Hello, Jack!')
Excide answered 12/3, 2019 at 3:39 Comment(0)
M
12

I took J.C's idea and expanded it to be able to create a fake server (mock) for each test case. Also, bind on port 0 to avoid port conflicts:

@contextmanager
def helloworld(cls):
    """Instantiate a helloworld server and return a stub for use in tests"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(cls(), server)
    port = server.add_insecure_port('[::]:0')
    server.start()

    try:
        with grpc.insecure_channel('localhost:%d' % port) as channel:
            yield helloworld_pb2_grpc.GreeterStub(channel)
    finally:
        server.stop(None)


class HelloWorldTest(unittest.TestCase):
    def test_hello_name(self):
        # may do something extra for this mock if it's stateful
        class FakeHelloworld(helloworld_pb2_grpc.GreeterServicer):
            def SayHello(self, request, context):
                return helloworld_pb2.SayHelloResponse()

        with helloworld(Fakehelloworld) as stub:
            response = stub.SayHello(helloworld_pb2.HelloRequest(name='Jack'))
            self.assertEqual(response.message, 'Hello, Jack!')
Maxillary answered 3/4, 2019 at 16:40 Comment(0)
P
1

You can give pytest-grpc a try.

If you are using Django, you can have a look at django-grpc-framework testing.

Palstave answered 26/5, 2020 at 3:19 Comment(0)
A
0

There is inline API docstrings on the code elements that you can use. There's an issue filed to host it on grpc.io in a nice format: https://github.com/grpc/grpc/issues/13340

Addis answered 15/8, 2018 at 17:50 Comment(2)
Yes, but I would really appreciate some example or something.Eberhard
Please file an issue on GitHub.Addis
P
0

To follow on from this, I was tweaking with Motiejus Jakštys's solution to get it to work with asynchronous code (and with pytest) and this is what I came up with. I thought I'd share it in case it's useful to anyone. I'll also include the synchronous version to give a side-by-side comparison.

Overview

  • If your gRPC server is ran in a synchronous way, you actually can just start the server then run your test, as server.start() is non-blocking.
  • If your gRPC is asynchronous, it's similar, but you have to wrap your server.start() in an asyncio.gather() for reasons I do not know. (Because of this, it's probably easier to write a separate composition root for your tests (e.g. create your own servicer and server), rather than trying to reuse anything from your application.)

Synchronous

@pytest.fixture(scope="function")
def stub():
    servicer = Servicer()
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_Servicer_to_server(servicer, server)
    port = server.add_insecure_port("[::]:0")
    server.start()

    try:
        with grpc.insecure_channel(f"localhost:{port}") as channel:
            yield pb2_grpc.Stub(channel)
    finally:
        server.stop(None)


def test_request(stub):
    request: pb2.Request = pb2.Request()
    response: pb2.Response = stub.Endpoint(request)
    assert response.value == 1

Asynchronous

Basically the same as above, but use:

  • grpc.aio.server
  • grpc.aio.insecure_channel
  • Don't use grpc.aio.insecure_channel in a with statement - for some reason it doesn't work
  • Replace server.start() with asyncio.gather(server.start()) - I'm not sure why this works, but it was the only solution I found. (This also this means the function that calls this line should still be synchronous, as you do not use an await.)

NB: I would suggest setting asyncio_mode = auto in your configuration file. When I tried to use the pytest.fixture and pytest.mark.asyncio decorators I had issues with receiving an async_generator object in my test.

This gives us:

@pytest.fixture(scope="function")
def stub():
    servicer = Servicer()
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_Servicer_to_server(servicer, server)
    port = server.add_insecure_port("[::]:0")
    asyncio.gather(server.start())

    try:
        channel = grpc.aio.insecure_channel(f"localhost:{port}")
        yield pb2_grpc.Stub(channel)
    finally:
        server.stop(None)


async def test_request(stub):
    request: pb2.Request = pb2.Request()
    response: pb2.Response = await stub.Endpoint(request)
    assert response.value == 1
Paulpaula answered 14/12, 2023 at 16:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.