boost::asio::async_write - ensure only one outstanding call
Asked Answered
S

4

11

According to the documentation:

"The program must ensure that the stream performs no other write operations (such as async_write, the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes."

Does this mean, I cannot call boost::asio::async_write a second time until the handler for the first is called? How does one achieve this and still be asynchronous?

If I have a method Send:

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred);
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback);
}

Do I have to change it to something like:

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    // Issue a send
    std::lock_guard<std::mutex> lock(m_numPostedSocketIOMutex);
    ++m_numPostedSocketIO;

    m_numPostedSocketIOConditionVariable.wait(lock, [this]() {return m_numPostedSocketIO == 0; });

    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred);
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback);
}

and if so, then aren't I blocking after the first call again?

Singer answered 20/12, 2017 at 19:24 Comment(1)
"The usual approach to fixing this is to have a queue of outgoing buffers, instead of a single one, and send them in succession, e.g. boost asio async_write : how to not interleaving async_write calls?" (quoted from this previously deleted answer)Fossick
S
2

Here is a complete, compilable, and tested, example, that I researched and got to work through trial and error after reading the answer and subsequent edits from RustyX.

Connection.h

#pragma once

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:

    typedef std::shared_ptr<Connection> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);

    //
    static std::string ErrorCodeToString(const boost::system::error_code & errorCode);

    Connection(const Connection &) = delete;
    Connection(Connection &&) = delete;
    Connection & operator = (const Connection &) = delete;
    Connection & operator = (Connection &&) = delete;
    ~Connection();

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start();
    void Stop();

    void Send(const std::vector<char> & data);

private:

    static size_t                                           m_nextClientId;

    size_t                                                  m_clientId;
    ConnectionManager *                                     m_owner;
    boost::asio::ip::tcp::socket                            m_socket;
    std::atomic<bool>                                       m_stopped;
    boost::asio::streambuf                                  m_receiveBuffer;
    mutable std::mutex                                      m_sendMutex;
    std::vector<char>                                       m_sendBuffers[2];         // Double buffer
    int                                                     m_activeSendBufferIndex;
    bool                                                    m_sending;

    std::vector<char>                                       m_allReadData;            // Strictly for test purposes

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);

    void DoReceive();
    void DoSend();
};

//--------------------------------------------------------------------

Connection.cpp

#include "Connection.h"
#include "ConnectionManager.h"

#include <boost/bind.hpp>

#include <algorithm>
#include <cstdlib>

//--------------------------------------------------------------------
size_t Connection::m_nextClientId(0);

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------------------------------------
std::string Connection::ErrorCodeToString(const boost::system::error_code & errorCode)
{
    std::ostringstream debugMsg;
    debugMsg << " Error Category: " << errorCode.category().name() << ". "
             << " Error Message: "  << errorCode.message() << ". ";

    // IMPORTANT - These comparisons only work if you dynamically link boost libraries
    //             Because boost chose to implement boost::system::error_category::operator == by comparing addresses
    //             The addresses are different in one library and the other when statically linking.
    //
    // We use make_error_code macro to make the correct category as well as error code value.
    // Error code value is not unique and can be duplicated in more than one category.
    if (errorCode == boost::asio::error::make_error_code(boost::asio::error::connection_refused))
    {
        debugMsg << " (Connection Refused)";
    }
    else if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
    {
        debugMsg << " (Remote host has disconnected)";
    }
    else
    {
        debugMsg << " (boost::system::error_code has not been mapped to a meaningful message)";
    }

    return debugMsg.str();
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
    :
    m_clientId                          (m_nextClientId++)
  , m_owner                             (connectionManager)
  , m_socket                            (std::move(socket))
  , m_stopped                           (false)
  , m_receiveBuffer                     ()
  , m_sendMutex                         ()
  , m_sendBuffers                       ()
  , m_activeSendBufferIndex             (0)
  , m_sending                           (false)
  , m_allReadData                       ()
{
    printf("Client connection with id %zd has been created.", m_clientId);
}

//--------------------------------------------------------------------
Connection::~Connection()
{
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
    printf("Client connection with id %zd has been destroyed.", m_clientId);
}

//--------------------------------------------------------------------
void Connection::Start()
{
    DoReceive();
}

//--------------------------------------------------------------------
void Connection::Stop()
{
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
    // as a consequence of the outstanding async receive call that gets posted every time we receive.
    // Once we stop posting another receive in the receive handler and once our owner release any references to
    // us, we will get destroyed.
    m_stopped = true;
    m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    std::lock_guard<std::mutex> lock(m_sendMutex);

    // Append to the inactive buffer
    std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex ^ 1];
    inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end());

    //
    DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend()
{
    // Check if there is an async send in progress
    // An empty active buffer indicates there is no outstanding send
    if (m_sendBuffers[m_activeSendBufferIndex].empty())
    {
        m_activeSendBufferIndex ^= 1;

        std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex];
        auto self(shared_from_this());

        boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer),
            [self](const boost::system::error_code & errorCode, size_t bytesTransferred)
            {
                std::lock_guard<std::mutex> lock(self->m_sendMutex);

                self->m_sendBuffers[self->m_activeSendBufferIndex].clear();

                if (errorCode)
                {
                    printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());

                    // An error occurred
                    // We do not stop or close on sends, but instead let the receive error out and then close
                    return;
                }

                // Check if there is more to send that has been queued up on the inactive buffer,
                // while we were sending what was on the active buffer
                if (!self->m_sendBuffers[self->m_activeSendBufferIndex ^ 1].empty())
                {
                    self->DoSend();
                }
            });
    }
}

