标准C++中的共享递归互斥

A shared recursive mutex in standard C++

本文关键字:递归 共享 C++ 标准      更新时间:2023-10-16

为C++17计划了一个shared_mutex类。并且CCD_ 2已经在C++14中了。(谁知道他们为什么按这个顺序来,但不管怎样。)然后从C++11开始就有了recursive_mutexrecursive_timed_mutex。我需要的是shared_recursive_mutex。我是错过了标准中的某些内容,还是必须再等三年才能得到标准版本?

如果目前还没有这样的功能,那么仅使用标准C++实现这样一个功能的简单(第一优先级)和高效(第二优先级)是什么?

互斥的递归属性与术语"所有者";,在shared_mutex的情况下,这是不明确定义的:几个线程可能同时调用.lock_shared()

假设"所有者";作为调用.lock()(而不是.lock_shared()!)的线程,递归共享互斥的实现可以简单地从shared_mutex:派生

class shared_recursive_mutex: public shared_mutex
{
public:
    void lock(void) {
        std::thread::id this_id = std::this_thread::get_id();
        if(owner == this_id) {
            // recursive locking
            count++;
        }
        else {
            // normal locking
            shared_mutex::lock();
            owner = this_id;
            count = 1;
        }
    }
    void unlock(void) {
        if(count > 1) {
            // recursive unlocking
            count--;
        }
        else {
            // normal unlocking
            owner = std::thread::id();
            count = 0;
            shared_mutex::unlock();
        }
    }
private:
    std::atomic<std::thread::id> owner;
    int count;
};

字段.owner需要声明为原子字段,因为在.lock()方法中,检查该字段时不受并发访问的保护。

如果要递归调用.lock_shared()方法,则需要维护所有者的映射,并且对该映射的访问应该使用一些额外的互斥对象来保护。

允许具有活动.lock()的线程调用.lock_shared()会使实现更加复杂。

最后,允许线程将锁定从.lock_shared()推进.lock(),因为当两个线程尝试执行该推进时,可能会导致死锁。


同样,递归共享互斥的语义非常脆弱,所以最好不要使用它。

如果您在Linux/POSIX平台上,您很幸运,因为C++互斥对象是以POSIX为模型的。POSIX提供了更多的特性,包括递归、进程共享等等。将POSIX基元封装到C++类中是直接的。

POSIX线程文档的良好切入点。

下面是一个围绕类型T的快速线程安全包装器:

