boost::asio async_receive_from UDP endpoint shared between threads?
Asked Answered
U

2

12

Boost asio specifically allows multiple threads to call the run() method on an io_service. This seems like a great way to create a multithreaded UDP server. However, I've hit a snag that I'm struggling to get an answer to.

Looking at a typical async_receive_from call:

m_socket->async_receive_from(
        boost::asio::buffer(m_recv_buffer),
        m_remote_endpoint,
        boost::bind(
            &udp_server::handle_receive,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

The remote endpoint and message buffer are not passed through to the handler, but are at a higher scope level (member variable in my example). The code to handle the UDP message when it arrives will look something like:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
    // process message
    blah(m_recv_buffer, size);

    // send something back
    respond(m_remote_endpoint);
}

If there are multiple threads running, how does the synchronisation work? Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread before calling the handler in another thread in the case that a message arrived in the meantime. That seems to negate the point of allowing multiple threads to call run in the first place.

If I want to get concurrent serving of requests, it looks like I need to hand off the work packets, along with a copy of the end point, to a separate thread allowing the handler method to return immediately so that asio can get on and pass another message in parallel to another one of the threads that called run().

That seems more than somewhat nasty. What am I missing here?

Unhook answered 2/11, 2014 at 19:47 Comment(3)
Re-reading your last paragraph, it looks like you have a good grasp of things already. However, the hand-off doesn't really need to be "nasty", as I hope my answer shows (the post(...) lines does that).Heavy
I wish that the work of servicing the requests was, as you say, minimal. Sadly, in the case I'm working on, it's a long running complex operation, sometimes requiring calls out to other services. I did a bit of testing with Ubuntu 14 and, as guessed, was able to show that the handler is never called concurrently regardless of the number of threads that call run() on the io_service so using the approach of having boost manage my threads behind the scenes isn't going to work. Looks like I'm back to the explicit pool approach. Thanks for the answer all the same.Unhook
The demo in my answer does, as you can [see when you make the processing take 1..3s](), then certainly the service will do the work concurrently. However, in general, you don't want to run any blocking operations on the IO queue. This is precisely the topic of this question. So yes, either make the blocking operations async, or use a separate queue for the blocking operations (a second io_service could be a simple option)Heavy
H
7

Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread

If you mean "when running the service with a a single thread" then this is correct.

Otherwise, this isn't the case. Instead Asio just says behaviour is "undefined" when you call operations on a single service object (i.e. the socket, not the io_service) concurrently.

That seems to negate the point of allowing multiple threads to call run in the first place.

Not unless processing takes a considerable amount of time.

The first paragraphs of the introduction of the Timer.5 sample seem like a good exposition about your topic.

Session

To separate the request-specific data (buffer and endpoint) you want some notion of a session. A popular mechanism in Asio is either bound shared_ptrs or a shared-from-this session class (boost bind supports binding to boost::shared_ptr instances directly).

Strand

To avoid concurrent, unsynchronized access to members of m_socket you can either add locks or use the strand approach as documented in the Timer.5 sample linked above.

Demo

Here for your enjoyment is the Daytime.6 asynchronous UDP daytime server, modified to work with many service IO threads.

Note that, logically, there's still only a single IO thread (the strand) so we don't violate the socket class's documented thread-safety.

However, unlike the official sample, the responses may get queued out of order, depending on the time taken by the actual processing in udp_session::handle_request.

Note the

  • a udp_session class to hold the buffers and remote endpoint per request
  • a pool of threads, which are able to scale the load of actual processing (not the IO) over multiple cores.
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using namespace boost;
using asio::ip::udp;
using system::error_code;

std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}

class udp_server; // forward declaration

struct udp_session : enable_shared_from_this<udp_session> {

    udp_session(udp_server* server) : server_(server) {}

    void handle_request(const error_code& error);

    void handle_sent(const error_code& ec, std::size_t) {
        // here response has been sent
        if (ec) {
            std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
        }
    }

    udp::endpoint remote_endpoint_;
    array<char, 100> recv_buffer_;
    std::string message;
    udp_server* server_;
};

class udp_server
{
    typedef shared_ptr<udp_session> shared_session;
  public:
    udp_server(asio::io_service& io_service)
        : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
          strand_(io_service)
    {
        receive_session();
    }