//--------------------------------------------------------------------
void Connection::DoReceive()
{
    auto self(shared_from_this());

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
        [self](const boost::system::error_code & errorCode, size_t bytesRead)
        {
            if (errorCode)
            {
                // Check if the other side hung up
                if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
                {
                    // This is not really an error. The client is free to hang up whenever they like
                    printf("Client %zd has disconnected.", self->m_clientId);
                }
                else
                {
                    printf("An error occured while attemping to receive data from client id %zd. Error Code: %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());
                }

                // Notify our masters that we are ready to be destroyed
                self->m_owner->OnConnectionClosed(self);

                // An error occured
                return;
            }

            // Grab the read data
            std::istream stream(&self->m_receiveBuffer);
            std::string data;
            std::getline(stream, data, '#');
            data += "#";

            printf("Received data from client %zd: %s", self->m_clientId, data.c_str());

            // Issue the next receive
            if (!self->m_stopped)
            {
                self->DoReceive();
            }
        });
}

//--------------------------------------------------------------------

ConnectionManager.h

#pragma once

#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager
{
public:

    ConnectionManager(unsigned port, size_t numThreads);
    ConnectionManager(const ConnectionManager &) = delete;
    ConnectionManager(ConnectionManager &&) = delete;
    ConnectionManager & operator = (const ConnectionManager &) = delete;
    ConnectionManager & operator = (ConnectionManager &&) = delete;
    ~ConnectionManager();

    void Start();
    void Stop();

    void OnConnectionClosed(Connection::SharedPtr connection);

protected:

    boost::asio::io_service            m_io_service;
    boost::asio::ip::tcp::acceptor     m_acceptor;
    boost::asio::ip::tcp::socket       m_listenSocket;
    std::vector<std::thread>           m_threads;

    mutable std::mutex                 m_connectionsMutex;
    std::vector<Connection::SharedPtr> m_connections;

    boost::asio::deadline_timer        m_timer;

    void IoServiceThreadProc();

    void DoAccept();
    void DoTimer();
};

//--------------------------------------------------------------------

ConnectionManager.cpp

#include "ConnectionManager.h"

#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <system_error>
#include <cstdio>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
    :
    m_io_service  ()
  , m_acceptor    (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
  , m_listenSocket(m_io_service)
  , m_threads     (numThreads)
  , m_timer       (m_io_service)
{
}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
    Stop();
}

