How do I make sure that a message was received in gRPC bidirectional streaming?
Asked Answered
S

1

6

How do I know that the message I am sending through an gRPC stream was received on the other end?

Is there a build in way to do that in gRPC bidirectional streaming or do I need to just use streaming and then send a response back?

Proto file:


service SimpleService {
rpc SimpleRPC (stream SimpleData) returns (stream SimpleData) {}
}

message SimpleData {
string msg = 1;
}

Go code:


client := pb.NewSimpleServiceClient(conn)
stream, err := client.SimpleRPC(context.Background())
waitc := make(chan struct{})

msg := &pb.SimpleData{"sup"}
go func() {
for {
   stream.Send(msg)
    }
}()
<-waitc
stream.CloseSend()

Shrapnel answered 10/5, 2019 at 11:24 Comment(6)
its tcp based, sending the message with success should suffice. But otherwise add a tx id to each message being sent. Pass it back in the response, so the sender can match a query for a response.Sternutation
So send back a async response with a message id or something and resend if there is no response?Shrapnel
might be async, or not. That depends of the protocol. but, again, this is tcp based stream, so sending / receving are reliable.Sternutation
It's not needed, TCP connection automatically do a packet level acknowledgement, adding another layer for this will just be overkill. It is reliable as @mh-cbon has stated before.Sculpture
but if I send a message if stream.Send(msg) in that moment the server goes offline does gRCP tell me that there was an error?Shrapnel
Whole concept of bidirectional streaming is that messages are sent freely without locking up. In real life it means that transport level does minimal delivery control (as stated above it is limited with TCP delivery acknowledgement). If you want to ensure guaranteed message delivery I'd follow suggestion above, to send acknowledgement based on message ID. If that's not good enough and transport environment is unreliable then you should look towards Message Queuing systems like nats.io, RabbitMQ, Kafka, etc.Tace
M
3

Unfortunately, gRPC doesn't handle this case very well. It doesn't handle it for unidirectional streaming either, or even for single RPC requests. The failure mode (simple case without streaming) is like this:

  • gRPC client sends request to server, with a timeout at T
  • server responds successfully to client at T-1sec
  • getting the message to the client and/or parsing it takes 2sec, putting it at T+1sec (past the timeout deadline)
  • the client times out the request before that at time T, and sees an error

The best solution is to figure out a way to avoid doing this to begin with, by making the protocol stateless from the server's perspective -- in other words, if the client saw a failure let them retry. Or, you could reverse the direction of the connection so that the "server" sends RPCs to the client, and the client responds with an ack.

The place where this is hard is if the server is storing something that depends on the client's state. For example, let's say the server has an ordered queue of data that it's trying to send to the client, and it needs to know what the last thing the client has seen before removing items from the queue. The options here are:

  • If you have a single RPC, turn it into a (very short-lived) bidirectional RPC stream, where the client can send a ClientRequest object that internally is oneof a ClientActualRequest object or an empty ClientSuccess object that indicates that the client saw the message. Then, in the implementation of the client, have it first send a ClientRequest { ClientActualRequest {...} }, and after receiving process the response from the server have it send a ClientRequest { ClientSuccess {} } and then terminate the stream. (The server will have to know how to handle that too of course.)
  • If you have streaming in either / both directions it gets harder. In the simplest case, you can do the same thing as the single RPC case above, where first the client sends a real request, and then on every update from the server it sends an acknowledgement. Importantly, the client must process the stream in order and the server must process the acknowledgements in order, otherwise the server could think that the client has seen all the messages when it has only seen a subset. (Note that gRPC guarantees in-order delivery, so it's only reordering within your client implementation that can cause the problem; probably only possible if you're using the async gRPC API unless you're doing something really weird.) Another twist on this would be, instead of having ack messages, you could make the client send last_known_sequence_number to the server as part of the protocol, and the server could only safely delete items up to that sequence number; however, there's no easy way to pass more complicated statuses back if you do it that way because it's just a sequence number.
  • If ordered handling on the client not feasible, then you have to go full crazy. Now your protocol needs the server to put sequence numbers on each message in the queue, and your client's ClientSuccess object needs to encode the sequence number that you're acknowledging so that the server knows which things it can delete. This is basically what TCP does internally, but only ack'ing in the direction from server to client of course... obviously we are talking about some pretty complicated stuff if you're comparing your solution to TCP.

Remember in all these cases that, by the same token that the server's ack isn't guaranteed to be seen by the client even if the RPC succeeded, in our new protocols the client's ack isn't guaranteed to be seen by the server either. For that reason your client needs to be able to handle seeing duplicated requests (up to whatever number of inflight requests you let it have at once) across a connection reset. Unless you implement an "ack of an ack", that is :-P.

Motherless answered 25/3, 2022 at 21:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.