MPI_Reduce选择前k个结果

MPI_Reduce select first k results

本文关键字:结果 选择 Reduce MPI      更新时间:2023-10-16

我想使用MPI找到所有节点上的前k个结果。为此,我想使用MPI_Reduce与自己的函数。然而,我的代码不起作用,因为函数的len参数与给予MPI_Reduce的计数参数不相同。
我在这里发现实现可以这样做来管道计算。

我的代码类似于这个:

inline void MPI_user_select_top_k(int *invec, acctbal_pair *inoutvec, int *len, MPI_Datatype *dtpr) {
    std::vector<acctbal_pair> temp;
    for(int i = 0; i < *len; ++i) {
        acctbal_pair p1 = {invec[i].suppkey, invec[i].acctbal};
        acctbal_pair p2 = {inoutvec[i].suppkey, inoutvec[i].acctbal};
        temp.push_back(p1);
        temp.push_back(p2);
    }
    std::sort(temp.begin(), temp.end(), [&](acctbal_pair a, acctbal_pair b) { return a.acctbal > b.acctbal;});
    for(int i = 0; i < *len; ++i) {
        inoutvec[i].suppkey = temp[i].suppkey;
        inoutvec[i].acctbal = temp[i].acctbal;
    }
}

其中acctbal_pair是一个结构体,包含字段supkey和acctbal
我这样调用MPI_Reduce。其中localResults和globalResults是大小为k的向量。

MPI_Reduce(localResults.data(), globalResults.data(), k, mpi_suppkey_acctbal, select_top_k, ROOT, MPI_COMM_WORLD);

然而,对于稍大的k值,计数被分成更小的块,使我的函数失败。

是否有任何方法告诉Reduce不要管道计算?或者你知道另一种(有效的)实现方法吗?我真的不想使用MPI_Gather并在根上查找前k个结果,因为通信开销很大。

我不能仅仅用固定参数k创建函数(并将所有k个元素视为1 MPI_type),因为k是在运行时计算的。

我知道这不是MPI_Reduce的目的(它应该只是计算一些操作元素),但如果计数没有分块,它就可以完美地工作。

注。:我的MPI实现是OpenMPI

当然,您可以这样做-您只需要创建一个大小为k的类型(这在运行时很容易做到)并进行选择。唯一的技巧是,您没有办法将状态(例如,k)传递给选择操作,因此您需要通过全局变量进行通信—这显然不是很好,但是可以做需要做的事情。如果需要使用不同大小的k重复运行算法,只需根据需要创建类型并重置全局变量。

(如果您以其他方式将k的值偷偷地放入选择操作中,例如,在每个数组中传递给它的数据的第一个元素是值k,则可以避免使用全局变量)

下面是一些这样做的代码;它允许处理器的值少于k的情况。每个处理器选择k个最小值,并将它们填充到本地数组中,然后选择操作执行部分排序合并操作,只挑选k个最小元素。

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>
const int invalid_data = -1;
static int globalselectk;        /* yuk */
int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}
MPI_Datatype createtype(int selectk) {
    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);
    return selectktype;
}
void initselectk(int *d, size_t selectk) {
    for (int i=0; i<selectk; i++)
        d[i] = invalid_data;
}
void printselectk(int *d, size_t selectk) {
    printf("[");
    for (int i=0; i<selectk; i++)
        printf("%3d ",d[i]);
    printf("] ");
}
int countselectk(int *d, size_t selectk) {
    int count = 0;
    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;
    return count;
}
int mergeselect(int *d1, int *d2, int *dout, size_t selectk) {
    int count1 = countselectk(d1, selectk);
    int count2 = countselectk(d2, selectk);
    int count = 0;
    int total = count1+count2;
    if (total >= selectk) total = selectk;
    int idx1=0, idx2=0;
    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 < count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 < count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        }
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++;
    }
    return count;
}
void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;
    int out[globalselectk];
    for (int i=0; i<*len; i++) {
        initselectk(out, globalselectk);
        int count = mergeselect(invals, inoutvals, out, globalselectk);
        for (int j=0; j<count; j++)
            inoutvals[j] = out[j];
        invals += globalselectk;
        inoutvals += globalselectk;
    }
    return;
}
int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;
    return (*i1 - *i2);
}
int main(int argc, char **argv) {
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-selectn", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);
    globalselectk = selectk;     /* yuk */
    int localdata[locn];
    int local[selectk], global[selectk];
    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk);
    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);
    srand(rank*37);
    for (int i=0; i<locn; i++)
        localdata[i] = floor(500.*rand()/RAND_MAX);
    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */
    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++)
        local[i] = localdata[i];
    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }
    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);
    if (rank == 0) {
        printf("Result is: n");
        printselectk(global,selectk);
        printf("n");
    }
    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();
    return 0;
}

