带有共享缓冲区的两个等待线程(生产者/消费者)

Two waiting threads (producer/consumer) with a shared buffer

本文关键字:两个 等待 生产者 消费者 线程 共享 缓冲区      更新时间:2023-10-16

我试图拥有一堆生产者线程,直到启用缓冲区有一个物品的空间没有更多的空间。

同时,应该有一堆消费者线程等待,直到缓冲区中有一些东西,然后将其从缓冲区中取出东西,如果它为空,则可以回去睡觉。

在伪代码中,这是IAM在做的,但是所有IAM都陷入僵局。

condition_variable cvAdd;
condition_variable cvTake;
mutex smtx;
ProducerThread(){
    while(has something to produce){
         unique_lock<mutex> lock(smtx);
         while(buffer is full){
            cvAdd.wait(lock);
         }
         AddStuffToBuffer();
         cvTake.notify_one();
    }
}
ConsumerThread(){
     while(should be taking data){
        unique_lock<mutex> lock(smtx);
        while( buffer is empty ){
            cvTake.wait(lock);
        }   
        TakeStuffFromBuffer();
        if(BufferIsEmpty)
        cvAdd.notify_one();
     }
}

值得一提的另一个错误是,您的消费者只有在缓冲区变为空的时才通知等待生产者。

只有在队列已满时,通知消费者的最佳方法才是。

例如:

template<class T, size_t MaxQueueSize>
class Queue
{
    std::condition_variable consumer_, producer_;
    std::mutex mutex_;
    using unique_lock = std::unique_lock<std::mutex>;
    std::queue<T> queue_;
public:
    template<class U>
    void push_back(U&& item) {
        unique_lock lock(mutex_);
        while(MaxQueueSize == queue_.size())
            producer_.wait(lock);
        queue_.push(std::forward<U>(item));
        consumer_.notify_one();
    }
    T pop_front() {
        unique_lock lock(mutex_);
        while(queue_.empty())
            consumer_.wait(lock);
        auto full = MaxQueueSize == queue_.size();
        auto item = queue_.front();
        queue_.pop();
        if(full)
            producer_.notify_all();
        return item;
    }
};

您的生产者和消费者都试图锁定静音,但两种线程都不会解锁哑光。这意味着获取锁的第一个线程将其固定,而另一个线程永远不会运行。

