c++ 11无锁单生产者单消费者:如何避免忙碌等待

C++11 lockfree single producer single consumer: how to avoid busy wait

本文关键字:何避免 等待 消费者 生产者 c++      更新时间:2023-10-16

我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者。当前实现不使用锁:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;
class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }
    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }
    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }
private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

应用程序需要将工作项排队一段时间,然后休眠等待事件。这是一个模拟行为的最小main:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

我很确定我的实现是有bug的:如果工作线程完成并在执行working_ = false之前,另一个enqueue来了呢?是否有可能使我的代码线程安全不使用锁?

解决方案要求:

  • 快速排队
  • 即使队列不为空,析构函数也必须退出
  • 没有忙等待,因为工作线程有很长一段时间是空闲的
  • 如果可能的话没有锁
<标题>编辑

根据您的建议,我做了另一个Worker类的实现。这是我的第二次尝试:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 
    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }
    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }
    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }
private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

我在enqueue方法中引入了worker_.join()。这可能会影响性能,但在极少数情况下(当队列为空并且线程退出之前,会出现另一个enqueue)。working_变量现在是atomic_flag,在enqueue中设置,在work中清除。在working_.clear()之后需要额外的while,因为如果在clear之前,但在while之后推送另一个值,则不处理该值。

这个实现正确吗?

我做了一些测试,实现似乎可以工作。

OT:这是一个编辑,还是一个回答?

如果工作线程完成并且在执行working_ = false之前,另一个队列来了怎么办?

则该值将被推入队列,但不会被处理,直到另一个值在设置标志后加入队列。您(或您的用户)可以决定这是否可以接受。可以使用锁来避免这种情况,但是它们违背了您的要求。

如果正在运行的线程即将完成并且设置了working_ = false;,但是在next值进入队列之前还没有停止运行,代码可能会失败。在这种情况下,你的代码将在运行的线程上调用operator=,根据链接的文档调用std::terminate

在将worker分配给新线程之前添加worker_.join()应该可以防止这种情况。

另一个问题是,如果队列已满,queue_.push可能会失败,因为它的大小是固定的。目前,您只是忽略了这种情况,并且该值不会被添加到完整队列中。如果等待队列有空间,则无法获得快速队列(在边缘情况下)。您可以接受push返回的bool值(告诉它是否成功),并从enqueue返回它。这样调用者就可以决定是等待还是丢弃该值。

或者使用非固定大小的队列。Boost对这个选择是这样说的:

可以用来在push期间完全禁用动态内存分配,以确保无锁行为。如果数据结构配置为固定大小,则内部节点存储在数组中并对其进行寻址通过数组索引。这将队列的可能大小限制为索引可以寻址的元素数量类型(通常是2**16-2),但在缺乏双宽度比较和交换指令的平台上,这是最好的方法实现锁自由。

你的工作线程需要超过2个状态。

  • 未运行
  • <
  • 做任务/gh>
  • 空闲关闭
  • 关闭

如果强制关机,它会跳过空闲关机。如果您的任务用完,它将转换为空闲关机。在空闲关机时,它清空任务队列,然后进入关机状态。

设置关机,然后离开工作任务的末端。

生产者首先把东西放到队列中。然后检查工作器状态。如果Shutdown或Idle Shutdown,首先join它(并将其转换为未运行),然后启动一个新的worker。如果没有运行,就启动一个新的worker。

如果生产者想要启动一个新的工作者,它首先要确保我们处于未运行状态(否则,逻辑错误)。然后切换到Doing tasks状态,然后启动工作线程。

如果生产者想要关闭helper任务,它会设置done标志。然后检查工作状态。如果它不是在运行,它就加入它。

这会导致一个工作线程无缘无故地启动。

在少数情况下,上面的语句可以阻塞,但在此之前也有一些情况。

然后,我们写一个正式或半正式的证明,证明上面的代码不会丢失消息,因为当你写无锁的代码时,直到你有一个证明,你才完成。

这是我对这个问题的解决方案。我不太喜欢回答自己,但我认为展示实际的代码可能会帮助别人。

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"
using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;
class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 
    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }
    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }
    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }
private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};

我尝试了@Cameron的建议,不要关闭线程并添加一个信号量。这实际上只在第一个enqueue和最后一个work中使用。这不是无锁的,但只有在这两种情况下。

我做了一些性能比较,在我以前的版本(见我编辑的问题),这一个。没有明显的差别,开始和停止的时候也没有很多。然而,当enqueue必须对工作线程进行signal而不是启动新线程时,它的速度要快10倍。这是一种罕见的情况,所以它不是很重要,但无论如何这是一个改进。

这个实现满足:

  • 一般情况下无锁(enqueuework忙时);
  • 请勿忙等待,以防长时间没有enqueue
  • 析构函数尽快退出
  • 正确性?:)

非常片面的回答:我认为所有这些原子、信号量和状态都是一个反向通信通道,从"线程"到"Worker"。为什么不使用另一个队列呢?