从接收方的角度来看,如何确保已收到使用 MPI_Isend 发送的所有消息?

From the receiver's end, how do I make sure that all messages sent using MPI_Isend have been received?

本文关键字:MPI Isend 消息 确保 何确保      更新时间:2023-10-16

我正在研究一个应用程序,其中每个处理器使用MPI_Isend向一些其他处理器发送一堆消息,然后接收一些未知数字的消息。

在我的小示例程序中(下面的代码)我有4个处理器向其余3个处理器发送4条消息,因此每个处理器应该接收12条消息。

我看到的问题是,当我的机器运行正常时,我的程序输出如下

Rank 2 receives 12 msgs; Global count = 48
Rank 1 receives 12 msgs; Global count = 48
Rank 3 receives 12 msgs; Global count = 48
Rank 0 receives 12 msgs; Global count = 48

但是有时候,一些处理器接收不到足够的消息:

Rank 1 receives 9 msgs; Global count = 37
Rank 3 receives 12 msgs; Global count = 37
Rank 2 receives 4 msgs; Global count = 37
Rank 0 receives 12 msgs; Global count = 37

我知道问题可能与while-loop有关,其中我使用MPI_Iprobe检查传入消息并在检查返回false后立即退出循环。但我不知道该怎么做。换句话说,我如何确保所有处理器在到达MPI_Allreduce语句时接收到它们应该接收的所有消息?

我的程序是这样的:

#include "mpi.h"
#include <stdbool.h>
#include "stdio.h"
#include "stdlib.h"
int main(int argc, char* argv[]) 
{
    MPI_Init(&argc, &argv);
    int rank, p;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &p);
    //ASSUMPTION: using 4 procs
    //Don't worry about this part.
    //just some stupid way to determine the receivers.
    // Irrelevant to the question.
    int recvs[3];   
    int i = 0, nei = 0; 
    for (; nei < 4; ++nei)
    {
        if (nei != rank)        
        {
            recvs[i] = nei;     
            ++i;        
        }
    }
    //Proc sending msgs to its neighbors.
    //In this case, it's all other procs. (but in my real app, it's almost never the case)
    int TAG = 0;
    int buff[4] = {555, 666, 777, 888};
    int local_counts[4] = {0, 0, 0, 0}; //EDIT 1
    for (nei = 0; nei < 3; ++nei)
    {
        for (i = 0; i < 4; ++i)
        { 
            MPI_Request req;
            MPI_Isend(&buff[i], 1, MPI_INT, recvs[nei], TAG, MPI_COMM_WORLD, &req);             
            local_counts[recvs[nei]] += 1; //EDIT 1
        }
    }
    //EDIT 1: tell processors how many msgs they're supposed to get
    int global_counts[4];
    int expectedRecvCount;
    MPI_Reduce(local_counts, global_counts, 4, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    MPI_Scatter(global_counts, 1, MPI_INT, &expectedRecvCount, 1, MPI_INT, 0, MPI_COMM_WORLD);
    //Receiving
    int recvCount = 0;      
    MPI_Status status;
    int hasMsg = 0;     
    int num; 
    do  
    {
        MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &hasMsg, &status);             
        if (hasMsg)
        {   
            MPI_Recv(&num, 1, MPI_INT, status.MPI_SOURCE, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            ++recvCount;
            printf("nRank %d got %d from %d", rank, num, status.MPI_SOURCE);       
        }    
    }
    while (recvCount < expectedRecvCount); //EDIT 1
    //while (hasMsg);
    //Total number msgs received by all procs.
    //Now here's where I see the problem!!!
    MPI_Allreduce(&recvCount, &global_count, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); 
    printf("nRank %d receives %d msgs; Global count = %d", rank, recvCount, global_count);
    MPI_Finalize();     
    return 0; 

}

===========================================

编辑1

