Boost Beast Async Server Failing with Assertion failed: (id_ != T::id) on multiple aync calls
Asked Answered
L

3

6

Assertion failed: (id_ != T::id), function try_lock, file /usr/local/include/boost/beast/websocket/detail/stream_base.hpp, line 91.

       // Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
    websocket::stream<tcp::socket> ws_;
    boost::asio::strand<
            boost::asio::io_context::executor_type> strand_;
    boost::beast::multi_buffer buffer_;

public:
    // Take ownership of the socket
    explicit
    session(tcp::socket socket)
            : ws_(std::move(socket))
            , strand_(ws_.get_executor())
    {
    }

    // Start the asynchronous operation
    void
    run()
    {
        // Accept the websocket handshake
        ws_.async_accept(
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_accept,
                                shared_from_this(),
                                std::placeholders::_1)));
    }

    void
    on_accept(boost::system::error_code ec)
    {
        std::cout << std::this_thread::get_id() <<" : DO ACCEPT" << std::endl;

        if(ec)
            return fail(ec, "accept");

        std::ifstream ifs("tweetsample.txt");
        if(ifs.good())
        {
            auto tweet = std::string{};
            while(std::getline(ifs,tweet))
            {
                try
                {
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    auto t = json::parse(tweet);
                    std::string tweet_text = t["text"];
                    auto n = boost::asio::buffer_copy(buffer_.prepare(tweet_text.size()), 
boost::asio::buffer(tweet_text));
                buffer_.commit(n);
                    do_write();
                }
                catch(nlohmann::detail::type_error& ex)
                {
                       std::cout << ex.what() << std::endl;
                }
            }
        }
    }

    void
    do_read()
    {
        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_read,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void do_write()
    {
        std::cout << std::this_thread::get_id() << "do_write" << std::endl;
        ws_.async_write(
                buffer_.data(),
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_write,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void
    on_read(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the session was closed
        if(ec == websocket::error::closed)
            return;

        if(ec)
            fail(ec, "read");

        std::cout << boost::beast::buffers(buffer_.data()) << std::endl;

        // Echo the message
        ws_.text(ws_.got_text());
        ws_.async_write(
                buffer_.data(),
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_write,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void
    on_write(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        std::cout << "on_write" << std::endl;
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Clear the buffer
        buffer_.consume(buffer_.size());

        // Do another read
        do_read();
    }
};

//------------------------------------------------------------------------------

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
    tcp::acceptor acceptor_;
    tcp::socket socket_;

public:
    listener(
            boost::asio::io_context& ioc,
            tcp::endpoint endpoint)
            : acceptor_(ioc)
            , socket_(ioc)
    {
        boost::system::error_code ec;

        // Open the acceptor
        acceptor_.open(endpoint.protocol(), ec);
        if(ec)
        {
            fail(ec, "open");
            return;
        }

        // Allow address reuse
        acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
        if(ec)
        {
            fail(ec, "set_option");
            return;
        }

        // Bind to the server address
        acceptor_.bind(endpoint, ec);
        if(ec)
        {
            fail(ec, "bind");
            return;
        }

        // Start listening for connections
        acceptor_.listen(
                boost::asio::socket_base::max_listen_connections, ec);
        if(ec)
        {
            fail(ec, "listen");
            return;
        }
    }

    // Start accepting incoming connections
    void
    run()
    {
        if(! acceptor_.is_open())
            return;
        do_accept();
    }

    void
    do_accept()
    {

        acceptor_.async_accept(
                socket_,
                std::bind(
                        &listener::on_accept,
                        shared_from_this(),
                        std::placeholders::_1));
    }

    void
    on_accept(boost::system::error_code ec)
    {
        if(ec)
        {
            fail(ec, "accept");
        }
        else
        {
            // Create the session and run it
            std::make_shared<session>(std::move(socket_))->run();
        }

        // Accept another connection
        do_accept();
    }
};  


    int main(int argc, char* argv[])
    {
        auto const address = boost::asio::ip::make_address("XXX.XX.XX.X");
        auto const port = static_cast<unsigned short>(std::atoi("XXXX"));
        auto const threads = 1;

        // The io_context is required for all I/O
        boost::asio::io_context ioc{threads};

        // Create and launch a listening port
        std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();

        // Run the I/O service on the requested number of threads
        std::vector<std::thread> v;
        v.reserve(threads - 1);
        for(auto i = threads - 1; i > 0; --i)
            v.emplace_back(
                    [&ioc]
                    {
                        ioc.run();
                    });
        ioc.run();        
        return EXIT_SUCCESS;
    }

I am trying to Experiment with websocket ,currently when i recv the accept , i want to send all the data i read from file line by line .

Program crashes after the first line is sent . Please Advise what m doing wrong .

Leopold answered 8/9, 2018 at 15:39 Comment(1)
Tried Increasing the Number of threads from which ioservice run is called . Still the issue persists .Leopold
U
9

I had the same issue, so looking into the failed assertion I found this comment:

    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
Untangle answered 22/4, 2019 at 19:40 Comment(0)
B
4

This is a concurrency problem. The answer is mentioned here

When you call websocket::stream::async_write, you must wait until the operation completes (your completion handler is invoked) before calling async_write again. This is explained here: https://www.boost.org/doc/libs/1_67_0/libs/beast/doc/html/beast/using_websocket/notes.html#beast.using_websocket.notes.thread_safety

If you want to send more than one message at a time, you need to implement your own write queue. An example may be found here: https://github.com/vinniefalco/CppCon2018/blob/8fc8528571561d2acee5c05f4b7a51861d1d496d/websocket_session.cpp#L87

In my case I used synchronous call:

 ws_.write(net::buffer(std::string(message)));
Boddie answered 15/4, 2021 at 23:18 Comment(0)
B
0
    std::cout << "do_write" << std::endl;
    // Echo the message
    ws_.async_write(boost::asio::buffer(tweet),
                    boost::asio::bind_executor(strand_, std::bind(&session::on_write, shared_from_this(),
                                                                  std::placeholders::_1, std::placeholders::_2)));

This does an asynchronous operation using a local buffer (the tweet argument goes out of scope before the operation will start/complete).

In the read side, you solved it by using data_, which is a member variable. You should consider a solution like that.

Boland answered 8/9, 2018 at 17:14 Comment(6)
Thanks .. let me check that . But why a local variable going out of scope should give assertion of thread id not equal to Id . After reading some docs it says usually happens when trying to perform two concurrent async op. Isn't that the case here .. ?Leopold
When you use variables after their lifetime has ended, that causes Undefined Behaviour. Literally anything can happen, and often surprising effects occur. You cannot reason about a program that invokes UBBoland
Edited the Code Snippet. Tried changing the implementation to use buffer_copy to internal buffer . Still getting the same assertion. I have a feeling its something else which is going wrong.Leopold
Does a buffer copy also will not own the content ? i mean buffer copy does not do a deep copy ? kindly advise .Leopold
One thing which i noticed is on_write is not getting called even after writing the data , and another do write is called before it . what cab be the issue . really confused.Leopold
"The boost::asio::buffer_copy function is used to copy bytes from a source buffer (or buffer sequence) to a target buffer (or buffer sequence)."Boland

© 2022 - 2024 — McMap. All rights reserved.