合并排序(pthreads)C++

Merge Sort (pthreads) C++

本文关键字:C++ pthreads 排序 合并      更新时间:2023-10-16

我对pthreads有点陌生,我正在尝试创建一个对100万个随机生成的整数进行排序的程序。我似乎对线程失去了一点控制。当第一次运行时,代码只生成一个线程,但当随后运行该线程时,线程数量会失控。由于我真的不知道问题出在哪里,所以我提供了下面的代码。

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
#define N           8          /* # of thread */
#define NUM_INTS    10000      //ideally should be able to sort 1,000,000
int int_list[NUM_INTS];
/* structure for array index
 * used to keep low/high end of sub arrays
 */
typedef struct Arr {
    int low;
    int high;
} ArrayIndex;
void merge(int low, int high) {
    int mid = (low+high)/2;
    int left = low;
    int right = mid+1;
    int list_b[high-low+1];
    volatile int i, cur = 0;
    while((left <= mid) && (right <= high)) {
        if (int_list[left] > int_list[right])
            list_b[cur++] = int_list[right++];
        else
            list_b[cur++] = int_list[right++];
    }
    while(left <= mid)
        list_b[cur++] = int_list[left++];
    while(right <= high)
        list_b[cur++] = int_list[left++];
    for (i = 0; i < (high-low+1) ; i++)
        int_list[low+i] = list_b[i];
}
void * mergesort(void *a){
    ArrayIndex *pa = (ArrayIndex *)int_list;
    int mid = (pa->low + pa->high)/2;
    ArrayIndex aIndex[N];
    pthread_t thread[N];
    aIndex[0].low = pa->low;
    aIndex[0].high = mid;
    aIndex[1].low = mid+1;
    aIndex[1].high = pa->high;
    if (pa->low >= pa->high)
        return 0;
    volatile int i;
    for(i = 0; i < N; i++)
        pthread_create(&thread[i], NULL, mergesort, &aIndex[i]);
    for(i = 0; i < N; i++)
        pthread_join(thread[i], NULL);
    merge(pa->low, pa->high);
    pthread_exit(NULL);
}
int main(){
    volatile int i;
    struct timeval start_time, end_time;
    srand(getpid());
    for(i=0; i<NUM_INTS; i++)
        int_list[i] = rand();
    ArrayIndex ai;
    ai.low = 0;
    ai.high = NUM_INTS/sizeof(int_list[0])-1;
    pthread_t thread;
    pthread_create(&thread, NULL, mergesort, &ai);
    pthread_join(thread, NULL);
    return 0;
}

gdb输出:

(gdb) run
Starting program: /.../sort.o 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff6fd5700 (LWP 25801)]
[Thread 0x7ffff6fd5700 (LWP 25801) exited]
Computation Time: 38006 micro-seconds.
[Inferior 1 (process 25797) exited normally]
(gdb) run
Starting program: /.../sort.o 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff6fd5700 (LWP 25804)]
[New Thread 0x7ffff67d4700 (LWP 25805)]
[New Thread 0x7ffff5fd3700 (LWP 25806)]
[New Thread 0x7ffff57d2700 (LWP 25807)]
[New Thread 0x7ffff4fd1700 (LWP 25808)]
[New Thread 0x7fffef7fe700 (LWP 25811)]
[New Thread 0x7fffeeffd700 (LWP 25810)]
...
[New Thread 0x7ffeca6ec700 (LWP 26148)]
Program received signal SIGINT, Interrupt.
[Switching to Thread 0x7ffee8728700 (LWP 26088)]
__GI___nptl_create_event () at events.c:25
25  events.c: No such file or directory.

问题是,您试图通过为每个子问题启动一个新线程来实现递归分治并行性,直到给线程一个要"排序"的数组项为止。由于多种原因,这种方法完全是错误的。若只给您一个,排序一个由100万个项组成的数组将需要在递归的叶调用处有100万个线程,而在以上所有递归级别都需要另外一百万个线程。即使引入了一些粒度(一个阈值,在该阈值之后递归变为串行),线程的总量仍然可能很大,除非阈值类似于NUM_INTS/N

即使不包括以上内容,您的实现也有一些错误:

  • 在每一级递归中,都要启动N个线程,即使工作只是一分为二。对于i>1,aIndex[i]未初始化,因此相应的线程在其输入参数中接收垃圾
  • 将指向int的指针int_list强制转换为指向ArrayIndex的指针

有几种方法可以修复设计:

  • 简单的方法是引入一个适当的阈值,在这个阈值之后递归就变成串行的,正如我上面所说的
  • 更复杂的是实现线程池和线程处理的任务池/队列,但也更通用和灵活;因此,当您将给定的数组一分为二时,您将创建两个任务来处理每一半,并将这些任务提交给线程接收工作的工作队列。请注意,为了获得良好的性能,您仍然需要设置一些粒度,以便每个任务有足够的工作量,但这将比限制线程数量所需的阈值小得多
  • 正确的方法,尤其是对于生产代码,是采用一个库或并行技术,该技术具有用于递归并行的适当基元,例如英特尔的线程构建块(tbb)或微软的并行模式库(ppl)

另请参阅一些链接(通常,谷歌搜索"并行合并排序C++")

  • 并行合并中的性能问题++
  • https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp
  • http://www.drdobbs.com/parallel/parallel-merge-sort/229400239