//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
    if (m_io_service.stopped())
    {
        m_io_service.reset();
    }

    DoAccept();

    for (auto & thread : m_threads)
    {
        if (!thread.joinable())
        {
            thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
        }
    }

    DoTimer();
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
    {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
        m_connections.clear();
    }

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
    //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
    m_io_service.stop();

    for (auto & thread : m_threads)
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
    try
    {
        // Log that we are starting the io_service thread
        {
            printf("io_service socket thread starting.");
        }

        // Run the asynchronous callbacks from the socket on this thread
        // Until the io_service is stopped from another thread
        m_io_service.run();
    }
    catch (std::system_error & e)
    {
        printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
    }
    catch (std::exception & e)
    {
        printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
    }
    catch (...)
    {
        printf("Unhandled exception caught in io_service socket thread.");
    }

    {
        printf("io_service socket thread exiting.");
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
    m_acceptor.async_accept(m_listenSocket,
        [this](const boost::system::error_code errorCode)
        {
            if (errorCode)
            {
                printf("An error occured while attemping to accept connections. Error Code: %s", Connection::ErrorCodeToString(errorCode).c_str());
                return;
            }

            // Create the connection from the connected socket
            std::lock_guard<std::mutex> lock(m_connectionsMutex);
            Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
            m_connections.push_back(connection);
            connection->Start();

            DoAccept();
        });
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
    std::lock_guard<std::mutex> lock(m_connectionsMutex);

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
    if (itConnection != m_connections.end())
    {
        m_connections.erase(itConnection);
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
    if (!m_io_service.stopped())
    {
        // Send messages every second
        m_timer.expires_from_now(boost::posix_time::seconds(30));
        m_timer.async_wait(
            [this](const boost::system::error_code & errorCode)
            {
                std::lock_guard<std::mutex> lock(m_connectionsMutex);
                for (auto connection : m_connections)
                {
                    connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
                }

                DoTimer();
            });
    }
}

main.cpp

#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main()
{
    // Start up the server
    ConnectionManager connectionManager(5000, 2);
    connectionManager.Start();

    // Pretend we are doing other things or just waiting for shutdown
    std::this_thread::sleep_for(std::chrono::minutes(5));

    // Stop the server
    connectionManager.Stop();

    return 0;
}
Singer answered 29/12, 2017 at 17:46 Comment(0)
A
4

The async in async_write() refers to the fact that the function returns immediately while the writing happens in background. There should still be only one outstanding write at any given time.

You need to use a buffer if you have an asynchronous producer to set aside the new chunk of data until the currently active write completes, then issue a new async_write in the completion handler.

That is, Connection::Send must only call async_write once to kick off the process, in subsequent calls it should instead buffer its data, which will be picked up in the completion handler of the currently executing async_write.

For performance reasons you want to avoid copying the data into the buffer, and instead append the new chunk to a list of buffers and use the scatter-gather overload of async_write that accepts a ConstBufferSequence. It is also possible to use one large streambuf as a buffer and append directly into it.

Of course the buffer needs to be synchronized unless both Connection::Send and the io_service run in the same thread. An empty buffer can be reused as an indication that no async_write is in progress.

Here's some code to illustrate what I mean:

struct Connection
{
    void Connection::Send(std::vector<char>&& data)
    {
        std::lock_guard<std::mutex> lock(buffer_mtx);
        buffers[active_buffer ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
        doWrite();
    }

private:

    void Connection::doWrite()
    {
        if (buffer_seq.empty()) { // empty buffer sequence == no writing in progress
            active_buffer ^= 1; // switch buffers
            for (const auto& data : buffers[active_buffer]) {
                buffer_seq.push_back(boost::asio::buffer(data));
            }
            boost::asio::async_write(m_socket, buffer_seq, [this] (const boost::system::error_code& ec, size_t bytes_transferred) {
                std::lock_guard<std::mutex> lock(buffer_mtx);
                buffers[active_buffer].clear();
                buffer_seq.clear();
                if (!ec) {
                    if (!buffers[active_buffer ^ 1].empty()) { // have more work
                        doWrite();
                    }
                }
            });
        }
    }

    std::mutex buffer_mtx;
    std::vector<std::vector<char>> buffers[2]; // a double buffer
    std::vector<boost::asio::const_buffer> buffer_seq;
    int active_buffer = 0;
    . . .
};

The complete working source can be found in this answer.

Amazon answered 20/12, 2017 at 21:9 Comment(10)
That's a lot of talk about fancy buffers and locking mechanisms around them :) What does it look like? I've got no idea how to use asio::streambuf, or it I am allowed to modify it after posting a call, or if I need another seperate buffer of my own with locks around it and then grab things from it to put into the streambuf when I call send yet again from my handler. Also, I have no idea what is meant by an input sequence and an output sequence. It contains a contiguous array of characters, no? What is the in vs the out? Sure would be neat if they had any of this in their examples!Singer
In the code you posted after edit, if I follow correctly, we are allocating new buffers every time send gets called after an async send gets posted. However, we are accumulating on the last buffer that was allocated until the aync send completes. It is most likely, also allocating internal space when the emplace_back is called (because that forwards arguments to constructor of const_buffer). I think I see how this is working, but isn't that expensive performance wise?Singer
Almost got it all ready to test in my actual demo project. However, I am getting an error on the line 'buffers->emplace_back(data);' that says, "'boost::asio::const_buffer::const_buffer(boost::asio::const_buffer &&)': cannot convert argument 1 from 'const std::vector<char,std::allocator<char>>' to 'const boost::asio::mutable_buffer &'"Singer
The code is just to demonstrate the idea, is definitely not the most optimal. I've just updated it to use a double buffer. The most efficient way though would be to use a circular buffer with an async_write_some loop.Amazon
Unfortunately, after I implemented this and started testing it, I am still getting errors. I thought maybe it was unrelated, so I asked another question here: https://mcmap.net/q/1019564/-boost-asio-double-buffering/… and it seems that at least one person is indicating that the vector of buffers do not contain their own copy of the data, but instead that I must maintain the lifetime of the input vector<char> that was a parameter in Send. Now I am more confused. Are the asio buffers that are members not the storage?Singer
Yes the asio buffer is non-owning. But you should also avoid copying if at all possible. So I've edited the answer to have the sender own the data, but do a move instead. So call it as Send(std::move(data)); data.clear();.Amazon
This latest listing does not work either. I appreciate the help and all, but can we please take the time to get an example that compiles and works, not only for my sake, but for the sake of others in the future?Singer
Got it to work and posted in answer below for people looking in the future. Comment there is anything is incorrect.Singer
"does not work" isn't a constructive feedback and SO isn't a code writing service. I think I've provided more than sufficient information to answer the question. Nevertheless, the code I showed works. See the link to the full example.Amazon
"does not work, because it will not compile" is about as constructive as it gets. I gave you credit in my answer after waiting for several days after alerting you that it wasn't working. No, SO is not a code writing service, but answers should compile. Because SO is not a code writing service, I took what information I could gleam from your answer and created my own working example to assist others in the future who face the same roadblock that I faced. I thank you for your assistance.Singer
P
3

Yes you need to wait for completion handler before calling async_write again. Are you sure you'll be blocked? Of course it depends on how fast you generate your data, but even if yes there's no way to send it faster than your network can handle it. If it's really an issue consider sending bigger chunks.

Plenish answered 20/12, 2017 at 19:47 Comment(0)
S
2

Here is a complete, compilable, and tested, example, that I researched and got to work through trial and error after reading the answer and subsequent edits from RustyX.

Connection.h

#pragma once

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:

    typedef std::shared_ptr<Connection> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);

    //
    static std::string ErrorCodeToString(const boost::system::error_code & errorCode);

    Connection(const Connection &) = delete;
    Connection(Connection &&) = delete;
    Connection & operator = (const Connection &) = delete;
    Connection & operator = (Connection &&) = delete;
    ~Connection();

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start();
    void Stop();

    void Send(const std::vector<char> & data);

private:

    static size_t                                           m_nextClientId;

    size_t                                                  m_clientId;
    ConnectionManager *                                     m_owner;
    boost::asio::ip::tcp::socket                            m_socket;
    std::atomic<bool>                                       m_stopped;
    boost::asio::streambuf                                  m_receiveBuffer;
    mutable std::mutex                                      m_sendMutex;
    std::vector<char>                                       m_sendBuffers[2];         // Double buffer
    int                                                     m_activeSendBufferIndex;
    bool                                                    m_sending;

    std::vector<char>                                       m_allReadData;            // Strictly for test purposes

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);

    void DoReceive();
    void DoSend();
};

