C++ STL 算法(列表排序)OpenMP/多线程实现

C++ STL algorithm (list sort) OpenMP/multithreaded implementations

本文关键字:OpenMP 多线程 实现 排序 STL 算法 列表 C++      更新时间:2023-10-16

我试图加速我的代码的一个内核,它的本质归结为一种排序(我在具有多个内核的CPU上运行它(。我从这篇文章(STL算法和并发编程(中发现,其中一些算法可以加速,例如,使用OpenMP(见下文(。

我使用 __gnu_parallel::sort 得到了相当不错的加速

例如

__gnu_parallel::sort(std::begin(X), std::end(X), [](X a, X b){ return a.member > b.member;});

事实证明,std::list 是我的数据的更好容器。但这似乎没有用于排序的并行/多线程实现。

上面链接的帖子日期为2010年。我想知道对此的最新智慧是什么。

对于 Microsoft(Visual Studio 2015 之前(,std::list::sort 使用列表数组,其中 array[i] 要么是空列表,要么是大小为 2 的列表 i (1,2,4,8, ...(。节点一次从原始列表中获取一个并合并到数组中,然后将数组合并以形成单个排序列表。假设比较开销不大,这是一个内存绑定过程,由于扫描列表以拆分列表的开销,多线程将无济于事,几乎使内存读取操作的数量翻了一番。下面是此类列表排序的示例代码,其限制是比较<而不是><=,因此必须反转操作数以保持排序稳定。

#define NUMLISTS 32                     /* number of lists */
NODE * SortList(NODE *pList)
{
NODE * aList[NUMLISTS];                 /* array of lists */
NODE * pNode;
NODE * pNext;
int i;
    if(pList == NULL)                   /* check for empty list */
        return NULL;
    for(i = 0; i < NUMLISTS; i++)       /* zero array */
        aList[i] = NULL;
    pNode = pList;                      /* merge nodes into array */
    while(pNode != NULL){
        pNext = pNode->next;
        pNode->next = NULL;
        for(i = 0; (i < NUMLISTS) && (aList[i] != NULL); i++){
            pNode = MergeLists(aList[i], pNode);
            aList[i] = NULL;
        }
        if(i == NUMLISTS)
            i--;
        aList[i] = pNode;
        pNode = pNext;
    }
    pNode = NULL;                       /* merge array into one list */
    for(i = 0; i < NUMLISTS; i++)
        pNode = MergeLists(aList[i], pNode);
    return pNode;
}
NODE * MergeLists(NODE *pSrc1, NODE *pSrc2)
{
NODE *pDst = NULL;                      /* destination head ptr */
NODE **ppDst = &pDst;                   /* ptr to head or prev->next */
    if(pSrc1 == NULL)
        return pSrc2;
    if(pSrc2 == NULL)
        return pSrc1;
    while(1){
        if(pSrc2->data < pSrc1->data){  /* if src2 < src1 */
            *ppDst = pSrc2;
            pSrc2 = *(ppDst = &pSrc2->next);
            if(pSrc2 == NULL){
                *ppDst = pSrc1;
                break;
            }
        } else {                        /* src1 <= src2 */
            *ppDst = pSrc1;
            pSrc1 = *(ppDst = &pSrc1->next);
            if(pSrc1 == NULL){
                *ppDst = pSrc2;
                break;
            }
        }
    }
    return pDst;
}

update - Visual Studio 2015 及更高版本切换到使用迭代器而不是列表进行合并排序,这消除了分配问题,例如没有默认分配器,并且由于合并是通过 splice(( 完成的,因此它提供了异常安全性(如果用户比较抛出异常,列表将重新排序,但所有节点都在那里, 假设拼接永远不会引发异常(。VS2015也切换到自上而下的合并排序,但可以使用基于迭代器的自下而上的合并排序。我不确定为什么要切换到自上而下,因为对于具有随机分散节点的大型列表(远远超过缓存大小(来说,它的速度要慢约 40%。基于迭代器的代码示例。数组中的每个迭代器都指向大小为 2 的运行的第一个节点的 power i,或者它等于 list.end((,以指示空运行。运行的结束将是数组或局部变量迭代器中第一个先前的非"空"条目(数组中的所有运行都是相邻的运行(。所有合并都涉及相邻运行。合并函数有 3 个参数,一个迭代器到左运行的第一个节点,一个迭代器到右运行的第一个节点,这也是左运行的结束,以及一个迭代器到右运行结束(可能是下一个运行的第一个节点的迭代器或 list.end(((。

template <typename T>
typename std::list<T>::iterator Merge(std::list<T> &ll,
                    typename std::list<T>::iterator li,
                    typename std::list<T>::iterator ri,
                    typename std::list<T>::iterator ei);
// iterator array size
#define ASZ 32
template <typename T>
void SortList(std::list<T> &ll)
{
    if (ll.size() < 2)                  // return if nothing to do
        return;
    std::list<T>::iterator ai[ASZ];     // array of iterators
    std::list<T>::iterator li;          // left   iterator
    std::list<T>::iterator ri;          // right  iterator
    std::list<T>::iterator ei;          // end    iterator
    size_t i;
    for (i = 0; i < ASZ; i++)           // "empty" array
        ai[i] = ll.end();
    // merge nodes into array
    for (ei = ll.begin(); ei != ll.end();) {
        ri = ei++;
        for (i = 0; (i < ASZ) && ai[i] != ll.end(); i++) {
            ri = Merge(ll, ai[i], ri, ei);
            ai[i] = ll.end();
        }
        if(i == ASZ)
            i--;
        ai[i] = ri;
    }
    // merge array into single list
    ei = ll.end();                              
    for(i = 0; (i < ASZ) && ai[i] == ei; i++);
    ri = ai[i++];
    while(1){
        for( ; (i < ASZ) && ai[i] == ei; i++);
        if (i == ASZ)
            break;
        li = ai[i++];
        ri = Merge(ll, li, ri, ei);
    }
}
template <typename T>
typename std::list<T>::iterator Merge(std::list<T> &ll,
                             typename std::list<T>::iterator li,
                             typename std::list<T>::iterator ri,
                             typename std::list<T>::iterator ei)
{
    std::list<T>::iterator ni;
    (*ri < *li) ? ni = ri : ni = li;
    while(1){
        if(*ri < *li){
            ll.splice(li, ll, ri++);
            if(ri == ei)
                return ni;
        } else {
            if(++li == ri)
                return ni;
        }
    }
}

从 17 C++开始,您可以考虑并行排序算法 std::sort(std::execution::p ar, ...