不同的互斥锁用于push和pop

different mutex for push and pop

本文关键字:用于 push pop      更新时间:2023-10-16

我有一个名为'subscribedQueue'的类。该类通过其订阅的发布者(复数)调用其推送方法接收数据。

在另一个线程中,调用该类的pop方法来接收该数据。因此,在某种意义上,该类是多个发布者与其订阅者之间的一种缓冲区。对于实现,我基于自己在这里找到的关于线程安全队列的信息。

现在我的问题是双重的:

  • 如果我使用相同的互斥锁来暂停和弹出值(目前我使用两个不同的互斥锁),是否有可能让我的程序卡住,等待阻塞的推送?
  • 如果不是,怎么可能push和pop方法都可以通过lock(the_same_mutex)。

我的假设是,如果我使用相同的互斥锁,并且程序进入pop方法,它将在pop中获取锁,检查队列是否为空,并等待在push方法中永远无法设置的条件变量(因为锁已经被pop获取)。

当前代码(使用两个不同的互斥体):

#include <boost/thread.hpp>
#include <queue>
#include "subscriber.h"
#include "pubdata.h"
#ifdef DEBUG
#include <iostream>
#include <boost/lexical_cast.hpp>
#endif
namespace PUBLISHSUBSCRIBE
{
  template<class T>
  class SubscribedQueue: public PUBLISHSUBSCRIBE::Subscriber<T>, private std::queue< PubData<T> >
  {
  public:
    PubData<T>  pop();   //removes the next item from the queue, blocks until the queue is not empty
    void push(const PubData<T> data); //method used by the publisher to push data onto the queue
  private:
    mutable boost::mutex writeMutex_; //only needed for publishing/pushing data
    mutable boost::mutex readMutex_;  //only needed for reading/popping data
    boost::condition_variable notify_;
  };
  template<class T>
  PubData<T> SubscribedQueue<T>::pop() { //Blocks until the queue is not empty
    boost::mutex::scoped_lock lock(readMutex_);
    while(std::queue< PubData<T> >::empty())
      notify_.wait(lock); //block until recieving a notification AND the queue is not empty
    PubData<T> head = std::queue< PubData<T> >::front();
    std::queue< PubData<T> >::pop();
#ifdef DEBUG
    std::string debugOut("pop: " + boost::lexical_cast<std::string>(head) + " - timestamp: " + boost::lexical_cast<std::string>(head.timestamp()) + " - from: " + boost::lexical_cast<std::string>(this) + "n" );
    std::cout <<debugOut;
#endif
    lock.unlock();
    return head;
  }
  template<class T>
  void SubscribedQueue<T>::push(const PubData<T> data){
    boost::mutex::scoped_lock lock(writeMutex_);
#ifdef DEBUG
    std::cout << "published: " << data << std::endl;
#endif
    std::queue< PubData<T> >::push(data);
    lock.unlock();
    notify_.notify_one();
  }
}
#endif //SUBSCRIBEDQUEUE_H

[edit]最让我担心的是:我有一个boost::condition_variable notify_在pop中执行'等待直到通知'。但是pop必须首先锁定互斥锁,这个互斥锁也必须在push中锁定,以便通知条件变量。

不会导致死锁,为什么不呢?

标准库容器不是线程安全的;如果你试图同时从多个线程修改一个容器,那么就会发生不好的事情

如果你有一个单独的互斥锁用于push和pop操作,那么你就不能防止两个线程同时进行push和pop操作,所以你根本没有真正保护集合。

我的假设是,如果我使用相同的互斥锁,并且程序进入pop方法,它将在pop中获取锁,检查队列是否为空,并等待在push方法中永远无法设置的条件变量(因为锁已经被pop获取)。

当您等待pop中的条件变量时,wait() 将解锁互斥锁,因此在等待时push()将能够锁定它。Push()调用notify_one()并通过在函数结束时scoped_lock超出作用域来解锁互斥锁。然后,当pop()线程下一次调度时,它将立即重新锁定互斥锁并继续执行。

是的,你必须使用单个互斥锁,否则说push已经为一个值腾出了空间并增加了大小,但没有完成将该值复制到适当的位置…读取器可以读取任何形式的垃圾。

您不必担心读写器死锁——条件变量被设计为仲裁这种情况,允许推送线程操作,而弹出线程等待通知。

根据定义,"pop"是从列表中删除项的行为。因此,如果您希望从多个线程的同一个列表中进行push和pop操作,则需要使用相同的互斥锁来保护该列表。

如果你使用不同的互斥锁,那么这意味着这些不同的线程可以同时添加/删除项,从而破坏列表。

当线程A向互斥锁保护的列表中添加项时,线程B试图从列表中弹出项,必须等到线程A完成添加项并离开锁。

首先,你不需要像现在的代码那样有互斥锁mutable——它们似乎不会在任何const函数中使用。

第二,不应该有两个互斥锁,一个用于读,一个用于写,应该只有一个互斥锁用于访问队列,并将访问限制在尽可能小的范围内,保持锁的时间不超过所需的指令。

您必须只使用一个互斥锁来创建线程安全队列,因为您必须确保一次只有一个线程访问队列(无论它是push还是pop)。

回答你的问题:1)使用当前的push和pop实现,没有死锁的可能性(除非您在其他地方使用互斥锁),因为锁被限制在push和pop的范围内,并且scoped_lock在出现异常的情况下也会释放互斥锁。2) Push和pop可以通过锁当且仅当互斥当前未锁定。否则线程将被挂起,直到锁线程释放互斥锁。