How to handle a bidirectional grpc stream asynchronously
Asked Answered
P

1

5

I have a game or for that matter any remote user interface with a server and multiple clients which should communicate via network. Both client and server should be able to send updates asynchronously.

This seems to be a very natural service definition, which let's grpc manage sessions.

syntax = "proto3";

package mygame;

service Game {
    rpc participate(stream ClientRequest) returns (ServerResponse);
}

message ClientRequest {
    // Fields for the initial request and further updates
}

message ServerResponse {
    // Game updates
}

Implementing the client is trivial (although the following code is obviously incomplete and simplified).

class Client:
    def __init__(self):
        self.channel = grpc.insecure_channel("localhost:50051")
        self.stub = game_pb2_grpc.GameStub(channel)
        self.output_queue = queue.Queue()

    def output_iter(self):
        while True:
            client_output_msg = self.output_queue.get()
            self.output_queue.task_done()
            yield client_output_msg

    def do_work(self):
        for response in self.stub.participate(self.output_iter()):
            print(response)  # handle update


with grpc.insecure_channel("localhost:50051") as channel:
    client = Client()
    client.do_work()

What seems hard is implementing the server without blocking.

class Game(game_pb2_grpc.GameServicer):
    def __init__(self):
        self.pending_events = queue.Queue()

    def participate(self, request_iter, context):
        for client_update in request_iter:
            print(client_update)
            # !!!
            # The next bit won't happen if the client has no updates
            # !!!
            try:
                while True:
                    server_update = self.pending_events.get_nowait()
                    yield server_update
            except queue.Empty:
                pass

server = grpc.server(ThreadPoolExecutor(max_workers=100))
game_pb2_grpc.add_GameServicer_to_server(Game(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()

As commented in the code, the client won't receive updates if it doesn't constantly send requests. Maybe a async approach would be better, which might also solve other problems in this design.

PS: This issue has been solved with grpc in go here, however i don't see how to translate this to pythons grpc implementations.

I would be very happy about any help!

Plattdeutsch answered 18/12, 2020 at 21:1 Comment(0)
P
7

I was finally able to get it working using the python asynio api. The basic idea is to decouple read and write into two coroutines using asyncio.create_task. For anybody interested, here is a solution.

class Game(game_pb2_grpc.GameServicer):
    async def read_client_requests(self, request_iter):
        async for client_update in request_iter:
            print("Recieved message from client:", client_update, end="")

    async def write_server_responses(self, context):
        for i in range(15):
            await context.write(game_pb2.ServerResponse(dummy_value=str(i)))
            await asyncio.sleep(0.5)

    async def participate(self, request_iter, context):
        read_task = asyncio.create_task(self.read_client_requests(request_iter))
        write_task = asyncio.create_task(self.write_server_responses(context))

        await read_task
        await write_task


async def serve():
    server = grpc.aio.server()
    game_pb2_grpc.add_GameServicer_to_server(Game(), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()


if __name__ == "__main__":
    asyncio.run(serve())

Note that instead of the write coroutine, a yield would also be sufficient.

Plattdeutsch answered 19/12, 2020 at 20:28 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.