How do i handle streaming messages with Python gRPC
Asked Answered
S

2

13

I'm following this Route_Guide sample.

The sample in question fires off and reads messages without replying to a specific message. The latter is what i'm trying to achieve.

Here's what i have so far:

import grpc
...

channel = grpc.insecure_channel(conn_str)
try:
    grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
    sys.exit('Error connecting to server')
else:
    stub = MyService_pb2_grpc.MyServiceStub(channel)
    print('Connected to gRPC server.')
    this_is_just_read_maybe(stub)


def this_is_just_read_maybe(stub):
    responses = stub.MyEventStream(stream())
    for response in responses:
        print(f'Received message: {response}')
        if response.something:
            # okay, now what? how do i send a message here?

def stream():
    yield my_start_stream_msg
    # this is fine, i receive this server-side
    # but i can't check for incoming messages here

I don't seem to have a read() or write() on the stub, everything seems to be implemented with iterators.

How do i send a message from this_is_just_read_maybe(stub)? Is that even the right approach?

My Proto is a bidirectional stream:

service MyService {
  rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}
Seller answered 15/12, 2017 at 11:56 Comment(0)
K
13

What you're trying to do is perfectly possible and will probably involve writing your own request iterator object that can be given responses as they arrive rather than using a simple generator as your request iterator. Perhaps something like

class MySmarterRequestIterator(object):

    def __init__(self):
        self._lock = threading.Lock()
        self._responses_so_far = []

    def __iter__(self):
        return self

    def _next(self):
        # some logic that depends upon what responses have been seen
        # before returning the next request message
        return <your message value>

    def __next__(self):  # Python 3
        return self._next()

    def next(self):  # Python 2
        return self._next()

    def add_response(self, response):
        with self._lock:
            self._responses.append(response)

that you then use like

my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
    my_smarter_request_iterator.add_response(response)

. There will probably be locking and blocking in your _next implementation to handle the situation of gRPC Python asking your object for the next request that it wants to send and your responding (in effect) "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out".

Kodok answered 21/12, 2017 at 21:42 Comment(3)
Thanks Nathaniel, i'll try that out. Any particular reason Python gRPC doesn't expose .send() and .recv() primitives, like the Golang implementation does, and pretty much all the others do?Seller
At the time the API was being designed it was viewed as a bit of an edge case for Python use cases, so we went with what's most common being what's most straightforward and what's less common being possible with just a little more work.Kodok
The thing is, how scalable is this solution when serving bidirectional connections to many clients with all these locks?Mummy
N
17

Instead of writing a custom iterator, you can also use a blocking queue to implement send and receive like behaviour for client stub:

import queue
...

send_queue = queue.SimpleQueue()  # or Queue if using Python before 3.7
my_event_stream = stub.MyEventStream(iter(send_queue.get, None))

# send
send_queue.put(StreamingMessage())

# receive
response = next(my_event_stream)  # type: StreamingMessage

This makes use of the sentinel form of iter, which converts a regular function into an iterator that stops when it reaches a sentinel value (in this case None).

Nut answered 22/10, 2019 at 22:12 Comment(2)
If send_queue is a consumer, will the stub.MyEventStream keep iterating send_queue after send_queue reaches empty and then get values ?Landgrave
it should be "put" instead of "push"Dowell
K
13

What you're trying to do is perfectly possible and will probably involve writing your own request iterator object that can be given responses as they arrive rather than using a simple generator as your request iterator. Perhaps something like

class MySmarterRequestIterator(object):

    def __init__(self):
        self._lock = threading.Lock()
        self._responses_so_far = []

    def __iter__(self):
        return self

    def _next(self):
        # some logic that depends upon what responses have been seen
        # before returning the next request message
        return <your message value>

    def __next__(self):  # Python 3
        return self._next()

    def next(self):  # Python 2
        return self._next()

    def add_response(self, response):
        with self._lock:
            self._responses.append(response)

that you then use like

my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
    my_smarter_request_iterator.add_response(response)

. There will probably be locking and blocking in your _next implementation to handle the situation of gRPC Python asking your object for the next request that it wants to send and your responding (in effect) "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out".

Kodok answered 21/12, 2017 at 21:42 Comment(3)
Thanks Nathaniel, i'll try that out. Any particular reason Python gRPC doesn't expose .send() and .recv() primitives, like the Golang implementation does, and pretty much all the others do?Seller
At the time the API was being designed it was viewed as a bit of an edge case for Python use cases, so we went with what's most common being what's most straightforward and what's less common being possible with just a little more work.Kodok
The thing is, how scalable is this solution when serving bidirectional connections to many clients with all these locks?Mummy

© 2022 - 2024 — McMap. All rights reserved.