//--------------------------------------------------------------------

Connection.cpp

#include "Connection.h"
#include "ConnectionManager.h"

#include <boost/bind.hpp>

#include <algorithm>
#include <cstdlib>

//--------------------------------------------------------------------
size_t Connection::m_nextClientId(0);

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------------------------------------
std::string Connection::ErrorCodeToString(const boost::system::error_code & errorCode)
{
    std::ostringstream debugMsg;
    debugMsg << " Error Category: " << errorCode.category().name() << ". "
             << " Error Message: "  << errorCode.message() << ". ";

    // IMPORTANT - These comparisons only work if you dynamically link boost libraries
    //             Because boost chose to implement boost::system::error_category::operator == by comparing addresses
    //             The addresses are different in one library and the other when statically linking.
    //
    // We use make_error_code macro to make the correct category as well as error code value.
    // Error code value is not unique and can be duplicated in more than one category.
    if (errorCode == boost::asio::error::make_error_code(boost::asio::error::connection_refused))
    {
        debugMsg << " (Connection Refused)";
    }
    else if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
    {
        debugMsg << " (Remote host has disconnected)";
    }
    else
    {
        debugMsg << " (boost::system::error_code has not been mapped to a meaningful message)";
    }

    return debugMsg.str();
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
    :
    m_clientId                          (m_nextClientId++)
  , m_owner                             (connectionManager)
  , m_socket                            (std::move(socket))
  , m_stopped                           (false)
  , m_receiveBuffer                     ()
  , m_sendMutex                         ()
  , m_sendBuffers                       ()
  , m_activeSendBufferIndex             (0)
  , m_sending                           (false)
  , m_allReadData                       ()
{
    printf("Client connection with id %zd has been created.", m_clientId);
}

//--------------------------------------------------------------------
Connection::~Connection()
{
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
    printf("Client connection with id %zd has been destroyed.", m_clientId);
}

//--------------------------------------------------------------------
void Connection::Start()
{
    DoReceive();
}

//--------------------------------------------------------------------
void Connection::Stop()
{
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
    // as a consequence of the outstanding async receive call that gets posted every time we receive.
    // Once we stop posting another receive in the receive handler and once our owner release any references to
    // us, we will get destroyed.
    m_stopped = true;
    m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    std::lock_guard<std::mutex> lock(m_sendMutex);

    // Append to the inactive buffer
    std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex ^ 1];
    inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end());

    //
    DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend()
{
    // Check if there is an async send in progress
    // An empty active buffer indicates there is no outstanding send
    if (m_sendBuffers[m_activeSendBufferIndex].empty())
    {
        m_activeSendBufferIndex ^= 1;

        std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex];
        auto self(shared_from_this());

        boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer),
            [self](const boost::system::error_code & errorCode, size_t bytesTransferred)
            {
                std::lock_guard<std::mutex> lock(self->m_sendMutex);

                self->m_sendBuffers[self->m_activeSendBufferIndex].clear();

                if (errorCode)
                {
                    printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());

                    // An error occurred
                    // We do not stop or close on sends, but instead let the receive error out and then close
                    return;
                }

                // Check if there is more to send that has been queued up on the inactive buffer,
                // while we were sending what was on the active buffer
                if (!self->m_sendBuffers[self->m_activeSendBufferIndex ^ 1].empty())
                {
                    self->DoSend();
                }
            });
    }
}

