One ZeroMQ socket per thread or per call?
Asked Answered
A

4

5

As we all know, a ZeroMQ socket shall not be shared among application threads.
context_t instances however can.

I have a multi-threaded-application and I'd like to have each thread exchange messages from time to time with a REQ/REP-socket counterparty ( event, exceptions and the like ), depending on what they are doing ( they are doing non-ZeroMQ-stuff ).

To send messages to my REQ/REP-socket I use the following function
( a half-C++ half-pseudo-code ):

sendMessage:

bool sendMessage(std::string s)
{
    zmq::socket_t socket(globalContext(), ZMQ_REQ);
    socket.connect("ipc://http-concentrator");

    zmq::message_t message(s.size());
    memcpy(message.data(), s.data(), s.size());
    if (!socket.send(message))
        return false;

    // poll on socket for POLLIN with timeout

    socket.recv(&message);
    // do something with message

    return true;
}

This function is called from every thread when needed. It creates a local socket, connects, sends the message, and receives a response. At exit, the socket is disconnected and removed ( at least I'm assuming that it is closed ).

This way, I don't need to bother to maintain a socket in each of my threads. This comes at the cost of creating and connecting each time I call this function.

I have stressed this code and I didn't see much difference between reusing one socket and this reconnect-implementation. ( I have 20k REP/REQ transactions per second, including a JSON-decode/encode, on both side of the use-case )

Q: Is there a more correct ZeroMQ-way of doing this?

Alemannic answered 30/1, 2017 at 17:16 Comment(6)
There is a major performance difference, but if you haven't noticed it it may not be a problem. If there is a connection pool available for ZeroMQ I would use it, rather then either a socket over thread or a socket per cal.Distraint
If you're using REQ<->REP then you should put a ROUTER<->DEALER proxy in-between the two. Then you can simply spin up X number of REQ and/or REP threads to do work.Whetstone
What I like about my approach is, that the threads do not need to carry a socket with them. I'm willing to accept this overhead. I will nevertheless look into the ROUTER/DEALER-pattern.Alemannic
@EJP Thanks for your hint regarding the connection pool. It inspired me to create a simple one while keeping the simplicity of usage in my threads.Alemannic
@PatrickB. would you mind to accept the best answer provided? It took some time, so might be a fair step is to be done, isn't it? ( anyway - great stories at [Filter Failure] the Gare d’Austerlitz smoked' Linux is quite a frequent case for a poor heat-management embedded controllers :o) not just at RATP ) Enjoy the day & Stay tuned.Homoeo
@Homoeo I haven't yet accepted an answer for the reason that none gave me a (new) solution to my problem. I ended up implementing a per-thread-socket-pool. Your answer is great and helps people (me) to understand some considerations, but it doesn't provide an answer to my problem.Alemannic
A
5

Here is my (current) solution, in C++11 you can assign object to a thread_local-storage. Storing the socket_t-instance static and thread_local in a function gives me the functionality I was looking for:

class socketPool
{
    std::string endpoint_;

public:
    socketPool(const std::string &ep) : endpoint_(ep) {}

    zmq::socket_t & operator()()
    {
        thread_local static zmq::socket_t socket(
                globalContext(), 
                ZMQ_REQ);
        thread_local static bool connected;

        if (!connected) {
            connected = true;
            socket.connect(endpoint_);
        }

        return socket;
    }
};

// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");

In my sendMessage()-function instead of creating and connecting I simply do

bool sendMessage(std::string s)
{
    zmq::socket_t &socket = httpReqPool();

    // the rest as above
}

Regarding performance, well, it's 7 times faster on my machine. (140k REQ/REP per second).

Alemannic answered 31/1, 2017 at 10:36 Comment(1)
Out of curiosity, are you proposing that creating and connecting socket every time you want to send a message increased the performance by 7 times?Rebel
H
6

Nota Bene: this answer was posted before O/P was changed from 20k TPS to 140k TPS on ipc:// transport-class

Q: Is there a more correct ZeroMQ-way of doing this?

A:Not easy to say what is "this" and what are the parameters of the "correctness"-metric

Given that,
the points below will be more general
and applicable for a system design-phase reasoning:


Resources Utilisation Overheads Avoidance

This point is a dual-edge sword. Some overheads are always associated with both an infrastructure element setup and disposal ( yes, even the closing and dismantling ) of the REQ-AccessPoint to the REQ/REP-pattern and the associated socket-based transport-class impose some remarkable overheads on both the REQ-side host and also the REP-side.

It was fair you've noted, that you took some care about quantitatively testing this, on a level of some 20k TPS, and did not observe any adverse effects of such approach. What was not clear, whether any other scenario was also tested in-vivo on the same SUT ( System-under-Test ), so as to provide some baseline for comparisons of each respective design ( and to allow to determine the difference of the overheads per se ).

While a well designed framework hides this part of the system internal behaviour from the user-maintained code, it does not mean, it is all a cheap, the less a free-of-charge processing.

It is obvious there are jobs performed under the hood in the Context()-instance thread(s) ( ... yes, plural is correct here, as some high-performance code may benefit from using more than one I/O-threads per a Context() instance and positively influence the workload distribution by explicitly defined affinity-mapping between a pattern-socket and it's respective I/O-thread handler ( so as to somehow balance, if not able to deterministically level, the expected I/O-throughput, incl. all the associated overheads ).

If still in doubts, one shall always remember, that an imperative programming style function or an object-oriented methods are principally victims of the external caller, who decides at which moment & how often such "en-slaved" code-execution unit is called on duty and being executed. The function/method does not have any natural means of back-throtling ( a suppresion of ) the frequency of it's own invocations from external caller(s) and robust designs simply cannot remain to just rely on optimistic assumptions that such calls do not come more often than XYZ-k TPS ( 20k being cited above may be fine for in-vitro testing, but real deployment may shift that several orders of manitude ( be it artificially - during testing, or not - during some peak-hour or user(system)-panic or due to some technical error or hardware failure ( we've all heard so many times about NIC-card flooding an L1/L2 traffic beyond all imaginable limits et al - we just do not and cannot know, when / where it will happen next time again ).

