boost asio ssl async_read + async_write within a strand is failing
Asked Answered
E

1

1

I consider myself reasonably experienced with asio but can't figure out how to correctly perform async_read and async_write on a boost::asio::ssl::stream<boost::asio::ip::tcp::socket>. I have created the following minimal example https://github.com/ladnir/asio-ssl-stackoverflow which I explain next.

My goal is quite simple, perform full duplex async read and write on a ssl_stream. The documentation is clear that you need to perform the async_read and async_write calls from within a strand which I do.

My setup is to have an io_context with multiple threads. Data is continuously sent and received on the socket. Sending and receiving data each have their own callback chain. In the completion handler for each I simply schedule another send or receive operation. All of this is performed within a strand. Below is the main bit of code

 std::function<void(bool, ssl::stream<tcp::socket>&, io_context::strand&, u64)> f =
        [&](bool send, ssl::stream<tcp::socket>& sock, io_context::strand& strand, u64 t) {
        
        strand.dispatch([&, send, t]() {
            std::vector<u8> buffer(10000);
            auto bb = mutable_buffer(buffer.data(), buffer.size());
            auto callback = [&, send, t, buffer = std::move(buffer), moveOnly = std::unique_ptr<int>{}](boost::system::error_code error, std::size_t n) mutable {
                    if (error) {
                        std::cout << error.message() << std::endl;
                        std::terminate();
                    }

                    // perform another operation or complete.
                    if (t)
                        f(send, sock, strand, t - 1);
                    else
                        --spinLock;
                };

            if (send)
                async_write(sock, bb, std::move(callback));
            else
                async_read(sock, bb, std::move(callback));

        });
};

A send and receive callback chain is then started for the server and client sockets.

    // launch our callback chains.
    f(true, srvSocket, srvStrand, trials);
    f(false, srvSocket, srvStrand, trials);
    f(true, cliSocket, cliStrand, trials);
    f(false, cliSocket, cliStrand, trials);

It seems that despite the use of the strand, something inside OpenSSL is not being performed in a thread-safe manner. When I run the code I sometimes get a decryption failure and sometimes it just crashes somewhere in OpenSSL.

If I use a tcp::socket this code works fine. If I make the io_context single-threaded then it works fine. I have tested this on ubuntu and windows.

It seems from related questions, e.g. this, that full duplex should work as long as you wrap it in a strand.

Does anyone see what I'm doing wrong? Maybe it is simply not safe to perform an async_read/async_write when the other is already scheduled?


Notes on Sehe's solution: After comparing my code to Sehe's solution I determined that my code has one major bug. The execution context of the ssl::stream<tcp::socket>'s is the multi-thread io_context. As such, there is no guarantee that the ssl::stream perform work on my strand. Moreover, how could it? The ssl::stream is unaware of my strand and only happens to be executed on it during the initial call. When the ssl::stream gets called back from the underlying socket, it will/might schedule more read/write operations but it wont be on my chosen strand.

The only thing guaranteed is that the ssl::stream is on the execution context that it was constructed with. A simple fix is to construct the stream with a strand as it's execution context.

I updated the github repo above to contain my solution.

Thanks, Sehe for the insight.

Ergener answered 24/10, 2022 at 20:46 Comment(3)
What related questions say that "this" (?) results in unsynchronized access to the underlying socket? (The big question is what "this" is, and likely it is always subtly (or completelt) different from what you're doing)Vina
The unsynchronized access comment (removed) was based on my own debugging. This was in fact the case but the root cause of this is explained in my notes on your solution.Ergener
Your edit doesn't seem accurate. You can bind completion handlers to a stand executor (asio:: bind_executor or the deprecated strand's wrap member function). I agree that the new executor interface is much more ergonomicVina
V
1

It's been one of those days. I felt completely silly that I couldn't find the issue with your code. I kept running into variations of use-after-free and stack thrashing even with simplified code (way simplified).

The obvious culprit looked to me the use of vector member data after moving the vector. I have not been able to prove that (manually asserting the values for data() and size() are stable across move didn't fail), but see below.

So... I went to square one and just wrote the entire program the way I'd write it. That is to say,

  • using executors instead of service reference + explicit handler binding
  • asio::strand<> instead of io_context::strand (which is deprecated)
  • asio::thread_pool instead of io_context plus raw threads (and a work guard)

I don't think any of these really mattered, but it does simplify the code.

While doing the mental gymnastics to know whether function<> can hold a copy of itself, I decided it would need to have shared ownership.

auto make_loop = [&](auto& stream, bool sending) {
    auto shared_loop = std::make_shared<std::function<void()>>();
    auto port        = stream.lowest_layer().remote_endpoint().port();
    auto mode        = [=](auto t) {
        return std::string(sending ? "write" : "read") +
            " t=" + std::to_string(t) + " port:" + std::to_string(port);
    };
    auto s = std::make_shared<sentinel>("shared_loop " + mode(trials));

    *shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
                    t = trials, s_ = s]() mutable {
        if (!t--) {
            return;
        }

        auto data = std::make_shared<std::vector<uint8_t>>(10'000);
        auto buf  = asio::buffer(*data);

        auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
            (error_code ec, size_t n) {
                trace("Handler ", mode(t), " n=", n, " ", ec.message());
                if (t && sl)
                    (*sl)();
            };

        trace("Initiating ", mode(t));

        if (sending)
            async_write(stream, buf, handler);
        else
            async_read(stream, buf, handler);
    };
    return *shared_loop;
};

post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));

