使用MPI临时接收数据,然后返回结果

Using MPI to take in data temporarily use it and then return a result

本文关键字:然后 返回 结果 数据 MPI 使用      更新时间:2023-10-16

我有一个起始位置数组,用于读取程序中每个baby节点的文件。我试图让headnode向每个节点发送开始读取文件的位置,然后让它们发回结果
让它变得困难的是,文件中的行没有完美数量的节点,因此必须反复使用它们。为了实现这一点,我尝试使用for循环进行发送和接收,其中头节点发送文件中行数的消息,子节点接收文件中行数来除以子节点数的消息。

简单地说,这对我来说不可行,我真的不知道该怎么办。

if(qNum == 1){  //If query Number is one
    if(firstSource == 1){ //And the source is equal to 1
        if(my_rank == 0){ // if this process is the head node
            int startVal = 0; // declare variable for starting value
            int z = 1; // declare variable to loop through baby nodes
            for(int i = 1; i <= enronInfo[0]; i++){  // for # of lines in file
                if(z == world_size){ // if process num equals largest process num reset to 1
                    z = 1;
                }
                startVal = getFseekVal(i, firstSource); //set the startVal to the value at location I in the array.
                MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD); //send a message to processor z with the startVal
            }
            MPI_Barrier(MPI_COMM_WORLD); //Don't know if this helps
            if(my_rank != 0){ //if not the headnode
                int startVal; // declare variable for starting value
                for(int i = 0; i<=babyLoopSize; i++){ // for # of lines in processor divided by # of babynodes
                    MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // receive a message with startVal from the headnode
                }
            }
            MPI_Barrier(MPI_COMM_WORLD); // Don't know if this helps
        }
    }



#include <fstream>
#include <string>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
using namespace std;
int enronDSarr[39859], nipsDSarr[1499], kosDSarr[3429], nytDSarr[299999]; //containers for docstring values
string enronV = "/home/mcconnel/BagOfWords/vocab.enron.txt";
string nipsV = "/home/mcconnel/BagOfWords/vocab.nips.txt";
string kosV = "/home/mcconnel/BagOfWords/vocab.kos.txt";
string nytV = "/home/mcconnel/BagOfWords/vocab.nytimes.txt";
string enronDW = "/home/mcconnel/BagOfWords/docword.enron.txt";
string nipsDW = "/home/mcconnel/BagOfWords/docword.nips.txt";
string kosDW = "/home/mcconnel/BagOfWords/docword.kos.txt";             // Strings for locations of each file
string nytDW = "/home/mcconnel/BagOfWords/docword.nytimes.txt";
string enronDS = "/home/mcconnel/BagOfWords/docstart.enron.txt";
string nipsDS = "/home/mcconnel/BagOfWords/docstart.nips.txt";
string kosDS = "/home/mcconnel/BagOfWords/docstart.kos.txt";
string nytDS = "/home/mcconnel/BagOfWords/docstart.nytimes.txt";
int enronInfo[3], nipsInfo[3], kosInfo[3], nytInfo[3];                  // Arrays storing first 3 lines of info from each DocWords file
int firstSource, secondSource, numTimes, qNum, wordLength;
string enteredWord;
char enteredWordChar[50];

int word2int(string fileLocation, string input){                // converts text from a file into a word count value
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        int wordNum;
        while(file.good()){
                file >> word;
                if(word.compare(input)== 0){
                        wordNum = i;
                        return wordNum;
                }
                i++;
        }
        return wordNum;
}
int getFseekVal(int docNumber, int sourceNumber){
        if(sourceNumber  == 1){
                return enronDSarr[docNumber - 1];
        }
        else if(sourceNumber == 2){
                return nipsDSarr[docNumber - 1];
        }
        else if(sourceNumber == 3){
                return kosDSarr[docNumber - 1];
        }
        else{
                return nytDSarr[docNumber - 1];
        }
}

string int2word(string fileLocation, int wordInt){              // converts a word count value from a file into the actual text
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        string retWord;
        while(file.good()){
                file >> word;
                if(i == wordInt){
                        retWord = word;
                        return retWord;
                }
                i++;
        }
        return 0;
}

