在 C++11 中使用期货、异步和线程实现搜索

Implement search using futures, async, and thread in C++11

本文关键字:线程 实现 搜索 异步 C++11 使用期      更新时间:2023-10-16

我想以多线程方式实现分支和绑定搜索。特别是,我想使用 async 来包装每个分支的搜索调用,然后等到某个线程提出答案,然后退出。(理想情况下,我想取消其他线程,但线程取消不在标准中)。以下是一些简化的代码:

#include <iostream>
#include <random>
#include <future>
#include <thread>
using namespace std;
mt19937 rng;
uniform_int_distribution<unsigned> random_binary(0, 1);
bool search() {
  return static_cast<bool>(random_binary(rng));
}
#define N 10000
int main()
{
  rng.seed(42);
  std::vector<future<bool>> tasks;
  for (unsigned i=0; i<N; ++i)
    tasks.push_back(async(launch::async, search));
  // Don't want to wait sequentially here.
  for (unsigned i=0; i<N; ++i) {
    tasks[i].wait();
    if (tasks[i].get()) {
      cout << "i = " << i << "n";
      break;
    }
  }
  return 0;
}

search()是搜索功能。它根据是否找到答案返回真/假。我返回一个随机答案进行说明。但问题的症结在于调用tasks[i].wait()的 for 循环。现在,我正在按顺序等待任务完成。相反,我想做这样的事情:

auto x = wait_for_any(tasks.begin(), tasks.end());
x.get();
// cancel other threads.
// Profit?

实现这一目标的好方法是什么?

>std::future提供了一个valid()函数,让你检查结果是否可用而不会阻塞,所以你可以使用它,例如在忙碌等待循环中:

std::future<bool>* res_future = 0;
for(size_t i = 0; ; i==tasks.size()?i=0:++i){
  // could add a timeout period to not completely busy-wait the CPU
  if(tasks[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready){
    res = &tasks[i];
    break;
  }
}
bool res = res_future->get();

建议添加到 std::future ,使这样的任务更容易,是一种.then(func_obj)方法,该方法在结果可用时异步调用func_obj,您可以在其中设置标志或其他东西。

遗憾的是,我不知道除了上述方式之外,还有什么方法可以以任何其他方式实现wait_for_any

template<class FwdIt>
std::future<bool> wait_for_any(FwdIt first, FwdIt last)
{
  return std::async([=]{
    for(FwdIt cur(first); ; cur==last?cur=first:++cur){
    // could add a timeout period to not completely busy-wait the CPU
    if(cur->wait_for(std::chrono::seconds(0)) == std::future_status::ready)
      return cur->get();
  });
}

线程销毁通常通过协作取消来完成。

P. S.:如果结果不可用,std::future<T>::get()将自动wait()

安排所有任务访问相同的condition_variablemutexbool。 这可以通过创建这些全局变量或每个任务运行成员函数的成员数据来完成,或者您可以通过std::ref将它们作为任务创建函数中的参数传递。

在开始任何任务之前,将bool初始化为not_found。 然后,主线程启动任务并等待condition_variable。 然后,搜索器任务进行搜索。 当他们搜索时,他们偶尔会检查bool(可能带有原子负载),看看它是否已设置为found。 如果有,搜索器线程将立即返回。

当一个线程找到结果时,它将bool设置为 found 并发出condition_variable信号。 这将唤醒主线程并有效地取消其余的搜索器任务。 然后,主线程可以加入、分离、放弃等所有搜索器任务。 如果您没有 main 显式加入搜索器任务,最好可以安排所有搜索器任务在 main 退出之前结束。

没有轮询。 无需等待死胡同搜索。 唯一的临时部分是弄清楚搜索者任务检查bool的方式和频率。 我建议对这部分进行性能测试。

我要采取的方法是让 main 函数创建一个std::promise,然后将其作为对将要完成工作的各种线程的引用传递。然后,每个工作线程将使用它们共享的std::promise,以便在获得结果后使用set_value()

哪个工作线程首先到达那里将能够发送结果,而其他人在尝试使用 set_value() 并且已经设置时将引发异常。

在下面的源代码框架中,我正在使用std::async但是如果我们可以在不创建不用于任何内容的额外std::promise的情况下剥离自治线程,那就太好了。

因此,代码框架将如下所示:

#include <iostream>
#include <thread>  // std::thread is defined here
#include <future>  // std::future and std::promise defined here
bool search(int args, std::promise<int> &p) {
    // do whatever needs to be done
    try {
        p.set_value(args);
    }
    catch (std::future_error & e) {
        // std::future_errc::promise_already_satisfied
    }
    return true;
}
int main(int argc, char * argv[])
{
    std::promise<int> myProm;    
    auto fut = myProm.get_future();
    for (int i = 1; i < 4; i++) {
        std::async(std::launch::async, search, i, std::ref(myProm));
    }
    auto retVal = fut.get();
    return 0;
}