用于并行计算的C++代码优化示例

Example of C++ code optimization for parallel computing

本文关键字:代码优化 C++ 并行计算 用于      更新时间:2023-10-16

我正在努力理解优化例程。我专注于我的代码中最关键的部分(代码有一些长度为"nc"的循环和一个长度为"np"的循环,其中数字"np"比"nc"大得多)。我在这里展示部分代码。代码的其余部分在%的计算时间中不是很重要,所以我更喜欢在算法的其余部分中进行代码净化。然而,具有"np"长度的临界循环是一段非常简单的代码,并且它可以并行化。因此,如果我把这部分重写成更有效、更不清晰的版本(可能是SSE指令),也不会有什么坏处。我使用的是gcc编译器、c++代码和OpenMP并行化。

这个代码是众所周知的细胞内粒子算法的一部分(这个也是基本算法)。我正在尝试在这个版本上学习代码优化(所以我的目标不是只有有效的PIC算法,因为它已经有一千种变体,但我也想为代码优化带来一些示范性的例子)。我正在尝试做一些工作,但我不太确定我是否正确地解决了所有优化属性。

const int NT = ...; // number of threads (in two versions: about 6 or about 30)
const int np = 10000000; // np is about 1000-10000 times larger than nc commonly
const int nc = 10000;
const int step = 1000;
float u[np], x[np];
float a[nc], a_lin[nc], rho_full[NT][nc], rho_diff[NT][nc] , weight[nc];
int p,num;
for ( i = 0 ; i<step ;  i++) {
// *** 
// *** some not very time consuming code for calculation 
// *** a, a_lin from values of rho_full and rho_diff
#pragma omp for private(p,num)
for ( k = np ; --k ; ) {
    num = omp_get_thread_num();
    p = (int) x[k];
    u[k] += a[p] + a_lin[p] * (x[k] - p);
    x[k] += u[k];
    if (x[k]<0 ) {x[k]+=nc;} else
    if (x[k]>nc) {x[k]-=nc;};
    p = (int) x[k];
    rho_full[num][p] += weight[k];
    rho_diff[num][p] += weight[k] * (x[k] - p);
    }
};

我意识到这有问题:

1)(主要问题)我使用一组数组rho_full[num][p],其中num是每个线程的索引。经过计算,我只是总结了这些数组(rho_full[0][p] + rho_full[1][p] + rho_full[2][p] ...)。原因是避免使用两个不同的线程写入数组的同一部分。我不太确定这种方式是否是一种有效的解决方案(请注意,数字"nc"相对较小,因此使用"np"的操作次数可能仍然是最重要的)

2)(也是一个重要的问题)我需要读很多次x[k],它也改了很多次。也许最好将这个值读入某个寄存器,然后忘记整个x数组,或者在这里修复一些指针。经过所有的计算,我可以再次调用x[k]数组并存储获得的值。我相信编译器为我做了这项工作,但我不太确定,因为我在算法的中心使用了对x[k]的修改。所以编译器可能会自己做一些有效的工作,但在这个版本中,它调用的次数可能比我调用和存储这个值的次数多。

3)(可能不相关)该代码适用于整数部分和小数点以下的余数。它需要这两个值。我将整数部分标识为p = (int) x,将余数标识为x - p。我在循环开始时和循环结束时计算这个例程。可以看出,这种拆分可以存储在某个地方,并在下一步使用(我的意思是i索引处的步骤)。你认为下面的版本更好吗?我将积分和余数部分存储在x的数组中,而不是整数值x。

int x_int[np];
float x_rem[np];
//...
for ( k = np ; --k ; ) {
    num = omp_get_thread_num();
    u[k] += a[x_int[k]] + a_lin[x_int[k]] * x_rem[k];
    x_rem[k] += u[k];
    p = (int) x_rem[k];   // *** This part is added into code for simplify the rest.
    x_int[k] += p;        // *** And maybe there is a better way how to realize
    x_rem[k] -= p;        // *** this "pushing correction".
    if (x_int[k]<0 ) {x_int[k]+=nc;} else
    if (x_int[k]>nc) {x_int[k]-=nc;};
    rho_full[num][x_int[k]] += weight[k];
    rho_diff[num][x_int[k]] += weight[k] * x_rem[k];
    }
};

您可以对for循环使用OMP缩减:

int result = 0;
#pragma omp for nowait reduction(+:result) 
for ( k = np ; --k ; ) {
    num = omp_get_thread_num();
    p = (int) x[k];
    u[k] += a[p] + a_lin[p] * (x[k] - p);
    x[k] += u[k];
    if (x[k]<0 ) {x[k]+=nc;} else
    if (x[k]>nc) {x[k]-=nc;};
    p = (int) x[k];
    result += weight[k] + weight[k] * (x[k] - p);
}