如何编写异步操作?
How to compose asynchronous operations?
我正在寻找一种编写异步操作的方法。最终目标是执行异步操作,并使其运行完成,或在用户定义的超时后返回。
出于示例目的,假设我正在寻找一种组合以下协程1的方法:
IAsyncOperation<IBuffer> read(IBuffer buffer, uint32_t count)
{
auto&& result{ co_await socket_.InputStream().ReadAsync(buffer, count, InputStreamOptions::None) };
co_return result;
}
socket_
是一个StreamSocket
实例。
和超时协程:
IAsyncAction timeout()
{
co_await 5s;
}
我正在寻找一种方法来组合这些协程,以便在读取数据或超时过期后尽快返回。
这些是我到目前为止评估的选项:
- C++20 协程:据我P1056R0了解,目前没有库或语言功能"支持协程的创建和组合"。
- Windows 运行时提供了异步任务类型,最终派生自 IAsyncInfo: 同样,我没有找到任何允许我按照所需方式组合任务的工具。
- 并发运行时:这看起来很有希望,特别是
when_any
函数模板看起来正是我所需要的。
由此看来我需要使用并发运行时。但是,我很难将所有部分组合在一起。我对如何处理异常以及是否需要取消相应的其他并发任务感到特别困惑。
问题是双重的:
- 并发运行时是唯一的选项(UWP 应用程序)吗?
- 实现会是什么样子?
1这些方法是应用程序内部的。不需要让它们返回 Windows 运行时兼容类型。
我认为最简单的方法是使用concurrency
库。 您需要修改超时以返回与第一个方法相同的类型,即使它返回 null。
(我意识到这只是部分答案...
我的C++很糟糕,但我认为这很接近...
array<task<IBuffer>, 2> tasks =
{
concurrency::create_task([]{return read(buffer, count).get();}),
concurrency::create_task([]{return modifiedTimeout.get();})
};
concurrency::when_any(begin(tasks), end(tasks)).then([](IBuffer buffer)
{
//do something
});
正如 Lee McPherson 在另一个答案中所建议的那样,并发运行时看起来是一个可行的选择。它提供的任务可与其他任务结合使用,使用延续链接,并与 Windows 运行时异步模型无缝集成(请参阅在 C++ 中为 UWP 应用创建异步操作)。作为奖励,包括<pplawait.h>
标头为concurrency::task
类模板实例化提供了适配器,以用作 C++20 协程 awaitable。
我无法回答所有的问题,但这就是我最终想出的。为了简单起见(并且易于验证),我使用 Sleep 代替实际的读取操作,并返回一个int
而不是一个IBuffer
。
任务的组成
ConcRT 提供了几种组合任务的方法。给定要求,concurrency::when_any
可用于创建在提供的任何任务完成时返回的任务。当仅提供 2 个任务作为输入时,还有一个方便的运算符 (operator||
) 可用。
异常传播
从任一输入任务引发的异常不计为成功完成。与when_any
任务一起使用时,引发异常不足以满足等待条件。因此,不能使用异常来分解组合任务。为了解决这个问题,我选择返回一个std::optional
,并在then
延续中提出适当的异常。
任务取消
这对我来说仍然是一个谜。看来,一旦任务满足when_any
任务的等待条件,就不需要取消相应的其他未完成任务。一旦这些完成(成功与否),它们就会被默默地处理。
以下是使用前面提到的简化的代码。它创建一个由实际工作负载和一个超时任务组成的任务,两者都返回一个std::optional
。then
延续检查返回值,并在没有返回值的情况下引发异常(即timeout_task
首先完成)。
#include <Windows.h>
#include <cstdint>
#include <iostream>
#include <optional>
#include <ppltasks.h>
#include <stdexcept>
using namespace concurrency;
task<int> read_with_timeout(uint32_t read_duration, uint32_t timeout)
{
auto&& read_task
{
create_task([read_duration]
{
::Sleep(read_duration);
return std::optional<int>{42};
})
};
auto&& timeout_task
{
create_task([timeout]
{
::Sleep(timeout);
return std::optional<int>{};
})
};
auto&& task
{
(read_task || timeout_task)
.then([](std::optional<int> result)
{
if (!result.has_value())
{
throw std::runtime_error("timeout");
}
return result.value();
})
};
return task;
}
以下测试代码
int main()
{
try
{
auto res1{ read_with_timeout(3000, 5000).get() };
std::cout << "Succeeded. Result = " << res1 << std::endl;
auto res2{ read_with_timeout(5000, 3000).get() };
std::cout << "Succeeded. Result = " << res2 << std::endl;
}
catch( std::runtime_error const& e )
{
std::cout << "Failed. Exception = " << e.what() << std::endl;
}
}
生成以下输出:
Succeeded. Result = 42 Failed. Exception = timeout
- 无法在 WinRT 中获取异步操作结果 (Windows::Foundation::IAsyncOperating 接口
- 异步操作的 Asio 处理程序在其同步对应项正常工作时不会调用
- 我不明白异步操作如何使HTTP服务器并发
- 提升::Asio 使用异步操作时传输的字节
- 如何编写异步操作?
- 发生哪些线程异步操作
- C 11异步操作在树上的操作
- 在 boost::asio 中启动异步操作和运行io_service的正确顺序
- 如何实现多线程异步操作
- FTP异步操作[Wininet C++]上的ERROR_IO_PENDING
- 使用带有boost::asio异步操作的自定义streambuf
- 提升 ASIO,如何取消异步操作
- 提升异步操作不起作用(对我来说)
- 在同一条链中执行异步操作
- 将异步操作的处理程序绑定到另一个类的非静态成员
- 如果一个basic_waitable_timer在仍有异步操作等待的情况下被销毁,该怎么办
- boost::asio中异步操作的处理程序要求
- 设计一个结合同步和异步操作的c++ API
- 如何用c++ boost::asio顺序执行异步操作
- 如何将函数作为参数传递给异步操作处理程序