有效地找到最小的大数组使用Opencl

Efficiently find minimum of large array using Opencl

本文关键字:数组 Opencl 有效地      更新时间:2023-10-16

我正在opencl中实现分层聚类算法。对于每一步,我都要在一个非常大的数组中找到最小的值(大约。10^8个条目),这样我就知道哪些元素必须组合到一个新的簇中。最小值的识别必须进行9999次。使用我目前的内核,需要大约200秒才能找到最小值(在所有迭代中累积)。我解决这个问题的方法是将数组分成2560个大小相等的片段(我的Radeon 7970上有2560个流处理器),并分别找到每个片段的最小值。然后运行第二个内核,将这些最小值组合成全局最小值。

有没有更有效的方法来解决这个问题?最初的想法是通过使用OpenCL来加速HCA,但由于识别最小值需要的时间比CPU上的matlab HCA长得多。我做错了什么?

__kernel void findMinValue(__global float * myArray, __global double * mins, __global int * elementsToWorkOn, __global int * arraysize){
int gid = get_global_id(0);
int minloc = 0;
float mymin = INFINITY;
int eltoWorkOn = *elementsToWorkOn;
int offset = gid*eltoWorkOn;
int target = offset + eltoWorkOn;
if (offset<*arraysize){
    //make sure the array size is not exceeded
    if (target > *arraysize){
        target = *arraysize;
    }
    //find minimum for the kernel
    for (int i = offset; i < target; i++){
        if (*(myArray + i) < mymin){
            mymin = *(myArray + i);
            minloc = i;
        }
    }
}
*(mins + gid * 2) = minloc;
*(mins + gid * 2 + 1) = mymin;
}

__kernel void getGlobalMin(__global double * mins, __global double * gmin, __global int * pixelsInImage){
    int nWorkitems = 2560;
    float globalMin = INFINITY;
    double globalMinLoc;
    float tempMin;
    for (int i = 0; i < nWorkitems; i++){
        tempMin = *(mins + 2 * i + 1);
        if (tempMin < globalMin){
            globalMin = tempMin;
            globalMinLoc = *(mins + 2 * i);
        }
    }
    *(gmin + 0) = globalMinLoc;
    *(gmin + 1) = globalMin;
}

我根据你的建议重新设计了findMinValue内核。内存访问现在是聚合的,我将工作划分为工作组,这样我就可以减少全局内存访问的数量。在此之前,每个内核都将其最小值写入全局mins缓冲区。现在每个工作组只有一个内核写入一个值(即组最小值)。此外,我增加了全局工作大小,以隐藏内存延迟。

这些更改允许将识别最小值所需的时间从>200秒减少到59秒!非常感谢您的帮助!

在优化内核时,还有什么我可能错过的吗?你还有什么建议吗?我不知道如何使用setArg()。我必须传递一个指向int值的指针给它(像这样:err = clSetKernelArg(kernel[2], 3, sizeof(int), &variable);)。在这种情况下,内核声明是怎样的呢?

这是我的新内核:

__kernel void findMinValue(__global float * myArray, __global double * mins, __global int * arraysize,__global int * elToWorkOn,__global int * dummy){
int gid = get_global_id(0);
int lid = get_local_id(0);
int groupID = get_group_id(0);
int lsize = get_local_size(0);
int gsize = get_global_id(0);
int minloc = 0;
int arrSize = *arraysize;
int elPerGroup = *elToWorkOn;
float mymin = INFINITY;

__local float lmins[128];
//initialize local memory
*(lmins + lid) = INFINITY;
__local int lminlocs[128];
//this private value will reduce global memory access in the for loop (temp = *(myArray + i);)
float temp;
//ofset and target of the for loop
int offset = elPerGroup*groupID + lid;
int target = elPerGroup*(groupID + 1);
//prevent that target<arrsize (may happen due to rounding errors or arrSize not a multiple of elPerGroup
target = min(arrSize, target);
//find minimum for the kernel
//offset is different for each lid, leading to sequential memory access
if (offset < arrSize){
    for (int i = offset; i < target; i += lsize){
        temp = *(myArray + i);
        if (temp < mymin){
            mymin = temp;
            minloc = i;
        }
    }
    //store kernel minimum in local memory
    *(lminlocs + lid) = minloc;
    *(lmins + lid) = mymin;
    //find work group minimum (reduce global memory accesses)
    lsize = lsize >> 1;
    while (lsize > 0){
        if (lid < lsize){
            if (*(lmins + lid)> *(lmins + lid + lsize)){
                *(lmins + lid) = *(lmins + lid + lsize);
                *(lminlocs + lid) = *(lminlocs + lid + lsize);
            }
        }
        lsize = lsize >> 1;
    }
}
//write group minimum to global buffer
if (lid == 0){
    *(mins + groupID * 2 + 0) = *(lminlocs + 0);
    *(mins + groupID * 2 + 1) = *(lmins + 0);
}
}

如果每个工作项遍历全局数组,则读取合并为零。如果您更改它,使每个工作项都按翘曲或波前大小大步前进,那么您将获得巨大的速度增益。

WI访问连续内存比访问分散内存效率高得多。此外,您应该首先在工作组中求和,然后将其传递给全局内存。并使用单个setArg()的int,而不是缓冲区的目的。至少,你应该这样做:

__kernel void findMinValue(__global float * myArray, __global double * mins, __global int arraysize){
    int gid = get_global_id(0);
    int minloc = 0;
    float mymin = INFINITY;
    //find minimum for the kernel
    for (int i = gid ; i < arraysize; i+= get_global_size(0)){
        if (*(myArray + i) < mymin){
            mymin = *(myArray + i);
            minloc = i;
        }
    }
    *(mins + gid * 2) = minloc;
    *(mins + gid * 2 + 1) = mymin;
}

聚合内存访问将计算速度提高了大约4倍。然而,对于我们的目的来说,这还是太慢了。重新计算所有条目的最小值的蛮力方法是不合适的。

因此,我更改了算法,以便它只保留每行的最小值(+其位置)。在每次迭代中更改了2行和2列后,如果需要,则更新行最小值,然后通过查找行最小值的最小值来获得全局最小值。因此,如果我们有一个22500*22500矩阵,我只需要得到22500元素的最小值,而不是506250000。当然,这种实现需要额外的计算,但最终我们可以减少从200 s(非凝聚)到59 s(凝聚)一直到8 s搜索最小值的时间。

我希望这对将来的人有帮助:-)