如何在循环中使用 ppl 任务和 .then

Howto use ppl tasks and .then in a loop?

本文关键字:ppl 任务 then 循环      更新时间:2023-10-16

我试图学习ppl。而不是使用线程。让我们从头开始。我有一个简单的问题:

v1:

            while(true)
            {
                auto important_msg = ReceiveImportantMessage_Blocking(); //Blocks for about 20s until there is a new message
                //do processing on important_msg
                auto unimportant_msg = ReceiveUnimportantMessage_Blocking(); //Blocks for about 60s until there is a new message
                //do processing on unimportant_msg
            }

v1 显然很糟糕,因为这两个调用都会阻塞并最终相互等待。

v2:

            while(true)
            {
                auto important_msg = ReceiveImportantMessage_Blocking(); //Blocks for about 20s until there is a new message
                //do processing on important_msg
                auto unimportant_msg = CheckForUnimportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(unimportant_msg) {
                    //do processing on unimportant_msg
                }
            }

v2 更好,因为不重要的消息不会阻止重要消息。此外,重要消息在收到实例后会对其进行处理。不过,在我们收到重要消息之前,不会检查不重要的消息。因此,当我到达它时,这条不重要的信息可能已经 20 多岁了。

v3:

            while(true)
            {
                auto important_msg = CheckForImportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(important_msg) {
                    //do processing on important_msg
                }   
                auto unimportant_msg = CheckForUnimportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(unimportant_msg) {
                    //do processing on unimportant_msg
                }
                sleep(10); //just so we don't busy wait.    
            }

v3 可以更快地获取不重要的消息。但对于重要消息,它也变慢了。重要消息的处理不会立即收到。但只有当我四处检查它时。由于我添加了睡眠以避免忙于等待(并消耗过多的 CPU 时间),因此与 v2 相比,接收和处理重要消息所需的时间更长。

v4:

            {
                auto important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //ppl::task
                auto unimportant_msg_task = ReceiveUnimportantMessageTask_NonBlocking(); //ppl::task
                while(true)
                {
                    if(important_msg_task.is_done()) {
                        auto important_msg = important_msg_task.get();
                        //do processing on important_msg
                        important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //listen for new message
                    }
                    if(unimportant_msg_task.is_done()) {
                        auto unimportant_msg = unimportant_msg_task.get();
                        //do processing on important_msg
                        unimportant_msg_task = ReceiveUnimportantMessageTask_NonBlocking(); //listen for new message
                    }
                    sleep(10); //just so we don't busy wait.    
                }
            }

V4 与 v3 相同。只是用 ppl 任务代替。它存在无法立即处理重要消息的问题。

v5)我想删除睡眠并使用important_msg_task".then"在收到后启动处理,并在处理旧消息后使用".then"侦听新消息,".then"处理新消息等等(并对unimportant_msg_task执行相同的操作)。我不知道如何在循环中做到这一点。似乎我最终会永远得到一个接一个不断增长的连接任务链。

那么如何使用 ppl(或至少没有原始线程)解决这个问题呢?

使用这种惯用代码:

template<typename Func>
concurrency::task<void> doWhile(Func func)
{
  static_assert(
    std::is_same_v<decltype(func()), bool> ||
    std::is_same_v<decltype(func()), concurrency::task<bool>>);
  return concurrency::create_task(func)
    .then([func](bool needToContinue)
    {
      if (needToContinue)
        return doWhile(func);
      return concurrency::task_from_result();
    });
}

对于一个任务,您可以编写如下内容:

concurrency::task<bool> process() {
  // launching initial task
  auto important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //ppl::task
  // adding continuation with processing of the result
  auto continuation = important_msg_task.then([](const Message &msg)
  {
    // do processing on important msg
    // decide whether to continue or stop processing
    return stopProcessing() ? false : true;
  });
  return continuation;
}
auto loop = doWhile([] {
  return process();
});

使用loop.get()等待处理完成。

要有两个并行处理"循环",只需为另一个"循环"触发第二个doWhile,例如:

auto loop2 = doWhile([] {
  return ReceiveUnimportantMessageTask_NonBlocking()
    .then([](const Message &msg)
    {
      //do processing on unimportant msg
      return !stopProcessing();
    });
});

由于所有任务都在线程池上执行,因此具有多个doWhile可以有效地使处理并行。