template<class T, class Lock>
struct lock_guarded {
  Lock l;
  T* t;
  T* operator->()&&{ return t; }
  template<class Arg>
  auto operator[](Arg&&arg)&&
  -> decltype(std::declval<T&>()[std::declval<Arg>()])
  {
    return (*t)[std::forward<Arg>(arg)];
  }
  T& operator*()&&{ return *t; }
};
constexpr struct emplace_t {} emplace {};
template<class T>
struct mutex_guarded {
  lock_guarded<T, std::unique_lock<std::mutex>>
  get_locked() {
    return {{m},&t};
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  get_locked() const {
    return {{m},&t};
  }
  lock_guarded<T, std::unique_lock<std::mutex>>
  operator->() {
    return get_locked();
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  operator->() const {
    return get_locked();
  }
  template<class F>
  std::result_of_t<F(T&)>
  operator->*(F&& f) {
    return std::forward<F>(f)(*get_locked());
  }
  template<class F>
  std::result_of_t<F(T const&)>
  operator->*(F&& f) const {
    return std::forward<F>(f)(*get_locked());
  }
  template<class...Args>
  mutex_guarded(emplace_t, Args&&...args):
    t(std::forward<Args>(args)...)
  {}
  mutex_guarded(mutex_guarded&& o):
    t( std::move(*o.get_locked()) )
  {}
  mutex_guarded(mutex_guarded const& o):
    t( *o.get_locked() )
  {}
  mutex_guarded() = default;
  ~mutex_guarded() = default;
  mutex_guarded& operator=(mutex_guarded&& o)
  {
    T tmp = std::move(o.get_locked());
    *get_locked() = std::move(tmp);
    return *this;
  }
  mutex_guarded& operator=(mutex_guarded const& o):
  {
    T tmp = o.get_locked();
    *get_locked() = std::move(tmp);
    return *this;
  }
private:
  std::mutex m;
  T t;
};

您可以使用以下任一项:

mutex_guarded<std::vector<int>> guarded;
auto s0 = guarded->size();
auto s1 = guarded->*[](auto&&e){return e.size();};

两者做的事情大致相同,只有当互斥锁被锁定时,受保护的对象才会被访问。

窃取@tsyvarev的答案(有一些小改动)我们得到:

class shared_recursive_mutex
{
  std::shared_mutex m
public:
  void lock(void) {
    std::thread::id this_id = std::this_thread::get_id();
    if(owner == this_id) {
      // recursive locking
      ++count;
    } else {
      // normal locking
      m.lock();
      owner = this_id;
      count = 1;
    }
  }
  void unlock(void) {
    if(count > 1) {
      // recursive unlocking
      count--;
    } else {
      // normal unlocking
      owner = std::thread::id();
      count = 0;
      m.unlock();
    }
  }
  void lock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    if (shared_counts->count(this_id)) {
      ++(shared_count.get_locked()[this_id]);
    } else {
      m.lock_shared();
      shared_count.get_locked()[this_id] = 1;
    }
  }
  void unlock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    auto it = shared_count->find(this_id);
    if (it->second > 1) {
      --(it->second);
    } else {
      shared_count->erase(it);
      m.unlock_shared();
    }
  }
private:
  std::atomic<std::thread::id> owner;
  std::atomic<std::size_t> count;
  mutex_guarded<std::map<std::thread::id, std::size_t>> shared_counts;
};

CCD_ 17和CCD_。

锁定和解锁共享锁定互斥锁两次(这是安全的,因为分支实际上是关于"这个线程是否控制互斥锁",而另一个线程不能将答案从"否"更改为"是",反之亦然)。您可以用->*而不是shared_timed_mutex0的一个锁来实现这一点,这将使它更快(以逻辑的复杂性为代价)。


以上内容不支持先使用独占锁,然后使用共享锁。这很棘手。它不支持拥有共享锁,然后升级到唯一锁,因为当两个线程尝试时,基本上不可能阻止它死锁。

最后一个问题可能是为什么递归共享互斥是个坏主意。

使用现有原语构建共享递归互斥是可能的。不过我不建议你这么做。

这并不简单,封装现有的POSIX实现(或任何您平台的原生实现)可能会更高效。

如果您确实决定编写自己的实现,那么使其高效仍然取决于特定于平台的细节,因此您要么为每个平台编写一个具有不同实现的接口,要么选择一个平台,并可以同样轻松地使用本机(POSIX或其他)设施。

我当然不会提供一个示例递归读/写锁实现,因为对于Stack Overflow的答案来说,这是一个完全不合理的工作量。

共享我的实现,没有承诺

recursive_shared_mutex.h

