在C++中用MPI解析大文件

Parsing large file with MPI in C++

本文关键字:文件 MPI C++ 中用      更新时间:2023-10-16

我有一个C++程序,我想在其中解析一个巨大的文件,寻找我实现的一些正则表达式。程序按顺序执行时工作正常,但后来我想使用MPI运行它。

我在main函数中通过区分master(协调执行的人)和worker(并行解析文件的人)开始了对MPI的适应。类似这样的东西:

MPI::Init(argc, argv);
...
if(rank == 0) {
    ...
    // Master sends initial and ending byte to every worker
    for(int i = 1; i < total_workers; i++) {
        array[0] = (i-1) * first_worker_file_part;
        array[1] = i * first_worker_file_part;
        MPI::COMM_WORLD.Send(array, 2, MPI::INT, i, 1);
    }
}
if(rank != 0)
    readDocument();
...
MPI::Finalize();

master将向每个worker发送一个具有2个位置的数组,该数组包含将在位置0开始读取文件的字节和需要在位置1停止读取的字节。

readDocument()函数现在看起来是这样的(不是解析,只是每个工作人员读取自己的文件部分):

void readDocument()
{
    array = new int[2];
    MPI::COMM_WORLD.Recv(array, 10, MPI::INT, 0, 1, status);
    int read_length = array[1] - array[0];
    char* buffer = new char [read_length];
    if (infile)
    {
        infile.seekg(array[0]); // Start reading in supposed byte
        infile.read(buffer, read_length);
    }
}

我尝试过不同的例子,从将读取的输出写入文件到用不同数量的进程运行它。例如,当我用20个进程而不是10个进程运行程序时,读取文件的时间会延长两倍。我预计会有将近一半的时间,但我不明白为什么会发生这种情况。

此外,在另一件事中,我想让主程序等待所有工作者完成执行,然后打印最终时间。在工人们加工的时候,有没有办法"阻止"他?就像C pthreads中的cond_wait一样?

根据我的经验,在具有并行文件系统的计算机系统上工作的人往往了解这些并行文件系统,所以你的问题最初会把你标记为不在这样的系统上工作。

在没有特定硬件支持的情况下,从单个文件读取归结为系统定位单个读取头并从磁盘读取一系列字节到内存。许多现代文件系统(如RAID)的复杂现实并没有实质性地改变这种情况,RAID实际上可能会将文件存储在多个磁盘上。当多个进程同时要求操作系统访问文件时,o/s会根据一些概念(可能是公平的)分配磁盘访问权限,这样就不会有进程出现饥饿。在最坏的情况下,o/s在将磁盘访问从一个进程切换到另一个进程时花费了太多时间,以至于读取速率显著下降。就吞吐量而言,最有效的方法是单个进程一次性读取整个文件,而其他进程则执行其他操作。

这种情况下,多个进程争夺稀缺的磁盘i/o资源,无论这些进程是否是并行MPI(或类似)程序的一部分,或者是同时运行的完全独立的程序,都适用。

影响是你观察到的——不是10个进程每个都在等待获得文件的1/10共享,而是20个进程每个在等待它们的1/20共享。哦,你哭了,但每个过程只读取一半的数据,所以整个团队应该花同样的时间来获取文件。不,我回应,您忘记添加o/s在访问之间定位和重新定位读/写磁头所需的时间。读取时间包括延迟(发出请求后开始读取需要多长时间)和吞吐量(i/o系统来回传递字节的速度)。

应该很容易对延迟和带宽做出一些合理的估计,以解释20个进程的读取时间是10个进程的两倍。

你怎么能解决这个问题?如果没有并行文件系统,你就无法做到这一点。但您可能会发现,让主进程读取整个文件,然后将其打包,这比您当前的方法更快。你可能不会,你可能会发现目前的方法是整个计算中最快的。如果读取时间是总计算时间的10%,您可能会认为这是一个合理的开销。

要添加高性能标记的正确答案,可以使用MPI-IO进行文件读取,向IO例程提供(在这种情况下)不要从每个处理器读取的提示;但是,如果您移动到具有并行文件系统的集群,那么具有修改(或空)MPI_Info的同一代码也应该能够利用并行文件系统。对于MPI-IO最常见的实现,Romio,这里有描述可用提示的手册;特别是,我们使用

