C++ MPI 发送接收问题

C++ MPI Send Receive Issues

本文关键字:问题 MPI C++      更新时间:2023-10-16

我正在从事一个并行计算项目,但遇到了内核通信的问题。该程序有效,但在各个点都遇到了问题。程序达到的点取决于使用的内核数量。因此,我要通过对高度振荡函数的各种方法求和来计算积分。每个计算应在内核之间拆分并求和。它应该通过各种波数来展示行为。因此,我需要将每个计算分成几个部分,然后将它们发送到核心,然后将它们带回并求和。在此之后,我需要对下一个波数重复它,这些波数是 [100,10000]。积分面积从 0 到 pi,每个需要用波数分成相等的部分。

对于这段代码,如果我在一个内核上运行它,我能够让它达到波数 120,如果我使用 2 个内核,它会一直到大约波数 240,3 核是 360,4 核是 477。这种趋势或多或少还在继续。

对我需要修复的问题的任何见解都非常感激。

#include <iostream>
#include <fstream>
#include <vector>
#include <typeinfo>
#include "mpi.h"
#include "math.h"

using namespace std;

double fun_val_points(int kw, double x);
vector<double> integrand(double (fn)(int wn, double x), double wn, 
vector<double> pts, int qn);
const double PI = 3.141592653589793;
int main(int argc, char* argv[]) {
MPI::Init(argc, argv);
for (int wavenum = 100; wavenum <= 10000; wavenum++){
// Declare variables
int rank, numcores, tag = 123;
int div, rem, qn = 101;             // qn represents the number of 
//quadrature points
// Initialize variables
rank = MPI::COMM_WORLD.Get_rank();
numcores = MPI::COMM_WORLD.Get_size();
div = int((wavenum+1) / numcores);
rem = (wavenum+1) % numcores;
vector<double>points(div + rem, 0.0);
if (rank == 0) {
vector<double> quadraturePoints(wavenum + 1, 0.0);    
//quadraturePoints needs to be k+1 equally spaced points
// between 0 and pi. It then needs to be 
//distributed
// to each core in a load balanced way.
for (int i = 0; i < wavenum + 1; i++) {
quadraturePoints[i] = PI * (i) / (wavenum);    // Initialize 
//quadraturePoints
}
for (int j = 0; j < numcores - 1; j++) {
vector<double> sendpoints = vector<double>(div + rem, 0.0);
int index = 0;
for (int n = j * div; (n <= (j + 1) * div) && (index < div + rem); n++) {
sendpoints[index++] = quadraturePoints[n];    // Get the 
//relevent quadrature points for this core
}
//Send the data to the core
MPI::COMM_WORLD.Send(&sendpoints[0], sendpoints.size(), MPI_DOUBLE, j, tag);
}
// Send data to the last core, which needs to have any remaining 
//quadrature points
vector<double> sendpoints = vector<double>(div + rem, 0.0);
int index = 0;
for (int n = (numcores-1) * div; n < wavenum + 1; n++) {
sendpoints[index++] = quadraturePoints[n];
}
MPI::COMM_WORLD.Send(&sendpoints[0], sendpoints.size(), MPI_DOUBLE, numcores - 1, tag);
}
vector<double> localQuads(div+rem,0.0);
// Recieve the quadrature points for local calculation
MPI::COMM_WORLD.Recv(&localQuads[0], div + rem + 1, MPI_DOUBLE, 0, tag);
while(!localQuads.empty() && localQuads.back() <= .00000001){
localQuads.pop_back();  // Remove any spare 0's on the end of the quad 
//points.
// This is here because some quadrature points 
//were sent as longer lists than necessary
// so that they would all be the same length
}

vector<double> localWTS(3, 0.0);  // To keep track of the integrals 
//across the local quad points
for(int i = 0; i < localQuads.size()-1; i++){
vector<double> partition(qn+1, 0.0);  // Partition the quadrature
for (double j = 0; j < qn+1; j++){
partition[j] = localQuads[i] + (j/qn)*(localQuads[i+1] - localQuads[i]);
}
vector<double> temp = integrand(fun_val_points, wavenum, partition, 
partition.size());  // Integrate the partition
for (int j = 0; j < 3; j++){
localWTS[j] += temp[j]; // Add the integrated values to the running 
//total
}
}
// Send the local integration values back to master
MPI::COMM_WORLD.Send(&localWTS[0], 3, MPI_DOUBLE, 0, tag);

if (rank == 0) {
vector<double> totalWTS(3, 0.0);
for (int commRank = 0; commRank < numcores; commRank++) {
MPI::COMM_WORLD.Recv(&localWTS[0], 3, MPI_DOUBLE, commRank, tag);
// Gather all the integral values and add them to the running total
for (int index = 0; index < 3; index++) {
totalWTS[index] += localWTS[index];
}
}
ofstream out; // Open a text file for output to disk
out.open("data.txt", ios_base::app);
if(!out.is_open()){ // In case there was an error opening the file
cout << "Error opening output file." << endl;
}
out << wavenum << " ";
out.precision(16);
out << totalWTS[0] << " " << totalWTS[1] << " " << totalWTS[2] << endl;
out.close();
}
}
MPI::Finalize();
return 0;
}
double fun_val_points(int kw, double x) {
return cos(100 * x - kw*sin(x));
}
vector<double> integrand(double (fn)(int wn, double x), double wn,vector<double> pts, int qn) {
double M, T, S;
M = 0;
T = 0;
for (int j = 1; j < qn; j++) {
double len = pts[j] - pts[j - 1];
double mpts = (pts[j] + pts[j - 1]) / 2;
M += fn(wn, mpts)*len;
T += (fn(wn, pts[j - 1]) + fn(wn, pts[j]))*len / 2;
}
S = M * 2 / 3 + T * 1 / 3;
return {M, T, S};
} 

关于 MPI 标准,您的程序不正确。

原因是任务 0MPI_Send()给自己,并且没有发布任何接收。

从务实的角度来看,这对于小消息来说"有效",但对于大消息来说却很糟糕。注意小与大取决于您的 MPI 库、您使用的互连以及其他因素,因此长话短说,假设此模式将导致挂起,不要这样做。

通常通过使用MPI_Irecv()MPI_Sendrecv()来避免这种情况。

话虽如此,您的通信模式正在乞求 MPI 集体操作:MPI_Scatterv(quadraturePoints, ...)然后MPI_Reduce(localWTS, ...)