#ifndef _RECURSIVE_SHARED_MUTEX_H
#define _RECURSIVE_SHARED_MUTEX_H
#include <thread>
#include <mutex>
#include <map>
struct recursive_shared_mutex
{
public:
    recursive_shared_mutex() :
        m_mtx{}, m_exclusive_thread_id{}, m_exclusive_count{ 0 }, m_shared_locks{}
    {}
    void lock();
    bool try_lock();
    void unlock();
    void lock_shared();
    bool try_lock_shared();
    void unlock_shared();
    recursive_shared_mutex(const recursive_shared_mutex&) = delete;
    recursive_shared_mutex& operator=(const recursive_shared_mutex&) = delete;
private:
    inline bool is_exclusive_locked()
    {
        return m_exclusive_count > 0;
    }
    inline bool is_shared_locked()
    {
        return m_shared_locks.size() > 0;
    }
    inline bool can_exclusively_lock()
    {
        return can_start_exclusive_lock() || can_increment_exclusive_lock();
    }
    inline bool can_start_exclusive_lock()
    {
        return !is_exclusive_locked() && (!is_shared_locked() || is_shared_locked_only_on_this_thread());
    }
    inline bool can_increment_exclusive_lock()
    {
        return is_exclusive_locked_on_this_thread();
    }
    inline bool can_lock_shared()
    {
        return !is_exclusive_locked() || is_exclusive_locked_on_this_thread();
    }
    inline bool is_shared_locked_only_on_this_thread()
    {
        return is_shared_locked_only_on_thread(std::this_thread::get_id());
    }
    inline bool is_shared_locked_only_on_thread(std::thread::id id)
    {
        return m_shared_locks.size() == 1 && m_shared_locks.find(id) != m_shared_locks.end();
    }
    inline bool is_exclusive_locked_on_this_thread()
    {
        return is_exclusive_locked_on_thread(std::this_thread::get_id());
    }
    inline bool is_exclusive_locked_on_thread(std::thread::id id)
    {
        return m_exclusive_count > 0 && m_exclusive_thread_id == id;
    }
    inline void start_exclusive_lock()
    {
        m_exclusive_thread_id = std::this_thread::get_id();
        m_exclusive_count++;
    }
    inline void increment_exclusive_lock()
    {
        m_exclusive_count++;
    }
    inline void decrement_exclusive_lock()
    {
        if (m_exclusive_count == 0)
        {
            throw std::logic_error("Not exclusively locked, cannot exclusively unlock");
        }
        if (m_exclusive_thread_id == std::this_thread::get_id())
        {
            m_exclusive_count--;
        }
        else
        {
            throw std::logic_error("Calling exclusively unlock from the wrong thread");
        }
    }
    inline void increment_shared_lock()
    {
        increment_shared_lock(std::this_thread::get_id());
    }
    inline void increment_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            m_shared_locks[id] = 1;
        }
        else
        {
            m_shared_locks[id] += 1;
        }
    }
    inline void decrement_shared_lock()
    {
        decrement_shared_lock(std::this_thread::get_id());
    }
    inline void decrement_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.size() == 0)
        {
            throw std::logic_error("Not shared locked, cannot shared unlock");
        }
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            throw std::logic_error("Calling shared unlock from the wrong thread");
        }
        else
        {
            if (m_shared_locks[id] == 1)
            {
                m_shared_locks.erase(id);
            }
            else
            {
                m_shared_locks[id] -= 1;
            }
        }
    }
    std::mutex m_mtx;
    std::thread::id m_exclusive_thread_id;
    size_t m_exclusive_count;
    std::map<std::thread::id, size_t> m_shared_locks;
    std::condition_variable m_cond_var;
};
#endif

recursive_shared_mutex.cpp

#include "recursive_shared_mutex.h"
#include <condition_variable>
void recursive_shared_mutex::lock()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_exclusively_lock(); });
    if (is_exclusive_locked_on_this_thread())
    {
        increment_exclusive_lock();
    }
    else
    {
        start_exclusive_lock();
    }
}
bool recursive_shared_mutex::try_lock()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_increment_exclusive_lock())
    {
        increment_exclusive_lock();
        return true;
    }
    if (can_start_exclusive_lock())
    {
        start_exclusive_lock();
        return true;
    }
    return false;
}
void recursive_shared_mutex::unlock()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_exclusive_lock();
    }
    m_cond_var.notify_all();
}
void recursive_shared_mutex::lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_lock_shared(); });
    increment_shared_lock();
}
bool recursive_shared_mutex::try_lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_lock_shared())
    {
        increment_shared_lock();
        return true;
    }
    return false;
}
void recursive_shared_mutex::unlock_shared()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_shared_lock();
    }
    m_cond_var.notify_all();
}

