MPI随机漫步:为什么我的消息会丢失

Random Walk with MPI: Why are my messages getting lost?

本文关键字:消息 我的 为什么 随机 漫步 MPI      更新时间:2023-10-16

我正在尝试使用MPI和C++开发一个并行随机walker模拟。

在我的模拟中,每个过程都可以被认为是一个可以包含粒子(随机步行者)的细胞。单元在具有周期性边界条件(即环形拓扑)的一维中对齐。

在每个时间步长中,粒子都可以以一定的概率停留在其单元中或进入左或右相邻单元。为了更容易一点,只有每个单元列表中的最后一个粒子可以行走。如果粒子行走,则必须将其发送到具有相应秩的进程(MPI_Isend+MPI_Probe+MPI_Recv+MPI_Waitall)。

然而,在第一步之后,我的粒子开始消失,即消息不知何故"丢失"了。

下面是一个很小的例子(很抱歉,如果它还很长的话)。为了更好地跟踪粒子的运动,每个粒子都有一个ID,该ID对应于它开始的过程的级别。在每一步之后,每个细胞都会打印存储在其中的粒子的ID

#include <mpi.h>
#include <vector>
#include <iostream>
#include <random>
#include <string>
#include <sstream>
#include <chrono>
#include <algorithm>
using namespace std;
class Particle
{
public:
    int ID;     // this is the rank of the process which initialized the particle
    Particle () : ID(0) {};
    Particle (int ID) : ID(ID) {};
};
stringstream msg;
string msgString;
int main(int argc, char** argv)
{
    // Initialize the MPI environment
    MPI_Init(NULL, NULL);
    // Get the number of processes
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);
    // Get the rank of the process
    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    // communication declarations
    MPI_Status status;
    // get the ranks of neighbors (periodic boundary conditions)
    int neighbors[2];
    neighbors[0] = (world_size + world_rank - 1) % world_size;  // left neighbor
    neighbors[1] = (world_size + world_rank + 1) % world_size;  // right neighbor
    // declare particle type
    MPI_Datatype type_particle;
    MPI_Type_contiguous (1, MPI_INT, &type_particle);
    MPI_Type_commit (&type_particle);
    // every process inits 1 particle with ID = world_rank
    vector<Particle> particles;
    particles.push_back (Particle(world_rank));
    // obtain a seed from the timer
  typedef std::chrono::high_resolution_clock myclock;
  myclock::time_point beginning = myclock::now();
  myclock::duration d = myclock::now() - beginning;
  unsigned seed2 = d.count();
    default_random_engine generator (seed2);
    uniform_real_distribution<double> distribution (0, 1);
    // ------------------------------------------------------------------
    // begin time loop
    //-------------------------------------------------------------------
    for (int t=0; t<10; t++)
    {
        // ------------------------------------------------------------------
        // 1) write a message string containing the current list of particles
        //-------------------------------------------------------------------
        // write the rank and the particle IDs into the msgString
        msg << "rank " << world_rank << ": ";
        for (auto& i : particles)
        {
            msg << i.ID << " ";
        }
        msg << "n";
        msgString = msg.str();
        msg.str (string()); msg.clear ();
        // to print the messages in order, the messages are gathered by root (rank 0) and then printed
        // first, gather nums to root
    int num = msgString.size();
    int rcounts[world_size];
    MPI_Gather( &num, 1, MPI_INT, rcounts, 1, MPI_INT, 0, MPI_COMM_WORLD);
    // root now has correct rcounts, using these we set displs[] so
    // that data is placed contiguously (or concatenated) at receive end
    int displs[world_size];
    displs[0] = 0;
    for (int i=1; i<world_size; ++i)
    {
        displs[i] = displs[i-1]+rcounts[i-1]*sizeof(char);
    }
    //  create receive buffer
    int rbuf_size = displs[world_size-1]+rcounts[world_size-1];
    char *rbuf = new char[rbuf_size];
        // gather the messages
    MPI_Gatherv( &msgString[0], num, MPI_CHAR, rbuf, rcounts, displs, MPI_CHAR,
                                                               0, MPI_COMM_WORLD);
        // root prints the messages
        if (world_rank == 0)
        {
            cout << endl << "step " << t << endl;
            for (int i=0; i<rbuf_size; i++)
                cout << rbuf[i];
        }

        // ------------------------------------------------------------------
        // 2) send particles randomly to neighbors
        //-------------------------------------------------------------------
        Particle snd_buf;
        int sndDest = -1;
        // 2a) if there are particles left, prepare a message. otherwise, proceed to step 2b)
        if (!particles.empty ())
        {
            // write the last particle in the list to a buffer
            snd_buf = particles.back ();
            // flip a coin. with a probability of 50 %, the last particle in the list gets sent to a random neighbor
            double rnd = distribution (generator);
            if (rnd <= .5)
            {
                particles.pop_back ();
                // pick random neighbor
                if (rnd < .25)
                {
                    sndDest = neighbors[0];     // send to the left
                }
                else
                {
                    sndDest = neighbors[1];     // send to the right
                }
            }
        }
        // 2b) always send a message to each neighbor (even if it's empty)
        MPI_Request requests[2];
        for (int i=0; i<2; i++)
        {
            int dest = neighbors[i];
            MPI_Isend (
                &snd_buf,                               // void* data
                sndDest==dest ? 1 : 0,  // int count            <---------------- send 0 particles to every neighbor except the one specified by sndDest
                type_particle,                  // MPI_Datatype
                dest,                                       // int destination
                0,                                          // int tag
                MPI_COMM_WORLD,                 // MPI_Comm
                &requests[i]
            );
        }

        // ------------------------------------------------------------------
        // 3) probe and receive messages from each neighbor
        //-------------------------------------------------------------------
        for (int i=0; i<2; i++)
        {
            int src = neighbors[i];
            // probe to determine if the message is empty or not
            MPI_Probe (
                src,                                // int source,
                0,                                  // int tag,
                MPI_COMM_WORLD,         // MPI_Comm comm,
                &status                         // MPI_Status* status
            );
            int nRcvdParticles = 0;
            MPI_Get_count (&status, type_particle, &nRcvdParticles);
            // if the message if non-empty, receive it
            if (nRcvdParticles > 0) // this proc can receive max. 1 particle from each neighbor
            {
                Particle rcv_buf;
                MPI_Recv (
                    &rcv_buf,                   // void* data
                    1,                              // int count
                    type_particle,      // MPI_Datatype
                    src,                            // int source
                    0,                                              // int tag
                    MPI_COMM_WORLD,                     // MPI_Comm comm
                    MPI_STATUS_IGNORE                   // MPI_Status* status
                );
                // add received particle to the list
                particles.push_back (rcv_buf);
            }
        }
        MPI_Waitall (2, requests, MPI_STATUSES_IGNORE);
    }
    // ------------------------------------------------------------------
    // end time loop
    //-------------------------------------------------------------------

    // Finalize the MPI environment.
    MPI_Finalize();
    if (world_rank == 0)
        cout << "nMPI_Finalize()n";
    return 0;
}

