使用pthreads实现线程池

Thread pool implementation using pthreads

本文关键字:线程 实现 pthreads 使用      更新时间:2023-10-16

我试图使用pthreads了解以下线程池的实现。当我在主机中评论for循环时,该程序卡住了,在放置日志后,它似乎陷入了螺旋池destructor中的JOIN函数。

我无法理解为什么会发生这种情况,会发生任何僵局吗?

这可能是天真的,但有人可以帮助我理解为什么会发生这种情况以及如何纠正这种情况。

非常感谢!

#include <stdio.h>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <malloc.h>
#include <stdlib.h>
// Base class for Tasks
// run() should be overloaded and expensive calculations done there
// showTask() is for debugging and can be deleted if not used
class Task {
public:
    Task() {}
    virtual ~Task() {}
    virtual void run()=0;
    virtual void showTask()=0;
};
// Wrapper around std::queue with some mutex protection
class WorkQueue {
public:
WorkQueue() {
    // Initialize the mutex protecting the queue
    pthread_mutex_init(&qmtx,0);
    // wcond is a condition variable that's signaled
    // when new work arrives
    pthread_cond_init(&wcond, 0);
}
~WorkQueue() {
    // Cleanup pthreads
    pthread_mutex_destroy(&qmtx);
    pthread_cond_destroy(&wcond);
}
// Retrieves the next task from the queue
Task *nextTask() {
    // The return value
    Task *nt = 0;
    // Lock the queue mutex
    pthread_mutex_lock(&qmtx);
    // Check if there's work
    if (finished && tasks.size() == 0) {
        // If not return null (0)
        nt = 0;
    } else {
        // Not finished, but there are no tasks, so wait for
        // wcond to be signalled
        if (tasks.size()==0) {
            pthread_cond_wait(&wcond, &qmtx);
        }
        // get the next task
        nt = tasks.front();
        if(nt){
        tasks.pop();
    }
        // For debugging
        if (nt) nt->showTask();
    }
    // Unlock the mutex and return
    pthread_mutex_unlock(&qmtx);
    return nt;
}
// Add a task
void addTask(Task *nt) {
    // Only add the task if the queue isn't marked finished
    if (!finished) {
        // Lock the queue
        pthread_mutex_lock(&qmtx);
        // Add the task
        tasks.push(nt);
        // signal there's new work
        pthread_cond_signal(&wcond);
        // Unlock the mutex
        pthread_mutex_unlock(&qmtx);
    }
}
// Mark the queue finished
void finish() {
    pthread_mutex_lock(&qmtx);
    finished = true;
    // Signal the condition variable in case any threads are waiting
    pthread_cond_signal(&wcond);
    pthread_mutex_unlock(&qmtx);
}
// Check if there's work
bool hasWork() {
//printf("task queue size is %dn",tasks.size());
    return (tasks.size()>0);
}
private:
std::queue<Task*> tasks;
bool finished;
pthread_mutex_t qmtx;
pthread_cond_t wcond;
};
// Function that retrieves a task from a queue, runs it and deletes it
void *getWork(void* param) {
Task *mw = 0;
WorkQueue *wq = (WorkQueue*)param;
while (mw = wq->nextTask()) {
    mw->run();
    delete mw;
}
pthread_exit(NULL);
}
class ThreadPool {
public:
// Allocate a thread pool and set them to work trying to get tasks
ThreadPool(int n) : _numThreads(n) {
int rc;
    printf("Creating a thread pool with %d threadsn", n);
    threads = new pthread_t[n];
    for (int i=0; i< n; ++i) {
        rc = pthread_create(&(threads[i]), 0, getWork, &workQueue);
    if (rc){
     printf("ERROR; return code from pthread_create() is %dn", rc);
     exit(-1);
        }
    }
}
// Wait for the threads to finish, then delete them
~ThreadPool() {
    workQueue.finish();
    //waitForCompletion();
    for (int i=0; i<_numThreads; ++i) {
        pthread_join(threads[i], 0);
    }
    delete [] threads;
}
// Add a task
void addTask(Task *nt) {
    workQueue.addTask(nt);
}
// Tell the tasks to finish and return
void finish() {
    workQueue.finish();
}
// Checks if there is work to do
bool hasWork() {
    return workQueue.hasWork();
}
private:
pthread_t * threads;
int _numThreads;
WorkQueue workQueue;
};
// stdout is a shared resource, so protected it with a mutex
static pthread_mutex_t console_mutex = PTHREAD_MUTEX_INITIALIZER;
// Debugging function
void showTask(int n) {
pthread_mutex_lock(&console_mutex);
pthread_mutex_unlock(&console_mutex);
}
// Task to compute fibonacci numbers
// It's more efficient to use an iterative algorithm, but
// the recursive algorithm takes longer and is more interesting
// than sleeping for X seconds to show parrallelism
class FibTask : public Task {
public:
FibTask(int n) : Task(), _n(n) {}
~FibTask() {
    // Debug prints
    pthread_mutex_lock(&console_mutex);
    printf("tid(%d) - fibd(%d) being deletedn", pthread_self(), _n);
    pthread_mutex_unlock(&console_mutex);        
}
virtual void run() {
    // Note: it's important that this isn't contained in the console mutex lock
    long long val = innerFib(_n);
    // Show results
    pthread_mutex_lock(&console_mutex);
    printf("Fibd %d = %lldn",_n, val);
    pthread_mutex_unlock(&console_mutex);

    // The following won't work in parrallel:
    //   pthread_mutex_lock(&console_mutex);
    //   printf("Fibd %d = %lldn",_n, innerFib(_n));
    //   pthread_mutex_unlock(&console_mutex);
}
virtual void showTask() {
    // More debug printing
    pthread_mutex_lock(&console_mutex);
    printf("thread %d computing fibonacci %dn", pthread_self(), _n);
    pthread_mutex_unlock(&console_mutex);        
}
private:
// Slow computation of fibonacci sequence
// To make things interesting, and perhaps imporove load balancing, these
// inner computations could be added to the task queue
// Ideally set a lower limit on when that's done
// (i.e. don't create a task for fib(2)) because thread overhead makes it
// not worth it
long long innerFib(long long n) {
    if (n<=1) { return 1; }
    return innerFib(n-1) + innerFib(n-2);
}
long long _n;
};
int main(int argc, char *argv[]) 
{
// Create a thread pool
ThreadPool *tp = new ThreadPool(10);
// Create work for it
/*for (int i=0;i<100; ++i) {
    int rv = rand() % 40 + 1;
    showTask(rv);
    tp->addTask(new FibTask(rv));
}*/
delete tp;
printf("nnnnnDone with all work!n");
}

