运行固定数量的线程

Running fixed number of threads

本文关键字:线程 运行      更新时间:2023-10-16

随着c++17的新标准,我想知道是否有一种好方法可以使用固定数量的线程启动进程,直到一批作业完成。

你能告诉我如何实现这段代码的所需功能吗:

std::vector<std::future<std::string>> futureStore;
const int batchSize             = 1000;
const int maxNumParallelThreads = 10;
int threadsTerminated           = 0;
while(threadsTerminated < batchSize)
{
const int& threadsRunning = futureStore.size();
while(threadsRunning < maxNumParallelThreads)
{
futureStore.emplace_back(std::async(someFunction));
}
for(std::future<std::string>& readyFuture: std::when_any(futureStore.begin(), futureStore.end()))
{
auto retVal = readyFuture.get(); 
// (possibly do something with the ret val)
threadsTerminated++;
}
} 

我读到,曾经有一个std::when_any功能,但它确实是一个功能,它确实进入了std功能。

当前标准库中是否支持此功能(不一定是std::future-s)?有没有办法轻松实现它,或者我必须解决这样的事情?

在我看来,这不是理想的方法:

  1. 您的主线程所做的只是等待您的其他线程完成,轮询您未来的结果。几乎以某种方式浪费了这个线程...

  2. 我不知道 std::async 以任何合适的方式重用线程的基础设施的程度,所以你每次都有创建全新线程的风险......(除此之外,您可能不会创建任何线程,如果您没有明确指定std::launch::async,请参阅此处。

我个人更喜欢另一种方法:

  1. 一次创建要使用的所有线程。
  2. 让每个线程运行一个循环,重复调用 someFunction(),直到达到所需任务的数量。

实现可能类似于以下示例:

const int BatchSize = 20;
int tasksStarted = 0;
std::mutex mutex;
std::vector<std::string> results;
std::string someFunction()
{
puts("worker started"); fflush(stdout);
sleep(2);
puts("worker done"); fflush(stdout);
return "";
}
void runner()
{
{
std::lock_guard<std::mutex> lk(mutex);
if(tasksStarted >= BatchSize)
return;
++tasksStarted;
}
for(;;)
{
std::string s = someFunction();
{
std::lock_guard<std::mutex> lk(mutex);
results.push_back(s);
if(tasksStarted >= BatchSize)
break;
++tasksStarted;
}
}
}
int main(int argc, char* argv[])
{
const int MaxNumParallelThreads = 4;
std::thread threads[MaxNumParallelThreads - 1]; // main thread is one, too!
for(int i = 0; i < MaxNumParallelThreads - 1; ++i)
{
threads[i] = std::thread(&runner);
}
runner();
for(int i = 0; i < MaxNumParallelThreads - 1; ++i)
{
threads[i].join();
}
// use results...
return 0;
}

这样,您就不会重新创建每个线程,而只是继续,直到所有任务都完成。

如果这些任务与上面的示例并不完全相同,则可以使用纯虚函数(例如"execute"或"运算符()")创建一个基类Task,并使用所需的实现(并保存任何必要的数据)创建子类。

然后,您可以将实例作为指针放入 std::vector 或 std::list(好吧,我们不会迭代,列表在这里可能合适......)中(否则,你会得到类型擦除!)并让每个线程在完成前一个任务时删除其中一个任务(不要忘记防止竞争条件!)并执行它。一旦没有更多的任务,就返回...

如果您不关心确切的线程数,最简单的解决方案是:

std::vector<std::future<std::string>> futureStore(
batchSize
);
std::generate(futureStore.begin(), futureStore.end(), [](){return std::async(someTask);});

for(auto& future : futureStore) {
std::string value = future.get();
doWork(value);
}

根据我的经验,std::async将在生成一定数量的线程后重用线程。它不会生成 1000 个线程。此外,在使用线程池时,您不会获得太多的性能提升(如果有的话)。我过去做过测量,整体运行时间几乎相同。

我现在使用线程池的唯一原因是为了避免在计算循环中创建线程的延迟。如果您有时间限制,则在第一次使用 std::async 时可能会错过截止时间,因为它将在第一次调用时创建线程。

对于这些应用程序,有一个很好的线程池库。看看这里: https://github.com/vit-vit/ctpl

#include <ctpl.h>
const unsigned int numberOfThreads = 10;
const unsigned int batchSize = 1000;
ctpl::thread_pool pool(batchSize /* two threads in the pool */);
std::vector<std::future<std::string>> futureStore(
batchSize
);
std::generate(futureStore.begin(), futureStore.end(), [](){ return pool.push(someTask);});
for(auto& future : futureStore) {
std::string value = future.get();
doWork(value);
}