理解建议N3650中C++1y的可恢复函数示例

Understanding an example about resumable functions in proposal N3650 for C++1y

本文关键字:可恢复 函数 C++1y N3650      更新时间:2023-10-16

考虑以下取自N3650的示例:

int cnt = 0;
do {
   cnt = await streamR.read(512, buf);
   if (cnt == 0)
      break;
   cnt = await streamW.write(cnt, buf);
} while (cnt > 0);

我可能遗漏了一些东西,但如果我很好地理解了asyncawait,那么当效果相当于写作时,用上面的例子来展示这两个结构的有用性有什么意义:

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

其中CCD_ 3和CCD_?

await关键字不等于对future调用get。你可能会更像这样看待它,假设你从这个开始:

future<T> complex_function()
{
     do_some_stuff();
     future<Result> x = await some_async_operation();
     return do_some_other_stuff(x);
}

这在功能上或多或少与相同

future<T> complex_function()
{
     do_some_stuff();
     return some_async_operation().then([=](future<Result> x) {
         return do_some_other_stuff(x);
     });
}

或多或少要注意的是,因为存在一些资源管理含义,所以不应该像lambda版本那样复制do_some_stuff中创建的变量来执行do_some_other_stuff

第二种变体使调用时会发生什么更加清楚。

  1. 当您调用complex_function时,do_some_stuff()将被同步调用
  2. CCD_ 9被异步调用并在将来产生结果。执行此操作的确切时刻取决于您实际的异步调用实现,当您使用线程时,它可能是即时的,当使用延迟执行时,它可以是调用.get()的任何时候
  3. 我们不会立即执行do_some_other_stuff,而是将其链接到步骤2中获得的未来。这意味着它可以在来自some_async_operation的结果准备好后立即执行,但不能在之前执行。除此之外,它的执行时刻是由运行时决定的。如果实现仅包含then提案,这意味着它将继承父未来的执行器/启动策略(根据N3558)
  4. 函数返回表示最终结果的最后一个future。请注意,这需要成为未来,因为函数体的一部分是异步执行的

一个更完整的例子(希望是正确的):

future<void> forwardMsgs(istream& streamR, ostream& streamW) async
{
    char buf[512];
    int cnt = 0;
    do {
       cnt = await streamR.read(512, buf);
       if (cnt == 0)
          break;
       cnt = await streamW.write(cnt, buf);
    } while (cnt > 0);
}
future<void> fut = forwardMsgs(myStreamR, myStreamW);
/* do something */
fut.get();

重要的一点是(引用草案):

挂起后,可恢复函数可以由运行时的调度逻辑恢复,并最终完成其逻辑,此时它执行返回语句(显式或隐式),并在占位符中设置函数的结果值。

和:

可恢复的函数在其执行暂停后恢复后,可以在另一个线程上继续执行。

也就是说,最初调用forwardMsgs的线程可以在任何挂起点返回。如果是这样,在/* do something */行期间,forwardMsgs内部的代码可以由另一个线程执行,即使函数已被"同步"调用。


这个例子与非常相似

future<void> fut = std::async(forwardMsgs, myStreamR, myStreamW);
/* do something */
fut.get();

不同之处在于可恢复函数可以由不同的线程执行:在每个恢复/挂起点之后,不同的线程可以恢复(可恢复函数的)执行。

我认为streamR.read()streamW.write()调用是异步I/O操作和返回期货,由await表达式自动等待。

因此,等效同步版本必须调用future::get()才能获得结果,例如

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

您指出这里没有并发性是正确的。然而,在可恢复函数的上下文中,await使行为与上面的代码段不同。当到达await时,函数将返回future,因此即使在等待某些其他结果(例如,在这种情况下,read()write()调用完成)时,可恢复函数在await被阻塞,函数的调用方也可以在不阻塞的情况下继续,因此,当调用者正在做其他事情时,结果在后台变得可用。

以下是不使用await的示例函数的正确翻译:

struct Copy$StackFrame {
  promise<void> $result;
  input_stream& streamR;
  output_stream& streamW;
  int cnt;
  char buf[512];
};
using Copy$StackPtr = std::shared_ptr<Copy$StackFrame>;
future<void> Copy(input_stream& streamR, output_stream& streamW) {
  Copy$StackPtr $stack{ new Copy$StackFrame{ {}, streamR, streamW, 0 } };
  future<int> f$1 = $stack->streamR.read(512, stack->buf);
  f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  return $stack->$result.get_future();
}
void Copy$Cont1(Copy$StackPtr $stack, future<int> f$1) {
  try {
    $stack->cnt = f$1.get();
    if ($stack->cnt == 0) {
      // break;
      $stack->$result.set_value();
      return;
    }
    future<int> f$2 = $stack->streamW.write($stack->cnt, $stack->buf);
    f$2.then([$stack](future<int> f) { Copy$Cont2($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}
void Copy$Cont2(Copy$StackPtr $stack, future<int> f$2) {
  try {
    $stack->cnt = f$2.get();
    // while (cnt > 0)
    if (cnt <= 0) {
      $stack->$result.set_value();
      return;
    }
    future<int> f$1 = $stack->streamR.read(512, stack->buf);
    f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}

正如您所看到的,这里的编译器转换相当复杂。这里的关键点是,与get()版本不同,一旦进行了第一个异步调用,原始Copy就会返回其未来。

我对这两个代码示例之间的差异的含义有同样的问题。让我们重写一点,使它们更完整。

    // Having two functions
    future<void> f (istream&streamR, ostream&streamW) async
    {  int cnt = 0;
       do {
          cnt = await streamR.read(512, buf);
          if (cnt == 0)
             break;
          cnt = await streamW.write(cnt, buf);
       } while (cnt > 0);
    }
    void g(istream&streamR, ostream&streamW)
    {  int cnt = 0;
       do {
          cnt = streamR.read(512, buf).get();
          if (cnt == 0)
             break;
          cnt = streamW.write(cnt, buf).get();
       } while (cnt > 0);
    }
    // what is the difference between
    auto a = f(streamR, streamW);
    // and 
    auto b = async(g, streamR, streamW);

您仍然需要至少三个堆栈。在这两种情况下,主线程都没有被阻塞。是否假设等待将由编译器比未来<>更有效地实现:get()?。好吧,没有等待的现在可以使用了。

谢谢Adam Zielinski