Using ZeroMQ together with Boost::ASIO
Asked Answered
M

5

25

I've got a C++ application that is using ZeroMQ for some messaging. But it also has to provide a SGCI connection for an AJAX / Comet based web service.

For this I need a normal TCP socket. I could do that by normal Posix sockets, but to stay cross platform portable and make my life easier (I hope...) I was thinking of using Boost::ASIO.

But now I have the clash of ZMQ wanting to use it's own zmq_poll() and ASIO it's io_service.run()...

Is there a way to get ASIO to work together with the 0MQ zmq_poll()?

Or is there an other recommended way to achieve such a setup?

Note: I could solve that by using multiple threads - but it's only a little single core / CPU box that'll run that program with a very low amount of SCGI traffic, so multithreading would be a waste of resources...

Mab answered 10/10, 2012 at 21:53 Comment(9)
for those of us unfamiliar with ZeroMQ, can you expand upon zmq_poll()? I assume it is an event loop of some kind?Kursk
Yes, zmq_poll() is basically an event loop. It can wait for ZMQ sockets and native sockets (given by their file descriptor integer id) at the same time (cf. api.zeromq.org/3-2:zmq-poll). A deeper or C++ way of extending is not possible as it's a C API.Mab
zmq_poll() doesn't wait nor block. That is the point of polling it vs just calling the sockets.recv() (which will block until it gets a message)Sporogonium
zguide.zeromq.org/page:all#Handling-Multiple-Sockets zmq poll example (check the second 'code block'). from the zmq_poll documentation, setting the timeout of zmq_poll() to 0 makes it return immediatelySporogonium
But that would make the code spinning / active waiting then. Not very good for the performance...Mab
"Multithreading would be a waste of resources" - based on what? The fact that it's a single core CPU is not a problem for multithreading, especially when the threads have little if any work to do. The waste of resources would be concerning if you were drastically limited in the amount of memory available (for an embedded solution, for instance) but you don't say that is the case.Dimpledimwit
What are your latency requirements in terms of handling the zmq_poll() requests? You could run the zmq_poll() in terms of the io_service (by handling it in a deadline_timer). If you think that would be acceptable, I can code up an example in an answer.Dimpledimwit
@Chad: yes, it's an embedded device - but luckily one of the "bigger" ones. I don't have to count bytes, kilobytes are interesting and megabytes relevant. So the additional stack of one or two threads is not a problem, but of course I'd love to avoid it if possible...Mab
@Dimpledimwit #2: Think of 1 ZMQ message per minute and two kind of SCGI requests, one that's asking about an internal state and that can and should be answered immediately (less than 100 ms roundtrip, better in the range of 10-20ms) and the other where the reply is blocked (-> COMET pattern) till a relevant ZMQ message arrives. So there are allways quite a few open SCGI sockets hanging around waiting for the ZMQ message. Once it arrives they have to be answered immediately (10-20ms).Mab
K
16

After reading the documentation here and here, specifically this paragraph

ZMQ_FD: Retrieve file descriptor associated with the socket The ZMQ_FD option shall retrieve the file descriptor associated with the specified socket. The returned file descriptor can be used to integrate the socket into an existing event loop; the ØMQ library shall signal any pending events on the socket in an edge-triggered fashion by making the file descriptor become ready for reading.

I think you can use null_buffers for every zmq_pollitem_t and defer the event loop to an io_service, completely bypassing zmq_poll() altogether. There appear to be some caveats in the aforementioned documentation however, notably

The ability to read from the returned file descriptor does not necessarily indicate that messages are available to be read from, or can be written to, the underlying socket; applications must retrieve the actual event state with a subsequent retrieval of the ZMQ_EVENTS option.

So when the handler for one of your zmq sockets is fired, you'll have to do a little more work before handling the event I think. Uncompiled pseudo-code is below

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

note you don't have to use a lambda here, boost::bind to a member function would be sufficient.

Kursk answered 11/10, 2012 at 22:13 Comment(2)
That way sound's very promising. I got a file descriptor out of ZMQ (using a connection of type ipc). But the async_read_some( null_buffers(), ...) gives me a Bad file descriptor error...Mab
@Mab you need to assign the the zmq file descriptor to the asio stream_descriptor prior to async_read_some. I'll update my answer with an example when I'm not on my iPhone.Kursk
M
2

In the end I figured out there are two possible solutions:

  • Sam Miller's where we use the event loop of ASIO
  • The ZeroMQ's event loop by getting the ASIO file descriptors though the .native() methods of the acceptor and the socket and inserting them into the array of zmq_pollitem_t