我能想到的一种方法是每个处理器跟踪它向其他处理器发送的消息数量。然后在发送操作完成后,我将对这些消息计数执行MPI_ReduceMPI_Scatter。这样,每个处理器将知道它应该接收多少消息。(参见代码)有人能评论一下这种方法的表现吗?它会潜在地严重地阻碍性能吗?

从接收方端,我如何确保使用MPI_Isend发送的所有消息都已收到?- MPI不提供这个功能,你只能知道所有的MPI_Isend操作已经完成。

换句话说,基本上接收者不知道发送者会发送多少信息。但是发送者知道什么时候他们没有更多的消息要发送。那么,你能不能用一条消息通知接收方从秩n开始的消息将不会再收到任何消息?

你的代码正在处理另一个问题,你如何确保所有的MPI_Isend操作已经完成?

下面是基于您的示例的代码。我没有使用MPI_Iprobe,因为在MPI_Iprobe和if语句之间没有计算。相反,我使用了MPI_Probe。

下面的代码,确保所有的消息都已经发送,并且当一个进程从所有其他进程接收到stopTAG消息时,它将停止接收消息。

#include "mpi.h"
#include <stdbool.h>
#include "stdio.h"
#include "stdlib.h"
int main(int argc, char* argv[]) 
{
    MPI_Init(&argc, &argv);
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    //ASSUMPTION: using 4 procs
    //Don't worry about this part.
    //just some stupid way to determine the receivers.
    // Irrelevant to the question.
    int recvs[3];   
    int i = 0, nei = 0; 
    for (; nei < 4; ++nei)
    {
        if (nei != rank)        
        {
            recvs[i] = nei;     
            ++i;        
        }
    }
    //Proc sending msgs to its neighbors.
    //In this case, it's all other procs. (but in my real app, it's almost never the case)
    int TAG = 0;
    int stopTAG = 1;
    int buff[4] = {555, 666, 777, 888};
    MPI_Request req[3*5];
    for (nei = 0; nei < 3; ++nei)
    {
        for (i = 0; i < 4; ++i)
        { 
            MPI_Isend(&buff[i], 1, MPI_INT, recvs[nei], TAG, 
                        MPI_COMM_WORLD, &req[nei * 5 + i]);
        }
    }
    for (nei = 0; nei < 3; ++nei) {
        MPI_Isend(NULL, 0, MPI_CHAR, recvs[nei], stopTAG, MPI_COMM_WORLD, 
                  &req[nei * 5 + 4]);
    }
    //Receiving
    int recvCount = 0;      
    MPI_Status status;
    int hasMsg = 0;     
    int num; 
    char stopArray[size];
    for (i = 0; i < size; i++) {
        stopArray[i] = 0;
    }
    stopArray[rank] = 1;
    char stop;
    int completedSends = 0;
    do  
    {
        MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        if (status.MPI_TAG == TAG)
        {   
            MPI_Recv(&num, 1, MPI_INT, status.MPI_SOURCE, TAG, 
                        MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            ++recvCount;
            printf("Rank %d got %d from %dn", rank, num, 
                        status.MPI_SOURCE);
        }
        else if (status.MPI_TAG == stopTAG) {
            MPI_Recv(NULL, 0, MPI_CHAR, status.MPI_SOURCE, stopTAG, 
                        MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            stopArray[status.MPI_SOURCE] = 1;
        }
        stop = 1;
        for (i = 0; i < size; i++) {
            stop &= stopArray[i];
        }
        if (completedSends < (3*5)) {
            int indx;
            MPI_Status status;
            MPI_Waitany(3*5, req, &indx, &status);
            completedSends++;
        }
    }
    while (!stop && (completedSends <= 15));
    //Total number msgs received by all procs.
    //Now here's where I see the problem!!!
    int global_count;
    MPI_Allreduce(&recvCount, &global_count, 1, MPI_INT, MPI_SUM, 
                    MPI_COMM_WORLD); 
    printf("nRank %d receives %d msgs;nGlobal count = %dn", rank, 
                recvCount, global_count);
    MPI_Finalize();
    return 0;
}