grpc asynchronous bi-directional server (Java/Python)
Asked Answered
J

1

6

Here is my scenario. The grpc server is an asynchronous sever which subscribes data from other source. Also it provides a subscribe function to its client, thus it could push the data to the grpc client once it receives data from other source. The server is implemented in Java.

@Override
public StreamObserver<Message> subscribe (
        StreamObserver<Message> responseObserver) {
    return new StreamObserver<Message>() {

        @Override
        public void onNext(Message message) {
            api.subscribe(message.value, Message -> {
                synchronized (responseObserver) {
                    ...
                    ...
                    // get data from other source

                    responseObserver.onNext(Converter.create().toProtobuf(Message.class, data));
                }
            });
        }

        @Override
        public void onError(Throwable t) {
            log.warn("Encountered error in sub", t);
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}

I wanna use python to implement a grpc client to subscribe from this server. However, it appears weird once python subscribe data, it immediately shutdown without waiting for the asynchronous return from Java server. However, the Java client could run forever and waiting for the asynchronous data from the server.

Proto

message Message{
    string value = 1;
}
service test {
    rpc subscribe(stream Message) returns (stream Message) {}
}

The Python client Code(not working)

def gen_message():
    yield test.Message(value="2")

def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = test_grpc.MessengerStub(channel)

    stream = stub.subscribe(gen_message())
    try:
        for e in stream:
            print(e)
    except grpc._channel._Rendezvous as err:
        print(err)

run()

The Java Code(working)

StreamObserver<Message> requestObserver = stub.subscribe(new StreamObserver<Message>() {
    @Override
    public void onNext(Message message) {
        System.out.println(message)
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onCompleted() {

    }
});
Message message = Message.newBuilder().build();
requestObserver.onNext(message);

I have been got confused. How to implement the same feature in python client? Thanks~

p.s. If the server is an while True server other than asynchronous server, the python client works. I suspects that the python client does not know the "asynchronous" server and once its stream has no new data it close the connection.

Jamila answered 1/8, 2018 at 1:20 Comment(1)
I'm facing the same issue. Did you solve the problem?Paisa
T
1

this is a "bug" feature for Python API, it will automatically add an onComplete message when sending requests to the server once the request_iterator is exhausted. in your case to fix it you could change the request_iterator to a queue, and create an infinite generator based on the queue. if you also own the Java serverside, you could ignore the onComplete message which also works.

Thorlie answered 21/2, 2024 at 16:15 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.