设计或多或少是正常的,但是实现它包含了几件事,这些内容有些过于复杂,并且可能引入不稳定性。我想当您发表评论for循环时,您会进行僵局,因为您应该在WorkQueue::finish()方法中使用pthread_cond_broadcast而不是pthread_cond_signal

注意:我通常通过将num_threads数量的空项目数量纳入工作时间来实现ThreadPool终止,并且我设置了一个finished标志,只是能够在我的addTask()方法中检查某些东西,因为finish()之后,我通常不让添加新任务和新任务和我从addTask()返回false或有时我断言。

另一个注意事项:最好将线程封装成类,这具有多种好处,并使多个平台的效果更加容易。

我还没有执行您的程序,也可能还有其他错误,只需浏览您的代码。

编辑:这是一个重新设计的版本,我对您的代码发布了一些修改,但我不保证它有效。手指交叉...: - )

#include <stdio.h>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <malloc.h>
#include <stdlib.h>
#include <assert.h>
// Reusable thread class
class Thread
{
public:
    Thread()
    {
        state = EState_None;
        handle = 0;
    }
    virtual ~Thread()
    {
        assert(state != EState_Started);
    }
    void start()
    {
        assert(state == EState_None);
        // in case of thread create error I usually FatalExit...
        if (pthread_create(&handle, NULL, threadProc, this))
            abort();
        state = EState_Started;
    }
    void join()
    {
        // A started thread must be joined exactly once!
        // This requirement could be eliminated with an alternative implementation but it isn't needed.
        assert(state == EState_Started);
        pthread_join(handle, NULL);
        state = EState_Joined;
    }
protected:
    virtual void run() = 0;
private:
    static void* threadProc(void* param)
    {
        Thread* thread = reinterpret_cast<Thread*>(param);
        thread->run();
        return NULL;
    }
private:
    enum EState
    {
        EState_None,
        EState_Started,
        EState_Joined
    };
    EState state;
    pthread_t handle;
};

// Base task for Tasks
// run() should be overloaded and expensive calculations done there
// showTask() is for debugging and can be deleted if not used
class Task {
public:
    Task() {}
    virtual ~Task() {}
    virtual void run()=0;
    virtual void showTask()=0;
};

