boost asio async_write : how to not interleaving async_write calls?
Asked Answered
V

2

32

Here's my implementation :

  • Client A send a message for Client B
  • Server process the message by async_read the right amount of data and will wait for new data from Client A (in Order not to block Client A)
  • Afterwards Server will process the information (probably do a mysql query) and then send the message to Client B with async_write.

The problem is, if Client A send message really fast, async_writes will interleave before the previous async_write handler is called.

Is there a simple way to avoid this problem ?

EDIT 1 : If a Client C sends a message to Client B just after Client A, the same issue should appear...

EDIT 2 : This would work ? because it seems to block, I don't know where...

 namespace structure {                                                              
  class User {                                                                     
  public:                                                                          
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
      m_socket(io_service, context), m_strand(io_service), is_writing(false) {}    

    ssl_socket& getSocket() {                                                      
      return m_socket;                                                             
    }                                                                              

    boost::asio::strand getStrand() {                                              
      return m_strand;                                                             
    }                                                                              

    void push(std::string str) {                                                   
      m_strand.post(boost::bind(&structure::User::strand_push, this, str));        
    }                                                                              

    void strand_push(std::string str) {                                            

      std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;       
      m_queue.push(str);                                                           
      if (!is_writing) {                                                           
        write();                                                                   
        std::cout << "going to write" << std::endl;                                
      }                                                                            
      std::cout << "Already writing" << std::endl;                                 
    }                                                                              

    void write() {                                                                 
      std::cout << "writing" << std::endl;                                         
      is_writing = true;                                                           
      std::string str = m_queue.front();                                           
      boost::asio::async_write(m_socket,                                           
                               boost::asio::buffer(str.c_str(), str.size()),       
                               boost::bind(&structure::User::sent, this)           
                               );                                                  
    }                                                                              

    void sent() {                                                                  
      std::cout << "sent" << std::endl;                                            
      m_queue.pop();                                                               
      if (!m_queue.empty()) {                                                      
        write();                                                                   
        return;                                                                    
      }                                                                            
      else                                                                         
        is_writing = false;                                                        
      std::cout << "done sent" << std::endl;                                       
    }                                          

  private:                                     
    ssl_socket          m_socket;              
    boost::asio::strand m_strand;              
    std::queue<std::string>     m_queue;       
    bool                        is_writing;    
  };                                           
}                                              

#endif
Versicle answered 13/10, 2011 at 13:16 Comment(1)
Note that async write is much less valuable than async read. Most writes are virtually instant as the OS will locally buffer the data. Reads on the other hand may block waiting for the remote side, and there's nothing you can do about that locally. Synchronous write is therefore a viable way of implementing sequencing. This also solves the issue of data ownership - the code above is incorrect as str is destroyed when write() returns, which may be before boost::asio_async_write() accesses the buffer.Ormuz
D
52

Is there a simple way to avoid this problem ?

Yes, maintain an outgoing queue for each client. Inspect the queue size in the async_write completion handler, if non-zero, start another async_write operation. Here is a sample

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <deque>
#include <iostream>
#include <string>

class Connection
{
public:
    Connection(
            boost::asio::io_service& io_service
            ) :
        _io_service( io_service ),
        _strand( _io_service ),
        _socket( _io_service ),
        _outbox()
    {

    }

    void write( 
            const std::string& message
            )
    {
        _strand.post(
                boost::bind(
                    &Connection::writeImpl,
                    this,
                    message
                    )
                );
    }

private:
    void writeImpl(
            const std::string& message
            )
    {
        _outbox.push_back( message );
        if ( _outbox.size() > 1 ) {
            // outstanding async_write
            return;
        }

        this->write();
    }

    void write()
    {
        const std::string& message = _outbox[0];
        boost::asio::async_write(
                _socket,
                boost::asio::buffer( message.c_str(), message.size() ),
                _strand.wrap(
                    boost::bind(
                        &Connection::writeHandler,
                        this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred
                        )
                    )
                );
    }

    void writeHandler(
            const boost::system::error_code& error,
            const size_t bytesTransferred
            )
    {
        _outbox.pop_front();

        if ( error ) {
            std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
            return;
        }

        if ( !_outbox.empty() ) {
            // more messages to send
            this->write();
        }
    }


private:
    typedef std::deque<std::string> Outbox;

private:
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::ip::tcp::socket _socket;
    Outbox _outbox;
};

int
main()
{
    boost::asio::io_service io_service;
    Connection foo( io_service );
}

some key points

  • the boost::asio::io_service::strand protects access to Connection::_outbox
  • a handler is dispatched from Connection::write() since it is public

it wasn't obvious to me if you were using similar practices in the example in your question since all methods are public.