考虑将静音锁定呼叫移动直到每个线程执行操作之前,然后在每个线程执行其操作后解锁(AddStuffTobuffer((或takeStuffFrombuffer(((。

#include "conio.h" #include <thread> #include <mutex> #include <queue> #include <chrono> #include <iostream> #include <condition_variable> using namespace std; mutex smtx; condition_variable cvAdd; bool running ; queue<int> buffer; void ProducerThread(){ static int data = 0; while(running){ unique_lock<mutex> lock(smtx); if( !running) return; buffer.push(data++); lock.unlock(); cvAdd.notify_one(); this_thread::sleep_for(chrono::milliseconds(300)); } } void ConsumerThread(){ while(running){ unique_lock<mutex> lock(smtx); cvAdd.wait(lock,[](){ return !running || !buffer.empty(); }); if( !running) return; while( !buffer.empty() ) { auto data = buffer.front(); buffer.pop(); cout << data <<" n"; this_thread::sleep_for(chrono::milliseconds(300)); } } } int main() { running = true; thread producer = thread([](){ ProducerThread(); }); thread consumer = thread([](){ ConsumerThread(); }); while(!getch()) { } running = false; producer.join(); consumer.join(); }

我以前已经回答了这个问题,但是我目前对mutexlock_guard等的基本机制和行为有所了解,因为我一直在观看有关该主题的一些视频,以及我目前正在观看的视频实际上与locking相反,因为该视频显示了如何实现使用圆形缓冲区或环形缓冲区,两个指针的LockFreeQueue,并且使用了atomic而不是mutex。现在,对于您当前的情况,atomicLockFreeQueue将无法回答您的问题,但是我从该视频中获得的是循环缓冲区的想法。

由于您的两个生产者/消费者线程都将共享同一内存池。如果您具有生产者的1到1比例 - 消费者线程很容易跟踪每个数端或每个指针中的每个索引。但是,当您有很多事情要花很多时间时,事情确实会变得有些复杂。

可以做的一件事是,如果将缓冲区的大小限制为n个对象,则实际上可能希望将其创建为n 1。一个额外的空白空间,这将有助于减轻多个生产商和消费者共享的环缓冲区结构中的某些复杂性。


以下面的插图:

p =生产者的索引,c =消费者的索引,n代表[]索引空间的数量。n = 5。

一对一

 p                N = 5
[ ][ ][ ][ ][ ]
 c

这里p和c == 0。这表示缓冲区为空。假设生产商在C收到任何东西之前填充缓冲液

             p    N = 5
[x][x][x][x][x]
 c

在这种情况下,缓冲区已满,P必须等待空白空间。C现在能够获取。

             p     N = 5
[ ][x][x][x][x]
    c         

在这里c在[0]处获得c,并将其索引提高到1。P现在能够绕圈缓冲区圆圈。

这很容易通过单个P&amp跟踪。C。现在,让我们与多个消费者和一个生产者一起探索

一对一

 p                 N = 5
[ ][ ][ ][ ][ ]
c1
c2

在这里p index = 0,c1&amp;C2索引= 0,环缓冲区为空

             p     N = 5
[x][x][x][x][x]
c1
c2

现在P必须等待C1或C2在[0]上以[0]的方式获取

             p     N = 5
[ ][ ][x][x][x]
    c1 c2

在这里,C1或C2是否获得[0]或1,但两者都成功地获取了一个项目并不明显。两者都增加了索引计数器。上面似乎表明C1从[0]增加到1。然后[0]的C2也必须增加索引计数器,但它已经从0更改为1,因此C2将其增加到2。

>

如果我们假设p == 0 && c1 || c2 == 0是缓冲区为空时,这里存在死锁的情况。在这里查看这种情况。

 p               N = 5  // P hasn't written yet but has advanced 
[ ][ ][ ][ ][x]  // 1 Item is left
           c1  // Both c1 & c2 have index to same item.
           c2  // c1 acquires it and so does c2 but one of them finishes first and then increments the counter. Now the buffer is empty and looks like this:
 p                N = 5
[ ][ ][ ][ ][ ]
c1          c2    // p index = 0 and c1 = 0 represents empty buffer.
                  // c2 is trying to read [4]

这可能会导致死锁。

许多到一到

 p1
 p2                N = 5
[ ][ ][ ][ ][ ]
 c1

在这里,您有多个生产商可以写入一个消费者的缓冲区。如果他们交织在一起:

p1 writes to [0] increments counter
p2 writes to [0] increments counter
   p1 p2
[x][ ][ ][ ][ ]
c1

这将导致缓冲区中的空白空间。生产者彼此干扰。您需要在这里相互排除。

对许多人的想法;您需要考虑并结合以上一对一和多一到一的功能。您将需要静音的消费者和生产者的静音,试图将同一静音的两者用于两者都会给您带来不可预见的僵局的问题。您必须确保检查所有案例,并在适当的时间 - 地方锁定它们。也许这几个视频将帮助您更多地了解。

  • 过程,同步&amp;僵局
  • LockFreequeue

伪代码:可能看起来像这样:

condition_variable cvAdd;
condition_variable cvTake;
mutex consumerMutex;
mutex producerMutex;
ProducerThread(){
    while( has something to produce ) {    
         unique_lock<mutex> lock(producerMutex);
         while(buffer is full){
            cvAdd.wait(lock);
         }
         AddStuffToBuffer();
         cvTake.notify_one();
    }
}
ConsumerThread() {    
     while( should be taking data ) {    
        unique_lock<mutex> lock(consumerMutex);
        while( buffer is empty ){
            cvTake.wait(lock);
        }   
        TakeStuffFromBuffer();
        if(BufferIsEmpty)
        cvAdd.notify_one();
     }    
}

这里唯一的区别是使用了2个独家静音者,而不是生产者和消费者试图使用相同的静音者。这是共享的记忆;但是您不想将计数器或指针分享到两者之间的内存池中。多个生产商可以使用同一互惠码,可以使用同一互联克,但是让消费者和生产商都使用同一静音可能是您的基本问题。