I have accepted the answer of Sam Miller as that's for me the best solution in SCGI case where constantly new connections are created and ended. Handling the thus every changing zmq_pollitem_t array is big hassle that can be avoided by using the ASIO event loop.

Mab answered 14/10, 2012 at 16:9 Comment(0)
P
2

Obtaining the socket to ZeroMQ is the smallest part of the battle. ZeroMQ is based on a protocol which is layered over TCP, so you will have to reimplement ZeroMQ within a custom Boost.Asio io_service if you go this route. I ran into the same problem when creating an asynchronous ENet service using Boost.Asio by first simply trying to catch traffic from an ENet client using a Boost.Asio UDP service. ENet is a TCP like protocol layered over UDP, so all I achieved at that point was catching packets in a virtually useless state.

Boost.Asio is template based, and the built-in io_service's use templates to basically wrap the system socket library to create TCP and UDP service. My final solution was to create a custom io_service that wrapped the ENet library rather than the systems socket library, allowing it to use ENet's transport functions rather than having to reimplement them using the built-in UDP transport.

The same can be done for ZeroMQ, but ZeroMQ is already a very high performance network library in it's own right that already provides async I/O. I think you can create a viable solution by receiving messages using ZeroMQ's existing API and passing the messages into a io_service thread pool. That way messages/tasks will still be handled asynchronously using Boost.Asio's reactor pattern without having to re-write anything. ZeroMQ will provide the async I/O, Boost.Asio will provide the async task handlers/workers.

The existing io_service can still be coupled to an existing TCP socket as well, allowing the threadpool to handle both TCP (HTTP in your case) and ZeroMQ. It's entirely possible in such a setup for the ZeroMQ task handlers to access the TCP services session objects, allowing you to send results of the ZeroMQ message/task back to a TCP client.

The following is just to illustrate the concept.

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
Pantagruel answered 10/2, 2014 at 5:44 Comment(0)
R
2

2 years after this question someone posted a project which does exactly this. The project is here: https://github.com/zeromq/azmq. The blog post discussing the design is here: https://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/.

Here is the sample code copied from the readme:

#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>

namespace asio = boost::asio;

int main(int argc, char** argv) {
    asio::io_service ios;
    azmq::sub_socket subscriber(ios);
    subscriber.connect("tcp://192.168.55.112:5556");
    subscriber.connect("tcp://192.168.55.201:7721");
    subscriber.set_option(azmq::socket::subscribe("NASDAQ"));

    azmq::pub_socket publisher(ios);
    publisher.bind("ipc://nasdaq-feed");

    std::array<char, 256> buf;
    for (;;) {
        auto size = subscriber.receive(asio::buffer(buf));
        publisher.send(asio::buffer(buf));
    }
    return 0;
}

Looks nice. If you try, let me know in the comments if it still works in 2019 [I will probably try in a couple of months and then update this answer] (the repo is stale, last commit was a year ago)

Reluctance answered 4/4, 2019 at 14:46 Comment(0)
S
0

The solution is to poll your io_service as well instead of run().

Check out this solution for some poll() info.

Using poll instead of run will allow you to poll zmq's connections without any blocking issues.

Sporogonium answered 11/10, 2012 at 12:58 Comment(4)
But wouldn't that create the problem that I'm stuck waiting for a ZMQ message till I can also look for a ASIO/SCGI message as ZMQ is blocking but ASIO isn't? Or using non blocking for both I'd do heavy polling myself by looking if any of the two has some work to do?Mab
You're already using zmq_poll(). I was assuming that to still be true in my answer. So the answer is yes to your second (comment) question, you'd be polling BOTH asio and zmq. I didn't mean to suggest that you would change the zmq method from zmq_poll() to zsock.recv() thus making zmq block...Sporogonium
Classic polling (i.e. repeated active asking if there's work to do) is not an option. The name of zmq_poll() is thus misleading as it's not a "classic polling". It's name is derived by the poll() like in linux.die.net/man/2/poll - which isn't classic polling. -- So far I can only see the solution to make ZMQ and ASIO return immediately that's a classic polling resulting in spinning (bad) or make one or both blocking and thus causing huge latencies in handling ZMQ and/or ASIO messages. Or I didn't understand your comment correctly :(Mab
Those are your only two options. Either Classic polling or one w/ blocking. You can add a sleep into the classic poll loop to not have it just spin and waste cpu time. The cpu will context out on the sleep, tho you probably will not be able to guarentee latency w/ a sleep...Sporogonium

© 2022 - 2024 — McMap. All rights reserved.