Dialectic answered 13/10, 2011 at 15:53 Comment(15)
I have tried this solution, thing is I have a single io_service with multiple thread running run(), thing is even with using strand.post to push data on queue it seems to segfault because it is called from 2 different threads... any idea why ?Versicle
@Versicle that sounds like a separate question to me. You likely have implemented your logic incorrectly, it can be easy to do with strands and multiple threads. The use of a queue is an appropriate solution for your original question.Dialectic
What would you use to know when you should pop data from queue ?Versicle
@Versicle I provided a sample implementation in my answer.Dialectic
Actually I did not took the time to declare them private when I have writted it on stackoverflow, but yeah that was the idea ! Thank anyway +1Versicle
Perhaps I'm misunderstanding something from the strand.post() documentation, but that would seem to indicate that post() returns immediately. That being the case, doesn't passing the string argument here as a const & potentially result in a dangling reference in writeImpl()? It would seem to me that due to the post(), writeImpl() will run not immediately, but rather, in the near future, and by that time the original string may have been destroyed on the calling thread.Pianist
@AllanBazinet read the documentation for boost::bind notably, the sentence: The arguments that bind takes are copied and held internally by the returned function objectDialectic
Using strand::dispatch() instead of strand::post() in the public write() method could provide marginally better performance. The code is correct as-is; perhaps dispatch() just hadn't been implemented when the answer was originally written.Danyel
@SamMiller because 'message' is copied and held by the function object returned by boost::bind, and "The strand will make a copy of the handler object as required" during strand::post, is the deque '_outbox' really necessary then? Isn't the strand itself a queue?Usk
Using async_write in this way is inefficient because it will work hard to send every single byte before finishing, which means the last packet will be smaller than it could have been if you just used async_write_some in a loop, supplying all of your buffers at once as a sequence.Intercept
Use strand to protect data is the keyChaperon
Thank you @SamMiller. I tweaked your implementation to work with boost 1.67Susian
@Usk I have the same question.Adulterate
What does the // outstanding async_write comment stands for?Anemochore
impeccable answer except when I try it (apart from being told the strand methods are deprecated) it doesn't work. Do I need to 'start' something running?Leek
I
10

Just trying to improve Sam's great answer. The improvement points are:

  • async_write tries hard to send every single byte from the buffer(s) before completing, which means you should supply all the input data that you have to the write operation, otherwise the framing overhead may increase due to TCP packets being smaller than they could have been.

  • asio::streambuf, while being very convenient to use, is not zero-copy. The example below demonstrates a zero-copy approach: keep the input data chunks where they are and use a scatter/gather overload of async_write that takes in a sequence of input buffers (which are just pointers to the actual input data).

Full source code:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

using namespace std::chrono_literals;
using boost::asio::ip::tcp;

class Server
{
  class Connection : public std::enable_shared_from_this<Connection>
  {
    friend class Server;
    void ProcessCommand(const std::string& cmd) {
      if (cmd == "stop") {
        server_.Stop();
        return;
      }
      if (cmd == "") {
        Close();
        return;
      }
      std::thread t([this, self = shared_from_this(), cmd] {
        for (int i = 0; i < 30; ++i) {
          Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n");
        }
        server_.io_service_.post([this, self] {
          DoReadCmd();
        });
      });
      t.detach();
    }

    void DoReadCmd() {
      read_timer_.expires_from_now(server_.read_timeout_);
      read_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Read timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) {
        read_timer_.cancel();
        if (!ec) {
          const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data());
          std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1));
          buf_in_.consume(bytes_read);
          ProcessCommand(cmd);
        }
        else {
          Close();
        }
      });
    }

    void DoWrite() {
      active_buffer_ ^= 1; // switch buffers
      for (const auto& data : buffers_[active_buffer_]) {
        buffer_seq_.push_back(boost::asio::buffer(data));
      }
      write_timer_.expires_from_now(server_.write_timeout_);
      write_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Write timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) {
        write_timer_.cancel();
        std::lock_guard<std::mutex> lock(buffers_mtx_);
        buffers_[active_buffer_].clear();
        buffer_seq_.clear();
        if (!ec) {
          std::cout << "Wrote " << bytes_transferred << " bytes\n";
          if (!buffers_[active_buffer_ ^ 1].empty()) // have more work
            DoWrite();
        }
        else {
          Close();
        }
      });
    }
    bool Writing() const { return !buffer_seq_.empty(); }

    Server& server_;
    boost::asio::streambuf buf_in_;
    std::mutex buffers_mtx_;
    std::vector<std::string> buffers_[2]; // a double buffer
    std::vector<boost::asio::const_buffer> buffer_seq_;
    int active_buffer_ = 0;
    bool closing_ = false;
    bool closed_ = false;
    boost::asio::deadline_timer read_timer_, write_timer_;
    tcp::socket socket_;
  public:
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) {
    }

    void Start() {
      socket_.set_option(tcp::no_delay(true));
      DoReadCmd();
    }

    void Close() {
      closing_ = true;
      if (!Writing())
        Shutdown();
    }

    void Shutdown() {
      if (!closed_) {
        closing_ = closed_ = true;
        boost::system::error_code ec;
        socket_.shutdown(tcp::socket::shutdown_both, ec);
        socket_.close();
        server_.active_connections_.erase(shared_from_this());
      }
    }

    void Write(std::string&& data) {
      std::lock_guard<std::mutex> lock(buffers_mtx_);
      buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
      if (!Writing())
        DoWrite();
    }

  };

  void DoAccept() {
    if (acceptor_.is_open()) {
      auto session = std::make_shared<Connection>(*this);
      acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) {
        if (!ec) {
          active_connections_.insert(session);
          session->Start();
        }
        DoAccept();
      });
    }
  }

  boost::asio::io_service io_service_;
  tcp::acceptor acceptor_;
  std::unordered_set<std::shared_ptr<Connection>> active_connections_;
  const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30);
  const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30);

public:
  Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { }

  void Run() {
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n";
    DoAccept();
    io_service_.run();
  }

  void Stop() {
    acceptor_.close();
    {
      std::vector<std::shared_ptr<Connection>> sessionsToClose;
      copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose));
      for (auto& s : sessionsToClose)
        s->Shutdown();
    }
    active_connections_.clear();
    io_service_.stop();
  }

};

int main() {
  try {
    Server srv(8888);
    srv.Run();
  }
  catch (const std::exception& e) {
    std::cerr << "Error: " << e.what() << "\n";
  }
}
Intercept answered 31/12, 2017 at 14:18 Comment(1)
This model assumes you are doing reads. What if you are just posting stuff? And that these are messages with headers so you do need to send them as single messages.Leek

© 2022 - 2024 — McMap. All rights reserved.