下面的c++代码可以用OpenMP并行化,以获得更好的性能吗?

Can the following C++ code be parallelized, with OpenMP, for better performance?

本文关键字:更好 性能 OpenMP c++ 代码 并行化      更新时间:2023-10-16

我有一个代码,做奇异值分解(SVD)的方阵。代码完成了这项工作,但是,它相当慢,当矩阵大小增加时,它变得难以忍受。由于我不熟悉并行编程,因此,在我开始深入挖掘并最终意识到我想要实现的动作甚至是不可能实现之前,我向专家征求意见。

提前谢谢你。

void SVD::decompose() {
bool flag;
int i, its, j, jj, k, l, nm;
double anorm, c, f, g, h, s, scale, x, y, z;
Row rv1(n);
g = scale = anorm = 0.0;            //Householder reduction to bidiagonal form.
for (i = 0; i < n; i++) {
    l = i + 2;
    rv1[i] = scale*g;
    g = s = scale = 0.0;
    if (i < m) {
        for (k = i; k < m; k++) scale += abs(u[k][i]);
        if (scale != 0.0) {
            for (k = i; k < m; k++) {
                u[k][i] /= scale;
                s += u[k][i] * u[k][i];
            }
            f = u[i][i];
            g = -SIGN(sqrt(s), f);
            h = f*g - s;
            u[i][i] = f - g;
            for (j = l - 1; j < n; j++) {
                for (s = 0.0, k = i; k < m; k++) s += u[k][i] * u[k][j];
                f = s / h;
                for (k = i; k < m; k++) u[k][j] += f*u[k][i];
            }
            for (k = i; k < m; k++) u[k][i] *= scale;
        }
    }
    w[i] = scale *g;
    g = s = scale = 0.0;
    if (i + 1 <= m && i + 1 != n) {
        for (k = l - 1; k < n; k++) scale += abs(u[i][k]);
        if (scale != 0.0) {
            for (k = l - 1; k < n; k++) {
                u[i][k] /= scale;
                s += u[i][k] * u[i][k];
            }
            f = u[i][l - 1];
            g = -SIGN(sqrt(s), f);
            h = f*g - s;
            u[i][l - 1] = f - g;
            for (k = l - 1; k < n; k++) rv1[k] = u[i][k] / h;
            for (j = l - 1; j < m; j++) {
                for (s = 0.0, k = l - 1; k < n; k++) s += u[j][k] * u[i][k];
                for (k = l - 1; k < n; k++) u[j][k] += s*rv1[k];
            }
            for (k = l - 1; k < n; k++) u[i][k] *= scale;
        }
    }
    anorm = MAX(anorm, (abs(w[i]) + abs(rv1[i])));
}
for (i = n - 1; i >= 0; i--) {          //Accumulation of right-hand tranformations.
    if (i < n - 1) {
        if (g != 0.0) {
            for (j = l; j < n; j++)         // Double division to avoid possible underflow.
                v[j][i] = (u[i][j] / u[i][l]) / g;
            for (j = l; j < n; j++) {
                for (s = 0.0, k = l; k < n; k++) s += u[i][k] * v[k][j];
                for (k = l; k < n; k++) v[k][j] += s*v[k][i];
            }
        }
        for (j = l; j < n; j++) v[i][j] = v[j][i] = 0.0;
    }
    v[i][i] = 1.0;
    g = rv1[i];
    l = i;
}
for (i = MIN(m, n) - 1; i >= 0; i--) {          //Accumulation of left-hand transformations.
    l = i + 1;
    g = w[i];
    for (j = l; j < n; j++) u[i][j] = 0.0;
    if (g != 0.0) {
        g = 1.0 / g;
        for (j = l; j < n; j++) {
            for (s = 0.0, k = l; k < m; k++) s += u[k][i] * u[k][j];
            f = (s / u[i][i])*g;
            for (k = i; k < m; k++) u[k][j] += f*u[k][i];
        }
        for (j = i; j < m; j++) u[j][i] *= g;
    }
    else for (j = i; j < m; j++) u[j][i] = 0.0;
    ++u[i][i];
}
for (k = n - 1; k >= 0; k--) {          //Diagonalization of the bidiagonal form: Loop over
    for (its = 0; its < 30; its++) {            //singular values, and over allowed iterations.
        flag = true;
        for (l = k; l >= 0; l--) {          //Test ofr splitting.
            nm = l - 1;
            if (l == 0 || abs(rv1[l]) <= eps*anorm) {
                flag = false;
                break;
            }
            if (abs(w[nm]) <= eps*anorm) break;
        }
        if (flag) {
            c = 0.0;            //Cancellatin of rv[l], if l>0.
            s = 1.0;
            for (i = l; i < k + 1; i++) {
                f = s*rv1[i];
                rv1[i] = c*rv1[i];
                if (abs(f) <= eps*anorm) break;
                g = w[i];
                h = pythag(f, g);
                w[i] = h;
                h = 1.0 / h;
                c = g*h;
                s = -f*h;
                for (j = 0; j < m; j++) {
                    y = u[j][nm];
                    z = u[j][i];
                    u[j][nm] = y*c + z*s;
                    u[j][i] = z*c - y*s;
                }
            }
        }
        z = w[k];
        if (l == k) {           //Convergence.
            if (z < 0.0) {          //Singular value is made nonnegative.
                w[k] = -z;
                for (j = 0; j < n; j++) v[j][k] = -v[j][k];
            }
            break;
        }
        x = w[l];               //Shift from bottom 2-by-2 minor.
        nm = k - 1;
        y = w[nm];
        g = rv1[nm];
        h = rv1[k];
        f = ((y - z)*(y + z) + (g - h)*(g + h)) / (2.0*h*y);
        g = pythag(f, 1.0);
        f = ((x - z)*(x + z) + h*((y / (f + SIGN(g, f))) - h)) / x;
        c = s = 1.0;            //Next QR transformation:
        for (j = l; j <= nm; j++) {
            i = j + 1;
            g = rv1[i];
            y = w[i];
            h = s*g;
            g = c*g;
            z = pythag(f, h);
            rv1[j] = z;
            c = f / z;
            s = h / z;
            f = x*c + g*s;
            g = g*c - x*s;
            h = y*s;
            y *= c;
            for (jj = 0; jj < n; jj++) {
                x = v[jj][j];
                z = v[jj][i];
                v[jj][j] = x*c + z*s;
                v[jj][i] = z*c - x*s;
            }
            z = pythag(f, h);
            w[j] = z;           //Rotation can be arbitrary if z = 0.
            if (z) {
                z = 1.0 / z;
                c = f*z;
                s = h*z;
            }
            f = c*g + s*y;
            x = c*y - s*g;
            for (jj = 0; jj < m; jj++) {
                y = u[jj][j];
                z = u[jj][i];
                u[jj][j] = y*c + z*s;
                u[jj][i] = z*c - y*s;
            }
        }
        rv1[l] = 0.0;
        rv1[k] = f;
        w[k] = x;
    }
}

}

部分代码当然可以并行化。你能得到多少,那是另一个问题。

更简单的方法是使用通用的数学库。
更有趣的方法可能是使用OpenMP自己做。

但是在考虑OpenMP之前,请考虑重新排列索引。你倾向于对第一个索引进行循环,就像在for (k = i; k < m; k++) u[k][i] *= scale;中一样。这在c++中有一个非常糟糕的缓存命中率,因为u[k][i]基本上是u[k*second_index_size+i]。如果您交换索引,您将得到for (k = i; k < m; k++) u[i][k] *= scale;,这可以完美地利用缓存。
通过实现这个,您应该可以看到相当大的加速。

现在是OpenMP部分。
找出代码中的热点区域在哪里。也许可以使用Visual Studio来实现。然后你可以使用OpenMP来并行化某些for循环,比如
#pragma omp parallel for for (k = i; k < m; k++) u[i][k] *= scale;

你将得到什么取决于热点区域在哪里以及你的矩阵有多大。