How to safely write to a socket from multiple threads?
Asked Answered
B

1

2

I'm using asio (non-boost) to create a TCP server and while my code works it's not done properly because I'm calling asio::async_write from multiple threads. I think I should use strands but the more I read about that the more lost I am.

#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"

using asio::ip::tcp;

class session
    : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket)
        : socket_(std::move(socket))
    {
    }

    void start()
    {
        handler = MessageHandler();
        asio::write(socket_, asio::buffer(handler.initialMessage()));
        do_read();
    }

private:
    void do_read()
    {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(data_, max_length),
            [this, self](std::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    buffer_.append(data_, length);
                    size_t pos;
                    while ((pos = buffer_.find('\0')) != std::string::npos)
                    {
                        std::string message = buffer_.substr(0, pos);
                        buffer_.erase(0, pos + 1);

                        std::thread(&session::process_message, this, message).detach();
                    }

                    do_read();
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Read error: " << ec.message() << '\n';
                }
            });
    }

    void do_write(std::string message)
    {
        auto self(shared_from_this());
        asio::async_write(socket_, asio::buffer(message),
            [this, self](std::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                }
                else if (ec != asio::error::eof)
                {
                    std::cerr << "Write error: " << ec.message() << '\n';
                }
            });
    }

    void process_message(std::string message) {
        std::string response = handler.processMessage(message);
        do_write(response);
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
    std::string buffer_;
    MessageHandler handler;
};

