2

The following code simulates short bursts of work by sleeping 1ms at a time in order to achieve a cancellable task that takes a total of 2s. That work is then launched in three different ways.

  1. The first co_spawn (labelled Task-1) explicitly uses bind_cancellation_slot and successfully cancels the work after 1s.
  2. The second co_spawn uses awaitable_operators and steady_timer::async_wait to deliver the cancellation after 200ms, but that attempt fails, resulting in the full 2s execution duration.
  3. The third co_spawn uses a variation of the work that explicitly uses boost::asio::defer before starting the work to resolve the issue with the previous attempt.

Is the third attempt at using co_spawn correct? It seems wrong to me that my task's implementation needs to defensively defer at the very beginning in order to work with the awaitable_operators. Is there a better way to implement cancellation of such a long running task?

As a sidenote: I am aware that using co_await on an asynchronous task inside SyncWork would enable cancellation at suspension points. But I really am looking for a way to be more proactive with cancellation in between suspension points. I believe this issue is mostly relevant to the period of time when the coroutine starts and before the first suspension point is eventually hit.

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>

// Simulate work that lasts for 2s, and that checks for cancellation at every 1ms
boost::asio::awaitable<void>
SyncWork()
{
    using namespace std::chrono_literals;
    auto before = std::chrono::steady_clock::now();
    auto limit = before + 2s;
    while (std::chrono::steady_clock::now() < limit) {
        auto cs = co_await boost::asio::this_coro::cancellation_state;
        if (cs.cancelled() != boost::asio::cancellation_type::none) break;
        std::this_thread::sleep_for(1ms);
    }
    std::cout << "SyncWork - elapsed: " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before).count() << "ms\n";
}

boost::asio::awaitable<void>
DeferredSyncWork()
{
    auto executor = co_await boost::asio::this_coro::executor;
    co_await boost::asio::defer(executor, boost::asio::use_awaitable);
    
    co_await SyncWork();
}

boost::asio::awaitable<void>
timeout()
{
    auto executor = co_await boost::asio::this_coro::executor;
    boost::asio::steady_timer timer{executor};
    timer.expires_from_now(std::chrono::milliseconds{200});
    co_await timer.async_wait(boost::asio::use_awaitable);
    std::cout << "timeout\n";
}

boost::asio::awaitable<void>
WorkWithTimeout()
{
    using namespace boost::asio::experimental::awaitable_operators;
    co_await (SyncWork() || timeout());
}

boost::asio::awaitable<void>
DeferredWorkWithTimeout()
{
    using namespace boost::asio::experimental::awaitable_operators;
    co_await (DeferredSyncWork() || timeout());
}

void HandleException(std::exception_ptr p)
{
    if (p) {
        try {
            std::rethrow_exception(p);
        } catch (const std::exception& e) {
            std::cout << e.what() << '\n';
        } catch (...) {
            std::cout << "catch\n";
        }
    } else {
        std::cout << "success\n";
    }
}

int main()
{
    boost::asio::thread_pool pool{8};
    
    boost::asio::cancellation_signal signal;
    boost::asio::co_spawn(pool, SyncWork(), boost::asio::bind_cancellation_slot(signal.slot(), HandleException)); // Task-1
    boost::asio::co_spawn(pool, DeferredWorkWithTimeout(), HandleException); // Task-2
    boost::asio::co_spawn(pool, WorkWithTimeout(), HandleException); // Task-3
    
    std::this_thread::sleep_for(std::chrono::seconds{1});
    signal.emit(boost::asio::cancellation_type_t::terminal);
    
    pool.join();
}

Here is the program output:

timeout
SyncWork - elapsed: 202ms
success
SyncWork - elapsed: 1005ms
success
SyncWork - elapsed: 2000ms
success
Program ended with exit code: 0

1 Answer 1

6

Is the third attempt at using co_spawn correct? It seems wrong to me that my task's implementation needs to defensively defer at the very beginning in order to work with the awaitable_operators.

No that has no effect. You simply incur some runtime overhead to land back on the same executor you were already on.

The Problem

Your problem is that you are ... running a blocking function. Also, some other thread is emit()-ing a signal, but there's no synchronization in place. It looks like with optimization enabled, since the await-transform of cancellation_state_t is always ready, that all inlines away to a tight loop: https://godbolt.org/z/bPGE4rna9