The good news is, that this program runs without problems (under ASAN+UBSAN):

(almost) Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl  = asio::ssl;
using asio::ip::tcp;
using ssl::context;
using stream = ssl::stream<tcp::socket>;
using boost::system::error_code;

static constexpr auto trials = 10;

static inline void trace(auto const&... args) {
    static std::mutex mx;
    std::lock_guard   lk(mx);
    (std::cout << ... << args) << std::endl;
}

struct sentinel {
    std::string msg;
    ~sentinel() {
        trace("~sentinel: ", msg);
    } // trace when shared loop is freed
};

template <typename... Ts> static inline auto my_lock(std::shared_ptr<Ts...> sp) { return sp; }
template <typename... Ts> static inline auto my_lock(std::weak_ptr<Ts...> wp) { return wp.lock(); }

int main() try {
    asio::thread_pool ioc;
    ssl::context      sctx{ssl::context::tlsv13_server};
    ssl::context      cctx{ssl::context::tlsv13_client};

    sctx.set_default_verify_paths();
    sctx.set_password_callback([](auto&&...) { return "test"; });
    sctx.use_certificate_file("server.pem", context::pem);
    sctx.use_private_key_file("server.pem", context::pem);

    tcp::acceptor acc(make_strand(ioc), tcp::v4());
    trace("before set_option");
    acc.set_option(tcp::acceptor::reuse_address(true));
    acc.bind({{}, 7878});
    acc.listen();
    trace("listening");

    stream srv(acc.get_executor(), sctx);
    auto   fut = std::async([&] {
        acc.accept(srv.lowest_layer());
        trace("Handshaking ", srv.lowest_layer().remote_endpoint());

        srv.handshake(stream::server);
    });

    stream cli(make_strand(ioc), cctx);
    cli.lowest_layer().connect({{}, 7878});
    cli.handshake(stream::client);
    fut.get();
    trace("connected");

    auto make_loop = [&](auto& stream, bool sending) {
        auto shared_loop = std::make_shared<std::function<void()>>();
        auto port        = stream.lowest_layer().remote_endpoint().port();
        auto mode        = [=](auto t) {
            return std::string(sending ? "write" : "read") +
                " t=" + std::to_string(t) + " port:" + std::to_string(port);
        };
        auto s = std::make_shared<sentinel>("shared_loop " + mode(trials));

        *shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
                        t = trials, s_ = s]() mutable {
            if (!t--) {
                return;
            }

            auto data = std::make_shared<std::vector<uint8_t>>(10'000);
            auto buf  = asio::buffer(*data);

            auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
                (error_code ec, size_t n) {
                    trace("Handler ", mode(t), " n=", n, " ", ec.message());
                    if (t && sl)
                        (*sl)();
                };

            trace("Initiating ", mode(t));

            if (sending)
                async_write(stream, buf, handler);
            else
                async_read(stream, buf, handler);
        };
        return *shared_loop;
    };

    post(srv.get_executor(), make_loop(srv, true));
    post(srv.get_executor(), make_loop(srv, false));
    post(cli.get_executor(), make_loop(cli, true));
    post(cli.get_executor(), make_loop(cli, false));

    trace("waiting");
    ioc.join();
    sentinel atdone{"done"};
} catch(boost::system::system_error const& se) {
    std::cout << se.what() << " from " << se.code().location() << "\n";
}

Running it locally for demonstration purposes: https://i.imgur.com/q5e7ENI.mp4

The Troubles

There are two notable problems:

  • as you can see the shared loop function is leaked. This is because it circularly holds on to itself. If we attempt to break the cycle by using weak_ptr (e.g. as commented) the async chain cannot proceed beyond the first iteration. I don't think there is a solution here, except to move ownership out of the handler completely.

  • If in my example I replace the buffer with the std::vector as you had it:

         auto data = std::vector<uint8_t>(10'000);
         auto buf  = asio::buffer(data);
    

    (no further changes required), ASAN will object about heap-use-after-free regardless of how stable the data address ought to be... I would not bet on this

Vina answered 25/10, 2022 at 1:57 Comment(4)
Perhaps I should have TL;DR: it's not about your strands, it's about the unique ownership of the handler chain (and maybe the buffer as well)Vina
Thanks so much! With your code, I was able to fix mine. Its clear that I don't understand asio as well as I should ha. Especially the executor model. The fix appears to be to construct the stream using a strand... In hindsight, this makes perfect sense but didn't even know that was an option.Ergener
That's a "recent" (less than 5 years) interface change, so yes you do see the old ways around still. I love how it can simplify routine tasksVina
Are you aware of a reference for the new interface?Ergener

© 2022 - 2024 — McMap. All rights reserved.