class server
{
public:
    server(asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
        socket_(io_context)
    {
        do_accept();
    }

private:
    void do_accept()
    {
        acceptor_.async_accept(socket_,
            [this](std::error_code ec)
            {
                if (!ec)
                {
                    std::make_shared<session>(std::move(socket_))->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

void serverInit()
{
    try
    {
        asio::io_context io_context;

        server s(io_context, 0);

        io_context.run();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << '\n';
    }
}

Bindery answered 14/2, 2022 at 11:41 Comment(2)
What exactly confuses you about strands? Note that they won't completely solve your problem, they'll just stop multiple threads running at the same time but you need to wait until one async_write has completed before initiating the next one. One solution is to use a queue of messages to write, using strands then means that queue doesn't need to be thread safeDrinker
Most snippets I've found were for Boost and I'm not using it. And finally when I got the strand object it didn't have the wrap method? In the end I still don't know what I don't know. And while strands still confuse me the answer below solved the problem!Bindery
L
3

You only have 1 thread running the IO service. Everything is on an implicit strand (Why do I need strand per connection when using boost::asio?), no need to worry UNTIL you start using a new thread.

The simplest fix, then, would seem to make sure sending the replies happens on the IO service as well:

void process_message(std::string const& message) {
    std::string response = handler.processMessage(message);
    post(socket_.get_executor(),
         std::bind(&session::do_write, shared_from_this(), response));
}

Now if you wanted to be able to run the IO services on multiple threads, you would just make sure that the socket uses a strand executor.

HOWEVER

This doesn't guarantee that you won't see overlapping async_write operations, because the speed at which incoming messages are processed might be higher than the speed in which they are sent. Therefore the customary solution is

Queueing

In my examples I typically call this FIFO queue "outbox_" and I prefer to use deque for reasons of iterator/reference stability (see Iterator invalidation rules for C++ containers):

void do_write(std::string message)
{
    outbox_.push_back(std::move(message)); // assumed on (implicit) strand
    if (outbox_.size() == 1) {
        write_loop();
    }
}

void write_loop() {
    if (outbox_.empty())
        return;

    asio::async_write( //
        socket_, asio::buffer(outbox_.front()),
        [this, self = shared_from_this()] //
        (error_code ec, std::size_t /*length*/) {
            if (!ec) {
                outbox_.pop_front();
                write_loop();
            } else if (ec != asio::error::eof) {
                std::cerr << "Write error: " << ec.message() << '\n';
            }
        });
}

Demo

Here's a fixed listing with a stub message.h.

It also greatly simplifies the reading/buffer handling by using the existing async_read_until composed operation, which does everything you had manually written.

Live On Coliru

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
    #include "messages.h"
#else // mock messages.h
    #include <boost/lexical_cast.hpp>
    #include <iomanip>
    struct MessageHandler {
        std::string initialMessage() const { return "Initial\n"; }
        std::string processMessage(std::string const& req) const {
            return "Processed " +
                boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
        }
    };
#endif

namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start() {
        handler = MessageHandler();
        asio::write(socket_, asio::buffer(handler.initialMessage()));
        do_read();
    }

  private:
    void do_read() {
        async_read_until(
            socket_, asio::dynamic_buffer(buffer_), '\0',
            [this, self = shared_from_this()] //
            (error_code ec, std::size_t length) {
                if (!ec) {
                    std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
                    buffer_.erase(0, length);

                    do_read();
                } else if (ec != asio::error::eof) {
                    std::cerr << "Read error: " << ec.message() << std::endl;
                }
            });
    }

    void do_write(std::string message)
    {
        outbox_.push_back(std::move(message)); // assumed on (implicit) strand
        if (outbox_.size() == 1) {
            write_loop();
        }
    }

    void write_loop() {
        if (outbox_.empty())
            return;

        asio::async_write( //
            socket_, asio::buffer(outbox_.front()),
            [this, self = shared_from_this()] //
            (error_code ec, std::size_t /*length*/) {
                if (!ec) {
                    outbox_.pop_front();
                    write_loop();
                } else if (ec != asio::error::eof) {
                    std::cerr << "Write error: " << ec.message() << std::endl;
                }
            });
    }

    void process_message(std::string const& message) {
        std::string response = handler.processMessage(message);
        // dispatch/post to executor because we are on a different thread
        post(socket_.get_executor(),
             std::bind(&session::do_write, shared_from_this(), response));
    }

    tcp::socket socket_;
    std::string buffer_;
    std::deque<std::string> outbox_;
    MessageHandler handler;
};

class server
{
  public:
    server(asio::io_context& io_context, unsigned short port)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
        socket_(io_context)
    {
        do_accept();
    }

  private:
    void do_accept()
    {
        acceptor_.async_accept(socket_, [this](error_code ec) {
            if (!ec) {
                std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
                std::make_shared<session>(std::move(socket_))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

void serverInit() {
    try {
        asio::io_context io_context;

        server s(io_context, 8989);

        io_context.run();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

int main() { serverInit(); }

When sending a last burst of requests:

printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1

Prints correctly e.g.:

Accepted 127.0.0.1:34862

And the client receivese e.g.:

Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"

BONUS: Adding the strand

Minimal changes:

class server
{
public:
  server(asio::any_io_executor ex, unsigned short port)
      : acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
      do_accept();
  }

private:
    void do_accept()
    {
        acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
            if (!ec) {
                std::cout << "Accepted " << s.remote_endpoint() << std::endl;
                std::make_shared<session>(std::move(s))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

void serverInit() {
    try {
        asio::thread_pool io_context;

        server s(io_context.get_executor(), 8989);

        io_context.join();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

Live demo:

enter image description here

Lodging answered 14/2, 2022 at 14:6 Comment(4)
Thank you, didn't expect this detailed answer! One question, in your code you have dynbuffer_ but don't use it. Was it supposed to be used in place of asio::dynamic_buffer(buffer_)? Are there performance differences? I completely misunderstood how async_read_until works. Now after re-reading the docs I get it! Also thank you for using std::bind instead of boost::bind. didn't know they are interchangeable and I don't use Boost.Bindery
Good spot. Yeah that was a leftover. In some cases it could be beneficial, but mainly to make your code more generic (so you could have said dynbuffer_.consume(length) instead of buffer_.erase(0, length), with the same effect. It's more generic because you could then replace dynbuffer_ with anything satisfying the dytnamic buffer concept (e.g. asio::streambuf)). I ended up removing it because I thought the manual type declaration was too noisy to warrant the genericity.Lodging
@Lodging printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1 seems not right. It should be \n other than \0 to make sure the message is sent out one by one. I tried it with ubuntu&bash. How do you think about it?Aircool
@Aircool They're different things: buffering versus framing. The \0 is for message framing. \n happens to control buffering (depending on the implementation of netcat and the operating system). My test did not concern itself with when to flush, it just sends 100 messages with the framing expected in the application protocol.Lodging

© 2022 - 2024 — McMap. All rights reserved.