使用openMP进行并行求和-当我可以´不要使用减少条款

Parallel summation with openMP - what to do when i can´t use the reduction clause?

本文关键字:#180 openMP 并行 求和 我可以 使用      更新时间:2023-10-16

我在一个类中有一个函数,我在代码中调用了数百万次。这个函数中有一些要求很高的循环可以并行化。我的问题是,它们执行存储在非sclar变量中的求和。这是代码。

void Forces::ForRes(vector<vector<double> > & Posicoes,double epsilon,double sigma,double rc,double L)
{    
double rij_2,rij_6,rij_12,rijx,rijy,rijz;
double t;
double sigma6 = pow(sigma,6);
double sigma12 = pow (sigma6,2);              
for ( unsigned int i = 0 ; i < Posicoes.size() - 1 ; i++ )
{          
    for (unsigned int j = i + 1 ; j < Posicoes.size() ; j++)
    {
            rijx = (Posicoes[i][0]-Posicoes[j][0]) - L*round((Posicoes[i][0]-Posicoes[j][0])/L);
            rijy = (Posicoes[i][1]-Posicoes[j][1]) - L*round((Posicoes[i][1]-Posicoes[j][1])/L);
            rijz = (Posicoes[i][2]-Posicoes[j][2]) - L*round((Posicoes[i][2]-Posicoes[j][2])/L);
            rij_2 = rijx*rijx + rijy*rijy + rijz*rijz;
            rij_6 = pow(rij_2,3);
            rij_12 = pow(rij_6,2);
        if (rij_2 <= rc*rc)
        {
              U += 4*epsilon*((sigma12)/(rij_12)- (sigma6)/(rij_6));                                
            for (int k =0 ; k <3 ; k++)
            {         
                t = ((24*epsilon)/(rij_2))*(2*(sigma12)/(rij_12)- (sigma6)/(rij_6))*((Posicoes[i][k]-Posicoes[j][k]) 
                    - L*round((Posicoes[i][k]-Posicoes[j][k])/L));
                F[i][k] += t;           
                    F[j][k] -= t;
            }
       }            
    }          
}

}

下面是我在代码的另一部分中做的一个例子:

    #pragma omp parallel for default(shared) reduction(+:K) private(pi_2)
    for (int i = 0 ; i < Nparticulas;i++)
        {  
         for (int k = 0 ; k < 3 ; k++)
             {
                pi_2 += Momentos.M[i][k]*Momentos.M[i][k];
             }
             K += pi_2/2;
             pi_2 = 0;  
        }  

提前谢谢。

@phadjido建议后的代码:

 void Forces::ForRes(vector<vector<double> > &    Posicoes,double epsilon,double sigma,double rc,double L)
 {
  double rij_2,rij_6,rij_12,rijx,rijy,rijz;
  double t;
  double sigma6 = pow(sigma,6);
  double sigma12 = pow (sigma6,2);
   U = 0;
   unsigned int j;
   for ( unsigned int i = 0 ; i < Posicoes.size() - 1 ; i++ )
   {   
       #pragma omp parallel  private (rij_2,rij_6,rij_12,j) 
        {
          double Up = 0; 
          vector <vector <double> > Fp(Posicoes.size() , vector<double>(Posicoes[0].size(),0));    
          #pragma omp for  
          for ( j = i + 1 ; j < Posicoes.size() ; j++)
              {
                 rijx = (Posicoes[i][0]-Posicoes[j][0]) - L*round((Posicoes[i][0]-Posicoes[j][0])/L);
                 rijy = (Posicoes[i][1]-Posicoes[j][1]) - L*round((Posicoes[i][1]-Posicoes[j][1])/L);
                 rijz = (Posicoes[i][2]-Posicoes[j][2]) - L*round((Posicoes[i][2]-Posicoes[j][2])/L);
                 rij_2 = rijx*rijx + rijy*rijy + rijz*rijz;
                 rij_6 = pow(rij_2,3);
                 rij_12 = pow(rij_6,2);
      if (rij_2 <= rc*rc)
         {
          Up += 4*epsilon*((sigma12)/(rij_12)- (sigma6)/(rij_6));

        for (int k =0 ; k <3 ; k++)
            {         
                 t = ((24*epsilon)/(rij_2))*(2*(sigma12)/(rij_12)- (sigma6)/(rij_6))*((Posicoes[i][k]-Posicoes[j][k]) 
                - L*round((Posicoes[i][k]-Posicoes[j][k])/L));
                Fp[i][k] += t;
                Fp[j][k] -= t;
                }
             }
         }
      #pragma omp atomic
      U += Up;
      for(j = i + 1 ; j < Posicoes.size() ; j++)
      {    
          for ( int k = 0 ; k < 3; k++)
              {            
            #pragma omp atomic
            F[i][k] += Fp[i][j];
            #pragma omp atomic
            F[j][k]  -= Fp[j][k];
              }
       }         
     }
   }

}

如果编译器不支持用户定义的归约,您可以自己简单地实现归约操作。下面的代码显示了如何为您的第二个示例做到这一点。请注意,pi_2是在循环开始时初始化的。在您的示例中,pi_2是一个私有变量,可能尚未初始化为零。在并行区域之前,您需要对pi_2进行firstprivate和适当的初始化。

K = 0;
#pragma omp parallel private(pi_2)
{
    double local_K = 0;  /* initialize here */
    #pragma omp for
    for (int i = 0 ; i < Nparticulas;i++)
    {
        pi_2 = 0;  /* be careful */
        for (int k = 0 ; k < 3 ; k++)
        {
            pi_2 += Momentos.M[i][k]*Momentos.M[i][k];
        }
        local_K += pi_2/2;
    }
    #pragma omp atomic
        K += local_K;
}