具有pthread实现的并发队列

Concurrent queue with pthread implementation

本文关键字:并发 队列 实现 pthread 具有      更新时间:2023-10-16

我正在编写一个使用pthread实现的多线程队列。由于我根据互联网上的几个教程提出了代码,我想确保我的代码中没有逻辑错误:

template <typename T>
class ThreadQueue {
public:
    ThreadQueue() {
        pthread_mutex_init(&m_qmtx, NULL);
        pthread_cond_init(&m_condv, NULL);
    }
    ~ThreadQueue() {
        pthread_mutex_lock(&m_qmtx);
        m_queue.clear();
        pthread_mutex_unlock(&m_qmtx);
    }
    void push(T t_data) {
        pthread_mutex_lock(&m_qmtx);
        m_queue.push_back(t_data);
        pthread_mutex_unlock(&m_qmtx);
        pthread_cond_signal(&m_condv);
    }
    T front() {
        T ret;
        pthread_mutex_lock(&m_qmtx);
        while (m_queue.empty()) {
            pthread_cond_wait(&m_condv, &m_qmtx);
        }
        ret = m_queue.front();
        pthread_mutex_unlock(&m_qmtx);
        return ret;
    }
    void pop() {
        pthread_mutex_lock(&m_qmtx);
        if (!m_queue.empty())
            m_queue.pop_front();
        pthread_mutex_unlock(&m_qmtx);
    }
private:
    std::deque<T> m_queue;
    pthread_mutex_t m_qmtx;
    pthread_cond_t m_condv;
};

我可以看到你的代码的大问题是,它不是异常安全。只要你的deque操作不抛出,这不是一个问题,但如果(或者更确切地说,这只是时间问题)他们会抛出,那么你的互斥锁将保持锁定,然后你将被卡住。

的例子:

void push(T t_data) {
    pthread_mutex_lock(&m_qmtx);
    m_queue.push_back(t_data);
    pthread_mutex_unlock(&m_qmtx);
    pthread_cond_signal(&m_condv);
}

这里,push_back可能因为各种原因抛出(内存不足,T的复制构造函数抛出,…),如果发生了,pthread_mutex_unlock永远不会被调用。

使代码异常安全的最佳解决方案是为pthread_mutex_(un)lock编写RAII包装器。比如:

class MutexLock {
public:
    MutexLock(pthread_mutex_t& mutex)
        : m_mutex(mutex)
    {
        if (pthread_mutex_lock(m_mutex))
            throw std::runtime_error("Could not lock the mutex.");
    }
    ~MutexLock() { pthread_mutex_unlock(m_mutex); }
private:
    pthread_mutex_t& m_mutex;
}

然后你可以重写你的push函数(和其他)像这样:

void push(T t_data) {
    {
        MutexLock lock(m_qmtx);
        m_queue.push_back(t_data);
    } // note: braces to enforce *lock* scope, for identical results to your code
    pthread_cond_signal(&m_condv);
}

注意:如您所见,我还在包装器的构造函数中为pthread_mutex_lock添加了错误处理(当前代码中的另一个问题:当函数返回错误代码时,您需要来处理它!)。在析构函数中,这并不重要,因为(a)如果析构函数运行,则意味着包装器构造成功,因此锁可以保存互斥锁(并且您可以安全地解锁它),并且(b)析构函数不应该抛出,因此即使解锁失败,您也无能为力。

更多阅读:有关异常安全(这是c++中的一个基本概念)的更多信息,请参阅优秀的Herb Sutter的"每周大师"系列,他有许多关于异常安全的文章(具体来说,第8、21、56、59、60、61、65期,可能还有其他我错过的)。您可能还想了解RAII。

另外:c++ 11

正如@qdii所提到的,如果你可以使用c++ 11,那么你可能会有兴趣用新的标准等效(std::thread, std::mutex, std::condition_variable,…)替换所有pthreads的东西,这至少有两个优点:(a)与pthreads不同,它们是可移植的,并且(b)您不必为正确实现异常安全而操心太多,因为STL会照顾它的大部分(但您仍然必须使用正确的习惯用法,例如std::unique_lock用于保存互斥锁——它相当于我蹩脚的MutexLock RAII包装器,除了标准的包装器实际上是经过深思熟虑的)。