在 C++ 中使用 pthreads 实现线程池

Pool of threads implementation using pthreads in C++

本文关键字:实现 线程 pthreads C++      更新时间:2023-10-16

我在设计具有线程池的程序时遇到麻烦。

我遇到的主要问题是,当线程完成工作时,父线程必须等待 threadId(这就是父线程使用 pthread_join 等待线程的方式)。因为我们不知道哪个线程将首先完成,所以我无法找到解决问题的编程方法。

任何带有一小段代码的解释都是值得赞赏的。

谢谢。

好吧,我不知道您的设置或要求,但是您遇到了pthread_join()只等待一个线程的问题,并且您实际上想要等待任何线程。

因此,最明显的结论是pthread_join对你没有帮助。很抱歉陈述显而易见的事情,但我需要建立我的情况:-)

相反,你可能不得不想出另一个想法。例如,您可以等待条件变量;在线程退出之前,它会将条件设置为"退出"。主线程可以等待该条件,遍历线程以找出哪些线程已终止(可以有多个),并最终重置条件。条件互斥锁通常足以防止种族。

除了设置条件之外,线程还可以将一些 ID 添加到退出线程列表中(您可以使用条件互斥锁来保护该列表),因此主线程只需遍历该列表,而不是检查每个线程。

伪代码:

initialize condition variable with status "not exited"
...
...
launch your threads
...
...
while (some threads are still running) do
   lock condition variable on "exited"
   iterate through threads, remove the ones that have exited
   unlock condition variable with new condition "not exited"

在您的线程中:

 ...
 do whatever it needs to do
 ...
 ...
 lock condition variable
 unlock condition variable with new condition "exited"
 /* end of thread */

池通常实现为在生产者-消费者队列上等待任务项的多个线程。 任务是从具有"run()"方法的 Task 类派生的对象。 无论哪个线程获得任务,它都会调用 run(),当它返回时,线程循环从队列中获取另一个任务对象。

这消除了任何线程微观管理,并在我尝试过的每种系统/语言上可靠安全地工作。

我知道的完成通知最灵活的方法是让线程在 run() 返回时调用任务的"OnComplete"事件,或者可能是虚拟的"已完成"方法,就在循环回去获取下一个任务之前,任务作为参数。例如,此方法/事件可以发出事件/condvar/sema的信号,任务发起线程正在等待该事件/condvar/sema,可以将已完成的任务排队到原始线程或其他线程,甚至只是delete()任务,(也许它的工作完全在线程池中完成)。

对于错误通知,我捕获 run() 抛出的任何未捕获的异常,并在调用完成方法/事件之前将异常存储在 tast 字段中。

除了那些保护生产者-消费者队列的锁之外,没有锁(它们只需要足够长的时间来推送/弹出 *任务)。

无论您使用哪种设计,请非常努力地不要:

1) continually create/terminate/destroy threads - avoidable overhead and tricky to manage
2) wait with Join() for any thread to terminate - just don't :)
3) loop around some 'poll thread status' to see if they're finished yet - gets it wrong
4) Move 'working/finished' threads into and out of containers with complicated locks - deadlock-in-the-making
5) use any other sort of micro-management - difficult, messy, error-prone, too many locks, unnecesary, avoidable

理想的线程池是您不知道哪个线程完成了工作的地方。 事实上,通常甚至不需要保留对线程的任何引用。 双行伪线程池:

TblockingQueue *inQueue=new TblockingQueue();
for(int i=0;i<CpoolDepth,i++) new Thread(inQueue);

如果你正在尝试实现一个线程池,看看我几年前玩过的这个架构。我在这里写了它:C++线程池

有关非阻塞pthread_join,请参阅此 SO 讨论:非阻塞pthread_join

如果你只是在等待所有的线程完成它们的工作,你可以一个接一个地等待它们:顺序无关紧要,没有理由使用条件。此示例演示:

#include <memory.h>
#include <pthread.h>
#include <iostream>
using namespace std;
#define NTHREADS 10
void *thread(void *arg) {
    int *n = (int *) arg;
    sleep(10 - *n);
    cout << "Thread " << (*n) << endl;
    delete n;
    return NULL;
}
int main(int argc, char **argv) {
    pthread_t threads[NTHREADS];
    pthread_attr_t attr;
    memset(&attr, 0, sizeof(attr));
    int i;
    for (i=0; i<NTHREADS; i++) {
        int *p = new int;
        *p = i;
        pthread_create(threads + i, &attr, thread, p);
    }
    void *rval;
    for (i=0; i<NTHREADS; i++) {
        pthread_join(threads[i], &rval);
        cout << "Joined thread " << i << endl;
    }
    return 0;
}
尽管线程的完成顺序与等待顺序

相反(即线程 0 最后完成,但我们首先等待线程 0),但主线程在所有线程完成之前不会退出。无需任何条件。