//--------------------------------------------------------------------
void Connection::DoReceive()
{
    auto self(shared_from_this());

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
        [self](const boost::system::error_code & errorCode, size_t bytesRead)
        {
            if (errorCode)
            {
                // Check if the other side hung up
                if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
                {
                    // This is not really an error. The client is free to hang up whenever they like
                    printf("Client %zd has disconnected.", self->m_clientId);
                }
                else
                {
                    printf("An error occured while attemping to receive data from client id %zd. Error Code: %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());
                }

                // Notify our masters that we are ready to be destroyed
                self->m_owner->OnConnectionClosed(self);

                // An error occured
                return;
            }

            // Grab the read data
            std::istream stream(&self->m_receiveBuffer);
            std::string data;
            std::getline(stream, data, '#');
            data += "#";

            printf("Received data from client %zd: %s", self->m_clientId, data.c_str());

            // Issue the next receive
            if (!self->m_stopped)
            {
                self->DoReceive();
            }
        });
}

//--------------------------------------------------------------------

ConnectionManager.h

#pragma once

#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager
{
public:

    ConnectionManager(unsigned port, size_t numThreads);
    ConnectionManager(const ConnectionManager &) = delete;
    ConnectionManager(ConnectionManager &&) = delete;
    ConnectionManager & operator = (const ConnectionManager &) = delete;
    ConnectionManager & operator = (ConnectionManager &&) = delete;
    ~ConnectionManager();

    void Start();
    void Stop();

    void OnConnectionClosed(Connection::SharedPtr connection);

protected:

    boost::asio::io_service            m_io_service;
    boost::asio::ip::tcp::acceptor     m_acceptor;
    boost::asio::ip::tcp::socket       m_listenSocket;
    std::vector<std::thread>           m_threads;

    mutable std::mutex                 m_connectionsMutex;
    std::vector<Connection::SharedPtr> m_connections;

    boost::asio::deadline_timer        m_timer;

    void IoServiceThreadProc();

    void DoAccept();
    void DoTimer();
};

