如何编写异步操作?

How to compose asynchronous operations?

本文关键字:异步操作 何编写      更新时间:2023-10-16

我正在寻找一种编写异步操作的方法。最终目标是执行异步操作,并使其运行完成,或在用户定义的超时后返回。

出于示例目的,假设我正在寻找一种组合以下协程1的方法:

IAsyncOperation<IBuffer> read(IBuffer buffer, uint32_t count)
{
auto&& result{ co_await socket_.InputStream().ReadAsync(buffer, count, InputStreamOptions::None) };
co_return result;
}

socket_是一个StreamSocket实例。

和超时协程:

IAsyncAction timeout()
{
co_await 5s;
}

我正在寻找一种方法来组合这些协程,以便在读取数据或超时过期后尽快返回。

这些是我到目前为止评估的选项:

  • C++20 协程:据我P1056R0了解,目前没有库或语言功能"支持协程的创建和组合"。
  • Windows 运行时提供了异步任务类型,最终派生自 IAsyncInfo: 同样,我没有找到任何允许我按照所需方式组合任务的工具。
  • 并发运行时:这看起来很有希望,特别是when_any函数模板看起来正是我所需要的。

由此看来我需要使用并发运行时。但是,我很难将所有部分组合在一起。我对如何处理异常以及是否需要取消相应的其他并发任务感到特别困惑。

问题是双重的:

  • 并发运行时是唯一的选项(UWP 应用程序)吗?
  • 实现会是什么样子?

1这些方法是应用程序内部的。不需要让它们返回 Windows 运行时兼容类型。

我认为最简单的方法是使用concurrency库。 您需要修改超时以返回与第一个方法相同的类型,即使它返回 null。

(我意识到这只是部分答案...

我的C++很糟糕,但我认为这很接近...

array<task<IBuffer>, 2> tasks =
{
concurrency::create_task([]{return read(buffer, count).get();}),
concurrency::create_task([]{return modifiedTimeout.get();})
};
concurrency::when_any(begin(tasks), end(tasks)).then([](IBuffer buffer)
{ 
//do something 
});

正如 Lee McPherson 在另一个答案中所建议的那样,并发运行时看起来是一个可行的选择。它提供的任务可与其他任务结合使用,使用延续链接,并与 Windows 运行时异步模型无缝集成(请参阅在 C++ 中为 UWP 应用创建异步操作)。作为奖励,包括<pplawait.h>标头为concurrency::task类模板实例化提供了适配器,以用作 C++20 协程 awaitable。

我无法回答所有的问题,但这就是我最终想出的。为了简单起见(并且易于验证),我使用 Sleep 代替实际的读取操作,并返回一个int而不是一个IBuffer

任务的组成

ConcRT 提供了几种组合任务的方法。给定要求,concurrency::when_any可用于创建在提供的任何任务完成时返回的任务。当仅提供 2 个任务作为输入时,还有一个方便的运算符 (operator||) 可用。

异常传播

从任一输入任务引发的异常不计为成功完成。与when_any任务一起使用时,引发异常不足以满足等待条件。因此,不能使用异常来分解组合任务。为了解决这个问题,我选择返回一个std::optional,并在then延续中提出适当的异常。

任务取消

这对我来说仍然是一个谜。看来,一旦任务满足when_any任务的等待条件,就不需要取消相应的其他未完成任务。一旦这些完成(成功与否),它们就会被默默地处理。

以下是使用前面提到的简化的代码。它创建一个由实际工作负载和一个超时任务组成的任务,两者都返回一个std::optionalthen延续检查返回值,并在没有返回值的情况下引发异常(即timeout_task首先完成)。

#include <Windows.h>
#include <cstdint>
#include <iostream>
#include <optional>
#include <ppltasks.h>
#include <stdexcept>
using namespace concurrency;
task<int> read_with_timeout(uint32_t read_duration, uint32_t timeout)
{
auto&& read_task
{
create_task([read_duration]
{
::Sleep(read_duration);
return std::optional<int>{42};
})
};
auto&& timeout_task
{
create_task([timeout]
{
::Sleep(timeout);
return std::optional<int>{};
})
};
auto&& task
{
(read_task || timeout_task)
.then([](std::optional<int> result)
{
if (!result.has_value())
{
throw std::runtime_error("timeout");
}
return result.value();
})
};
return task;
}

以下测试代码

int main()
{
try
{
auto res1{ read_with_timeout(3000, 5000).get() };
std::cout << "Succeeded. Result = " << res1 << std::endl;
auto res2{ read_with_timeout(5000, 3000).get() };
std::cout << "Succeeded. Result = " << res2 << std::endl;
}
catch( std::runtime_error const& e )
{
std::cout << "Failed. Exception = " << e.what() << std::endl;
}
}

生成以下输出:

Succeeded. Result = 42
Failed. Exception = timeout