Boost::beast: Multiple async_write calls are firing an assertion error
Asked Answered
U

1

6

I'm writing tests for my full-duplex server, and when I do multiple (sequential) async_write calls (although covered with a strand), I get the following assertion error from boost::beast in the file boost/beast/websocket/detail/stream_base.hpp:

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

To reproduce the problem on your machine: A full client code that reproduces this issue (MCVE) can be found here. It doesn't work in the link because you need a server (on your own machine, sorry as it's not possible to do this conveniently online, and this is better objectively to show that the problem is in the client, not in the server if I include it here). I used websocketd to create a server with the command ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py where ./prog.py is a simply python program that prints and flushes (I got it from websocketd home page).

The call that does the writing in the client looks like this:

  std::vector<std::vector<std::future<void>>> clients_write_futures(
      clients_count);
  for (int i = 0; i < clients_count; i++) {
    clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
    for (int j = 0; j < num_of_messages; j++) {
      clients_write_futures[i][j] =
          clients[i]->write_data_async_future("Hello"); // writing here
    }
  }

Notice that I'm using only 1 client in the example. The array of clients is just a generalization for more stress on the server when testing.

My comments on the problem:

  1. The loop is sequential; it's not like I'm doing this in multiple threads
  2. It should be possible to do communication in a full-duplex form, where an indefinite number messages are sent to the server. How else can a full-duplex comm be done?
  3. I'm using strands to wrap my async calls to prevent any clash in the socket through io_service/io_context
  4. Investigating this with a debugger shows that the second iteration of the loop fails consistently, which means I'm doing something fundamentally wrong, but I don't know what is it. In other words: This is a deterministic problem apparently.

What am I doing wrong here? How can I write an indefinite number of messages to my websocket server?


EDIT:

Sehe, I wanna start by apologizing for the code mess (didn't realize it's that bad), and thanking you for your effort on this. I wish you asked me why it's structured in this (probably) organized and also chaotic way simultaneously, and the answer is simple: The main is a gtest code to see whether my generic, versatile websocket client works that I'm using to stress-test my server (which uses tons of multithreaded io_service objects, which I consider sensitive and need broad testing). I'm planning to bombard my server with many clients simultaneously during real production tests. I posted this question because the behavior of the client I don't understand. What I did in this file is create an MCVE (that people consistently request on SO). It took me two hours to strip my code to create it, and eventually I copied my gtest test fixture code (which is a fixture on the server) and pasted it in the main and verified that the problem still exists on another server and cleaned up a little (which obviously turned out not to be enough).

So why I don't catch exceptions? Because gtest will catch them and deem the test failed. The main is not production code, but the client is. I learned a lot from what you mentioned, and I have to say it's stupid to throw and catch, but I didn't know about std::make_exception_ptr(), so I found my (dumm) way to achieve the same result :-). Why too many useless functions: They're useless here in this test/example, but generally I could use them for other things later as this client is not only for this case.

Now moving back to the problem: Something I don't understand is why do we have to cover async_write with strand_ when it's being used sequentially in a loop in the main-thread (I misexpressed that I covered the handler only). I'd understand why the handler is covered, because the socket is not thread-safe, and a multithreaded io_service would create a race there. We also know that io_service::post itself is thread-safe (which is why I thought wrapping async_write is not necessary). Could you explain what it's that's not thread-safe when doing this that we need to wrap async_write itself? I know you know this already, but the same assert is firing still. We sequentialized the handler and the async queuing and the client is still not happy for making multiple write calls. What else can be missing?

(Btw, if you write, then get the future, then read, then write again, it works. This is why I'm using futures, to exactly define test cases and define the time order of my tests. I'm being paranoid here.)

Ulotrichous answered 6/6, 2018 at 19:21 Comment(1)
I can't even...Lieutenancy
B
4

You said you cover your async_write with a strand. But you do no such thing. All you can be seen doing is wrapping the completion handlers in that strand. But you're posting the async operations directly.

What's worse, you're doing it from the main thread, while there are async operations underway on any of the threads associated with your WSClient instances which means you're concurrently accessing object instances that aren't thread-safe.

That's a data race, so you get Undefined Behaviour.

A naive fix might be:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();

    post(strand_, [=,self=shared_from_this()] {
        websock.async_write(
            boost::asio::buffer(*data_ptr),
            boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
                                                          std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                          write_promise)));
    });

    return write_promise->get_future();
}

But that's not enough. Now you can be sure that none of your async operations or the completions thereof will run at the same time, but you can still post the next async-operation before the completion handler for the first is invoked.

To fix that you'll just need to queue.

To be honest I'm not sure why you are focusing so much on synchronization using the futures. That just makes it very hard to achieve this. If you can describe what it is that you /functionally/ want to achieve, I can propose a solution that is probably a lot shorter.

Code Review Notes

Before it dawned on me what the code was all about, I spent quite a lot of time reading your code. I wouldn't want to rob you of the notes I made along the way.

Warning: This was quite a protracted code dive. I provide it because some of the insights might help you see how you need to restructure your code.

I started out reading the async code chains up until on_handshake (which sets the started_promise value).

Then I headed onto the malstrom that is your main function. Your main function is 50 lines of code?! Having several parallel containers and repeated manual nested loops through them?

This is what I got after some refactoring:

