std::unordered_map 上的线程安全包装器

threadsafe wrapper on std::unordered_map

本文关键字:线程 安全 包装 unordered map std      更新时间:2023-10-16

我正在尝试在 std::unordered_map 之上实现一个线程安全包装类具有如下开始和结束功能是否安全?

        std::unordered_map<Key, T, Hash, Pred, Alloc> umap;
        iterator begin() {
            return umap.begin();
        }   
        iterator end() {
            return umap.end();
        }

如果复制/移动运算符=实现中有任何明显的错误,也请发表评论

    concurrent_unordered_map& operator=(const concurrent_unordered_map& other) ;
    {
        if (this!=&other) {
          std::lock(entry_mutex, other.entry_mutex);
          std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
          std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
          umap = other.umap;
        }
        return *this;           
    }
    concurrent_unordered_map& operator=(concurrent_unordered_map&& other) 
    {
        if (this!=&other) {
          std::lock(entry_mutex, other.entry_mutex);
          std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
          std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
          umap = std::move(other.umap)
        }
        return *this;       
    }

谢谢马吉夫

不能创建提供与基础标准容器相同的接口的线程安全容器,即使同步每个方法调用也是如此。这是因为接口规范本身不适合在多线程环境中使用。

下面是一个示例:假设您有多个线程同时插入到同一个容器对象:

c->insert(new_value);

因为您同步了每个方法调用,所以这工作得很好,这里没有问题。

但与此同时,另一个线程试图遍历容器中的所有元素:

auto itr = c->begin();
while (itr != c->end())
{
    // do something with itr
    ++itr;
}

我这样写是为了弄清楚问题:即使对开始和结束的调用在内部同步,您也不能以原子方式执行"遍历所有元素"的操作,因为您需要多个方法调用才能完成此任务。一旦任何其他线程在循环运行时向容器插入内容,此方案就会中断。

因此,如果您希望拥有一个无需外部同步即可使用的容器,则需要一个线程安全接口。例如,"遍历所有元素"任务可以通过提供for_each方法以原子方式完成:

c.for_each([](const value_type& value)
{
    // do something with value
}); 

不能简单地同步每个方法并获取线程安全对象,因为某些操作需要多个方法调用,并且如果容器在方法调用之间发生突变,则会中断。

一个典型的例子是迭代。

线程安全的一种简单方法是滥用 C++14 功能,如下所示:

template<class T>
struct synchronized {
  // one could argue that rvalue ref qualified version should not be
  // synchronized...  but I think that is wrong
  template<class F>
  std::result_of_t< F(T const&) > read( F&& f ) const {
    auto&& lock = read_lock();
    return std::forward<F>(f)(t);
  }
  template<class F>
  std::result_of_t< F(T&) > write( F&& f ) {
    auto&& lock = write_lock();
    return std::forward<F>(f)(t);
  }
  // common operations, useful rvalue/lvalue overloads:
  // get a copy of the internal guts:
  T copy() const& { return read([&](auto&&t)->T{return t;}); }
  T copy() && { return move(); }
  T move() { return std::move(*this).write([&](auto&&t)->T{return std::move(t);}); }
private:
  mutable std::shared_timed_mutex mutex;
  std::shared_lock<std::shared_timed_mutex> read_lock() const {
    return std::shared_lock<std::shared_timed_mutex>(mutex);
  }
  std::unique_lock<std::shared_timed_mutex> write_lock() {
    return std::unique_lock<std::shared_timed_mutex>(mutex);
  }
  T t;
public:
  // relatively uninteresting boilerplate
  // ctor:
  template<class...Args>
  explicit synchronized( Args&&... args ):
    t(std::forward<Args>(args)...)
  {}
  // copy ctors: (forwarding constructor above means need all 4 overloads)
  synchronized( synchronized const& o ) :t(std::forward<decltype(o)>(o).copy()) {}
  synchronized( synchronized const&& o ):t(std::forward<decltype(o)>(o).copy()) {}
  synchronized( synchronized & o )      :t(std::forward<decltype(o)>(o).copy()) {}
  synchronized( synchronized && o )     :t(std::forward<decltype(o)>(o).copy()) {}
  // copy-from-T ctors: (forwarding constructor above means need all 4 overloads)
  synchronized( T const& o ) :t(std::forward<decltype(o)>(o)) {}
  synchronized( T const&& o ):t(std::forward<decltype(o)>(o)) {}
  synchronized( T & o )      :t(std::forward<decltype(o)>(o)) {}
  synchronized( T && o )     :t(std::forward<decltype(o)>(o)) {}
};

这看起来很晦涩,但效果很好:

int main() {
  synchronized< std::unordered_map<int, int> > m;
  m.write( [&](auto&&m) {
    m[1] = 2;
    m[42] = 13;
  });
  m.read( [&](auto&&m) {
    for( auto&& x:m ) {
      std::cout << x.first << "->" << x.second << "n";
    }
  });
  bool empty = m.read( [&](auto&&m) {
    return m.empty();
  });
  std::cout << empty << "n";
  auto copy = m.copy();
  std::cout << copy.empty() << "n";
  synchronized< std::unordered_map<int, int> > m2 = m;
  m2.read( [&](auto&&m) {
    for( auto&& x:m ) {
      std::cout << x.first << "->" << x.second << "n";
    }
  });
}

这个想法是将操作粘贴到Lambda中,这些Lambda在同步上下文中执行。

编码风格有点晦涩难懂,但并非难以管理(至少具有 C++14 个功能)。

C++11 的一个很好的功能是,即使来自两个不同的线程,同一容器上的两个const操作也是合法的。 因此,read只是简单地传递了对容器的const引用,并且您可以在其中执行的几乎任何操作都可以与另一个线程并行执行。

现场示例

有一个线程安全的std::unordered_map实现是可能的(但通常没有用处) - 问题是每个迭代器对象都需要锁定递归互斥锁,直到其析构函数运行。 这不仅会有点慢,并且迭代器在内存使用量中膨胀,还存在功能问题:即使迭代器"当前"不用于读取或写入容器(例如,对于某些二级索引,或作为"游标",或者因为在使用它们之后,它们的销毁被懒惰地保留,直到封闭范围退出或拥有的对象被销毁): 这意味着其他线程可能会被阻塞很长时间,并且在实践中,围绕容器操作的程序逻辑可能构成一种死锁。