How can I receive multipart messages with ZeroMQ?
Asked Answered
K

3

7

I can't get ZeroMQ C++ wrapper to receive multipart messages. The same code using C version works just fine, but it leads to an exception with no explanations at all with C++. The multipart handling code is as follows:

int _tmain(int argc, _TCHAR* argv[])
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    while(true) {
        // the following two lines lead to exception
        zmq::message_t request;
        socket.recv(&request);

        //zmq_msg_t message;
        //zmq_msg_init (&message);
        //zmq_recv (socket, &message, 0);   
    }

    return 0;
}

It is extremely simple; this version does not work. but if I comment out the first two lines in the while loop and uncomment the currently commented (C version) code, it works. This is Windows XP sp3, Zeromq 2.1.1 and Visual Studio 2010 Express.

If I send single part messages, both versions work fine. What am I doing wrong?

Kiefer answered 1/6, 2011 at 15:18 Comment(1)
I've been told to upgrade to latest version of ZMQ, other than that the code is considered to be fine.Kiefer
K
-1

I decided to use the C version of the code. In general all examples seem to be in C anyway.

Kiefer answered 12/6, 2011 at 15:34 Comment(0)
V
4

I'm also a newbie in ZMQ and I too had to struggle a lot in order to understand multipart messaging using REP/REQ in ZeroMQ. I had to go through multiple resources and stitch data in order to understand this. I think this answer will help many seekers in the near future that's why I am sharing the client and server code here. I have tested this code and it is working perfectly fine. However, being a newbie there are chances that I would have missed something vital. Please share your valuable inputs.

Server Code

void
serverMultipartREPREQ()
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t socket(context, ZMQ_REP);
        socket.bind("tcp://*:5556");
        std::cout << "Listening at port 5556..." << std::endl;

        zmq::message_t reply;

        socket.recv(reply, zmq::recv_flags::none);
        auto rep = std::string(static_cast<char*> (reply.data()), reply.size());

        std::cout << "Received: " << rep << std::endl;
        
        while(1)
        {    
            if (input == "exit")
                break;

            for (int j = 0; j < 3; ++j)
            {
                std::string s("Message no - " + std::to_string(j));

                zmq::message_t message(s.length());
                memcpy(message.data(), s.c_str(), s.length());

                std::cout << "Sending: " << s << std::endl;

                if (j != 2)
                    socket.send(message, zmq::send_flags::sndmore);
                else
                    socket.send(message, zmq::send_flags::none); 
            }
        }
    }
    catch (const zmq::error_t& ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }

    Sleep(5000);
}

Client code

void
clientMultipartREQREP()
{
    try
    {
        zmq::context_t context(1);

        std::cout << "Connecting to socket at 5556" << std::endl;
        zmq::socket_t socket(context, ZMQ_REQ);
        socket.connect("tcp://localhost:5556");
        std::cout << "Connected to socket at 5556" << std::endl;

        std::string msg("Hii this is client...");
        zmq::message_t message(msg.length());
        memcpy(message.data(), msg.c_str(), msg.length());

        socket.send(message, zmq::send_flags::none); // send to server (request message)

        while (true)
        {
            __int64 more = 1;

            if (more)
            {
                zmq::message_t message;
                socket.recv(message, zmq::recv_flags::none);
                auto rep = std::string(static_cast<char*> (message.data()), message.size());
                std::cout << "Reading from client: " << rep << std::endl;

                size_t size = sizeof(__int64);
                socket.getsockopt(ZMQ_RCVMORE, &more, &size); // if msg is not the last one then more = 1 else more = 0
            }
            else
            {
                std::cout << "Done..." << std::endl;
                break;
            }
        }
    }
    catch (const zmq::error_t& ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }
    Sleep(5000);
}
Venu answered 7/8, 2020 at 7:19 Comment(1)
I appreciate the effort here, but I'm a bit confused. Normally I'd expect the client to be sending multiple frames of data to the server, but that's just my use case. This example makes some sense, but will only allow one multiple frame message before Exception: Operation cannot be accomplished in current state I would have preferred a client to receive a set of multi-messages after each button press on the server as an exampleSafekeeping
M
3

Probably C version of code doesn't work either, but you don't check the return code of zmq_recv, so you don't notice it. Also, when receiving miltipart messages you should check if there are more message parts to be received through the socket, like this:

int64_t more = 0;
size_t more_size = sizeof(more);
socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
if (more != 0)
{
  //has more parts
}

Also, take a look at ZmqMessage C++ library designed specifically for Sending and receiving ZeroMQ multipart messages.

Mitchelmitchell answered 12/9, 2011 at 11:29 Comment(0)
K
-1

I decided to use the C version of the code. In general all examples seem to be in C anyway.

Kiefer answered 12/6, 2011 at 15:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.