确定删除并发队列的安全性

Determining safety of deleting concurrent queue

本文关键字:安全性 队列 并发 删除      更新时间:2023-10-16

我正在用c++为实时系统编写一个无锁的单消费者单生产者可增长队列。内部队列可以工作,但它需要是可增长的。生产者线程是实时的,因此任何操作都需要是确定性的(因此没有等待、锁、内存分配),而消费者线程则不是。

因此,如果需要的话,消费者线程偶尔会增加队列的大小。队列的实现使得消费者端无法增长。因此,实际的队列被间接地封装在一个对象中,该对象分配调用,实际的增长是通过将对内部队列的引用交换到一个新的来实现的,同时保留旧的队列,以防生产者线程正在使用它。

问题是,然而,我无法弄清楚如何证明生产者线程何时停止使用旧队列,因此删除是安全的,而不必诉诸锁。下面是代码的伪表示:

template<typename T>
class queue
{
public:
    queue()
        : old(nullptr)
    {
        current.store(nullptr);
        grow();
    }
    bool produce(const T & data)
    {
        qimpl * q = current.load();
        return q->produce(data);
    }
    bool consume(T & data)
    {
        // the queue has grown? if so, a new and an old queue exists. consume the old firstly.
        if (old)
        {
            // here is the problem. we never really know when the producer thread stops using 
            // the old queue and starts using the new. it could be concurrently halfway-through inserting items
            // now, while the following consume call fails meanwhile.
            // thus, it is not safe yet to delete the old queue.
            // we know however, that it will take at most one call to produce() after we called grow()
            // before the producer thread starts using the new queue.
            if (old->consume(data))
            {
                return true;
            }
            else
            {
                delete old;
                old = nullptr;
            }
        }
        if (current.load()->consume(data))
        {
            return true;
        }
        return false;
    }
    // consumer only as well
    void grow()
    {
        old = current.load();
        current.store(new qimlp());
    }
private:
    class qimpl
    {
    public:
        bool produce(const T & data);
        bool consume(const T & data);
    };
    std::atomic<qimpl *> current;
    qimpl * old;
};

注意ATOMIC_POINTER_LOCK_FREE == 2是代码编译的条件。我看到的唯一可证明的条件是,如果调用grow(),下一次调用produce()将使用新的内部队列。因此,如果每次调用都增加produce中的原子计数,那么在N + 1处删除旧队列是安全的,其中N是grow()调用时的计数。然而,问题是,您需要自动交换新指针并存储计数,这似乎是不可能的。

欢迎大家有任何想法,作为参考,以下是系统的工作原理:

queue<int> q;
void consumer()
{
    while (true)
    {
        int data;
        if (q.consume(data))
        {
            // ..
        }
    }
}
void producer()
{
    while (true)
    {
        q.produce(std::rand());
    }
}
int main()
{
    std::thread p(producer); std::thread c(consumer);
    p.detach(); c.detach();
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

编辑:好了,现在问题解决了。我突然意识到,当一个项目被推到新队列中时,旧队列显然已经过时了。因此,代码片段现在看起来像这样:

bool pop(T & data)
{
    if (old)
    {
        if (old->consume(data))
        {
            return true;
        }
    }
    // note that if the old queue is empty, and the new has an enqueued element, we can conclusively
    // prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state 
    // for the producer is updated such that it uses all the new entities and will never use the old again.
    // if we successfully dequeue an element, we can delete the old (if it exists).
    if (current.load()->consume(data))
    {
        if (old)
        {
            delete old;
            old = nullptr;
        }
        return true;
    }
    return false;
}

我不完全理解grow()在你的算法中的使用,但似乎你需要某种Read-Copy-Update (RCU)机制来安全删除不需要的队列。

这篇文章描述了与Linux相关的这种机制的不同风格,但你可以谷歌RCU风格,适合其他平台。