防止一段代码在协程中并发执行

Guard a section of code from being executed concurrently in a coroutine

本文关键字:程中 并发 执行 代码 一段      更新时间:2023-10-16

我需要保护一段代码不被并发执行在协程中。防止在多线程环境中并发执行很简单,只需使用 std::lock_guard 类模板即可。但是,我的协程是从单个线程调用的,因此该解决方案不适用。

以下是我试图完成的(伪(代码:

future<http_response> send_req_async(http_request req) {
while (true) {
// Attempt to send an HTTP request
auto const& access_token{ token_store::access_token() };
auto const response{ co_await impl::send_req_async(req, access_token) };
if (response.status_code() == http_code::ok) {
co_return response;
}
// Attempt to refresh access token
if (response.status_code() == http_code::unauthorized) {
// The following scope needs to be guarded against concurrent execution.
// To guard against concurrent execution from multiple threads I would use:
// lock_guard<mutex> guard(refresh_token_mutex);
if (access_token != token_store::access_token()) {
continue;
}
auto const& token{ co_await refresh_token(token_store::refresh_token()) };
token_store::save_access_token(token);
// End of section that needs to be guarded.
}
}
}

该代码旨在允许并行发出多个请求,同时仅允许尝试刷新过期访问令牌的单个协程调用。理想情况下,解决方案应在令牌刷新操作进行时暂停并发协程调用,并在之后自动恢复它(即在多线程环境中使用相同的std::lock_guard语义(。

协程机制或C++标准库中是否内置了任何内容,允许我以干净的方式实现它,或者我必须自己滚动?


注意:我使用的是Visual Studio 2017 15.7.2,因此您可以假设完全支持C++17及其协程TS实现。

C++ 或标准库没有提供基础设施来获取所需的功能。但是,协程 TS 提供了实现可co_await异步互斥锁的构建基块。

一般的想法是实现一个可等待的,它试图在评估await_suspend表达式时获取合适的互斥锁。如果无法获取锁,则协程将挂起并添加到等待者队列中,否则执行将立即继续(保持锁(。

互斥锁的unlock方法从队列中恢复等待者,除非等待者队列为空。

网络上有预先构建的解决方案。我选择了Lewis Baker的async_mutex实现,原因有很多:

  • 没有内部依赖项的外部。只需将编译单元和头文件放入您的项目中即可完成。
  • 锁归协程所有,而不是线程。该实现允许在其他线程上恢复协程。
  • 这是一个无锁的实现。

此实现的使用与std::lock_guard非常相似:

#include <cppcoro/async_mutex.hpp>
namespace {
cppcoro::async_mutex refresh_mutex;
}
future<http_response> send_req_async(http_request req) {
while (true) {
// Attempt to send an HTTP request
auto const& access_token{ token_store::access_token() };
auto const response{ co_await impl::send_req_async(req, access_token) };
if (response.status_code() == http_code::ok) {
co_return response;
}
// Attempt to refresh access token
if (response.status_code() == http_code::unauthorized) {
// The following scope needs to be guarded against concurrent execution.
auto const refresh_guard{ co_await refresh_mutex.scoped_lock_async() };
if (access_token != token_store::access_token()) {
continue;
}
auto const& token{ co_await refresh_token(token_store::refresh_token()) };
token_store::save_access_token(token);
// refresh_guard falls out of scope, unlocking the mutex.
// If there are any suspended coroutines, the oldest one gets resumed.
}
}
}