如果线程拥有共享锁,它也可以在不放弃共享锁的情况下获得独占锁。(这当然不需要其他线程当前具有共享或独占锁)

反之亦然,拥有独占锁的线程可以获得共享锁。

有趣的是,这些属性还允许锁升级/降级。

临时升级锁:

recusrive_shared_mutex mtx;
foo bar;
mtx.lock_shared();
if (bar.read() == x)
{
    mtx.lock();
    bar.write(y);
    mtx.unlock();
}
mtx.unlock_shared();

从独占锁降级为共享锁

recusrive_shared_mutex mtx;
foo bar;
mtx.lock();
bar.write(x);
mtx.lock_shared();
mtx.unlock();
while (bar.read() != y)
{
     // Something
}
mtx.unlock_shared();

我搜索了一个C++读写锁,发现了这个相关的问题。我们确实需要这样一个shared_recursive_mutex来控制对我们的";数据库";类。因此,为了完整性:如果你正在寻找另一个实现示例(就像我以前一样),你可能也需要考虑这个链接:使用C++17(在github上)的shared_recursive_mutex实现。

特点

  • C++17
  • 单个页眉
  • 无依赖项

不过它有一个缺点:static thread_local成员通过模板专门用于PhantomType类。因此,您不能在同一(PhantomType)类的多个独立实例中真正使用shared_recursive_mutex。如果对你没有限制的话,试试看。

以下实现支持首先拥有一个unique_lock,然后在同一线程中获取一个额外的shared_lock

#include <shared_mutex>
#include <thread>
class recursive_shared_mutex: public std::shared_mutex {
public:
    void lock() {
        if (owner_ != std::this_thread::get_id()) {
            std::shared_mutex::lock();
            owner_ = std::this_thread::get_id();
        }
        ++count_;
    }
    void unlock() {
        --count_;
        if (count_ == 0) {
            owner_ = std::thread::id();
            std::shared_mutex::unlock();
        }
    }
    void lock_shared() {
        if (owner_ != std::this_thread::get_id()) {
            std::shared_mutex::lock_shared();
        }
    }
    void unlock_shared() {
        if (owner_ != std::this_thread::get_id()) {
            std::shared_mutex::unlock_shared();
        }
    }
private:
    std::atomic<std::thread::id> owner_;
    std::atomic_uint32_t count_ = 0;
};

与POSIX版本(pthread_rwlock_t)匹配的最小C++17版本,其中共享锁可以递归,而独占锁不能递归,并且没有升级/降级。

#include <shared_mutex>
#include <unordered_map>
class recursive_shared_mutex : std::shared_mutex {
  using base = std::shared_mutex;
  using locks_map =
      std::unordered_map<const recursive_shared_mutex *, std::size_t>;
  locks_map &thread_shared_locks() {
    thread_local locks_map shared_locks;
    return shared_locks;
  }
public:
  void lock() { base::lock(); }
  bool try_lock() { return base::try_lock(); }
  void unlock() { base::unlock(); }
  void lock_shared() {
    if (const auto [it, inserted] = thread_shared_locks().emplace(this, 1);
        inserted)
      base::lock_shared();
    else
      ++(it->second);
  }
  bool try_lock_shared() {
    auto &locks = thread_shared_locks();
    if (const auto [it, inserted] = locks.emplace(this, 1); inserted) {
      if (base::try_lock_shared())
        return true;
      else {
        locks.erase(it);
        return false;
      }
    } else {
      ++(it->second);
      return true;
    }
  }
  void unlock_shared() {
    auto &locks = thread_shared_locks();
    const auto it = locks.find(this);
    if (0 == --(it->second)) {
      base::unlock_shared();
      locks.erase(it);
    }
  }
};