  private:
    void receive_session()
    {
        // our session to hold the buffer + endpoint
        auto session = make_shared<udp_session>(this);

        socket_.async_receive_from(
                asio::buffer(session->recv_buffer_), 
                session->remote_endpoint_,
                strand_.wrap(
                    bind(&udp_server::handle_receive, this,
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
        // now, handle the current session on any available pool thread
        socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));

        // immediately accept new datagrams
        receive_session();
    }

    void enqueue_response(shared_session const& session) {
        socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
                strand_.wrap(bind(&udp_session::handle_sent, 
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    udp::socket  socket_;
    asio::strand strand_;

    friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
    if (!error || error == asio::error::message_size)
    {
        message = make_daytime_string(); // let's assume this might be slow

        // let the server coordinate actual IO
        server_->enqueue_response(shared_from_this());
    }
}

int main()
{
    try {
        asio::io_service io_service;
        udp_server server(io_service);

        thread_group group;
        for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
            group.create_thread(bind(&asio::io_service::run, ref(io_service)));

        group.join_all();
    }
    catch (std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
}

Closing thoughts

Interestingly, in most cases you'll see the single-thread version performing just as well, and there's no reason to complicate the design.

Alternatively, you can use a single-threaded io_service dedicated to the IO and use an old fashioned worker pool to do the background processing of the requests if this is indeed the CPU intensive part. Firstly, this simplifies the design, secondly this might improve the throughput on the IO tasks because there is no more need to coordinate the tasks posted on the strand.

Heavy answered 2/11, 2014 at 22:58 Comment(2)
You can see the demo Live On Coliru with 8 concurrent "remote clients" as well. Witness how responses are received out of order.Heavy
socket_.async_send_to may be invoked concurrently as it is executed outside of the strand, violating the thread safety. Consider using a shim function that dispatches udp_server::enqueue_response into the I/O strand.Eastbound
B
3

Since the "suggested edit queue" on the answer by @sehe is full, allow me to submit an update.

  • Replaced ctime() with something thread-safe
  • Updated to latest boost style, e.g. the new boost::bind, and the removal of socket_.get_io_service()
  • Got rid of the using namespace boost, to make it more obvious
  • Call async_send_to() in a thread-safe way (h/t to Tanner Sansbury)
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using boost::asio::ip::udp;
using boost::system::error_code;

static std::string make_daytime_string()
{
    return boost::posix_time::to_simple_string(boost::posix_time::second_clock::local_time());
}

class udp_server; // forward declaration

struct udp_session : boost::enable_shared_from_this<udp_session> {

    udp_session(udp_server* server) : server_(server) {}

    void handle_request(const error_code& error);

    void handle_sent(const error_code& ec, std::size_t) {
        // here response has been sent
        if (ec) {
            std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
        }
    }

    udp::endpoint remote_endpoint_;
    boost::array<char, 100> recv_buffer_;
    std::string message;
    udp_server* server_;
};

class udp_server
{
    typedef boost::shared_ptr<udp_session> shared_session;
  public:
    udp_server(boost::asio::io_service& io_service)
        : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
          strand_(io_service)
    {
        receive_session();
    }

  private:
    void receive_session()
    {
        // our session to hold the buffer + endpoint
        auto session = boost::make_shared<udp_session>(this);

        socket_.async_receive_from(
                boost::asio::buffer(session->recv_buffer_),
                session->remote_endpoint_,
                strand_.wrap(
                    boost::bind(&udp_server::handle_receive, this,
                        session, // keep-alive of buffer/endpoint
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
    }

    void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
        // now, handle the current session on any available pool thread
        boost::asio::post(socket_.get_executor(), boost::bind(&udp_session::handle_request, session, ec));

        // immediately accept new datagrams
        receive_session();
    }

    void enqueue_response(shared_session const& session) {
        // async_send_to() is not thread-safe, so use a strand.
        boost::asio::post(socket_.get_executor(),
            strand_.wrap(boost::bind(&udp_server::enqueue_response_strand, this, session)));
    }

    void enqueue_response_strand(shared_session const& session) {
        socket_.async_send_to(boost::asio::buffer(session->message), session->remote_endpoint_,
                strand_.wrap(boost::bind(&udp_session::handle_sent,
                        session, // keep-alive of buffer/endpoint
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred)));
    }

    udp::socket socket_;
    boost::asio::io_context::strand strand_;

    friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
    if (!error || error == boost::asio::error::message_size)
    {
        message = make_daytime_string(); // let's assume this might be slow
        message += "\n";

        // let the server coordinate actual IO
        server_->enqueue_response(shared_from_this());
    }
}

int main()
{
    try {
        boost::asio::io_service io_service;
        udp_server server(io_service);

        boost::thread_group group;
        for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
            group.create_thread(bind(&boost::asio::io_service::run, boost::ref(io_service)));

        group.join_all();
    }
    catch (std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}
Burdine answered 24/3, 2022 at 20:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.