How to broadcast in gRPC from server to client?
Asked Answered
H

5

11

I'm creating a small chat application in gRPC right now and I've run into the issue where if a user wants to connect to the gRPC server as a client, I'd like to broadcast that the event has occurred to all other connected clients.

I'm thinking of using some sort of observer but I"m confused as to how the server knows of who is connected and how I would broadcast the event to all clients and not just one or two.

I know using streams is part of the answer, but because each client is creating it's own stream with the server, I'm unsure of how it can subscribe to other server-client streams.

Hardin answered 30/3, 2018 at 20:9 Comment(8)
Keep a list of connected clients.Sapwood
#41583718 says that there is no actual unique identifier for clientsHardin
And as the answer to the question you just linked states, "One possible solution is handshake protocol in app level. You can add rpc method 'Connect' and send clientId as response from server. After that you can attach custom headers (metadata) to your rpc calls."Sapwood
mhm...sorry I missed that. I was thinking that maybe using gRPC supported bi-directional streams would be the answer, but I suppose something like a list of clients would work (albeit it feels weird to have to cycle through all of them each time a request is made to the server)Hardin
If you want to broadcast a message to all clients, you'll have to cycle through them - I don't see any way that could be avoidable.Sapwood
@VenelinVasilev if you have a question, please post a new question rather than posting as a comment on a question from two years ago. That way more people will be able to see & answer it.Sapwood
@Sapwood I do have, please take a look: #64119130Durance
@VenelinVasilev that question isn't even about the same language. I'm not a C++ dev so I won't be much help. Good luck!Sapwood
R
4

Another option would be to use a long-polling approach. That is try something like below (code in Python, since that is what I'm most familiar with, but go should be very similar). This was not tested, and is meant to just give you an idea of how to do long-polling in gRPC:

.PROTO defs
-------------------------------------------------
service Updater {
    rpc GetUpdates(GetUpdatesRequest) returns (GetUpdatesResponse);
}

message GetUpdatesRequest {
    int64 last_received_update = 1;
}

message GetUpdatesResponse {
    repeated Update updates = 1;
    int64 update_index = 2;
}

message Update {
    // your update structure
}


SERVER
-----------------------------------------------------------
class UpdaterServer(UpdaterServicer):
    def __init__(self):
        self.condition = threading.Condition()
        self.updates = []

    def post_update(self, update):
        """
        Used whenever the clients should be updated about something. It will
        trigger their long-poll calls to return
        """
        with self.condition:
            # TODO: You should probably remove old updates after some time
            self.updates.append(updates)
            self.condition.notify_all()

    def GetUpdates(self, req, context):
        with self.condition:
            while self.updates[req.last_received_update + 1:] == []:
                self.condition.wait()
            new_updates = self.updates[req.last_received_update + 1:]
            response = GetUpdatesResponse()
            for update in new_updates:
                response.updates.add().CopyFrom(update)
            response.update_index = req.last_received_update + len(new_updates)
            return response


SEPARATE THREAD IN THE CLIENT
----------------------------------------------
request = GetUpdatesRequest()
request.last_received_update = -1
while True:
    stub = UpdaterStub(channel)
    try:
        response = stub.GetUpdates(request, timeout=60*10)
        handle_updates(response.updates)
        request.last_received_update = response.update_index
    except grpc.FutureTimeoutError:
        pass
Radioscope answered 6/4, 2018 at 16:42 Comment(0)
T
3

Yup, I don't see any other way than keeping a global data structure containing all the connected streams and looping through them, telling each about the even that just occurred.

Tilt answered 4/4, 2018 at 20:23 Comment(1)
How can I get the connected stream so I can put it list ?Durance
Y
1

Another approach is to spawn a grpc-server on client side too. On app-level you have some handshake from client to server to exchange the clients grpc-server ip and port. You probably want to create a client for that address at this point and store the client in a list.

Now you can push messages to the clients from the list with default unary RPC calls. No [bidi] stream needed. Pros:

  • Possible to separate the clients "Push"-API from the server API.
  • Unary RPC push calls.

Cons:

  • Additional "server". Don't know if that is possible in every scenario.
Yapok answered 11/12, 2018 at 20:21 Comment(0)
G
1

A global map structure is needed, you can create a new chan for each connection. What I come up with is an intermediate channel to deal with the global map structure.

An example for server streaming:

func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
    //get trace id or generated a random string or whatever you want to indicate this goroutine
    ID:="randomString"
    //create a chan to receive response message
    conn := make(chan *pb.SubscribeResponse)
    //an intermediate channel which has the ownership of the `map`
    s.broadcast <- &broadcastPayload {
        //an unique identifier
        ID: ID
        //the chan corresponse to the ID
        Conn: conn
        //event to indicate add, remove or send message to broadcast channel
        Event: EventEnum.AddConnection
    }
    
    for {
        select {  
            case <-srv.Context().Done():  
                s.broadcast <- &entity.BroadcastPayload{  
                     ID: ID,
                     Event: EventEnum.RemoveConnection
                }
                return nil  
            case response := <-conn:  
                if status, ok := status.FromError(srv.Send(response)); ok {  
                    switch status.Code() {  
                    case codes.OK:  
                        //noop  
                    case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:  
                        return nil  
                    default:  
                        return nil  
             }  
         }}
    }
}

For the broadcast go routine:

//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func(){
    for v:=range s.broadcast {
        //do something based on the event
        switch v.Event {
            //add the ID and conn to the map
            case EventEnum.AddConnection:
                ...
            //delete map key and close conn channel here
            case EventEnum.RemoveConnection:
                ...
            //receive message from business logic, send the message to suiteable conn in the map as you like
            case EventEnum.ReceiveResponse:
                ...
        }
    }
}

I put some details here

Graner answered 30/5, 2021 at 16:21 Comment(0)
C
1

A simple chat server/client implemented with gRPC in Go sample

All clients are stored in the map[string]chan *chat.StreamResponse

type server struct {
    Host, Password string

    Broadcast chan *chat.StreamResponse

    ClientNames   map[string]string
    ClientStreams map[string]chan *chat.StreamResponse

    namesMtx, streamsMtx sync.RWMutex
}

And broadcast messages to all clients

func (s *server) broadcast(_ context.Context) {
    for res := range s.Broadcast {
        s.streamsMtx.RLock()
        for _, stream := range s.ClientStreams {
            select {
            case stream <- res:
                // noop
            default:
                ServerLogf(time.Now(), "client stream full, dropping message")
            }
        }
        s.streamsMtx.RUnlock()
    }
}

// send messages in individual client
func (s *server) sendBroadcasts(srv chat.Chat_StreamServer, tkn string) {
    stream := s.openStream(tkn)
    defer s.closeStream(tkn)

    for {
        select {
        case <-srv.Context().Done():
            return
        case res := <-stream:
            if s, ok := status.FromError(srv.Send(res)); ok {
                switch s.Code() {
                case codes.OK:
                    // noop
                case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
                    DebugLogf("client (%s) terminated connection", tkn)
                    return
                default:
                    ClientLogf(time.Now(), "failed to send to client (%s): %v", tkn, s.Err())
                    return
                }
            }
        }
    }
}

Cruickshank answered 22/4, 2022 at 11:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.