具有 C++11 多线程的特征库

Eigen library with C++11 multithreading

本文关键字:特征 多线程 C++11 具有      更新时间:2023-10-16

我有一个代码来计算具有期望最大化的高斯混合模型,以便从给定的输入数据样本中识别聚类。

一段代码是重复计算这种模型的多个试验Ntrials(一个独立的,但使用相同的输入数据),以便最终获得最佳解决方案(最大化模型总可能性的解决方案)。这个概念可以推广到许多其他聚类算法(例如k-means)。

我想使用 C++11 通过多线程并行化必须重复Ntrials次的代码部分,以便每个线程将执行一次试验。

一个代码示例,假设输入Eigen::ArrayXXd sample为 (Ndimension x Npoints) 可以是以下类型:

double bestTotalModelProbability = 0;
Eigen::ArrayXd clusterIndicesFromSample(Npoints);
clusterIndicesFromSample.setZero();
for (int i=0; i < Ntrials; i++)
{
totalModelProbability = computeGaussianMixtureModel(sample);

// Check if this trial is better than the previous one.
// If so, update the results (cluster index for each point
// in the sample) and keep them.
if totalModelProbability > bestTotalModelProbability
{
bestTotalModelProbability = totalModelProbability;
...
clusterIndicesFromSample = obtainClusterMembership(sample);
}
}

其中我将样本的参考值(Eigen::Ref)传递给函数computeGaussianMixModel()和gettainClusterMember()

我的代码在很大程度上基于特征数组,我采取的 N 维问题可以解释 10-100 个维度和 500-1000 个不同的采样点。我正在寻找一些示例来使用 Eigen 数组和 C++11 的 std:thread 创建此代码的多线程版本,但找不到任何内容,我正在努力制作一些简单的例子来操作特征数组。

我甚至不确定 Eigen 是否可以在 C++11 中的 std::thread 中使用。 有人可以帮助我,即使是一些简单的例子来理解合成器? 我在 Mac OSX 中使用 clang++ 作为编译器,在具有 6 个内核(12 个线程)的 CPU 上。

OP的问题引起了我的注意,因为通过多线程获得的加速数字处理是我个人清单上最重要的待办事项之一。

我必须承认,我对本征库的经验非常有限。(我曾经使用过将 3×3 个旋转矩阵分解为欧拉角,这在本征库中以一般方式非常聪明地解决了。

因此,我定义了另一个示例任务,该任务由示例数据集中的值的愚蠢计数组成。

这是多次完成的(使用相同的评估函数):

  1. 单线程(获取用于比较的值)
  2. 在一个额外的线程中启动每个子任务(以一种公认的相当愚蠢的方法)
  3. 通过交错访问示例数据启动线程
  4. 启动对示例数据具有分区访问的线程。

test-multi-threading.cc

#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>
// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
size_t n, const T data[], const T &begin, const T &end)
{
size_t result = 0;
for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
return result;
}
typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;
Time duration(const Clock::time_point &t0)
{
return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}
std::vector<Time> makeTest()
{
const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
const size_t NThreads = std::thread::hardware_concurrency();
// make a test sample
std::vector<Value> sample(N);
for (Value &value : sample) value = (Value)rand();
// prepare result vectors
std::vector<size_t> results4[4] = {
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0)
};
// make test
std::vector<Time> times{
[&]() { // single threading
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[0];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment single-threaded
for (size_t i = 0; i < NGroups; ++i) {
results[i] = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - stupid aproach
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[1];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value i = 0; i < NGroups;) {
size_t nT = 0;
for (; nT < NThreads && i < NGroups; ++nT, ++i) {
threads[nT] = std::move(std::thread(
[i, &results, &data, SizeGroup]() {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}));
}
for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - interleaved
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[2];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT; i < NGroups; i += NThreads) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}(),
[&]() { // multi-threading - grouped
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[3];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT * NGroups / NThreads,
iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}()
};
// check results (must be equal for any kind of computation)
const unsigned nResults = sizeof results4 / sizeof *results4;
for (unsigned i = 1; i < nResults; ++i) {
size_t nErrors = 0;
for (Value j = 0; j < NGroups; ++j) {
if (results4[0][j] != results4[i][j]) {
++nErrors;
#ifdef _DEBUG
std::cerr
<< "results4[0][" << j << "]: " << results4[0][j]
<< " != results4[" << i << "][" << j << "]: " << results4[i][j]
<< "!n";
#endif // _DEBUG
}
}
if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!n";
}
// done
return times;
}
int main()
{
std::cout << "std::thread::hardware_concurrency(): "
<< std::thread::hardware_concurrency() << 'n';
// heat up
std::cout << "Heat up...n";
for (unsigned i = 0; i < 3; ++i) makeTest();
// repeat NTrials times
const unsigned NTrials = 10;
std::cout << "Measuring " << NTrials << " runs...n"
<< "   "
<< " | " << std::setw(10) << "Single"
<< " | " << std::setw(10) << "Multi 1"
<< " | " << std::setw(10) << "Multi 2"
<< " | " << std::setw(10) << "Multi 3"
<< 'n';
std::vector<double> sumTimes;
for (unsigned i = 0; i < NTrials; ++i) {
std::vector<Time> times = makeTest();
std::cout << std::setw(2) << (i + 1) << ".";
for (const Time &time : times) {
std::cout << " | " << std::setw(10) << time.count();
}
std::cout << 'n';
sumTimes.resize(times.size(), 0.0);
for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
}
std::cout << "Average Values:n   ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(1)
<< sumTime / NTrials;
}
std::cout << 'n';
std::cout << "Ratio:n   ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(3)
<< sumTime / sumTimes.front();
}
std::cout << 'n';
// done
return 0;
}

