handling async streaming request in grpc python
Asked Answered
C

2

7

I am trying to understand how to handle a grpc api with bidirectional streaming (using the Python API).

Say I have the following simple server definition:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

Say that the messages that will be sent from the client come asynchronously ( as a consequence of user selecting some ui elements).

The generated python stub for the client will contain a method Translate that will accept a generator function and will return an iterator.

What is not clear to me is how would I write the generator function that will return messages as they are created by the user. Sleeping on the thread while waiting for messages doesn't sound like the best solution.

Corinnacorinne answered 6/3, 2019 at 17:50 Comment(0)
S
7

This is a bit clunky right now, but you can accomplish your use case as follows:

#!/usr/bin/env python

from __future__ import print_function

import time
import random
import collections
import threading

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc

from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server


def translate_next(msg):
    return ''.join(reversed(msg))


class Translator(TestServiceServicer):
  def Translate(self, request_iterator, context):
    for req in request_iterator:
      print("Translating message: {}".format(req.msg))
      yield Msg(msg=translate_next(req.msg))

class TranslatorClient(object):
  def __init__(self):
    self._stop_event = threading.Event()
    self._request_condition = threading.Condition()
    self._response_condition = threading.Condition()
    self._requests = collections.deque()
    self._last_request = None
    self._expected_responses = collections.deque()
    self._responses = {}

  def _next(self):
    with self._request_condition:
      while not self._requests and not self._stop_event.is_set():
        self._request_condition.wait()
      if len(self._requests) > 0:
        return self._requests.popleft()
      else:
        raise StopIteration()

  def next(self):
    return self._next()

  def __next__(self):
    return self._next()

  def add_response(self, response):
    with self._response_condition:
      request = self._expected_responses.popleft()
      self._responses[request] = response
      self._response_condition.notify_all()

  def add_request(self, request):
    with self._request_condition:
      self._requests.append(request)
      with self._response_condition:
        self._expected_responses.append(request.msg)
      self._request_condition.notify()

  def close(self):
    self._stop_event.set()
    with self._request_condition:
      self._request_condition.notify()

  def translate(self, to_translate):
    self.add_request(to_translate)
    with self._response_condition:
      while True:
        self._response_condition.wait()
        if to_translate.msg in self._responses:
          return self._responses[to_translate.msg]


def _run_client(address, translator_client):
  with grpc.insecure_channel('localhost:50054') as channel:
    stub = TestServiceStub(channel)
    responses = stub.Translate(translator_client)
    for resp in responses:
      translator_client.add_response(resp)

def main():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_TestServiceServicer_to_server(Translator(), server)
  server.add_insecure_port('[::]:50054')
  server.start()
  translator_client = TranslatorClient()
  client_thread = threading.Thread(
      target=_run_client, args=('localhost:50054', translator_client))
  client_thread.start()

  def _translate(to_translate):
    return translator_client.translate(Msg(msg=to_translate)).msg

  translator_pool = futures.ThreadPoolExecutor(max_workers=4)
  to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
  translations = translator_pool.map(_translate, to_translate)
  print("Translations: {}".format(zip(to_translate, translations)))

  translator_client.close()
  client_thread.join()
  server.stop(None)


if __name__ == "__main__":
  main()

The basic idea is to have an object called TranslatorClient running on a separate thread, correlating requests and responses. It expects that responses will return in the order that requests were sent out. It also implements the iterator interface so that you can pass it directly to an invocation of the Translate method on your stub.

We spin up a thread running _run_client which pulls responses out of TranslatorClient and feeds them back in the other end with add_response.

The main function I included here is really just a strawman since I don't have the particulars of your UI code. I'm running _translate in a ThreadPoolExecutor to demonstrate that, even though translator_client.translate is synchronous, it yields, allowing you to have multiple in-flight requests at once.

We recognize that this is a lot of code to write for such a simple use case. Ultimately, the answer will be asyncio support. We have plans for this in the not-too-distant future. But for the moment, this sort of solution should keep you going whether you're running python 2 or python 3.

Sectionalism answered 6/3, 2019 at 21:48 Comment(1)
Great answer !! thank you very much. In my actual case I don't really need to synchronise the output and the input (don't need to know what generated the output) so that simplifies my code quite a bit. Looking forward to having asyncio support in grpc.Corinnacorinne
S
0

I think another option can be using gRPC Asyncio API (also for better performance as it is stated in Performance best practices). You can create two asynchronous task/coroutine for request and response, and execute them concurrently. Here is an example code that applies this approach, similar to @Richard's example but with grpc.aio.


import asyncio

event = asyncio.Event()

async def main():

    async def request(event):
        
        # Request generator
        async def request_():
        
            # Star with first request
            yield Msg(msg=msg)
                            
            while True:
                # Wait for receiving response from server
                await event.wait()
                event.clear()

                # Send another request
                yield Msg(msg=msg)
                
        # Request
        state.iterator = stub.Translate(
                    request_()
                )

    async def receive(event):
        async for answer in state.iterator:
            
            # Let other task to send another request
            event.set()
            # Here is the answer from server
            answer = answer  
            
                        
                    
    # Shared state between two coroutines
    state = type('', (), {})()

    # Start tasks
    await asyncio.gather(request(event), receive(event))

Seftton answered 25/8, 2023 at 7:32 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.