How can I clean up properly when recv is blocking?
Asked Answered
A

4

5

Consider the example code below (I typed it up quickly as an example, if there are errors it doesn't matter - I'm interested in the theory).

bool shutDown = false; //global

int main()
{
  CreateThread(NULL, 0, &MessengerLoop, NULL, 0, NULL);
  //do other programmy stuff...
}


DWORD WINAPI MessengerLoop( LPVOID lpParam )
{
  zmq::context_t context(1);
  zmq::socket_t socket (context, ZMQ_SUB);
  socket.connect("tcp://localhost:5556");
  socket.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    zmq_msg_recv (&getMessage, socket, 0); //This line will wait forever for a message
    processMessage(getMessage); 
  }
}

A thread is created to wait for incoming messages and to handle them appropriately. The thread is looping until shutDown is set to true.

In ZeroMQ the Guide specifically states what must be cleaned up, namely the messages, socket and context.

My issue is: Since recv will wait forever for a message, blocking the thread, how can I shut down this thread safely if a message is never received?

Azoth answered 25/4, 2013 at 10:37 Comment(5)
Use the ZMQ_DONTWAIT flag in zmq_msg_recv, and add a manual Sleep for a short time period? So use a more polling method?Wakayama
I don't know about zmg_* API, but in plain socket API it is enough to close the socket in another thread, and recv exits with failure. To do this, you need to connect to the socket in another thread, to have socket variable available for closing. MessengerLoop must contain only endless recv loop.Berry
@AlexFarber I had considered that but, unless I'm mistaken, wouldn't that result in all of the message processing being done in that same MessengerLoop thread? In which case I then have to setup some sort of signalling system to differentiate between "I'm waiting for a message, so yes you can terminate me" and "I'm busy processing something, don't terminate me yet". This seems messier than a Polling based system. I'm pretty new to this network messaging malarky, am I missing something or is that the normal way to do things?Azoth
I am not familiar with zmq_* functions, but looking at the code you posted, I think the only line that can fail in the loop, when the socket is closed, is zmq_msg_recv call. But this is just my guess. Once this call fails, exit the loop and the whole thread.Berry
Also, after closing the socket from the main thread, you need to wait for the thread handle, when the thread actually exits - general Win32 multithreading programming rules.Berry
M
11

The blocking call will exit in a few ways. First, and this depends on your language and binding, an interrupt (Ctrl-C, SIGINT, SIGTERM) will exit the call. You'll get back (again, depending on your binding) an error or a null message (libzmq returns an EINTR error).

Second, if you terminate the context in another thread, the blocking call will also exit (libzmq returns an ETERM error).

Thirdly, you can set timeouts on the socket so it will return in any case after some timeout, if there's no data. We don't often do this but it can be useful in some cases.

Finally, what we do in practice is never do blocking receives but use zmq_poll to find out when sockets have messages waiting, then receive from those sockets. This is how you scale out to handling more sockets.

Megdal answered 25/4, 2013 at 12:38 Comment(3)
would you still use polling for REQ - REP pairs?Azoth
Sure. Blocking reads are fine in simple tasks that work with exactly one socket, but most realistic tasks use multiple sockets. You might for instance use a REQ socket to receive work, and then also log your status to a PUB socket every few minutes.Megdal
that's exactly what I've implemented :) Good to know I'm on the right track. Thanks!Azoth
K
2

You can use non-blocking call flag ZMQ_DONTWAIT

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    while(-1 == zmq_msg_recv(&getMessage, socket, ZMQ_DONTWAIT))
    {
      if (EAGAIN != errno || shutDown)
      {
        break;
      }
      Sleep(100);
    }
    processMessage(getMessage); 
  }
Kiersten answered 25/4, 2013 at 12:31 Comment(1)
two minor logic errors: 1) if (EAGAIN == errno || shutDown) 2) add if (shutDown) { break; } after break from recv while loop, i.e., skip processMessageAviate
D
0

Whenever zmq context is destroyed, zmq_msg_recv will receive a -1. I use this as the terminating condition in all of my code.

while (!shutdown)
{
    ..
    ..
    int rc = zmq_msg_recv (&getMessage, socket, 0);
    if (rc != -1)
    {
      processMessage;
    }
    else 
      break;
}

Remember to destroy the zmq context at the end of your main() for a proper clean-up.

zmq_ctx_destroy(zctx); 
Diorite answered 8/3, 2017 at 1:12 Comment(0)
G
0

Lets say you have a class say SUB (subscriber) that manages the receive of your ZMQ messages. In the destructor or exit function of your main function/class, call the following:

pub->close();

///
/// Close the publish context
///
void PUB::close()
{
    zmq_close (socket);
    zmq_ctx_destroy (context);
}

This will enable that 'recv' blocking terminates with error message that you can ignore. The application will exit comfortably in the right way. This is the right method. Good luck!

Grearson answered 12/1, 2018 at 15:50 Comment(1)
My read from wiki.zeromq.org/whitepapers:0mq-termination is that calling zmq_close from another thread while that same socket is waiting on recv isn't valid. "each thread will have to remain responsible for the open and closure of its own socket"Pudgy

© 2022 - 2024 — McMap. All rights reserved.