我用8个过程运行了模拟,下面是输出的示例。在步骤1中,它似乎仍然运行良好,但从步骤2开始,粒子开始消失。

step 0
rank 0: 0 
rank 1: 1 
rank 2: 2 
rank 3: 3 
rank 4: 4 
rank 5: 5 
rank 6: 6 
rank 7: 7 
step 1
rank 0: 0 
rank 1: 1 
rank 2: 2 3 
rank 3: 
rank 4: 4 5 
rank 5: 
rank 6: 6 7 
rank 7: 
step 2
rank 0: 0 
rank 1: 
rank 2: 2 
rank 3: 
rank 4: 4 
rank 5: 
rank 6: 6 7 
rank 7: 
step 3
rank 0: 0 
rank 1: 
rank 2: 2 
rank 3: 
rank 4: 
rank 5: 
rank 6: 6 
rank 7: 
step 4
rank 0: 0 
rank 1: 
rank 2: 2 
rank 3: 
rank 4: 
rank 5: 
rank 6: 6 
rank 7: 

我不知道代码出了什么问题。。。不知怎么的,MPI_Isend+MPI_Probe+MPI_Recv+MPI_Waitall组合似乎不起作用。。。非常感谢您的帮助!

您的代码中有一个错误。以下逻辑(省略了不相关的代码和参数)是错误的:

MPI_Probe(..., &status);
MPI_Get_count (&status, type_particle, &nRcvdParticles);
// if the message if non-empty, receive it
if (nRcvdParticles > 0)
{
    MPI_Recv();
}

MPI_Probe不会从消息队列中删除零大小的消息。唯一这样做的MPI调用是MPI_RecvMPI_Irecv+MPI_Test/MPI_Wait的组合。您必须接收所有消息,包括零大小的消息,否则它们将阻止接收具有相同(源、标记)组合的其他消息。虽然零大小消息的接收不会向接收缓冲区写入任何内容,但它会从队列中删除消息信封,从而可以接收到下一个匹配的消息。

解决方案:将调用移动到条件运算符之前的MPI_Recv