MPI_Info_set(info, "cb_config_list","*:1");

将读卡器的数量设置为每个节点一个。下面的代码将允许您尝试使用MPI-IO或POSIX(例如,seek)读取文件。

#include <iostream>
#include <fstream>
#include <mpi.h>
void partitionFile(const int filesize, const int rank, const int size,  
                   const int overlap, int *start, int *end) {
    int localsize = filesize/size;
    *start = rank * localsize;
    *end   = *start + localsize-1;
    if (rank != 0)      *start -= overlap;
    if (rank != size-1) *end   += overlap;
}
void readdataMPI(MPI_File *in, const int rank, const int size, const int overlap,
               char **data, int *ndata) {
    MPI_Offset filesize;
    int start;
    int end;
    // figure out who reads what 
    MPI_File_get_size(*in, &filesize);
    partitionFile((int)filesize, rank, size, overlap, &start, &end);
    *ndata =  end - start + 1;
    // allocate memory
    *data = new char[*ndata + 1];
    // everyone reads in their part 
    MPI_File_read_at_all(*in, (MPI_Offset)start, *data, 
                         (MPI_Offset)(*ndata), MPI_CHAR, MPI_STATUS_IGNORE);
    (*data)[*ndata] = '';
}
void readdataSeek(std::ifstream &infile, int array[2], char *buffer)
{
    int read_length = array[1] - array[0];
    if (infile)
    {
        infile.seekg(array[0]); // Start reading in supposed byte
        infile.read(buffer, read_length);
    }
}
int main(int argc, char **argv) {
    MPI_File in;
    int rank, size;
    int ierr;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (argc != 3) {
        if (rank == 0) 
            std::cerr << "Usage: " << argv[0] << " infilename [MPI|POSIX]" << std::endl;
        MPI_Finalize();
        return -1;
    }
    std::string optionMPI("MPI");
    if ( !optionMPI.compare(argv[2]) ) {
        MPI_Info info;
        MPI_Info_create(&info);
        MPI_Info_set(info, "cb_config_list","*:1"); // ROMIO: one reader per node
                                                    // Eventually, should be able to use io_nodes_list or similar
        ierr = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, info, &in);
        if (ierr) {
            if (rank == 0) 
                std::cerr << "Usage: " << argv[0] << " Couldn't open file " << argv[1] << std::endl;
            MPI_Finalize();
            return -1;
        }
        const int overlap=1;
        char *data;
        int ndata;
        readdataMPI(&in, rank, size, overlap, &data, &ndata);
        std::cout << "MPI: Rank " << rank << " has " << ndata << " characters." << std::endl;
        delete [] data;
        MPI_File_close(&in);
        MPI_Info_free(&info);
    } else {
        int fsize;
        if (rank == 0) {
            std::ifstream file( argv[1], std::ios::ate );
            fsize=file.tellg();
            file.close();
        }
        MPI_Bcast(&fsize, 1, MPI_INT, 0, MPI_COMM_WORLD);
        int start, end;
        partitionFile(fsize, rank, size, 1, &start, &end);
        int array[2] = {start, end};
        char *buffer = new char[end-start+2];
        std::ifstream infile;
        infile.open(argv[1], std::ios::in);
        readdataSeek(infile, array, buffer);
        buffer[end-start+1] = '';
        std::cout << "Seeking: Rank " << rank << " has " << end-start+1 << " characters." << std::endl;
        infile.close() ;
        delete [] buffer;
    }
    MPI_Finalize();
    return 0;
}

在我的桌面上,我没有得到太大的性能差异,甚至超额订阅了内核(例如,使用大量查找):

$ time mpirun -np 20 ./read-chunks moby-dick.txt POSIX
Seeking: Rank 0 has 62864 characters.
[...]
Seeking: Rank 8 has 62865 characters.
real    0m1.250s
user    0m0.290s
sys 0m0.190s
$ time mpirun -np 20 ./read-chunks moby-dick.txt MPI
MPI: Rank 1 has 62865 characters.
[...]
MPI: Rank 4 has 62865 characters.
real    0m1.272s
user    0m0.337s
sys 0m0.265s