zmq-cpp: recv() waits for data despite ZMQ_DONTWAIT being set
Asked Answered
P

2

10

I'm trying to implement a non-blocking receive method with ZeroMQ using the ZMQ_DONTWAIT flag but recv() behaves like being called without the flag:

    auto start = std::chrono::steady_clock::now();

    auto have_data = sock_->recv(&reply, ZMQ_DONTWAIT);

    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                std::chrono::steady_clock::now() - start).count();

    std::cout << duration << " " << have_data;

sock_ is a zmq::socket_t instantiated as REQ socket.

In this case have_data is always true and duration is whatever the REP server takes to reply (0 to several hundreds milliseconds).

Note that I'm talking about the cpp binding of ZeroMQ defined in zmq.hpp where recv() is differently declared than in zmq.h:

inline bool recv (message_t *msg_, int flags_ = 0);

Here recv() returns true if data has been received and false if errno is EAGAIN

Are there any preconditions to ZMQ_DONTWAIT making recv() return immediately?

(I'm using zmq version 4.1.2)

Paschal answered 29/9, 2015 at 12:9 Comment(2)
REQ socket bind or connect? Add this part of code, please.County
In my case it´s connectPaschal
C
2

Yes. ( ZeroMQ preconditions should be taken into account. )

A minor note: ZeroMQ allows one to setup a setsockopt() with ZMQ_RCVTIMEO == 0 or a performance-wise reasonable value.

The main issue however is hidden inside the REQ/REP behaviour pattern.

If an Application, on REQ "jumps" right into a state [*] and wait there for anything that might have already arrived into the [REQ]-<Rx>-buffer ( which is principally not possible in this use case ) or that might be will arrive, at some later time, but there is a trouble as the REP counterparty has nothing to reply to and must not and will not .send() anything without a prior .recv() a REQ's request.

    ]-<Tx>- - - - - - - - - +                      + - - - - - - - -<Tx>-[
    ]-<Rx>- - - - - - - -+  |                      :  +- - - - - - -<Rx>-[
[REQ]____________________:__|                      :__|__________________[REP]
                         :  |                      :  |
APP.send() ]--->.send()--:->|                      :  |
           ]             |  :\                     :  |
           ]             |  : \____________________:__|>
           ]             |  :                      :  |M
           .             .  .                      .  .M
           ?             ?  ?                      ?  ?
           .             .  .                      .  .M
           ]             |  :                      :  |M_.recv()--->[ APP.recv()
           ]             |  :                      |  :             [ and
           .             .  .                      .  .             [ can 
           ?             ?  ?                      ?  ?             ?
           .             .  .                      .  .             [ 
           ]             |  :                      |  :             [    .send()
           ]             |  :                      |  :             [ after
           ]             |  :                      |  :             [    .recv()
           ]             |  :                      |  :             [ 
           ]             |  :                      |<-:--.send()<---[ APP.send()
           ]             |  :                     /:  |
           ]            <|__:____________________/ :  |
           ]            M|  :                      :  |
           .            M.  .                      .  .
           ?             ?  ?                      ?  ?
           .            M.  .                      .  .
[*]        ]            M|  :                      :  |
APP.recv() ]<---.recv()_M|  :                      :  |
                         :  |                      :  |
Centaur answered 23/10, 2015 at 1:36 Comment(0)
H
0

You can look at the high level ZeroMQ api called czmq. It comes with ZeroMQ library. It has function zstr_recv_nowait() details: zstr man page

Example:

//some code
void *listener = zsocket_new (ctx, ZMQ_SUB); //zctx_t *ctx
//...
while (!zctx_interrupted)
{
     char* message = zstr_recv_nowait(listener);
     if (message && message[0]!='\0') {
      //do some work
     }
}
Hebdomadal answered 29/9, 2015 at 12:47 Comment(1)
zstr_recv_nowait() actually calls zmq_recv() with ZMQ_DONTWAIT.Thompkins

© 2022 - 2024 — McMap. All rights reserved.