线程安全矢量实现

Thread-safe vector implementation

本文关键字:实现 安全 线程      更新时间:2023-10-16

我正在研究线程安全的std::vector实现,以下是已完成的初步尝试:

    #ifndef THREADSAFEVECTOR_H
    #define THREADSAFEVECTOR_H
    #include <iostream>
    #include <vector>
    #include <mutex>
    #include <cstdlib>
    #include <memory>
    #include <iterator>
    #include <algorithm>
    #include <initializer_list>
    #include <functional>
    template <class T, class Alloc=std::allocator<T>>
    class ThreadSafeVector
    {
        private:
            std::vector<T> threadSafeVector;
            std::mutex vectorMutex;
        public:
            /*need to use typename here because std::allocator<T>::size_type, std::allocator<T>::value_type, std::vector<T>::iterator, 
            and std::vector<T>::const_reverse_iterator are 'dependent names' in that because we are working with a templated class, 
            these expressions may depend on types of type template parameters and values of non-template parameters*/
            typedef typename std::vector<T>::size_type size_type;
            typedef typename std::vector<T>::value_type value_type;
            typedef typename std::vector<T>::iterator iterator;
            typedef typename std::vector<T>::const_iterator const_iterator;
            typedef typename std::vector<T>::reverse_iterator reverse_iterator;
            typedef typename std::vector<T>::const_reverse_iterator const_reverse_iterator;
            typedef typename std::vector<T>::reference reference;
            typedef typename std::vector<T>::const_reference const_reference;
            /*wrappers for three different at() functions*/
            template <class InputIterator>
            void assign(InputIterator first, InputIterator last)
            {
                //using a local lock_guard to lock mutex guarantees that the mutex will be unlocked on destruction and in the case of an exception being thrown
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.assign(first, last);
            }
            void assign(size_type n, const value_type& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.assign(n, val);
            }
            void assign(std::initializer_list<value_type> il)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.assign(il.begin(), il.end());
            }
            /*wrappers for at() functions*/
            reference at(size_type n)
            {
                return threadSafeVector.at(n);
            }
            const_reference at(size_type n) const
            {
                return threadSafeVector.at(n);
            }   
            /*wrappers for back() functions*/
            reference back()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.back();
            }
            const reference back() const
            {
                return threadSafeVector.back();
            }
            /*wrappers for begin() functions*/
            iterator begin()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.begin();
            }
            const iterator begin() const noexcept
            {
                return threadSafeVector.begin();
            }
            /*wrapper for capacity() fucntion*/
            size_type capacity() const noexcept
            {
                return threadSafeVector.capacity();
            }
            /*wrapper for cbegin() function*/
            const iterator cbegin()
            {
                return threadSafeVector.cbegin();
            }
            /*wrapper for cend() function*/
            const iterator cend()
            {
                return threadSafeVector.cend();
            }
            /*wrapper for clear() function*/
            void clear()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.clear();
            }
            /*wrapper for crbegin() function*/
            const_reverse_iterator crbegin() const noexcept
            {
                return threadSafeVector.crbegin();
            }
            /*wrapper for crend() function*/
            const_reverse_iterator crend() const noexcept
            {
                return threadSafeVector.crend();
            }
            /*wrappers for data() functions*/
            value_type* data()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.data();
            }
            const value_type* data() const noexcept
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.data();
            }
            /*wrapper for emplace() function*/
            template <class... Args>
            void emplace(const iterator position, Args&&... args)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.emplace(position, args...);
            }
            /*wrapper for emplace_back() function*/
            template <class... Args>
            void emplace_back(Args&&... args)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.emplace_back(args...);
            }
            /*wrapper for empty() function*/
            bool empty() const noexcept
            {
                return threadSafeVector.empty();
            }
            /*wrappers for end() functions*/
            iterator end()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.end();
            }
            const iterator end() const noexcept
            {
                return threadSafeVector.end();
            }
            /*wrapper functions for erase()*/
            iterator erase(const_iterator position)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.erase(position);
            }
            iterator erase(const_iterator first, const_iterator last)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.erase(first, last);
            }
            /*wrapper functions for front()*/
            reference front()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.front();
            }
            const reference front() const
            {
                return threadSafeVector.front();
            }
            /*wrapper function for get_allocator()*/
            value_type get_allocator() const noexcept
            {
                return threadSafeVector.get_allocator();
            }
            /*wrapper functions for insert*/
            iterator insert(const_iterator position, const value_type& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.insert(position, val); 
            }
            iterator insert(const_iterator position, size_type n, const value_type& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.insert(position, n, val);
            }
            template <class InputIterator>
            iterator insert(const_iterator position, InputIterator first, InputIterator last)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.insert(position, first, last);
            }
            iterator insert(const_iterator position, value_type&& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.insert(position, val);
            }
            iterator insert(const_iterator position, std::initializer_list<value_type> il)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.insert(position, il.begin(), il.end());
            }
            /*wrapper function for max_size*/
            size_type max_size() const noexcept
            {
                return threadSafeVector.max_size();
            }
            /*wrapper functions for operator =*/
            std::vector<T>& operator= (const std::vector<T>& x)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.swap(x);
            }
            std::vector<T>& operator= (std::vector<T>&& x)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector=std::move(x);
            }
            std::vector<T>& operator= (std::initializer_list<value_type> il)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.assign(il.begin(), il.end());
                return *this; //is this safe to do?
            }
            /*wrapper functions for operator []*/
            reference operator[] (size_type n)
            {
                return std::ref(n);
            }
            const_reference operator[] (size_type n) const
            {
                return std::cref(n);
            }
            /*wrapper function for pop_back()*/
            void pop_back()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.pop_back();
            }
            /*wrapper functions for push_back*/
            void push_back(const value_type& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.push_back(val);
            }
            void push_back(value_type&& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.push_back(val);
            }
            /*wrapper functions for rbegin()*/
            reverse_iterator rbegin() noexcept
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.rbegin();
            }
            const_reverse_iterator rbegin() const noexcept
            {
                return threadSafeVector.rbegin();
            }
            /*wrapper functions for rend()*/
            reverse_iterator rend() noexcept
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                return threadSafeVector.rend();
            }
            const_reverse_iterator rend() const noexcept
            {
                return threadSafeVector.rend();
            }
            /*wrapper function for reserve()*/
            void reserve(size_type n)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.reserve(n);
            }
            /*wrapper functions for resize()*/      
            void resize(size_type n)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.resize(n);
            }
            void resize(size_type n, const value_type& val)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.resize(n, val);
            }
            void shrink_to_fit()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.shrink_to_fit();
            }
            //add function for size
            size_type size() const noexcept
            {
                return threadSafeVector.size();
            }
            /*wrapper function for swap()*/
            void swap(std::vector<T>& x)
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                threadSafeVector.swap(x);
            }
            void print()
            {
                std::lock_guard<std::mutex> vectorLockGuard(vectorMutex);
                for(const auto & element : threadSafeVector)
                {
                    std::cout << element << std::endl;
                }
                std::cout << std::endl;
            }
    };
    #endif

