具有 C++11 多线程的特征库

Eigen library with C++11 multithreading

我想使用 C++11 通过多线程并行化必须重复Ntrials次的代码部分,以便每个线程将执行一次试验。

一个代码示例,假设输入Eigen::ArrayXXd sample为 (Ndimension x Npoints) 可以是以下类型:

double bestTotalModelProbability = 0;
Eigen::ArrayXd clusterIndicesFromSample(Npoints);
for (int i=0; i < Ntrials; i++)
totalModelProbability = computeGaussianMixtureModel(sample);

// Check if this trial is better than the previous one.
// If so, update the results (cluster index for each point
// in the sample) and keep them.
if totalModelProbability > bestTotalModelProbability
bestTotalModelProbability = totalModelProbability;
clusterIndicesFromSample = obtainClusterMembership(sample);


我的代码在很大程度上基于特征数组,我采取的 N 维问题可以解释 10-100 个维度和 500-1000 个不同的采样点。我正在寻找一些示例来使用 Eigen 数组和 C++11 的 std:thread 创建此代码的多线程版本,但找不到任何内容,我正在努力制作一些简单的例子来操作特征数组。

我甚至不确定 Eigen 是否可以在 C++11 中的 std::thread 中使用。 有人可以帮助我,即使是一些简单的例子来理解合成器? 我在 Mac OSX 中使用 clang++ 作为编译器,在具有 6 个内核(12 个线程)的 CPU 上。