编译和运行得到:

$ mpicc kselect.c -o kselect -Wall -std=c99 
$ mpirun -np 10 kselect 12 5
Rank  0 has values:  98 138 167 197 238 276 314 384 391 399 420 455 
Rank  1 has values:  16  87 119 134 156 164 225 299 321 380 409 441 
Rank  2 has values:  22  81 155 219 285 295 330 342 364 399 435 499 
Rank  3 has values:   3   7  75 164 181 271 285 358 379 438 466 491 
Rank  4 has values:   7  63  74 132 173 178 197 244 304 337 352 457 
Rank  5 has values:  21  62 104 138 240 346 377 382 411 446 455 482 
Rank  6 has values:  19  90 142 231 246 269 281 307 331 380 413 451 
Rank  7 has values:  43 191 193 232 236 331 399 429 439 445 446 457 
Rank  8 has values:  10 111 128 165 277 277 371 394 413 438 443 470 
Rank  9 has values:   2   2  34  57  97 105 128 187 265 329 344 409 
Result is: 
[  2   2   3   7   7 ] 

(不带全局变量的版本如下:)

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>
#include <assert.h>
const int invalid_data = -1;
int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}
MPI_Datatype createtype(int selectk) {
    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);
    return selectktype;
}
void initselectk(int *d, int selectk) {
    d[0] = selectk;
    for (int i=1; i<selectk+1; i++)
        d[i] = invalid_data;
}
void printselectk(int *d) {
    int selectk = d[0];
    printf("[");
    for (int i=1; i<selectk+1; i++) 
        printf("%3d ",d[i]);
    printf("] ");
}
int countselectk(int *d) {
    int selectk = d[0];
    int count = 0;
    d++;
    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;
    return count;
}
int mergeselect(int *d1, int *d2, int *dout) {
    int selectk = d1[0];
    assert(selectk == d2[0]);
    dout[0] = selectk;
    dout++;
    int count1 = countselectk(d1);
    int count2 = countselectk(d2);
    int total = count1 + count2;
    if (total >= selectk) total = selectk;
    int count = 0;
    int idx1=1, idx2=1;
    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 <= count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 <= count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        } 
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++; 
    }
    return count;
}
void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;

    for (int i=0; i<*len; i++) {
        int selectk = invals[0];
        assert(selectk == inoutvals[0]);
        int out[selectk+1];
        initselectk(out, selectk);
        int count = mergeselect(invals, inoutvals, out);
        for (int j=1; j<=count; j++) 
            inoutvals[j] = out[j];
        invals += selectk+1;
        inoutvals += selectk+1;
    }
    return;
}
int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;
    return (*i1 - *i2);
}
int main(int argc, char **argv) {
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-selectn", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);
    int localdata[locn];
    int local[selectk+1], global[selectk+1];
    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk+1);
    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);
    srand(rank*37);
    for (int i=0; i<locn; i++) 
        localdata[i] = floor(500.*rand()/RAND_MAX);
    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */
    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++) 
        local[i+1] = localdata[i];
    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }
    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);
    if (rank == 0) {
        printf("Result is: n");
        printselectk(global);
        printf("n");
    }
    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();
    return 0;
}