在Windows 10上的cygwin64上编译和测试:

$ g++ --version
g++ (GCC) 7.3.0
$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc
$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
|     Single |    Multi 1 |    Multi 2 |    Multi 3
1. |     384008 |    1052937 |     130662 |     138411
2. |     386500 |    1103281 |     133030 |     132576
3. |     382968 |    1078988 |     137123 |     137780
4. |     395158 |    1120752 |     138731 |     138650
5. |     385870 |    1105885 |     144825 |     129405
6. |     366724 |    1071788 |     137684 |     130289
7. |     352204 |    1104191 |     133675 |     130505
8. |     331679 |    1072299 |     135476 |     138257
9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
|   372939.9 |  1086042.6 |   136648.3 |   136144.6
Ratio:
|      1.000 |      2.912 |      0.366 |      0.365

我在 coliru.com 上也做了同样的事情。(我不得不减少加热周期和样本量,因为我超过了原始值的时间限制。

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
|     Single |    Multi 1 |    Multi 2 |    Multi 3
1. |     224684 |     297729 |      48334 |      39016
2. |     146232 |     337222 |      66308 |      59994
3. |     195750 |     344056 |      61383 |      63172
4. |     198629 |     317719 |      62695 |      50413
5. |     149125 |     356471 |      61447 |      57487
6. |     155355 |     322185 |      50254 |      35214
7. |     140269 |     316224 |      61482 |      53889
8. |     154454 |     334814 |      58382 |      53796
9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
|   169387.5 |   329891.5 |    59428.2 |    51406.0
Ratio:
|      1.000 |      1.948 |      0.351 |      0.303

科里鲁的现场演示

我有点想知道 coliru(只有 4 个线程)的比率甚至比我的 PC 上的比率(有 8 个线程)还要好。实际上,我不知道如何解释这一点。 但是,这两种设置中还有许多其他差异,这些差异可能是也可能不是原因。至少,两次测量都显示第 3和第4 种方法的粗略加速为 3,其中第 2方法唯一地消耗了每个潜在的加速(可能是通过启动和加入所有这些线程)。

查看示例代码,您将认识到没有互斥锁或任何其他显式锁定。这是故意的。正如我所了解到的(很多很多年前),每次并行化的尝试都可能导致一定的额外通信开销(对于必须交换数据的并发任务)。如果通信开销变得很大,它只会消耗并发的速度优势。因此,可以通过以下方式实现最佳加速:

  • 最少的通信开销,即并发任务在独立数据上运行
  • 合并并发计算结果后工作量最小。

在我的示例代码中,我

  1. 在启动线程之前准备好每个数据和存储,
  2. 读取的共享数据在线程运行时永远不会更改,
  3. 以线程本地方式写入的数据(没有两个线程写入相同的数据地址)
  4. 在联接所有线程后评估计算结果。

关于 3.我有点纠结这是否合法,即是否允许在线程中写入的数据在加入后正确显示在主线程中。(某些东西似乎工作正常这一事实通常是虚幻的,但对于多线程尤其虚幻。

cppreference.com 提供以下说明

  • 对于std::thread::thread()

    构造函数调用的完成与(如std::memory_order中定义)在新执行线程上调用f副本的开始同步

  • 对于std::thread::join()

    *this标识的线程的完成与join()的相应成功返回同步

在Stack Overflow中,我找到了以下相关的问答:

  • 松弛的内存顺序效果是否可以扩展到执行线程的生命周期之后?
  • 这里需要内存围栏吗?
  • 线程::join上是否存在与同步关系的隐式内存屏障?

这让我相信,没关系。

但是,缺点是

  • 线程的创建和连接会导致额外的工作量(而且它并不便宜)。

另一种方法是使用线程池来克服此问题。我用谷歌搜索了一下,在github上找到了Jakob Progsch的ThreadPool。但是,我想,使用线程池,锁定问题又回到了"游戏中"。

这是否也适用于特征函数,取决于特征函数的实现方式。如果其中有对全局变量的访问(当同时调用同一函数时,这些变量将变为共享),这将导致数据竞争。

谷歌搜索了一下,我找到了以下文档。

特征和多线程 – 在多线程应用程序中使用特征

如果您自己的应用程序是多线程的,并且多个线程调用 Eigen,则必须在创建线程之前通过调用以下例程来初始化 Eigen:

#include <Eigen/Core>
int main(int argc, char** argv)
{
Eigen::initParallel();
...
}

注意

对于 Eigen 3.3 和完全兼容 C++11 的编译器(即线程安全的静态局部变量初始化),调用initParallel()是可选的。

警告

请注意,生成随机矩阵的所有函数都不是可重入的,也不是线程安全的。其中包括DenseBase::Random()和DenseBase::setRandom(),尽管调用了Eigen::initParallel()。这是因为这些函数基于 std::rand 的,它不是可重入的。对于线程安全的随机生成器,我们建议使用 boost::random 或 c++11 随机功能。