asio How to change the executor inside an awaitable?
Asked Answered
K

1

8

I have read this question and tried to replicate the answer with the following code:

#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_single.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread/thread.hpp>

inline std::osyncstream tout() {
  auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
  return std::osyncstream(std::cout) << "T" << hash << " ";
}

namespace asio = boost::asio;

asio::awaitable<void> mainCo(asio::io_context &appIO, asio::io_context &prodIO) {
  // the thread should also change when using the IO contexts directly.
  auto astrand = asio::io_context::strand{appIO};
  auto pstrand = asio::io_context::strand{prodIO};

  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable);
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable);
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(pstrand, asio::use_awaitable); // nop - no operation because we are already on the correct execution_context
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
}

int main() {
  asio::io_context prodIO;
  boost::thread prodThread;
  {
    // ensure the producer io context doesn't exit
    auto prodWork = asio::make_work_guard(prodIO);

    prodThread = boost::thread{[&prodIO] {
      tout() << "ProdThread run start" << std::endl;
      prodIO.run(); // if this call is removed the mainCo is stuck as expected
      tout() << "ProdThread run done" << std::endl;
    }};
    asio::io_context appIO;

    asio::co_spawn(appIO, mainCo(appIO, prodIO), asio::detached);

    tout() << "MainThread run start" << std::endl;
    appIO.run();
    tout() << "MainThread run done" << std::endl;
  }
  prodThread.join();
  return 42;
}

Current output:

/tmp/tmp.wz38MWkttM/cmake-build-debug-remote-host/CoroContextSwitching
T14386720116392206644 MainThread run start
T8726023523478668610 ProdThread run start
T14386720116392206644 MC on APPIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MainThread run done
T8726023523478668610 ProdThread run done

Process finished with exit code 42

Expected output:

/tmp/tmp.wz38MWkttM/cmake-build-debug-remote-host/CoroContextSwitching
T14386720116392206644 MainThread run start
T8726023523478668610 ProdThread run start
T14386720116392206644 MC on APPIO
T8726023523478668610 MC on PRODIO
T14386720116392206644 MC on APPIO
T8726023523478668610 MC on PRODIO
T8726023523478668610 MC on PRODIO
T14386720116392206644 MC on APPIO
T14386720116392206644 MainThread run done
T8726023523478668610 ProdThread run done

Process finished with exit code 42

I expect the thread id to change according to the cout statements. However all cout statements are executed on the MainThread.

How can I get the desired behaviour?

Edit 2:

The original question still stands this justs adds more information to the problem.

It seems like asio::use_awaitable is binding a default executor from somewhere. Is there a documented default?

With the following edited function I can achieve what I want:

asio::awaitable<void> mainCo(asio::io_context &appIO, asio::io_context &prodIO) {
  // the thread should also change when using the IO contexts directly.
  auto astrand = asio::io_context::strand{appIO};
  auto pstrand = asio::io_context::strand{prodIO};

  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable)); // so use_awaitable is binding a default executor from somewhere.
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable));
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(pstrand, asio::bind_executor(pstrand, asio::use_awaitable)); // nop - no operation because we are already on the correct execution_context
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
  co_await asio::post(astrand, /* the first parameter in post doesn't even matter (can be astrand, pstrand, appIO, prodIO) same result */
                      asio::bind_executor(pstrand, asio::use_awaitable));
  tout() << "MC on PRODIO" << std::endl;
  co_await asio::post(astrand, asio::use_awaitable);
  tout() << "MC on APPIO" << std::endl;
}

HOWEVER, it seems that this is not the proper way to switch executor as the first argument of asio::post doesn't matter. So what is the correct way to do this?

Edit: The question was closed and pointed me to this does boost::asio co_spawn create an actual thread?. I am aware that co_spawn does not spawn a new thread. That's why I spawn a new thread myself called prodThread. I expect the execution_context to switch after awaiting the post statements. The linked question does not answer my question.