int getInfoDW(string fileLocation, int pos){                    //imports an array of length 3 for each document's info in the docwords file
        ifstream file;
        file.open(fileLocation.c_str());
        int word;
        int i = 0;
        int retWord;
        while(file.good()){
                file >> word;
                if(i == 0 && pos == 0){
                        return word;
                }
                if(i == 1 && pos == 1){
                        return word;
                }
                if(i == 2 && pos == 2){
                        return word;
                }
                i++;
        }
        return retWord;
}
int getEnronDS(string fileLocation){                            // imports array from Enron docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(), "r");
    for(i = 0; i<39861; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                enronDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getNipsDS(string fileLocation){                             // imports array from NIPS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<1500; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nipsDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}
int getKosDS(string fileLocation){                              // imports array from KOS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<3430; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                kosDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}
int getNytDS(string fileLocation){                              // imports array from NYT docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<300000; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nytDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}
int getCurrentDS(int fileNumber, int documentNum){              //Will be used to return docstart byte value at document location
        if(fileNumber == 0){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 1){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 2){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 3){
                return enronDSarr[documentNum - 1];
        }
        else{
                printf("Something definitely went wrong");
        }
}
int getSourceNumber(){
        int source;
        printf("Select a wordbag:n 1. Enron n 2. NIPS n 3. KOSn 4. NYTn");
        cin >> source;
        return source;
}
int getUserResponse(){
   int i = 1;
   while(i){
        printf("Choose a query(1-4) and press enter:n");
        printf("1. What percent of documents in X use any one word more than ____ times?n");
        printf("2. What words in X are used more than ____ times in any document?n");
        printf("3. In which data set does the word ____ appear most frequently?n");
        printf("4. Does ____ appear more frequently in X or Y?n");
        cin >> qNum;
        if(qNum < 5 && qNum > 0){
                i = 0;
                printf("%d", qNum);
        }
        else{
                printf("Invalid Response, Please Try Again n");
        }
        if(qNum == 1){
                firstSource = getSourceNumber();
                printf("and how many times?n");
                cin >> numTimes;
               // query1(firstSource, numTimes);
        }
        else if(qNum == 2){
                firstSource = getSourceNumber();
                printf("and how many times?n");
                cin >> numTimes;
        }
        else if(qNum == 3){
                printf("What word would you like to use?n");
                cin >> enteredWord;
        }
        else if(qNum == 4){
                printf("What word would you like to use?n");
                cin >> enteredWord;
                printf("Select your first source...n");
                firstSource = getSourceNumber();
                printf("Select your second source...n");
                secondSource = getSourceNumber();
        }
    }
}
void importFiles(){
        getEnronDS(enronDS);
        getNipsDS(nipsDS);
        getKosDS(kosDS);                                                        // Functions to read in arrays for each docstart fil
        getNytDS(enronDS);
        for(int a = 0; a <= 2; a++){
                 enronInfo[a] = getInfoDW(enronDW, a);
                 nipsInfo[a] = getInfoDW(nipsDW, a);
                 kosInfo[a] = getInfoDW(kosDW, a);
                 nytInfo[a] = getInfoDW(nytDW, a);
        }
}


int main(int argc, char** argv){
        int world_size, my_rank, numDocs,fseekVal, babyLoopSize, babyLoopSize2, babyLoopSize3, babyLoopSize4;
        MPI_Init(NULL,NULL);
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);
        MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
        cout << " my rank is " << my_rank << "n";
        importFiles();
        cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << "n";
        if(my_rank == 0){
                getUserResponse();
        }
        MPI_Barrier(MPI_COMM_WORLD);
        babyLoopSize = enronInfo[0] / (world_size - 1);
        babyLoopSize2 = nipsInfo[0] / (world_size - 1);
        babyLoopSize3 = kosInfo[0] / (world_size - 1);
        babyLoopSize4 = nytInfo[0] / (world_size - 1);
        //cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << " and that my babyLoopSize for enron is " << babyLoopSize << "n";
        //cout << " my rank is " << my_rank << " I know that qNum is  " << qNum << "n";
        MPI_Bcast(&qNum, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&firstSource, 2, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&secondSource,3, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&enteredWordChar,4, MPI_CHAR, 0, MPI_COMM_WORLD);

        if(qNum == 1){
                if(firstSource == 1){
                        if(my_rank == 0){
                                int startVal = 0;
                                int z = 1;
                                for(int i = 1; i <= enronInfo[0]; i++){
                                        if(z == world_size){
                                                z = 1;
                                        }
                                        startVal = getFseekVal(i, firstSource);
                                        MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD);
                                }
                        if(my_rank != 0){
                                int startVal;
                                for(int i = 0; i<=babyLoopSize; i++){
                                        MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
                                }
                        }
                        MPI_Barrier(MPI_COMM_WORLD);
                }
        }


        MPI_Finalize();
}

MPI_Send通常是一个阻塞命令,请参阅此处:

此例程可能会阻塞,直到消息被目的地进程。

这意味着,每次在主进程上运行MPI_Send时,相应的从进程必须运行MPI_Recv,主进程才能继续。通过包含第一行MPI_Barrier,您告诉每个从机在让从机接收数据之前等待主机完成发送所有数据,但由于MPI_Send的阻塞性,主机将永远不会从第一次调用MPI_Send返回。

对于你的问题,我建议先把它制动成小块。编写代码回答每个子问题:

// Determine how many workers we have
int nWorkers = ...;
// Determine where in the file each worker should start
// If we store each location in an array we can make use of
// another MPI command latter
int aStartingLocs[nWorkers] = {...};
// Distribute starting locations to each worker
int nMyStart;
MPI_Scatter(aStartingLocs, 1, MPI_INT,   //< Things to send
            &nMyStart, 1, MPI_INT,       //< Recieved value
            ...);
// Our starting location is stored in nMyStart
// TODO: Use starting location to compute results
double dResult = ...;
// We can use MPI_Gather to send the values back to master
double aResults[nWorkers];
MPI_Gather(&dResult, 1, MPI_DOUBLE,    //< What we are sending
           aResults, 1, MPI_DOUBLE,    //< Where to store the result
           ...);
// Now we can use the result how we choose
if (bIAmMaster)
{
   // TODO: Use the results
}

以上只是一个大纲(半伪代码/注释),但希望您能够将其用作指南。

我已经有一段时间没有使用MPI了,所以您应该明确检查MPI_ScatterMPI_Gather的语法。看看谷歌上出现的第一个链接。

如果您需要分散一些不能被工作人员数量整除的项目,可以使用MPI_ScattervMPI_Gatherv,请参阅手册页。

如果您非常倾向于尝试实现自己版本的MPI_Scatter(v),我记得HPC的一项任务就是这样做的。然而,大多数情况下,仅仅使用库函数可能更容易/更好。