// Wrapper around std::queue with some mutex protection
class WorkQueue
{
public:
    WorkQueue() {
        pthread_mutex_init(&qmtx,0);
        // wcond is a condition variable that's signaled
        // when new work arrives
        pthread_cond_init(&wcond, 0);
    }
    ~WorkQueue() {
        // Cleanup pthreads
        pthread_mutex_destroy(&qmtx);
        pthread_cond_destroy(&wcond);
    }
    // Retrieves the next task from the queue
    Task *nextTask() {
        // The return value
        Task *nt = 0;
        // Lock the queue mutex
        pthread_mutex_lock(&qmtx);
        while (tasks.empty())
            pthread_cond_wait(&wcond, &qmtx);
        nt = tasks.front();
        tasks.pop();
        // Unlock the mutex and return
        pthread_mutex_unlock(&qmtx);
        return nt;
    }
    // Add a task
    void addTask(Task *nt) {
        // Lock the queue
        pthread_mutex_lock(&qmtx);
        // Add the task
        tasks.push(nt);
        // signal there's new work
        pthread_cond_signal(&wcond);
        // Unlock the mutex
        pthread_mutex_unlock(&qmtx);
    }
private:
    std::queue<Task*> tasks;
    pthread_mutex_t qmtx;
    pthread_cond_t wcond;
};
// Thanks to the reusable thread class implementing threads is
// simple and free of pthread api usage.
class PoolWorkerThread : public Thread
{
public:
    PoolWorkerThread(WorkQueue& _work_queue) : work_queue(_work_queue) {}
protected:
    virtual void run()
    {
        while (Task* task = work_queue.nextTask())
            task->run();
    }
private:
    WorkQueue& work_queue;
};
class ThreadPool {
public:
    // Allocate a thread pool and set them to work trying to get tasks
    ThreadPool(int n) {
        printf("Creating a thread pool with %d threadsn", n);
        for (int i=0; i<n; ++i)
        {
            threads.push_back(new PoolWorkerThread(workQueue));
            threads.back()->start();
        }
    }
    // Wait for the threads to finish, then delete them
    ~ThreadPool() {
        finish();
    }
    // Add a task
    void addTask(Task *nt) {
        workQueue.addTask(nt);
    }
    // Asking the threads to finish, waiting for the task
    // queue to be consumed and then returning.
    void finish() {
        for (size_t i=0,e=threads.size(); i<e; ++i)
            workQueue.addTask(NULL);
        for (size_t i=0,e=threads.size(); i<e; ++i)
        {
            threads[i]->join();
            delete threads[i];
        }
        threads.clear();
    }
private:
    std::vector<PoolWorkerThread*> threads;
    WorkQueue workQueue;
};
// stdout is a shared resource, so protected it with a mutex
static pthread_mutex_t console_mutex = PTHREAD_MUTEX_INITIALIZER;
// Debugging function
void showTask(int n) {
    pthread_mutex_lock(&console_mutex);
    pthread_mutex_unlock(&console_mutex);
}
// Task to compute fibonacci numbers
// It's more efficient to use an iterative algorithm, but
// the recursive algorithm takes longer and is more interesting
// than sleeping for X seconds to show parrallelism
class FibTask : public Task {
public:
    FibTask(int n) : Task(), _n(n) {}
    ~FibTask() {
        // Debug prints
        pthread_mutex_lock(&console_mutex);
        printf("tid(%d) - fibd(%d) being deletedn", (int)pthread_self(), (int)_n);
        pthread_mutex_unlock(&console_mutex);        
    }
    virtual void run() {
        // Note: it's important that this isn't contained in the console mutex lock
        long long val = innerFib(_n);
        // Show results
        pthread_mutex_lock(&console_mutex);
        printf("Fibd %d = %lldn",(int)_n, val);
        pthread_mutex_unlock(&console_mutex);

        // The following won't work in parrallel:
        //   pthread_mutex_lock(&console_mutex);
        //   printf("Fibd %d = %lldn",_n, innerFib(_n));
        //   pthread_mutex_unlock(&console_mutex);
        // this thread pool implementation doesn't delete
        // the tasks so we perform the cleanup here
        delete this;
    }
    virtual void showTask() {
        // More debug printing
        pthread_mutex_lock(&console_mutex);
        printf("thread %d computing fibonacci %dn", (int)pthread_self(), (int)_n);
        pthread_mutex_unlock(&console_mutex);        
    }
private:
    // Slow computation of fibonacci sequence
    // To make things interesting, and perhaps imporove load balancing, these
    // inner computations could be added to the task queue
    // Ideally set a lower limit on when that's done
    // (i.e. don't create a task for fib(2)) because thread overhead makes it
    // not worth it
    long long innerFib(long long n) {
        if (n<=1) { return 1; }
        return innerFib(n-1) + innerFib(n-2);
    }
    long long _n;
};
int main(int argc, char *argv[]) 
{
    // Create a thread pool
    ThreadPool *tp = new ThreadPool(10);
    // Create work for it
    for (int i=0;i<100; ++i) {
        int rv = rand() % 40 + 1;
        showTask(rv);
        tp->addTask(new FibTask(rv));
    }
    delete tp;
    printf("nnnnnDone with all work!n");
}

我认为你在那里有比赛状态...

当您删除for循环时,池创建后立即破坏池,因此没有时间开始等待队列。尝试在那里睡觉,您会看到的。

我实施了一个ThreadPool库,该库在我们所有的服务中广泛使用,因此有一些建议:

  • 您正在使用C ,因此无需使用pthreads,只需使用boost或std:线程:如果可用
  • 不要发出信号,而是推开空任务(当然,推动任务需要发出信号)
  • 使用boost ::函数或std:功能而不是基类
  • 应对虚假的唤醒(您的代码似乎无法处理它们)
  • pthread_cond_signal仅醒来一个线程,如果要通知所有线程,则必须使用pthread_cond_broadcast