//--------------------------------------------------------------------

ConnectionManager.cpp

#include "ConnectionManager.h"

#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <system_error>
#include <cstdio>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
    :
    m_io_service  ()
  , m_acceptor    (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
  , m_listenSocket(m_io_service)
  , m_threads     (numThreads)
  , m_timer       (m_io_service)
{
}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
    Stop();
}

//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
    if (m_io_service.stopped())
    {
        m_io_service.reset();
    }

    DoAccept();

    for (auto & thread : m_threads)
    {
        if (!thread.joinable())
        {
            thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
        }
    }

    DoTimer();
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
    {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
        m_connections.clear();
    }

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
    //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
    m_io_service.stop();

    for (auto & thread : m_threads)
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
    try
    {
        // Log that we are starting the io_service thread
        {
            printf("io_service socket thread starting.");
        }

        // Run the asynchronous callbacks from the socket on this thread
        // Until the io_service is stopped from another thread
        m_io_service.run();
    }
    catch (std::system_error & e)
    {
        printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
    }
    catch (std::exception & e)
    {
        printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
    }
    catch (...)
    {
        printf("Unhandled exception caught in io_service socket thread.");
    }

    {
        printf("io_service socket thread exiting.");
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
    m_acceptor.async_accept(m_listenSocket,
        [this](const boost::system::error_code errorCode)
        {
            if (errorCode)
            {
                printf("An error occured while attemping to accept connections. Error Code: %s", Connection::ErrorCodeToString(errorCode).c_str());
                return;
            }

            // Create the connection from the connected socket
            std::lock_guard<std::mutex> lock(m_connectionsMutex);
            Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
            m_connections.push_back(connection);
            connection->Start();

            DoAccept();
        });
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
    std::lock_guard<std::mutex> lock(m_connectionsMutex);

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
    if (itConnection != m_connections.end())
    {
        m_connections.erase(itConnection);
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
    if (!m_io_service.stopped())
    {
        // Send messages every second
        m_timer.expires_from_now(boost::posix_time::seconds(30));
        m_timer.async_wait(
            [this](const boost::system::error_code & errorCode)
            {
                std::lock_guard<std::mutex> lock(m_connectionsMutex);
                for (auto connection : m_connections)
                {
                    connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
                }

                DoTimer();
            });
    }
}

main.cpp

#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main()
{
    // Start up the server
    ConnectionManager connectionManager(5000, 2);
    connectionManager.Start();

    // Pretend we are doing other things or just waiting for shutdown
    std::this_thread::sleep_for(std::chrono::minutes(5));

    // Stop the server
    connectionManager.Stop();

    return 0;
}
Singer answered 29/12, 2017 at 17:46 Comment(0)
C
-1

Could we use 2 strands for this question by posting write(...) as an asynchronous operation to strand1 and handler(...) to strand2? Your advices on the code would be highly appreciated.

boost::asio::strand<boost::asio::io_context::executor_type> strand1, strand2;
std::vector<char> empty_vector(0);

void Connection::Send(const std::vector<char> & data)
{
    boost::asio::post(boost::asio::bind_executor(strand1, std::bind(&Connection::write, this, true, data)));
}

void Connection::write(bool has_data, const std::vector<char> & data)
{
     // Append to the inactive buffer
    std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex ^ 1];

    if (has_data)
    {            
        inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end());
    }

    //
    if (!inactiveBuffer.empty() && m_sendBuffers[m_activeSendBufferIndex].empty())
    {
        m_activeSendBufferIndex ^= 1;
        std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex];
        boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer), boost::asio::bind_executor(strand2, std::bind(&Connection::handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }
} 

void Connection::handler(const boost::system::error_code & errorCode, size_t bytesTransferred)
{
    self->m_sendBuffers[self->m_activeSendBufferIndex].clear();

    if (errorCode)
    {
        printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());

        // An error occurred
        // We do not stop or close on sends, but instead let the receive error out and then close
        return;
    }

    boost::asio::post(boost::asio::bind_executor(strand1, std::bind(&Connection::write, this, false, empty_vector)));
    }
}
Conventional answered 20/6, 2019 at 7:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.