使用 MPI_Bcast 发送具有动态大小的动态数组

Send dynamic array with dynamic size using MPI_Bcast

本文关键字:动态 数组 MPI Bcast 使用      更新时间:2023-10-16

OpenMPI:我想读取根节点上的文件并将该文件的内容发送到所有其他节点。我发现MPI_Bcast这样做:

int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
    int root, MPI_Comm comm)

我找到的所有示例都具有已知的count值,但就我而言,计数值主要在根上已知。 其他示例表示,相同的 MPI_Bcast 调用检索其他节点上的数据。

我添加了这个:

typedef short Descriptor[128];
MPI_Datatype descriptorType;
MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType);
MPI_Type_commit(&descriptorType);

 if(world_rank == 0)   {
  struct stat finfo;
  if(stat(argv[1], &finfo) == 0) {
        querySize = finfo.st_size/sizeof(Descriptor);
  }
 {
  //read binary query
  queryDescriptors = new Descriptor[querySize];
  fstream qFile(argv[1], ios::in | ios::binary);
  qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor));
  qFile.close();
  }
}
  MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD);
  if (world_rank != 0)
  {
        queryDescriptors = new Descriptor[querySize];
  }
  MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD);

当我这样称呼它时:mpirun -np 2 ./mpi_hello_world它工作正常,但是当我用超过 2 个调用它时,我得到这个:

mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed.
mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed.

如果qFile.read(...)未包含在if(rank==0){}测试中,则所有进程都将读取该文件。并且queryDescriptors = new Descriptor[querySize];应该在除 0 之外的所有进程的第一个MPI_Bcast()之后调用:之前,querySize在这些进程中毫无意义。

进程 0 必须

  • 读取项目数
  • 分配
  • 读取阵列
  • 广播项目数
  • 广播阵列

其他过程必须:

  • 接收项目数
  • 分配
  • 接收阵列

下面是如何读取浮点数组并使用动态分配的示例:

#include <stdio.h>
#include <iostream>
#include <fstream>
#include <mpi.h>
using namespace std;
int main (int argc,  char *argv[])
{
    int rank;
    int size;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if(rank == 0)
    {
        //creating the file
        ofstream myfile;
        myfile.open ("example.txt", ios::out |ios::binary);
        int nbitem=42;
        myfile.write((char*)&nbitem,sizeof(int));
        float a=0;
        for(int i=0;i<nbitem;i++){
            myfile.write((char*)&a,sizeof(float));
            a+=2;
        }
        myfile.close();    
    }

    //now reading the file
    int nbitemread=0;
    float* buffer;
    if(rank==0){
        ifstream file ("example.txt",  ios::in |ios::binary);
        file.read ((char*)&nbitemread, sizeof(int));
        buffer=new float[nbitemread];
        file.read ((char*)buffer,nbitemread* sizeof(float));
        file.close();
        //communication
        MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD);
    }else{
        MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD);
        //nbitemread is meaningfull now
        buffer=new float[nbitemread];
        MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD);
    }
    //printing...
    cout<<"on rank "<<rank<<" rode "<<buffer[nbitemread/2]<<" on position "<<nbitemread/2<<endl;
    delete[] buffer;
    MPI_Finalize();
    return 0;
}

使用 mpiCC main.cpp -o main 编译它并按 mpirun -np 2 main 运行

代码中的另一个问题是MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType); .应该是MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType);这是一段基于你的代码,应该可以解决问题:

#include <stdio.h>
#include <iostream>
#include <fstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <mpi.h>
using namespace std;
int main (int argc,  char *argv[])
{
    int world_rank;
    int size;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    int querySize;

    typedef short Descriptor[128];
    MPI_Datatype descriptorType;
    MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType);
    MPI_Type_commit(&descriptorType);

    Descriptor* queryDescriptors;

    if(world_rank == 0)   {
        struct stat finfo;
        if(stat(argv[1], &finfo) == 0) {
            cout<<"st_size "<<finfo.st_size<<" descriptor "<<sizeof(Descriptor)<< endl;
            querySize = finfo.st_size/sizeof(Descriptor);
            cout<<"querySize "<<querySize<<endl;
        }else{
            cout<<"stat error"<<endl;
        }
        {
            //read binary query
            queryDescriptors = new Descriptor[querySize];
            fstream qFile(argv[1], ios::in | ios::binary);
            qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor));
            qFile.close();
        }
    }
    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if (world_rank != 0)
    {
        queryDescriptors = new Descriptor[querySize];
    }
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD);
    cout<<"on rank "<<world_rank<<" rode "<<queryDescriptors[querySize/2][12]<<" on position "<<querySize/2<<endl;
    delete[] queryDescriptors;
    MPI_Finalize();
    return 0;
}