How resume the execution of a stackful coroutine in the context of its strand?
Asked Answered
A

4

4
using Yield = asio::yield_context;
using boost::system::error_code;
int Func(Yield yield) {
  error_code ec;
  asio::detail::async_result_init<Yield, void(error_code, int)> init(yield[ec]);
  std::thread th(std::bind(Process, init.handler));
  int result = init.result.get();  // <--- yield at here
  return result;
}

How to implement Process so that Func will resumed in the context of the strand that Func was originally spawned on?

Auvergne answered 1/11, 2014 at 23:39 Comment(0)
T
9

Boost.Asio uses a helper function, asio_handler_invoke, to provide a customization point for invocation strategies. For example, when a Handler has been wrapped by a strand, the invocation strategy will cause the handler to be dispatched through the strand upon invocation. As noted in the documentation, asio_handler_invoke should be invoked via argument-dependent lookup.

using boost::asio::asio_handler_invoke;
asio_handler_invoke(nullary_functor, &handler);

For stackful coroutines, there are various important details to take into consideration when yielding the coroutine and when invoking the handler_type associated with a yield_context to resume the coroutine:

  • If code is currently running in the coroutine, then it is within the strand associated with the coroutine. Essentially, a simple handler is wrapped by the strand that resumes the coroutine, causing execution to jump to the coroutine, blocking the handler currently in the strand. When the coroutine yields, execution jumps back to the strand handler, allowing it to complete.
  • While spawn() adds work to the io_service (a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent the io_service event loop from ending while a coroutine is outstanding, it may be necessary to add work to the io_service before yielding.
  • Stackful coroutines use a strand to help guarantee the coroutine yields before resume is invoked. Asio 1.10.6 / Boost 1.58 enabled being able to safely invoke the completion handler from within the initiating function. Prior versions required that the completion handler was not invoked from within the initiating function, as its invocation strategy would dispatch(), causing the coroutine to attempt resumption before being suspended.

Here is a complete example that accounts for these details:

#include <iostream>    // std::cout, std::endl
#include <chrono>      // std::chrono::seconds
#include <functional>  // std::bind
#include <thread>      // std::thread
#include <utility>     // std::forward
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

template <typename CompletionToken, typename Signature>
using handler_type_t = typename boost::asio::handler_type<
  CompletionToken, Signature>::type;

template <typename Handler>
using async_result = boost::asio::async_result<Handler>;

/// @brief Helper type used to initialize the asnyc_result with the handler.
template <typename CompletionToken, typename Signature>
struct async_completion
{
  typedef handler_type_t<CompletionToken, Signature> handler_type;

  async_completion(CompletionToken&& token)
    : handler(std::forward<CompletionToken>(token)),
      result(handler)
  {}

  handler_type handler;
  async_result<handler_type> result;
};

template <typename Signature, typename CompletionToken>
typename async_result<
  handler_type_t<CompletionToken, Signature>
>::type
async_func(CompletionToken&& token, boost::asio::io_service& io_service)
{
  // The coroutine itself is not work, so guarantee the io_service has
  // work.
  boost::asio::io_service::work work(io_service);

  // Initialize the async completion handler and result.
  async_completion<CompletionToken, Signature> completion(
      std::forward<CompletionToken>(token));

  auto handler = completion.handler;
  std::cout << "Spawning thread" << std::endl;
  std::thread([](decltype(handler) handler)
    {
      // The handler will be dispatched to the coroutine's strand.
      // As this thread is not running within the strand, the handler
      // will actually be posted, guaranteeing that yield will occur
      // before the resume.
      std::cout << "Resume coroutine" << std::endl;
      using boost::asio::asio_handler_invoke;
      asio_handler_invoke(std::bind(handler, 42), &handler);
    }, handler).detach();

  // Demonstrate that the handler is serialized through the strand by
  // allowing the thread to run before suspending this coroutine.
  std::this_thread::sleep_for(std::chrono::seconds(2));

  // Yield the coroutine.  When this yields, execution transfers back to
  // a handler that is currently in the strand.  The handler will complete
  // allowing other handlers that have been posted to the strand to run.
  std::cout << "Suspend coroutine" << std::endl;
  return completion.result.get();
}

int main()
{
  boost::asio::io_service io_service;

  boost::asio::spawn(io_service,
    [&io_service](boost::asio::yield_context yield)
    {
      auto result = async_func<void(int)>(yield, io_service);
      std::cout << "Got: " << result << std::endl;
    });

  std::cout << "Running" << std::endl;
  io_service.run();
  std::cout << "Finish" << std::endl;
}

Output:

Running
Spawning thread
Resume coroutine
Suspend coroutine
Got: 42
Finish

For much more details, please consider reading Library Foundations for Asynchronous Operations. It provides much greater detail into the composition of asynchronous operations, how Signature affects async_result, and the overall design of async_result, handler_type, and async_completion.

Tamtam answered 4/11, 2014 at 5:9 Comment(6)
Why not just add a io_service::work work(io_service); right after io_service is defined?Auvergne
@Auvergne The point was to accentuate that the coroutine is not considered work. The caller may not be aware of the coroutine function's implementation, but may expect the io_service to not run out of work while the coroutine is still alive.Tamtam
The example is problematic! The resume could possibly occur before the yield! Just add a line std::this_thread::sleep_for(std::chrono::seconds(2)); before completion.result.get(). The program will crash.Auvergne
@updogliu: What version are you using? I am observing different behavior between some versions.Tamtam
Boost 1.57 and 1.55. Didn't try with other versions yet.Auvergne
@Auvergne It should be fixed. ADL was not selecting the proper asio_handler_invoke() hook when handler was captured by the lambda. This resulted in the handler running outside of the strand context. With the proper path being selected, I was also able to migrate the work object into async_func(), which is a bit cleaner in my opinion.Tamtam
S
5

Here's an updated example for Boost 1.66.0 based on Tanner's great answer:

#include <iostream>    // std::cout, std::endl
#include <chrono>      // std::chrono::seconds
#include <functional>  // std::bind
#include <thread>      // std::thread
#include <utility>     // std::forward
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

template <typename Signature, typename CompletionToken>
auto async_add_one(CompletionToken token, int value) {
    // Initialize the async completion handler and result
    // Careful to make sure token is a copy, as completion's handler takes a reference
    using completion_type = boost::asio::async_completion<CompletionToken, Signature>;
    completion_type completion{ token };

    std::cout << "Spawning thread" << std::endl;
    std::thread([handler = completion.completion_handler, value]() {
        // The handler will be dispatched to the coroutine's strand.
        // As this thread is not running within the strand, the handler
        // will actually be posted, guaranteeing that yield will occur
        // before the resume.
        std::cout << "Resume coroutine" << std::endl;

        // separate using statement is important
        // as asio_handler_invoke is overloaded based on handler's type
        using boost::asio::asio_handler_invoke;
        asio_handler_invoke(std::bind(handler, value + 1), &handler);
    }).detach();

    // Demonstrate that the handler is serialized through the strand by
    // allowing the thread to run before suspending this coroutine.
    std::this_thread::sleep_for(std::chrono::seconds(2));

    // Yield the coroutine.  When this yields, execution transfers back to
    // a handler that is currently in the strand.  The handler will complete
    // allowing other handlers that have been posted to the strand to run.
    std::cout << "Suspend coroutine" << std::endl;
    return completion.result.get();
}

int main() {
    boost::asio::io_context io_context;

    boost::asio::spawn(
        io_context,
        [&io_context](boost::asio::yield_context yield) {
            // Here is your coroutine

            // The coroutine itself is not work, so guarantee the io_context
            // has work while the coroutine is running
            const auto work = boost::asio::make_work_guard(io_context);

            // add one to zero
            const auto result = async_add_one<void(int)>(yield, 0);
            std::cout << "Got: " << result << std::endl; // Got: 1

            // add one to one forty one
            const auto result2 = async_add_one<void(int)>(yield, 41);
            std::cout << "Got: " << result2 << std::endl; // Got: 42
        }
    );

    std::cout << "Running" << std::endl;
    io_context.run();
    std::cout << "Finish" << std::endl;
}

Output:

Running
Spawning thread
Resume coroutine
Suspend coroutine
Got: 1
Spawning thread
Resume coroutine
Suspend coroutine
Got: 42
Finish

Remarks:

  • Greatly leverages Tanner's answer
  • Prefer network TS naming (e.g, io_context)
  • boost::asio provides an async_completion class which encapsulates the handler and async_result. Careful as the handler takes a reference to the CompletionToken, which is why the token is now explicitly copied. This is because yielding via async_result (completion.result.get) will have the associated CompletionToken give up its underlying strong reference. Which can eventually lead to unexpected early termination of the coroutine.
  • Make it clear that a separate using boost::asio::asio_handler_invoke statement is really important. An explicit call can prevent the correct overload from being invoked.

-

I'll also mention that our application ended up with two io_context's which a coroutine may interact with. Specifically one context for I/O bound work, the other for CPU. Using an explicit strand with boost::asio::spawn ended up giving us well defined control over the context in which the coroutine would run/resume. This helped us avoid sporadic BOOST_ASSERT( ! is_running() ) failures.

Creating a coroutine with an explicit strand:

auto strand = std::make_shared<strand_type>(io_context.get_executor());
boost::asio::spawn(
    *strand,
    [&io_context, strand](yield_context_type yield) {
        // coroutine
    }
);

with invocation explicitly dispatching to the strand (multi io_context world):

boost::asio::dispatch(*strand, [handler = completion.completion_handler, value] {
    using boost::asio::asio_handler_invoke;
    asio_handler_invoke(std::bind(handler, value), &handler);
});

-

We also found that using future's in the async_result signature allows for exception propagation back to the coroutine on resumption.

using bound_function = void(std::future<RETURN_TYPE>);
using completion_type = boost::asio::async_completion<yield_context_type, bound_function>;

with yield being:

auto future = completion.result.get();
return future.get(); // may rethrow exception in your coroutine's context
Sherysherye answered 3/1, 2018 at 2:10 Comment(1)
Thank you so much for the write-up for 1.66. This is exactly what should be in their docs.Hereld
L
0

You are complicating things by creating threads out of the executor framework provided by Boost Asio.

For this reason you shouldn't assume that what you want is possible. I strongly suggest just adding more threads to the io_service and letting it manage the strands for you.

Or, you can extend the library and add the new feature you apparently want. If so, it's a good idea to contact the developer mailing list for advice. Perhaps they welcome this feature¹?


¹ (that you, interestingly, have not described, so I won't ask what the purpose of it is)

Lovage answered 2/11, 2014 at 0:17 Comment(3)
My purpose is to write an extension (new kind of async operation, but it is quite job specific so I didn't describe). Using a new thread is not the point, I can instead post the job via the io_service. But the question is still there: how to make it stay in the same strand when resumes the coroutine. Apparently the existing async functions in asio like async_read is respecting that. And this question is asking how.Auvergne
That's a different question though. Asio uses generic programming to select different dispatch policies. I think I've seen this described in the docs near strands. Search for asio_handler_invoke.Lovage
@Auvergne Have you seen en.highscore.de/cpp/boost/asio.html#asio_extensions by the way? Much recommendedLovage
A
0
using CallbackHandler = boost::asio::handler_type<Yield, void (error_code, int)>::type;

void Process(CallbackHandler handler) {
  int the_result = 81;
  boost::asio::detail::asio_handler_invoke(
      std::bind(handler, error_code(), the_result), &handler);
}

Hinted by @sehe, I made the above working solution. But I am not sure if this is the right/idiomatic/best way to do that. Welcome to comment/edit this answer.

Auvergne answered 2/11, 2014 at 20:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.