Avoiding Risk of Blocking

The mentioned REQ/REP Scaleable Formal Communication Pattern is known for it's risk of falling into an externally unresolveable distributed internal dead-lock. This is always a risk to avoid. Mitigation strategies may depend on the actual use-case's value at risk ( a need to certify a medical instrument, fintech use-cases, control-loop use-cases, academia research paper code or a private hobby toy ).

Ref.: REQ/REP Deadlocks >>> https://mcmap.net/q/590377/-quot-server-quot-to-quot-server-quot-zeromq-communication

Fig.1: Why is it wrong to use a naive REQ/REP
all cases when [App1]in_WaitToRecvSTATE_W2R + [App2]in_WaitToRecvSTATE_W2R
are principally an unsalvageable distributed mutual deadlock of REQ-FSA/REP-FSA ( each of the both Finite-State-Automata waits for "the other" to move ) and will never reach the "next" in_WaitToSendSTATE_W2S internal state.

               XTRN_RISK_OF_FSA_DEADLOCKED ~ {  NETWORK_LoS
                                         :   || NETWORK_LoM
                                         :   || SIG_KILL( App2 )
                                         :   || ...
                                         :      }
                                         :
[App1]      ![ZeroMQ]                    :    [ZeroMQ]              ![App2] 
code-control! code-control               :    [code-control         ! code-control
+===========!=======================+    :    +=====================!===========+
|           ! ZMQ                   |    :    |              ZMQ    !           |
|           ! REQ-FSA               |    :    |              REP-FSA!           |
|           !+------+BUF> .connect()|    v    |.bind()  +BUF>------+!           |
|           !|W2S   |___|>tcp:>---------[*]-----(tcp:)--|___|W2R   |!           |
|     .send()>-o--->|___|           |         |         |___|-o---->.recv()     |
| ___/      !| ^  | |___|           |         |         |___| ^  | |!      \___ |
| REQ       !| |  v |___|           |         |         |___| |  v |!       REP |
| \___.recv()<----o-|___|           |         |         |___|<---o-<.send()___/ |
|           !|   W2R|___|           |         |         |___|   W2S|!           |
|           !+------<BUF+           |         |         <BUF+------+!           |
|           !                       |         |                     !           |
|           ! ZMQ                   |         |   ZMQ               !           |
|           ! REQ-FSA               |         |   REP-FSA           !           |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
|           ! /\/\/\/\/\/\/\/\/\/\/\|         |/\/\/\/\/\/\/\/\/\/\/!           |
|           ! \/\/\/\/\/\/\/\/\/\/\/|         |\/\/\/\/\/\/\/\/\/\/\!           |
+===========!=======================+         +=====================!===========+

Homoeo answered 31/1, 2017 at 10:43 Comment(1)
Thank you for your detailed answer. I'm aware of the risks of deadlocking and failing connection for REQ/REP-sockets. The 20k -> 140k jump is only explained by re-using connected sockets instead of reconnecting every time.Alemannic
A
5

Here is my (current) solution, in C++11 you can assign object to a thread_local-storage. Storing the socket_t-instance static and thread_local in a function gives me the functionality I was looking for:

class socketPool
{
    std::string endpoint_;

public:
    socketPool(const std::string &ep) : endpoint_(ep) {}

    zmq::socket_t & operator()()
    {
        thread_local static zmq::socket_t socket(
                globalContext(), 
                ZMQ_REQ);
        thread_local static bool connected;

        if (!connected) {
            connected = true;
            socket.connect(endpoint_);
        }

        return socket;
    }
};

// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");

In my sendMessage()-function instead of creating and connecting I simply do

bool sendMessage(std::string s)
{
    zmq::socket_t &socket = httpReqPool();

    // the rest as above
}

Regarding performance, well, it's 7 times faster on my machine. (140k REQ/REP per second).

Alemannic answered 31/1, 2017 at 10:36 Comment(1)
Out of curiosity, are you proposing that creating and connecting socket every time you want to send a message increased the performance by 7 times?Rebel
P
1

I think one different is performance.

With above code, that means you need do 20k times of create socket, establish connection, send message and close socket, which is time consuming from my perspective and you can run some performance tool analysis to check how much time is used in function sendMessage().

An alternative approach might create one request socket for each thread, and send data with socket of that thread it belongs to. ZeroMQ is not support multiple thread, or it will lead to errors, such as assert error (debug mode) or crash.

Predominance answered 30/1, 2017 at 23:44 Comment(0)
B
1

An alternative could be having one dedicated thread for the ZeroMQ communication with some FIFO queue (must be guarded with a mutex or similar, of course...). This dedicated thread should be sleeping as long as the queue is empty and wake up (being signalled appropriately) whenever this state changes.

Depending on general needs, whenever the response for some outgoing message is received, the dedicated thread could simply call some callback (at some dedicated object per thread); be aware that you have a different thread context then, so you might need some means of synchronisation to prevent race conditions.

Alternatively, the sending threads could just wait for the response, being signalled by the ZeroMQ thread on received response (well, this actually is one of those means to prevent race conditions...).

Blotch answered 4/12, 2017 at 11:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.