C++ 中的细粒度锁定队列

fine-grained locking queue in c++

本文关键字:锁定 队列 细粒度 C++      更新时间:2023-10-16

这是 Anthony Williams 在第 6.2.3 章中引入的细粒度锁定队列 C++ 并发在行动中。

/*
    pop only need lock head_mutex and a small section of tail_mutex,push only need
    tail_mutex mutex.maximum container concurrency.
*/
template<typename T> class threadsafe_queue
{
    private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    }
    std::mutex head_mutex;   //when change the head lock it.
    std::unique_ptr<node> head;  
    std::mutex tail_mutex;   //when change the tail lock it.
    node* tail;
    std::condition_variable data_cond;
    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    public:
    /* 
        create a dummy node
    */
    threadsafe_queue():
        head(new node),tail(head.get())
    {}
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> head_lock;
        data_cond.wait(head_lock,[&]{return head.get()!=get_tail();}); //#1
        std::unique_ptr<node> old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }
    void push(T new_value)
    {
        std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data=new_data;
            node* const new_tail=p.get();
            tail->next=std::move(p);
            tail=new_tail;
        }
        data_cond.notify_one();
    }
}

情况是这样的:有两个线程(thread1thread2 (。 thread1正在做wait_and_popthread2正在做push。队列为空。

thread1在#2中,在data_cond.wait()之前已经检查过head.get()!=get_tail()。此时它的 CPU 周期已经用完了。 thread2开始了。

thread2完成了push功能并做了data_cond.notify_one()thread1又开始了。

现在thread1开始data_cond.wait(),但它永远等待。

这种情况可能发生吗?如果是这样,如何修复此容器?

是的

,OP 中描述的情况是可能的,会导致通知丢失。在谓词函数中注入一个很好的大时间延迟使其易于触发。这是科利鲁的演示。请注意程序需要 10 秒才能完成(超时长度到 wait_for (,而不是 100 毫秒(生成者在队列中插入项目的时间(。通知丢失。

条件

变量的设计中隐含着一个假设,即在关联的互斥锁被锁定时,条件的状态(谓词的返回值(不能更改。对于此队列实现,情况并非如此,因为push可以在不持有head_mutex的情况下更改队列的"空"。

§30.5p3 指定wait有三个原子部分:

  1. 释放互斥锁,并进入等待状态;
  2. 解除等待的阻塞;和
  3. 重新获取锁。

请注意,这些都没有提到谓词的检查,如果有的话传递给wait。带有谓词的wait的行为在 §30.5.1p15 中描述:

影响:

而 (!pred(((      等待(锁定(;

请注意,这里也不能保证谓词检查和wait是以原子方式执行的。有一个前提条件,即lock被锁定,并且它与调用线程持有的关联互斥锁。

至于修复容器以避免丢失通知,我会将其更改为单个互斥实现并完成它。当pushpop最终都采用相同的互斥锁(tail_mutex(时,将其称为细粒度锁定有点牵强。

data_cond.wait()每次唤醒时都会检查状况。 因此,即使它可能已经被检查过,也会在data_cond.notify_one()后再次检查。 此时,有数据要弹出(因为线程 2 刚刚完成了推送(,因此它会返回。 在此处阅读更多内容。

您唯一应该担心的是,当您在空队列上调用wait_and_pop,然后再也不将任何数据推送到该队列上时。 此代码没有用于超时等待并返回错误(或引发异常(的机制。