pthread执行时间比顺序执行时间差

pthread execution time worse than sequential

本文关键字:执行时间 时间差 执行 pthread 顺序      更新时间:2023-10-16

我正在学习使用pthread,希望它能帮助我的一些最慢的代码片段 走得快一点。我试图(作为一个预热的例子(编写一个蒙特卡洛积分器,使用 线程。我写了一个比较三种方法的代码:

  • 单线程 pthread 的积分与 NEVALS 积分和评估。
  • 积分 NTHREADS 的多个线程评估乘以每个 NEVALS 积分评估。
  • 多个线程提交到我的 CPU 中的不同内核,再次总计 NEVALS*NTHREADS 积分评估。

在运行时,最快的每个积分评估是单核,比其他核快 2 到 3 倍。另外两个似乎有些等同,除了 CPU使用率非常不同,第二个将线程分布在所有(8(个内核上 在我的 CPU 中,而第三个(不出所料(将工作集中在 NTHREADS 中,其余的则留给 空置。

这是来源:

#include <iostream>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <thread>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <unistd.h>
using namespace std;
double aleatorio(double a, double b){
double r = double(rand())/RAND_MAX;
return a + r * (b - a);
}
double funct(double* a){
return pow(a[0],6);
}
void EstimateBounds(int ndim, double (*f)(double*), double* bounds){
double x[ndim];
for(int i=1;i<=1000;i++){
for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
if ( f(x) > bounds[1]) bounds[1] = f(x);
if ( f(x) < bounds[0]) bounds[0] = f(x);
}
}
void Integrate(double (*f)(double*), int ndim, double* integral, int verbose, int seed){
int nbatch = 5000000;
const int maxeval = 25*nbatch;
double x[ndim];
srand(seed);


/// Algorithm to estimate the maxima and minima ///
for(int j=0;j<ndim;j++) x[j] = 0.5;
double bounds[2] = {f(x),f(x)};
EstimateBounds(ndim,f,bounds);
/// Integral initialization ///
int niter = int(maxeval/nbatch);

for(int k=1;k<=niter;k++)
{

double loc_min = bounds[0];
double loc_max = bounds[1];
int count = 0;
for (int i=1; i<=nbatch; i++)
{
for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
double y = aleatorio(bounds[0],bounds[1]);
if ( f(x) > loc_max )   loc_max = f(x);
if ( f(x) < loc_min )   loc_min = f(x);
if ( f(x) > y && y > 0 ) count++;
if ( f(x) < y && y < 0 ) count--;
}
double delta = (bounds[1]-bounds[0])*double(count)/nbatch;
integral[0]  += delta;
integral[1] += pow(delta,2);
bounds[0] = loc_min;
bounds[1] = loc_max;
if(verbose>0){
cout << "Iteration["<<k<<"]: " << k*nbatch;
cout << " integrand evaluations so far" <<endl;
if(verbose>1){
cout << "The bounds for this iteration were = ["<<bounds[0]<<","<<bounds[1]<<"]"<<endl;}
cout << "Integral = ";
cout << integral[0]/k << " +- "; 
cout << sqrt((integral[1]/k - pow(integral[0]/k,2)))/(k) << endl;
cout << endl;
}
}
integral[0] /= niter;
integral[1] = sqrt((integral[1]/niter - pow(integral[0],2)))/niter;
}
struct IntegratorArguments{
double (*Integrand)(double*);
int NumberOfVariables;
double* Integral;
int VerboseLevel;
int Seed;
};
void LayeredIntegrate(IntegratorArguments IA){
Integrate(IA.Integrand,IA.NumberOfVariables,IA.Integral,IA.VerboseLevel,IA.Seed);
}
void ThreadIntegrate(void * IntArgs){
IntegratorArguments *IA = (IntegratorArguments*)IntArgs;
LayeredIntegrate(*IA);
pthread_exit(NULL);
}
#define NTHREADS 5
int main(void)
{
cout.precision(16);
bool execute_single_core = true;
bool execute_multi_core = true;
bool execute_multi_core_2 = true;
///////////////////////////////////////////////////////////////////////////
///
///          Single Thread Execution 
///
///////////////////////////////////////////////////////////////////////////
if(execute_single_core){
pthread_t thr0;
double integral_value0[2] = {0,0};
IntegratorArguments IntArg0;
IntArg0.Integrand = funct;
IntArg0.NumberOfVariables = 2;
IntArg0.VerboseLevel = 0;
IntArg0.Seed = 1;
IntArg0.Integral = integral_value0;
int t = time(NULL);
cout << "Now Attempting to create thread "<<0<<endl;
int rc0 = 0;
rc0 = pthread_create(&thr0, NULL, ThreadIntegrate,&IntArg0);   
if (rc0) {
cout << "Error:unable to create thread," << rc0 << endl;
exit(-1);
}
else cout << "Thread "<<0<<" has been succesfuly created" << endl;
pthread_join(thr0,NULL);
cout << "Thread 0 has finished, it took " << time(NULL)-t <<" secs to finish"  << endl;
cout << "Integral Value = "<< integral_value0[0] << "+/-" << integral_value0[1] <<endl;
}

////////////////////////////////////////////////////////////////////////////////
///
///             Multiple Threads Creation 
///
/////////////////////////////////////////////////////////////////////////////// 
if(execute_multi_core){
pthread_t threads[NTHREADS];
double integral_value[NTHREADS][2];
IntegratorArguments IntArgs[NTHREADS];
int rc[NTHREADS];
for(int i=0;i<NTHREADS;i++){
integral_value[i][0]=0;
integral_value[i][1]=0;
IntArgs[i].Integrand = funct;
IntArgs[i].NumberOfVariables = 2;
IntArgs[i].VerboseLevel = 0;
IntArgs[i].Seed = i;
IntArgs[i].Integral = integral_value[i];        
}
int t = time(NULL);
for(int i=0;i<NTHREADS;i++){
cout << "Now Attempting to create thread "<<i<<endl;
rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
if (rc[i]) {
cout << "Error:unable to create thread," << rc[i] << endl;
exit(-1);
}
else cout << "Thread "<<i<<" has been succesfuly created" << endl;
}
/// Thread Waiting Phase ///
for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
cout << "All threads have now finished" <<endl;
cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
for(int i = 0; i < NTHREADS; i++ ) {
cout << "Thread " << i << " has as the value for the integral" << endl;
cout << "Integral = ";
cout << integral_value[i][0] << " +- "; 
cout << integral_value[i][1] << endl;
}
}
////////////////////////////////////////////////////////////////////////
///
///             Multiple Cores Execution 
///
///////////////////////////////////////////////////////////////////////

if(execute_multi_core_2){
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
pthread_t threads[NTHREADS];
double integral_value[NTHREADS][2];
IntegratorArguments IntArgs[NTHREADS];
int rc[NTHREADS];
for(int i=0;i<NTHREADS;i++){
integral_value[i][0]=0;
integral_value[i][1]=0;
IntArgs[i].Integrand = funct;
IntArgs[i].NumberOfVariables = 2;
IntArgs[i].VerboseLevel = 0;
IntArgs[i].Seed = i;
IntArgs[i].Integral = integral_value[i];        
}
int t = time(NULL);
for(int i=0;i<NTHREADS;i++){
cout << "Now Attempting to create thread "<<i<<endl;
rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
if (rc[i]) {
cout << "Error:unable to create thread," << rc[i] << endl;
exit(-1);
}
else cout << "Thread "<<i<<" has been succesfuly created" << endl;
CPU_SET(i, &cpuset);
}
cout << "Now attempting to commit different threads to different cores" << endl;
for(int i=0;i<NTHREADS;i++){
const int set_result = pthread_setaffinity_np(threads[i], sizeof(cpu_set_t), &cpuset);
if(set_result) cout << "Error: Thread "<<i<<" could not be commited to a new core"<<endl;
else cout << "Thread reassignment succesful" << endl;
}
/// Thread Waiting Phase ///
for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
cout << "All threads have now finished" <<endl;
cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
for(int i = 0; i < NTHREADS; i++ ) {
cout << "Thread " << i << " has as the value for the integral" << endl;
cout << "Integral = ";
cout << integral_value[i][0] << " +- "; 
cout << integral_value[i][1] << endl;
}
}

pthread_exit(NULL);
}

我编译 g++ -std=c++11 -w -fpermissive -O3 SOURCE.cpp -lpthread

在我看来,我的线程实际上是按顺序执行的,因为 时间似乎随着 NTHREADS 的增长而增长,实际上它需要大约 NTHREADS 的时间更长 而不是单个线程。

有没有人知道瓶颈在哪里?

您正在使用rand(),这是一个全局随机数生成器。首先,它不是线程安全的,因此在多个线程中使用它(可能并行使用(会导致未定义的行为。

即使我们撇开这一点,rand()也使用一个由所有线程共享的全局实例。如果一个线程想要调用它,处理器内核需要检查其他内核是否修改了其状态,并且每次使用它时都需要从主内存或其他缓存中重新获取该状态。这就是您观察到性能下降的原因。

请改用伪随机数生成器的<random>工具。它们提供了质量更好的随机数生成器、随机数分布以及创建多个独立随机数生成器实例的能力。使这些thread_local,以便线程不会相互干扰:

double aleatorio(double a, double b){
thread_local std::mt19937 rng{/*seed*/};
return std::uniform_real_distribution<double>{a, b}(rng);
}

请注意,虽然这没有对std::mt19937使用正确的种子设定,但有关详细信息,请参阅此问题,并且uniform_real_distribution<double>{a, b}将返回a包含和b排除之间的均匀分布的数字。您的原始代码给出了一个介于ab(含(之间的数字(拂开潜在的舍入错误(。我认为两者都与你特别相关。

另请注意我在您的问题下不相关的评论,以了解您应该改进的其他内容。