Here is my scenario. The grpc server is an asynchronous sever which subscribes data from other source. Also it provides a subscribe function to its client, thus it could push the data to the grpc client once it receives data from other source. The server is implemented in Java.
@Override
public StreamObserver<Message> subscribe (
StreamObserver<Message> responseObserver) {
return new StreamObserver<Message>() {
@Override
public void onNext(Message message) {
api.subscribe(message.value, Message -> {
synchronized (responseObserver) {
...
...
// get data from other source
responseObserver.onNext(Converter.create().toProtobuf(Message.class, data));
}
});
}
@Override
public void onError(Throwable t) {
log.warn("Encountered error in sub", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
I wanna use python to implement a grpc client to subscribe from this server. However, it appears weird once python subscribe data, it immediately shutdown without waiting for the asynchronous return from Java server. However, the Java client could run forever and waiting for the asynchronous data from the server.
Proto
message Message{
string value = 1;
}
service test {
rpc subscribe(stream Message) returns (stream Message) {}
}
The Python client Code(not working)
def gen_message():
yield test.Message(value="2")
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = test_grpc.MessengerStub(channel)
stream = stub.subscribe(gen_message())
try:
for e in stream:
print(e)
except grpc._channel._Rendezvous as err:
print(err)
run()
The Java Code(working)
StreamObserver<Message> requestObserver = stub.subscribe(new StreamObserver<Message>() {
@Override
public void onNext(Message message) {
System.out.println(message)
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
}
});
Message message = Message.newBuilder().build();
requestObserver.onNext(message);
I have been got confused. How to implement the same feature in python client? Thanks~
p.s. If the server is an while True server other than asynchronous server, the python client works. I suspects that the python client does not know the "asynchronous" server and once its stream has no new data it close the connection.