int main() {
    std::vector<actor> actors(1);

    for (auto& a : actors) {
        a.client = std::make_shared<WSClient>();
        a.session_start_future = a.client->start("127.0.0.1", "8085");
        a.messages.resize(50);
    }

    for (auto& a : actors) { a.session_start_future.get(); }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future = a.client->write_data_async_future("Hello");
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.read_future = a.client->read_data_async_future();
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future.get();
        std::string result = m.read_future.get();
    } }
}

All the datastructures have been folded into the small helper actor:

struct actor {
    std::shared_ptr<WSClient> client;
    std::future<void> session_start_future;

    struct message {
        std::string message = GenerateRandomString(20);
        std::future<void> write_future;
        std::future<std::string> read_future;
    };

    std::vector<message> messages;
};

We're approximately one hour of code review down the road now, with no gain, except that we can now TELL what main is doing, and have some confidence that there isn't some trivial mistake with a loop variable or something.

Picking Back Up

At the start of writing: write_data_async_future. Wait. There's also write_data_async and write_data_sync. Why? You'll want to read

What's worse, WSClient only relayed these to the assumed single session. Why is there a distinction between WSClient and WSClientSession at all at this point? I say, there is none.

Evaporating 30 lines of not-so-useful code further we still have the same failure, so that's good.

Where were we. write_data_async_future. Oh yeah, do we need the non-future versions? No. So, 40 more lines of code gone.

Now, for real: write_data_async_future:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
    websock.async_write(
        boost::asio::buffer(*data_ptr),
        boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
                                                      std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                      write_promise)));
    return write_promise->get_future();
}

Looks... okayish. Wait, there's on_write_future? This probably means we need to vaporize more lines of unused code. Looking... Yep. Poof, gone.

By now, the diffstat looks like this:

  test.cpp | 683 +++++++++++++++++++++++----------------------------------------
  1 file changed, 249 insertions(+), 434 deletions(-)

Back to that function, so let's look at on_write_future:

void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
                     std::shared_ptr<std::string> data_posted,
                     std::shared_ptr<std::promise<void> > write_promise) {
    boost::ignore_unused(bytes_transferred);
    boost::ignore_unused(data_posted);

    if (ec) {
        try {
            throw std::runtime_error("Error thrown while performing async write: " + ec.message());
        } catch (...) {
            write_promise->set_exception(std::current_exception());
        }
        return;
    }
    write_promise->set_value();
}

A few issues. Everything passed is being ignored. I know what you pass the shared_ptrs for, but maybe you should pass them as part of an operation object so as to avoid having so many separate shared-ptrs.

Throwing an exception just to catch it? Mmm. I'm not sure about that. Perhaps just set a fresh exception:

if (ec) {
    write_promise->set_exception(
            std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
    write_promise->set_value();
}

Even with that, there's a conceptual problem now. The way you liberally use get() without catching in main this means that any error in any connection will just abort all operations. It would be quite useful to have an error simply abort the one connection/session/client. Which, in your code are all pretty synonymous (and also with io_context and thread).

Sidenote: You store the thread as a member, but you always detach it. That means the member is useless from then on.

At this point I took a break from reviewing, and as it happens I got the brainwave that showed me the issue. The halfbaked result of my exercise are here. Note that you cannot use it because it doesn't actually fix the problem. But it might help in other ways?

Betony answered 6/6, 2018 at 23:39 Comment(11)
Please check my edit that explains what's going on :-)Ulotrichous
"because the socket is not thread-safe, and a multithreaded io_service would create a race there" - well the async_write operation needs to be on the strand for the exact same reason.Betony
"when it's being used sequentially in a loop in the main-thread" - firstly, you're not "using" the method. You're "using" the object. And the object already has pending async operations being serviced from other threads.Betony
"What else can be missing?" - making sure you don't overlap write (or read) calls. With only the "naive" addition of post you can still be executing the next async_write before the completion handler ran.Betony
"This is why I'm using futures, to exactly define test cases and define the time order of my tests." - in fact you do very little in that way. If you send 50 messages to a single client, the way your test code is set-up you will read any response. This might make sense but not if the response belongs to the message sent. The way you multiplied the read_futures to match the write_futures gave me the suspicion that you do want them matching.Betony
So, if you describe /functionally/ what you want your client class to achieve, we can try to show/suggest a simpler (and correct) approach.Betony
(PS. The irony in this all is that with the code you have you don't even ever need the strand in the first place, since each session is on its own io_context instance. That means everything is always on an implicit strand.)Betony
(let's see if it suggests chat now). What confuses me is that the pending operations in that object are all in io_service, and the interface of io_service is (by definition as a semi thread-pool) thread-safe. If I understand correctly, even if I post from 10 threads to io_service, it should be safe, right? also remember that my async_write_future doesn't use any this parameters (except for websocket, which simply posts to io_service). Shouldn't this mechanism be thread-safe? Does websock.async_write do anything other than posting to io_service?Ulotrichous
True that! I'm using a single io_context here, but in general this may change. So I'm treating this as if I have multiple threads running on that io_context.Ulotrichous
The fact that it's thread safe merely means you don't get a data race (Undefined Behaviour) when you do. websocket is expressly not thread aware.Betony
Let us continue this discussion in chat.Betony

© 2022 - 2024 — McMap. All rights reserved.