Some strategies to fix the situation:

  1. replace sleep_for with an async wait

    //sleep_for(1ms);
    co_await delay(1ms);
    

    where delay is a simple

    asio::awaitable<void> delay(auto dur) {
        co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
    }
    

    As bonus, you don't have to do manual checking of the cancellation state at all anymore, because awaitable<> has throw_if_cancelled on by default:

    assert(co_await asio::this_coro::throw_if_cancelled());
    
  2. of course, if sleep_for was actually the stand in for actual blocking work, you could follow that by a post:

    asio::awaitable<void> delay(auto dur) {
        co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
    }
    

Both these fix the problem without needing "magic" defer calls.

DEMO

Restructured to show the above, with some simplifications, and removing the unneeded DeferredWorkWithTimeout version:

Live On Compiler Explorer

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
auto constexpr now = std::chrono::steady_clock::now;

asio::awaitable<void> delay(auto dur) {
    co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
}

asio::awaitable<void> timeout(auto dur) {
    co_await delay(dur);
    std::cout << "timeout" << std::endl;
}

// Simulate 2s of work, checking cancellation every 1ms
boost::asio::awaitable<void> SyncWork() try {
    assert(co_await asio::this_coro::throw_if_cancelled());

    for (auto limit = now() + 2s; now() < limit;) {
        sleep_for(1ms);
        co_await post(co_await asio::this_coro::executor);

        //// or, combined:
        // co_await delay(1ms);
    }

    std::cout << "SyncWork - elapsed" << std::endl;
} catch (boost::system::system_error const& se) {
    std::cout << "SyncWork - " << se.code().message() << std::endl;
}

auto HandleException() {
    return [before = now()](std::exception_ptr p) {
        try {
            std::cout << "Completion in " << (now() - before) / 1ms << "ms" << std::endl;
            if (p)
                std::rethrow_exception(p);
            else
                std::cout << "success" << std::endl;
        } catch (std::exception const& e) {
            std::cout << e.what() << std::endl;
        } catch (...) {
            std::cout << "catch" << std::endl;
        }
    };
}

void task1() {
    std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
    asio::thread_pool pool{8};

    asio::cancellation_signal signal;
    co_spawn(pool, SyncWork(), bind_cancellation_slot(signal.slot(), HandleException()));

    sleep_for(1s);
    signal.emit(asio::cancellation_type_t::terminal);

    pool.join();
    std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
}

static asio::awaitable<void> WorkWithTimeout() { co_await (SyncWork() || timeout(200ms)); }

void task3() {
    std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
    asio::thread_pool pool{8};

    co_spawn(pool, WorkWithTimeout(), HandleException());

    pool.join();
    std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
}

int main() {
    task1();
    task3();
}

Printing e.g.

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp && ./a.out
 ----- task1 start 
SyncWork - Operation canceled
Completion in 1001ms
success
 ----- task1 exit

 ----- task3 start 
timeout
SyncWork - Operation canceled
Completion in 202ms
success
 ----- task3 exit

Side Notes

Perhaps a more natural way to control purely blocking work is e.g. std::stop_token. That way you don't even need to make it a coroutine. This could be important in case you can e.g. not afford to be resuming SyncWork on a different thread.

Sign up to request clarification or add additional context in comments.

5 Comments

You are correct that the sleep_for call was the stand in for actual blocking work. The co_await post at the end of every loop is an elegant solution to implicitly check for cancellation. On the other hand, the defer does have an effect, as proven by the resulting 2s execution duration. It seems that the synchronous work executes inline because the parallel_group uses co_spawn under the hood. Also, these two variations produce different outputs : co_await (SyncWork() || timeout()); // 2s duration and co_await (timeout() || SyncWork()); // prints "TIMEOUT" + 2s duration
In other words, task1() || task2() has an undocumented unintuitive quirk which is that task2() may start after task1() has reached its first suspension point if it has executed inline.
Would it be a reasonable option to spawn such a coroutine using an Executor adapter that implements dispatch using post?
I'm not behind a computer any time soon. Two things: I believe I've seen an executor with similar semantics somewhere in the past. It might have been in the Asio code base itself... Perhaps you can beat me to finding what I vaguely remember now. Second: a common pitfall with coros is exceptions suddenly evaporating coro frames which can easily go unnoticed. I'm not sure that was at play here, but it's a general thing to keep in mind (eg when the order of disjunction matters for observed effects)
Thank you! Your hint sent me digging through the code. I ended up finding boost::asio::execution::blocking_adaptation_t` and from there, I ended up on previous replies you've given in the past concerning boost::asio::prefer vs boost::asio::require. I can now successfully force a non-blocking co_spawn this way : boost::asio::co_spawn(boost::asio::require(pool.get_executor(), boost::asio::execution::blocking_t::never), WorkWithTimeout(), HandleException); and it gets cancelled properly after 200ms as expected!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.