用原子索引从静音阵列中锁定互斥品

Lock a mutex from mutex array with atomic index

本文关键字:锁定 阵列 索引      更新时间:2023-10-16

我正在尝试编写一个缓冲区,该缓冲区可以将数据推到缓冲区,检查是否已满,如有必要,请交换缓冲区。另一个线程可以获取文件输出的缓冲区。

我已经成功实现了缓冲区,但是我想添加一种迫使wapbuffer方法,该方法将迫使不完整的缓冲区交换并从不完整的缓冲区返回数据。为了做到这一点,我检查读取和写入缓冲区是否相同(试图强制施加缓冲区以写入文件,而还有其他可以写入的完整缓冲区)。我希望这种方法能够与GetBuffer方法并肩运行(不是真正的必要方法,但我想尝试并偶然发现了这个问题)。

getBuffer会阻塞,当porcestwapbuffer完成后,它仍将阻止,直到新的缓冲区完全填满,因为在porcestwapbuffer中,我更改了原子_Read_buffer_index。我想知道这是否总是有效吗?GetBuffer的阻塞锁是否会检测到Atomic Read_buffer_index的更改并更改静音的误差,或者在锁定开始时检查一下它必须锁定的mutex并继续尝试锁定相同的utex即使索引,即使是索引更改?

/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
    if (_index + length <= _size) {
        memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
        _index += length;
    } else {
        memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
        unsigned int t_index = _index;
        SwapBuffer();
        Push(&data[_size - t_index], length - (_size - t_index));
    }
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
    std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
    std::unique_ptr<T[]> result(new T[_size]);
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
    _read_buffer_index = (_read_buffer_index + 1) % _count;
    return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
    if (_write_buffer_index != _read_buffer_index)
        return nullptr;
    std::unique_ptr<T[]> result(new T[_index]);
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
    unsigned int next = (_write_buffer_index + 1) % _count;
    _mutexes[next].lock();
    _read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
    _mutexes[_write_buffer_index].unlock();
    _write_buffer_index = next;
    _index = 0;
    return result;
}

您的代码存在一些问题。首先,在修改原子变量时要小心。只有一小部分操作是原子能的(请参阅http://en.cppreference.com/w/cpp/atomic/atomic),原子操作的组合不是原子。考虑:

_read_buffer_index = (_read_buffer_index + 1) % _count;

这里发生的事情是您对变量,增量,Modulo操作和原子商店的原子读取。但是,整个语句本身不是不是原子!如果_count是2的功率,则只需使用++操纵器即可。如果不是这样,则必须将_read_buffer_index读取为临时变量,执行上述计算,然后使用compare_exchange函数存储新值如果变量在平均时间中没有更改。显然,后者必须在循环中完成,直到成功为止。您还必须担心一个线程会在第二个线程的读取和compare_exchange之间递增变量_count倍,在这种情况下,第二个线程错误地认为该变量不会更改。

第二个问题是缓存线弹跳。如果您在同一缓存线上有多个静音线,那么如果两个或多个线程尝试同时访问它们,则性能将非常糟糕。缓存线的大小取决于您的平台。

主要问题是,虽然ForceSwapBuffer()Push()都锁定_force_swap_buffer MUTEX,但GetBuffer()却没有。GetBuffer()确实更改_read_buffer_index。因此,在ForceSwapBuffer()中:

std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_write_buffer_index != _read_buffer_index)
    return nullptr;
// another thread can call GetBuffer() here and change _read_buffer_index
// rest of the code here

if统计后_write_buffer_index == _read_buffer_index的假设实际上是无效的。