从接收方的角度来看,如何确保已收到使用 MPI_Isend 发送的所有消息?
From the receiver's end, how do I make sure that all messages sent using MPI_Isend have been received?
我正在研究一个应用程序,其中每个处理器使用MPI_Isend
向一些其他处理器发送一堆消息,然后接收一些未知数字的消息。
在我的小示例程序中(下面的代码)我有4个处理器向其余3个处理器发送4条消息,因此每个处理器应该接收12条消息。
我看到的问题是,当我的机器运行正常时,我的程序输出如下
Rank 2 receives 12 msgs; Global count = 48
Rank 1 receives 12 msgs; Global count = 48
Rank 3 receives 12 msgs; Global count = 48
Rank 0 receives 12 msgs; Global count = 48
但是有时候,一些处理器接收不到足够的消息:
Rank 1 receives 9 msgs; Global count = 37
Rank 3 receives 12 msgs; Global count = 37
Rank 2 receives 4 msgs; Global count = 37
Rank 0 receives 12 msgs; Global count = 37
我知道问题可能与while-loop
有关,其中我使用MPI_Iprobe
检查传入消息并在检查返回false后立即退出循环。但我不知道该怎么做。换句话说,我如何确保所有处理器在到达MPI_Allreduce
语句时接收到它们应该接收的所有消息?
我的程序是这样的:
#include "mpi.h"
#include <stdbool.h>
#include "stdio.h"
#include "stdlib.h"
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv);
int rank, p;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &p);
//ASSUMPTION: using 4 procs
//Don't worry about this part.
//just some stupid way to determine the receivers.
// Irrelevant to the question.
int recvs[3];
int i = 0, nei = 0;
for (; nei < 4; ++nei)
{
if (nei != rank)
{
recvs[i] = nei;
++i;
}
}
//Proc sending msgs to its neighbors.
//In this case, it's all other procs. (but in my real app, it's almost never the case)
int TAG = 0;
int buff[4] = {555, 666, 777, 888};
int local_counts[4] = {0, 0, 0, 0}; //EDIT 1
for (nei = 0; nei < 3; ++nei)
{
for (i = 0; i < 4; ++i)
{
MPI_Request req;
MPI_Isend(&buff[i], 1, MPI_INT, recvs[nei], TAG, MPI_COMM_WORLD, &req);
local_counts[recvs[nei]] += 1; //EDIT 1
}
}
//EDIT 1: tell processors how many msgs they're supposed to get
int global_counts[4];
int expectedRecvCount;
MPI_Reduce(local_counts, global_counts, 4, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Scatter(global_counts, 1, MPI_INT, &expectedRecvCount, 1, MPI_INT, 0, MPI_COMM_WORLD);
//Receiving
int recvCount = 0;
MPI_Status status;
int hasMsg = 0;
int num;
do
{
MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &hasMsg, &status);
if (hasMsg)
{
MPI_Recv(&num, 1, MPI_INT, status.MPI_SOURCE, TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
++recvCount;
printf("nRank %d got %d from %d", rank, num, status.MPI_SOURCE);
}
}
while (recvCount < expectedRecvCount); //EDIT 1
//while (hasMsg);
//Total number msgs received by all procs.
//Now here's where I see the problem!!!
MPI_Allreduce(&recvCount, &global_count, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
printf("nRank %d receives %d msgs; Global count = %d", rank, recvCount, global_count);
MPI_Finalize();
return 0;
}
===========================================
编辑1
我能想到的一种方法是每个处理器跟踪它向其他处理器发送的消息数量。然后在发送操作完成后,我将对这些消息计数执行MPI_Reduce
和MPI_Scatter
。这样,每个处理器将知道它应该接收多少消息。(参见代码)有人能评论一下这种方法的表现吗?它会潜在地严重地阻碍性能吗?
从接收方端,我如何确保使用MPI_Isend发送的所有消息都已收到?- MPI不提供这个功能,你只能知道所有的MPI_Isend操作已经完成。
换句话说,基本上接收者不知道发送者会发送多少信息。但是发送者知道什么时候他们没有更多的消息要发送。那么,你能不能用一条消息通知接收方从秩n开始的消息将不会再收到任何消息?
你的代码正在处理另一个问题,你如何确保所有的MPI_Isend操作已经完成?
下面是基于您的示例的代码。我没有使用MPI_Iprobe,因为在MPI_Iprobe和if语句之间没有计算。相反,我使用了MPI_Probe。
下面的代码,确保所有的消息都已经发送,并且当一个进程从所有其他进程接收到stopTAG
消息时,它将停止接收消息。
#include "mpi.h"
#include <stdbool.h>
#include "stdio.h"
#include "stdlib.h"
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
//ASSUMPTION: using 4 procs
//Don't worry about this part.
//just some stupid way to determine the receivers.
// Irrelevant to the question.
int recvs[3];
int i = 0, nei = 0;
for (; nei < 4; ++nei)
{
if (nei != rank)
{
recvs[i] = nei;
++i;
}
}
//Proc sending msgs to its neighbors.
//In this case, it's all other procs. (but in my real app, it's almost never the case)
int TAG = 0;
int stopTAG = 1;
int buff[4] = {555, 666, 777, 888};
MPI_Request req[3*5];
for (nei = 0; nei < 3; ++nei)
{
for (i = 0; i < 4; ++i)
{
MPI_Isend(&buff[i], 1, MPI_INT, recvs[nei], TAG,
MPI_COMM_WORLD, &req[nei * 5 + i]);
}
}
for (nei = 0; nei < 3; ++nei) {
MPI_Isend(NULL, 0, MPI_CHAR, recvs[nei], stopTAG, MPI_COMM_WORLD,
&req[nei * 5 + 4]);
}
//Receiving
int recvCount = 0;
MPI_Status status;
int hasMsg = 0;
int num;
char stopArray[size];
for (i = 0; i < size; i++) {
stopArray[i] = 0;
}
stopArray[rank] = 1;
char stop;
int completedSends = 0;
do
{
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG)
{
MPI_Recv(&num, 1, MPI_INT, status.MPI_SOURCE, TAG,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
++recvCount;
printf("Rank %d got %d from %dn", rank, num,
status.MPI_SOURCE);
}
else if (status.MPI_TAG == stopTAG) {
MPI_Recv(NULL, 0, MPI_CHAR, status.MPI_SOURCE, stopTAG,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
stopArray[status.MPI_SOURCE] = 1;
}
stop = 1;
for (i = 0; i < size; i++) {
stop &= stopArray[i];
}
if (completedSends < (3*5)) {
int indx;
MPI_Status status;
MPI_Waitany(3*5, req, &indx, &status);
completedSends++;
}
}
while (!stop && (completedSends <= 15));
//Total number msgs received by all procs.
//Now here's where I see the problem!!!
int global_count;
MPI_Allreduce(&recvCount, &global_count, 1, MPI_INT, MPI_SUM,
MPI_COMM_WORLD);
printf("nRank %d receives %d msgs;nGlobal count = %dn", rank,
recvCount, global_count);
MPI_Finalize();
return 0;
}
- 用MacOS Mojave编译C++:致命错误:mpi.h:没有这样的文件或目录
- MPI突然停止了对多个核心的操作
- 设置 Visual Studio for MPI: 找不到标识符错误
- 使用 make 编译 MPI,几个命名空间错误,例如"错误:未知类型名称'使用'?
- 如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?
- 重载 MPI 中的运算符 ()
- MPI:检查是否有任何进程已终止
- 使用 pybind11 共享 MPI 通信器
- 使用 CMake,Microsoft MPI 和 Visual Studio 2017 找不到 mpi.h
- 在具有 MPI 的超立方体中广播
- 通过 mpi 发送 c++ 标准::矢量<bool>
- 使用 MPI 的 C++ 中的并行 for 循环
- 如何将 OpenMP 和 MPI 导入到大型 CLion CMake 项目中?
- 如何通过Boost.MPI发送2d Boost.MultiArray的子阵列?
- HDF5 构建了并行支持,但找不到特定于 mpi 的功能
- MPI 集合通信中的指针分配
- 仅特定内核计数上的 MPI 内存损坏
- 如何在 Mac OS 上安装 boost-mpi 及其对 clang 的依赖关系?
- MPI Isend and Irecv problems
- boost::mpi在具有相同标记的多个isend/irecv传输上抛出mpi_ERR_TRUNCATE