Kellen answered 24/4, 2022 at 9:16 Comment(2)
Could it be that because pstrand is doing no work it always falls back to the main thread? I'm just guessing here. I think that is what I'm seeing by tracing this. Like ptstrand is just waiting?Chrissie
pstrand waiting is expected. It waits until it gets a 'time slot' on the prodIO. But usually with asio::post the handler is invoked on the specified executor unless the completion token is bound to a different executor. What is confusing me is that in this case the default executor of asio::use_awaitable is not what I would expect.Kellen
R
5

You want to associate your executor with the completion token, and then let post/dispatch/defer figure it out from there:

co_await asio::post(bind_executor(pstrand, asio::use_awaitable));

See also e.g. When must you pass io_context to boost::asio::spawn? (C++) or boost::asio::bind_executor does not execute in strand for more details on how it works/why it works/when it works.

It does in fact explain how get_associated_executor does use a default so it might explain how explicitly posting to an executor didn't seem to work here (I haven't checked the associated executor implementation for use_awaitable just now)

Here's my take:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_single.hpp>
#include <boost/bind/bind.hpp>
#include <coroutine>
#include <iostream>
#include <thread>

inline void tout(auto const& msg) {
    static std::mutex mx;
    std::lock_guard   lk(mx);

    static const std::hash<std::thread::id> h{};

    std::cout << "T" << (h(std::this_thread::get_id()) % 100) << " " << msg
            << std::endl;
}

namespace asio = boost::asio;

asio::awaitable<void> mainCo(asio::io_context& appIO,
                            asio::io_context& prodIO) {
    auto to_app = bind_executor(make_strand(appIO), asio::use_awaitable);
    auto to_prod = bind_executor(make_strand(prodIO), asio::use_awaitable);

    tout("MC on APPIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_app);  tout("MC on APPIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_prod); tout("MC on PRODIO");
    co_await asio::post(to_app);  tout("MC on APPIO");
}

int main() {
    asio::io_context prodIO, appIO;
    auto             prodWork = asio::make_work_guard(prodIO);

    std::thread prodThread{[&prodIO] {
        tout("ProdThread run start");
        prodIO.run(); // if this call is removed the mainCo is stuck as
                    // expected
        tout("ProdThread run done");
    }};

    asio::co_spawn(appIO, mainCo(appIO, prodIO), asio::detached);

    tout("MainThread run start");
    appIO.run();
    tout("MainThread run done");

    prodWork.reset();
    prodThread.join();
}

Prints e.g.

T49 ProdThread run start
T31 MainThread run start
T31 MC on APPIO
T49 MC on PRODIO
T31 MC on APPIO
T49 MC on PRODIO
T49 MC on PRODIO
T31 MC on APPIO
T31 MainThread run done
T49 ProdThread run done

BONUS

I'd suggest passing executors, not execution context references. It's cleaner and more flexible: https://godbolt.org/z/Tr54vf8PM

It then makes it trivial to replace the prodIO + thread with a simple thread_pool execution context. It removes the need for work guards and also the fixes missing exception handling: https://godbolt.org/z/a3GT61qdh

Ramble answered 24/4, 2022 at 19:55 Comment(7)
Could you tell me why you decided not to use std::syncstream?Kellen
@Kellen Yeah. I need to upgrade my main compiler and I'm lazy :)Ramble
use_awaitable might default to asio::this_coro::executor. Which shouldn't change after co_spawn.Kellen
That makes a lot of sense. When I have a spare minute I might go and verify.Ramble
Thank you for your amazing answer. The bonus was exactly what I was aiming for.Kellen
Oh, I forgot. But I have another bonus. If the entire reason is to put some load on different executors, consider just expression that with something like co_await co_spawn(thread_pool_exec, work(), use_awaitable);. That's much safer and versatile.Ramble
Yes I am aware of that. I did this to avoid nesting lambdas/function calls as much as possible. Exception handling gets tricky with how I did it.Kellen

© 2022 - 2024 — McMap. All rights reserved.