我必须承认,我对本征库的经验非常有限。(我曾经使用过将 3×3 个旋转矩阵分解为欧拉角,这在本征库中以一般方式非常聪明地解决了。



  1. 单线程(获取用于比较的值)
  2. 在一个额外的线程中启动每个子任务(以一种公认的相当愚蠢的方法)
  3. 通过交错访问示例数据启动线程
  4. 启动对示例数据具有分区访问的线程。


#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>
// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
size_t n, const T data[], const T &begin, const T &end)
size_t result = 0;
for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
return result;
typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;
Time duration(const Clock::time_point &t0)
return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
std::vector<Time> makeTest()
const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
const size_t NThreads = std::thread::hardware_concurrency();
// make a test sample
std::vector<Value> sample(N);
for (Value &value : sample) value = (Value)rand();
// prepare result vectors
std::vector<size_t> results4[4] = {
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0)
// make test
std::vector<Time> times{
[&]() { // single threading
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[0];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment single-threaded
for (size_t i = 0; i < NGroups; ++i) {
results[i] = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
// done
return duration(t0);
[&]() { // multi-threading - stupid aproach
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[1];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value i = 0; i < NGroups;) {
size_t nT = 0;
for (; nT < NThreads && i < NGroups; ++nT, ++i) {
threads[nT] = std::move(std::thread(
[i, &results, &data, SizeGroup]() {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
// done
return duration(t0);
[&]() { // multi-threading - interleaved
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[2];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT; i < NGroups; i += NThreads) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
[&]() { // multi-threading - grouped
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[3];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT * NGroups / NThreads,
iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
// check results (must be equal for any kind of computation)
const unsigned nResults = sizeof results4 / sizeof *results4;
for (unsigned i = 1; i < nResults; ++i) {
size_t nErrors = 0;
for (Value j = 0; j < NGroups; ++j) {
if (results4[0][j] != results4[i][j]) {
#ifdef _DEBUG
<< "results4[0][" << j << "]: " << results4[0][j]
<< " != results4[" << i << "][" << j << "]: " << results4[i][j]
<< "!n";
#endif // _DEBUG
if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!n";
// done
return times;
int main()
std::cout << "std::thread::hardware_concurrency(): "
<< std::thread::hardware_concurrency() << 'n';
// heat up
std::cout << "Heat up...n";
for (unsigned i = 0; i < 3; ++i) makeTest();
// repeat NTrials times
const unsigned NTrials = 10;
std::cout << "Measuring " << NTrials << " runs...n"
<< "   "
<< " | " << std::setw(10) << "Single"
<< " | " << std::setw(10) << "Multi 1"
<< " | " << std::setw(10) << "Multi 2"
<< " | " << std::setw(10) << "Multi 3"
<< 'n';
std::vector<double> sumTimes;
for (unsigned i = 0; i < NTrials; ++i) {
std::vector<Time> times = makeTest();
std::cout << std::setw(2) << (i + 1) << ".";
for (const Time &time : times) {
std::cout << " | " << std::setw(10) << time.count();
std::cout << 'n';
sumTimes.resize(times.size(), 0.0);
for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
std::cout << "Average Values:n   ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(1)
<< sumTime / NTrials;
std::cout << 'n';
std::cout << "Ratio:n   ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(3)
<< sumTime / sumTimes.front();
std::cout << 'n';
// done
return 0;

在Windows 10上的cygwin64上编译和测试:

$ g++ --version
g++ (GCC) 7.3.0
$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc
$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
|     Single |    Multi 1 |    Multi 2 |    Multi 3
1. |     384008 |    1052937 |     130662 |     138411
2. |     386500 |    1103281 |     133030 |     132576
3. |     382968 |    1078988 |     137123 |     137780
4. |     395158 |    1120752 |     138731 |     138650
5. |     385870 |    1105885 |     144825 |     129405
6. |     366724 |    1071788 |     137684 |     130289
7. |     352204 |    1104191 |     133675 |     130505
8. |     331679 |    1072299 |     135476 |     138257
9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
|   372939.9 |  1086042.6 |   136648.3 |   136144.6
|      1.000 |      2.912 |      0.366 |      0.365

我在 coliru.com 上也做了同样的事情。(我不得不减少加热周期和样本量,因为我超过了原始值的时间限制。

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
|     Single |    Multi 1 |    Multi 2 |    Multi 3
1. |     224684 |     297729 |      48334 |      39016
2. |     146232 |     337222 |      66308 |      59994
3. |     195750 |     344056 |      61383 |      63172
4. |     198629 |     317719 |      62695 |      50413
5. |     149125 |     356471 |      61447 |      57487
6. |     155355 |     322185 |      50254 |      35214
7. |     140269 |     316224 |      61482 |      53889
8. |     154454 |     334814 |      58382 |      53796
9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
|   169387.5 |   329891.5 |    59428.2 |    51406.0
|      1.000 |      1.948 |      0.351 |      0.303


我有点想知道 coliru(只有 4 个线程)的比率甚至比我的 PC 上的比率(有 8 个线程)还要好。实际上,我不知道如何解释这一点。 但是,这两种设置中还有许多其他差异,这些差异可能是也可能不是原因。至少,两次测量都显示第 3和第4 种方法的粗略加速为 3,其中第 2方法唯一地消耗了每个潜在的加速(可能是通过启动和加入所有这些线程)。


  • 最少的通信开销,即并发任务在独立数据上运行
  • 合并并发计算结果后工作量最小。


  1. 在启动线程之前准备好每个数据和存储,
  2. 读取的共享数据在线程运行时永远不会更改,
  3. 以线程本地方式写入的数据(没有两个线程写入相同的数据地址)
  4. 在联接所有线程后评估计算结果。

关于 3.我有点纠结这是否合法,即是否允许在线程中写入的数据在加入后正确显示在主线程中。(某些东西似乎工作正常这一事实通常是虚幻的,但对于多线程尤其虚幻。

cppreference.com 提供以下说明

  • 对于std::thread::thread()


  • 对于std::thread::join()


在Stack Overflow中,我找到了以下相关的问答:

  • 松弛的内存顺序效果是否可以扩展到执行线程的生命周期之后?
  • 这里需要内存围栏吗?
  • 线程::join上是否存在与同步关系的隐式内存屏障?



  • 线程的创建和连接会导致额外的工作量(而且它并不便宜)。

另一种方法是使用线程池来克服此问题。我用谷歌搜索了一下,在github上找到了Jakob Progsch的ThreadPool。但是,我想,使用线程池,锁定问题又回到了"游戏中"。



特征和多线程 – 在多线程应用程序中使用特征

如果您自己的应用程序是多线程的,并且多个线程调用 Eigen,则必须在创建线程之前通过调用以下例程来初始化 Eigen:

#include <Eigen/Core>
int main(int argc, char** argv)


对于 Eigen 3.3 和完全兼容 C++11 的编译器(即线程安全的静态局部变量初始化),调用initParallel()是可选的。


请注意,生成随机矩阵的所有函数都不是可重入的,也不是线程安全的。其中包括DenseBase::Random()和DenseBase::setRandom(),尽管调用了Eigen::initParallel()。这是因为这些函数基于 std::rand 的,它不是可重入的。对于线程安全的随机生成器,我们建议使用 boost::random 或 c++11 随机功能。