You only have 1 thread running the IO service. Everything is on an implicit strand (Why do I need strand per connection when using boost::asio?), no need to worry UNTIL you start using a new thread.
The simplest fix, then, would seem to make sure sending the replies happens on the IO service as well:
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
Now if you wanted to be able to run the IO services on multiple threads, you would just make sure that the socket uses a strand executor.
HOWEVER
This doesn't guarantee that you won't see overlapping async_write operations, because the speed at which incoming messages are processed might be higher than the speed in which they are sent. Therefore the customary solution is
Queueing
In my examples I typically call this FIFO queue "outbox_" and I prefer to use deque
for reasons of iterator/reference stability (see Iterator invalidation rules for C++ containers):
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
Demo
Here's a fixed listing with a stub message.h.
It also greatly simplifies the reading/buffer handling by using the existing async_read_until
composed operation, which does everything you had manually written.
Live On Coliru
#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '\0',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit() {
try {
asio::io_context io_context;
server s(io_context, 8989);
io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
int main() { serverInit(); }
When sending a last burst of requests:
printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1
Prints correctly e.g.:
Accepted 127.0.0.1:34862
And the client receivese e.g.:
Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"
BONUS: Adding the strand
Minimal changes:
class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
void serverInit() {
try {
asio::thread_pool io_context;
server s(io_context.get_executor(), 8989);
io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
Live demo:
async_write
has completed before initiating the next one. One solution is to use a queue of messages to write, using strands then means that queue doesn't need to be thread safe – Drinker