ZMQ DEALER - ROUTER Communication
Asked Answered
A

2

4

I am currently working on a project that requires some communication over the network of a different data types from some entities of a distributed system and I am using ZMQ.

The main goal of the project is to have a central node which services clients which can connect at any time. For each client connected, the central node should manage the message communication between the two.

Currently, and by the moment, all communication is happening over TCP.

The clients need to send and receive messages at any time so they are ZMQ_DEALER type sockets and the central node is ZMQ_ROUTER

Initially, the goal is that one message from some client, this message arrive at other clients. This means that the other clients can see the same data all.

I have using the Asynchronous Client/Server pattern because I am interested in having several clients talking to each other in a collaborative way, having maybe a server broker or middleware.

I have a ZMQ_DEALER socket client which connect to ZMQ_ROUTER socket server

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main(int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t client(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    client.connect(endpoint);
    for (int request = 0; request < 10; request++)
    {

        s_sendmore(client, "");
        s_send(client, "Testing sending some data");

        std::string string = s_recv(client);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }
}

On my server code, I have a ZMQ_ROUTER which receive and manage the messages is, making bind it to a well port. This server is made in Python

import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)

print("Creating Server Network Manager Router")

while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        print(message)
        frontend.send_multipart(message)

On my other peer/client I have the following:

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main (int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t peer2(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    peer2.connect(endpoint);
    //s_sendmore(peer2, "");
    //s_send(peer2, "Probando");

    //std::string string = s_recv(peer2);

    //std::cout << "Received reply " << " [" << string << "]" << std::endl;

    for (int request = 0; request < 10; request++)
    {

        s_sendmore(peer2, "");
        s_send(peer2, "Probando");

        std::string string = s_recv(peer2);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }

}

UPDATE

But each that I execute some client, their respective messages do not arrive at another peer client. The messages arrive at ZMQ_ROUTER, and are returned to the ZMQ_DEALER sender origin.

enter image description here

This is because the identity frame was preceded by the ROUTER at the time of reception and the message is sent back through the ROUTER; where it removes the identity and uses the value to route the message back to the relevant DEALER, according to the ZMQ_ROUTER section to the end page here.

And this is logic, I am sending the identity of my DEALER to the ROUTER, the ROUTER take that identity frame and return to my DEALER the message

In the first instance, to starting in my implementation, I need that some message sent by any DEALER, this will be visualized by any another DEALER without matter how many DEALERS (one or many) are connected to the ZMQ_ROUTER. In this sense ... Is necessary meet about of the identity frame of other DEALER or other DEALERS?

If I have DEALER A, DEALER B, and DEALER C, and ROUTER

then:

DEALER A send a message ... And I want that message from DEALER A to arrive at DEALER B and DEALER C and so other DEALERS that can be joined to my session conversation ...

In this ideas order, is necessary met the identity frame of DEALER B and DEALER C previously on the DEALER A side so that this message to arrive him?

How to know the identity frames of each DEALER existent on my implementation? This is made on the ROUTER side? I haven't clear this

Alienist answered 14/3, 2018 at 23:12 Comment(0)
G
3

You could have all clients send a "I am here" message at start-up. The central server could then store all the IDs, c.f. the initial communication between worker and router in here: https://zguide.zeromq.org/docs/chapter3/#A-Load-Balancing-Message-Broker. The server would send out any received message to all currently known clients. You should add some heart beating in order to detect disconnected clients, c.f. https://zguide.zeromq.org/docs/chapter4/#Heartbeating.

However, ZeroMQ already comes with such a communication pattern: PUBSUB. In essence every client would have a DEALER and a SUB socket connected to the servers ROUTER and PUB sockets. The server simply sends out any received message via the PUB socket to all clients. If this would be a problem for the originating client, you can include the client ID in the message so that each client can filter out messages with their own ID. See also this example from the guide https://zguide.zeromq.org/docs/chapter5/#Getting-an-Out-of-Band-Snapshot

Another interesting pattern would be Republishing Updates from Clients:

Republishing Updates from Clients

Here PUSH--PULL is used to send the updates to the server. This makes sense if there is no need for a reply message from the server. If you do not need the state request from that example, you can leave out the ROUTER--DEALER part. Here a sample implementation using Python for brevity. The server listens to the PULL socket and sends out everything via the PUB socket:

import zmq

def main():
    # context and sockets
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    while True:
        message = collector.recv()
        print "I: publishing update %s" % message
        publisher.send(message)

if __name__ == '__main__':
    main()

The client listens to the PUB socket for some time. If a message is received it is logged. If the timeout is reached, a message is generated with a 1 in 10 chance:

import random
import time

import zmq

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    while True:
        if subscriber.poll(100) & zmq.POLLIN:
            message = subscriber.recv()
            print "I: received message %s" % message
        else:
            rand = random.randint(1, 100)
            if rand < 10:
                publisher.send("%d" % rand)
                print "I: sending message %d" % rand

if __name__ == '__main__':
    main()
Gunther answered 21/3, 2018 at 16:12 Comment(11)
Great samples patterns. Initially, I am interested in send messages one client to many clients (TO clients connected in an instant time). According to the samples, the important here is make some communication type between ROUTER and PUBLISHER on the server side ... really? This may be implemented performing some zmq.POLLIN in together (ROUTER and PUBLISHER) sockets?Alienist
@Alienist You can leave out the ROUTER--DEALER part if you only want to send messages from any client to all clients. See the example in my updated answer.Gunther
Ralf, the approach is correct. How to can I think here in scalability/redundancy of my server? Threads, async? I've coded the C++ SUB - PUB client version, and works, although I unknown if this is well built or may be optimized in code can you check it please? Just in case ..Alienist
@Alienist What redundancy requirements do you have? I suggest to carefully read the relevant sections in The Guide (chapters 4 and 5) which details how to match different requirements with appropriate solutions. As for scalability I would suggest to do some load tests first. I would expect that the limiting factor will be the network connection. I see no obvious problems with your code. I guess the main challenge will be the interleaving of listening to the SUB socket and doing 'real work' that might result in sending via the PUSH socket.Gunther
Ralf, Let me please make other question: In this architecture, I have python clients and c++ clients. The server is Python. The question is: Why on my python clients the message appears faster than my c++ clients? This is related with the server is coded in Python?Alienist
@Alienist That is unexpected. I think it would be best if you reduce it to a minimal example (only the PUB-SUB part?) and post that as a separate question.Gunther
I've published as a separate question, #49439887 just in case. Thanks. :)Alienist
@Alienist With all due respect to author, the proposed solution does not meet the specifications of the O/P definition. The code violates the requirement to deliver any message from agent-[A] to all other agents ( [B], [C], ... ), but not agent-[A] itself, by the [Server]-entity. This explicit requirement was not met in the above submitted proposal.Telemetry
@Telemetry In this sense will be the better to deal with the transport of all ID's of each client that send data to server and that each client can filter out messages with their own ID, this was mentioned by Ralf too ...Alienist
@Alienist Negative, Sir, there is better approach, that principally avoids this duty from being delegated back onto the sending-client and that avoids all overheads with sending data that is not intended to be delivered / consumed on the sending-client, which the proposed approach principally did not solveTelemetry
@Telemetry Can you tell us about that approach? .... I was thinking according to the recommendations in this answer that it is also possible to think that the server is the one that filters the sending of messages and sends them only to all the connected clients different from the sender?Alienist
T
3

The ( prematurely ) awarded answer does not meet defined properties.

Distributed systems need to operate both smart and efficiently, as the agents are distributed and both error-analyses and deployed production-issues are extremely expensive to analyse / test / debug.

Thus a copy/paste re-use of a problem-incompatible idea is not a way to achieve either the former, the less the latter.


So, let's review the efficiency first:

client-[A].send()-s a message, that O/P wanted to become server-side-[S].recv()-ed and re-broadcast to all other clients-[B,C,...], except the [A]-itself.

The most resources-efficient approach to this is to properly configure the infrastructure tools for doing exactly this, without re-inventing wheel and/or using a fragile and performance-devastating scaffolding code(s).

So:

on the client-[*] side best use the below sketched primitive agent-concept. More complex setups, like using as clever event-handling facilities as Tkinter has evolved to have packed into the .mainloop() soft-real-time system, are better, yet it is not as easy to start design-battles on more than one front, so let's rather keep things simpler at this moment:

zmq_VERSION      = zmq.zmq_version_info()
anAgentsIDENTITY = whateverHashOrHumanReadableSTRING
notMINE          = anAgentsIDENTITY

if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#SUB ---------------------------------------------------------------------------
aSUB = aCTX.socket( zmq.SUB )
aSUB.setsockopt(    zmq.LINGER,          0 )   # protect your agent
aSUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your agent from DoS
aSUB.setsockopt(    zmq.AFFINITY,        1 )   # protect your server resources
aSUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aSUB.setsockopt(    zmq.SUBSCRIBE,       notMINE )  #  NEVER .recv()-s  data back
...
#SUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

aSUB.connect(      "tcp://localhost:5557" )

#PUSH --------------------------------------------------------------------------
aPUSH = aCTX.socket( zmq.PUSH )
...
#PUSH PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True; anAgentSignsWithIdentityPREFIX = anAgentsIDENTITY
while notSoftFLAG:

    if aReasonToSendSomethingToServer:
       aPUSH.send( anAgentSignsWithIdentityPREFIX
                 + ":::"
                 + aMsgPAYLOAD,
                   zmq.DONTWAIT
                   )                          # inspect ZMQError
       ...
       pass

    if aSUB.poll( 100 ):
       message = aSUB.recv( zmq.DONTWAIT )    #  NEVER .recv()-s own data back
       ...
       pass


    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       pass

    if ...:
       ...
       pass

#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aSUB, aPUSH, ) ]
...

aCTX.term()
#   .term()
#########

Server can avoid ALL hassles with any need for any ad-hoc handling:

all being well tuned inside the ZeroMQ infrastructure:

pass;  zmq_VERSION = zmq.zmq_version_info()
if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )

aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#PUB ---------------------------------------------------------------------------
aPUB = aCTX.socket( zmq.PUB )
aPUB.setsockopt(    zmq.LINGER,          0 )   # protect your server
aPUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your server from DoS
aPUB.setsockopt(    zmq.AFFINITY,        3 )   # protect your server resources
aPUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aPUB.setsockopt(    zmq.IMMEDIATE,       1 )   # avoid Queueing for dead-ends
aPUB.setsockopt(    zmq.TOS,             tos ) # allow for L3-router TOS-policies
...
#PUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPUB.bind(   "tcp://*:5557" )                  # expose AccessPoint on tcp://

#PULL --------------------------------------------------------------------------
aPULL = aCTX.socket( zmq.PULL )
aPULL.setsockopt(    zmq.LINGER,          0 )  # protect your server
aPULL.setsockopt(    zmq.MAXMSGSIZE,      m )  # protect your server from DoS
aPULL.setsockopt(    zmq.AFFINITY,        3 )  # protect your server resources
aPULL.setsockopt(    zmq.HEARTBEAT_IVL,   ivl )#     server L3-helper Heartbeats
aPULL.setsockopt(    zmq.HEARTBEAT_TTL,   ttl )#     server L3-helper Heartbeats
...
#PULL PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPULL.bind(   "tcp://*:5558" )                 # expose AccessPoint on tcp://
...

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True
while notSoftFLAG:
    NOP_SLEEP = 10                            #  set a 10 [ms] sleep in case NOP
    if aPULL.poll( 0 ):                       #  NEVER block/wait
       aMSG = aPULL.recv( zmq.DONTWAIT )      #  NEVER .recv()-s own data back
       #CPY = zmq_msg_copy( &aMSG );          // WARNING ABOUT NATIVE C-API
       #                                      // HANDLING, NEED .COPY()
       #                                      //           NEED .CLOSE()
       aPUB.send( aMSG,   zmq.DONTWAIT )      #  re-PUB-lish to all others but sender
       ...< process aMSG payload on server-side, if needed >...

       NOP_SLEEP = 0                          # !NOP, avoid 10[ms] NOP-loop sleep
       pass

    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       NOP_SLEEP = 0
       pass

    if ...:
       ...
       pass

    sleep( NOP_SLEEP )                        # a soft-real-time controlled sleep on NOP
#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aPUB, aPULL, ) ]
...

aCTX.term()
#   .term()
#########
Telemetry answered 24/3, 2018 at 8:34 Comment(2)
In the pull section, in the server side, we should create a PULL socket instead of PUSH really? You type .. aPULL = aCTX.socket( zmq.PUSH ) In that consist, the notSoftFLAG and the aReasonToFlagLoopEXIT is some mechanism to mark the identities client?Alienist
Right, PULL was obvious. The logical flags have nothing to do with identities, but form additional potential loop-break mechanics.Telemetry

© 2022 - 2024 — McMap. All rights reserved.