我的实现基于 cplusplus.com 上找到的向量类及其成员函数的描述,并借助 STL 的向量类实现。现在,我对到目前为止编写的代码有几个问题:

    返回迭代器时,我不确定是否应该锁定互斥锁然后返回迭代器,
  1. 因为迭代器的有效性可能会因多个线程尝试访问它而发生变化,所以我继续锁定所有非常量迭代器的互斥锁。这是正确的方法吗?

  2. 我的理解是,在处理多线程代码时,不应从函数返回指针,因为这为用户代码提供了一个"后门"(因为缺乏更好的术语)来执行一些潜在的可疑活动。那么,对于赋值运算符的实现,有没有另一种方法可以编写这些函数,使其不返回 *this?

  3. 我选择使用lock_guard的所有本地实例,而不是将一个实例作为私有数据成员。如果我有一个作为私人数据成员会更好吗?

提前非常感谢:-)

线程之间的同步是一个全球性问题;它不能在本地解决。所以正确的答案是取消提问。

这种方法只是错误的粒度级别。防止对成员函数的冲突同时调用不会使容器线程在任何有用的意义上都是安全的;用户仍然必须确保操作序列是线程安全的,这意味着在操作序列进行时保持锁定。

举个简单的例子,考虑

void swap(vector<int>& v, int idx0, int idx1) {
    int temp = v[idx0];
    v[idx0] = v[idx1];
    v[idx1] = temp;
}

现在,如果在将v[idx1]复制到v[idx0]后,出现其他线程并擦除向量中的所有数据,会发生什么?v[idx1]赋值写入随机内存。这不是一件好事。为了防止这种情况,用户代码必须确保在整个执行过程中没有其他线程swap与向量混淆。vector的实现无法做到这一点。

Vector 似乎本质上不是线程安全的。这看起来很残酷,但对我来说同步向量的最简单方法是将其封装在另一个线程安全的对象中......

struct guardedvector {
   std::mutex guard;
   std::vector myvector;
}
guardedvector v;
v.guard.lock();
... use v.myvector
v.guard.unlock();

在 win32 中,您还可以使用超薄读/写器锁 (SRW)。它们是轻巧快速的互斥体,可以与多个读取器/一个写入器一起使用。在这种情况下,您将用 srwlock 替换防护装置。调用方代码负责调用正确的锁定方法。最后,您可以模板化结构并内联它。

如果你想要一个没有太多复杂性的一致实现,你至少需要两个新的成员函数,例如disable_write()enable_write()

使用向量的代码可以选择它是否需要在某些读取代码块期间保持一致的状态。它将在读取块开始时调用disable_write(),并在完成需要一致向量状态的块时调用enable_write()

添加另一个"write_lock"互斥锁,并在每个进行更改的成员函数中使用此互斥锁。

当写锁定处于活动状态时,应自由允许读取操作,因此仅读取数据的成员函数不需要"write_lock"互斥锁。你已经拥有的互斥体就足够了。

此外,您可以添加另一个类来写锁定您的向量,例如一些等效的lock_guard,甚至可以使disable_write()enable_write()与该类成为私有和朋友,以防止意外的写锁定永远不会释放。

在与自己讨论之后,线程保护向量的最佳方法是从向量继承并添加保护。

enter code here
template <class t> class guardedvector : public std::vector<t> {
    std::mutex guard;
};
guardedvector v;
v.lock();
//...iterate, add, remove...
v.unlock();

您可以使用 swrlock 代替互斥锁。但是,如果有很多线程读取矢量,则可能会给编写器线程带来延迟。在某些情况下,您可以仅使用具有编写器优先级模型的原子操作来重新创建自定义线程模型,但这需要深入了解处理器的工作原理。为简单起见,您可以更改编写器线程的优先级。