使用 MPI 和 C++ 从不同节点收集数据

Gathering data from different node using MPI and C++

本文关键字:节点 数据 MPI C++ 使用      更新时间:2023-10-16

我正在处理一个包含多个从节点和一个主节点的项目。在某些时候,我需要从不同的从属节点(主节点也可以被视为从节点)收集数据到主节点。数据可以是任何类型的,但假设它是无符号的 int。这就是数据在从属节点上的外观:

node0: |chunk01|chunk02|chunk03|chunk04|....

节点 1: |chunk11|chunk12|chunk13|chunk14|....

noden: |chunkn1|chunkn2|chunkn3|chunkn4|....

数据应全部收集到 node0,如下所示:

节点0: |chunk01|chunk11|chunk21|....|chunkn1|chunk02|chunk12|...|Chunkn2|...|东磁|

这意味着我们将每个节点的第一个块连接在一起,然后每个节点的第二个块连接在一起......

我不知道如何使用MPI_Gatherv来实现这一点,因为每个 chunkij 都有不同的大小,而且每个节点只知道自己的块大小和启动索引,而不知道其他节点的信息。

我对 MPI 不太熟悉,所以我想知道是否有任何 API 可以将不同大小的数据从不同节点收集到一个节点?

下面是一个可以编辑的示例,应该可以工作。几乎可以肯定这不是解决问题的最佳方法 - 我需要您的代码的更多详细信息来评论这一点。我还没有检查它是否可以编译,但如果您修复任何拼写错误,我很乐意尝试修复任何未解决的错误。

我也不知道效率对你有多重要——这个操作是每秒进行数百次还是每天一次?如果是后者,那么这段代码可能没问题。我也假设 C/C++。

// Populate this on each node from MPI_Comm_rank.
int myRank; 
// Populate this on each node from MPI_Comm_size.
int P; 
// Num chunks per core.
const int M = 4;  
// I'm assuming 0 is the master.
int masterNodeRank = 0; 
// Populate this. 
// It only needs to have meaningful data on the master node. 
//If master node doesn't have the data, fill with MPI_GATHER.
int* sizeOfEachChunkOnEachRank[M]; 
// Populate this. 
//It needs to exist on every 'slave' node.
int sizeOfMyChunks[M]; 
// Assuming you already have this array
// it should be the contiguous store of each core's data.
unsigned* myData; 
// This is what we'll gather all the data into on master node only.
unsigned* gatheredData = new unsigned[totalDataSize];
// This array will keep all of the displacements from each sending node.
int* displacements = new int[P];
// This keeps track of how many unsigneds we've received so far.
int totalCountSoFar = 0;
// We'll work through all the first chunks on each node at once, then all
// the second chunks, etc.
for(int localChunkNum = 0; localChunkNum < M; ++localChunkNum)
{
  // On the receiving node we need to calculate all the displacements
  // for the received data to go into the array
  if (myRank == masterNodeRank)
  {
    displacements[0] = 0;
    for(int otherCore = 1; otherCore < P; ++otherCore)
    {
      displacements[otherCore] = displacements[otherCore-1] + sizeOfEachChunkOnEachRank[localChunkNum][otherCore-1];
    }
  }
  // On all cores, we'll need to calculate how far into our local array
  // to start the sending from.      
  int myFirstIndex = 0;
  for(int previousChunk=0; previousChunk < localChunkNum; previousChunk++)
  {
    myFirstIndex += sizeOfMyChunks[previousChunk];
  }
  // Do the variable gather
  MPI_Gatherv(&myData[myFirstIndex], // Start address to send from
              sizeOfMyChunks[localChunkNum], // Number to send
              MPI_UNSIGNED, // Type to send
              &gatheredData[totalCountSoFar], // Start address to receive into
              sizeOfEachChunkOnEachRank[localChunkNum], // Number expected from each core
              displacements, // Displacements to receive into from each core
              MPI_UNSIGNED, // Type to receive
              masterNodeRank, // Receiving core rank
              MPI_COMM_WORLD); // MPI communicator.
  // If this is the receiving rank, update the count we've received so far
  // so that we don't overwrite data the next time we do the gather.
  // Note that the total received is the displacement to the receive from the
  // last core + the total received from that core.
  if(myRank == masterNodeRank)
  {
    totalCountSoFar += displacements[P-1] + sizeOfEachChunkOnEachRank[localChunkNum][P-1];
  }
}
delete[] displacements;
delete[] gatheredData;