C++中的线程池/排队系统

Threadpool / Queueing system in C++

本文关键字:排队 系统 线程 C++      更新时间:2023-10-16

我有一种情况,需要做一些繁重的计算。我发现细分数据然后将其合并在一起是最快的(随着大小的增加,时间增加得更快,所以拆分是合乎逻辑的)。

它应该能够为应用程序提供数据大小,例如一百万个双值。

我现在所做的是,将基于这个大小创建的数据发送到某个函数,在计算后返回,然后循环返回,将这些数据卸载到主向量中。

我想寄200的部分,还有一个"最后"部分。例如,给定size=1000005,最初将执行此函数5000次,然后最后一次执行大小为5的数据。

int size = 1000000;
int times = size / 200; // 5000
int leftover = size % 200; // 0, this not performed
QVector<double> x(size);
QVector<double> y(size);
x = createData(size);
x = createData(size);
for (int i = 0; i < times; i++)
{
    holder = createData(200);
    QVector<double> tempx = x.mid(i*200, 200);
    QVector<double> tempy = y.mid(i*200, 200);
    holder = myfunction(tempx, tempy, 200);  // let it now just return `tempy`
    for (int j = 0; j < 200; j++)
    {
        y[i*200 + j] = holder[j];
    }
}
// leftover function here, really similar to this part before.
// plotting function here

最后,x将保持初始化状态,y将进行计算。

由于这些代码部分可以分开运行,而且速度至关重要,所以我想使用几个核心。

以下是这种情况的进一步特征:

  • 这些函数调用是相互独立的,只有在向量完成后,我才想绘制结果
  • 每次通话的完成时间会有很大的变化
  • times的数量应该是可变的

我读到一些关于建议最大线程数为核心数量的文章(至少作为一个起点),因为使用太多线程可能会减慢进程。考虑到这种情况,排队系统/线程池似乎是有意义的,因为当一个线程有一些简单的作业,而其他线程则因较难的作业而减慢了速度时,它不会浪费时间。

虽然在十几个教程中,使用一些(通常是2个)线程打印一些消息似乎很容易,但有人能提供更详细的帮助吗?如何返回向量并将这些线程安全地卸载到主函数中,以及如何创建线程池以避免浪费时间?

使用Ubuntu 13.04、Qt和C++11x,尽管这并不重要。

首先,编写一个线程池是很困难的。如果你真的想学习如何写一个,Antony Williams写的《C++并发在行动》一书会教你如何做到这一点。

然而,您的情况似乎是一个简单的parallel_fo非常适合的情况。因此,我建议使用"英特尔线程构建块"库。该库的优点是它有一个非常好的线程池,并且可以很好地与C++11功能配合使用。

示例代码:

#include "tbb/task_scheduler_init.h"
#include "tbb/blocked_range.h"
#include "tbb/parallel_for.h"
#include "tbb/tbb_thread.h"
#include <vector>
int main() {
  tbb::task_scheduler_init init(tbb::tbb_thread::hardware_concurrency());
  std::vector<double> a(1000);
  std::vector<double> c(1000);
  std::vector<double> b(1000);
  std::fill(b.begin(), b.end(), 1);
  std::fill(c.begin(), c.end(), 1);
  auto f = [&](const tbb::blocked_range<size_t>& r) {
    for(size_t j=r.begin(); j!=r.end(); ++j) a[j] = b[j] + c[j];    
  };
  size_t hint_number_iterations_per_thread = 100;
  tbb::parallel_for(tbb::blocked_range<size_t>(0, 1000, hint_number_iterations_per_thread), f);
  return 0;
}

完成!英特尔TBB有一个非常好的线程池,可以尝试调整每个线程的工作负载。只要hint_number_iterations_per_thread不是一个疯狂的数字,它就会非常接近最优解

顺便说一句:intel TBB是一个可与大多数编译器配合使用的开源库!

您不需要创建任何东西。如果您使用的是Qt,那么您的问题已经解决。您可以从QRunnable派生一个类,然后将其传递给QThreadPool执行。

您可以指示QThreadPool应该同时运行多少个线程(任何额外的线程只需在队列中等待,直到插槽打开),但这不应该是必要的,因为QThreadPool根据您的体系结构设置了通常足够好的限制。

QThreadPool

Q可运行

您可以使用QtConcurrent库,这比创建QThreadPool和扩展QRunabble更简单。具体使用QtConcurrent::mapped函数,它采用一个开始迭代器和一个结束迭代器,以及一个函数(可以是lambda),并在内部为您处理线程池的创建和执行。

有两种变体:"mapped"返回结果的QFuture,但不阻塞当前线程,而"blockingMapped"直接返回结果列表。

要对一个大的整数向量求平方,可以执行以下操作:

std::vector<int> myInts = ....
QVector<int> result = QtConcurrent::blockingMapped(myInts.begin(), myInts.end(), [](int x) { return x*x}; });