您将如何在 C++11 中实现自己的读取器/写入器锁

How would you implement your own reader/writer lock in C++11?

本文关键字:读取 自己的 实现 C++11      更新时间:2023-10-16

我有一组数据结构需要用读取器/写入器锁来保护。我知道 boost::shared_lock,但我想使用 std::mutex、std::condition_variable 和/或 std::atomic 进行自定义实现,以便我可以更好地了解它的工作原理(稍后进行调整(。

每个数据结构(可移动,但不可复制(都将继承自一个名为 Commons 的类,该类封装了锁定。我希望公共接口看起来像这样:

class Commons {
public:
    void read_lock();
    bool try_read_lock();
    void read_unlock();
    void write_lock();
    bool try_write_lock();
    void write_unlock();
};

。以便它可以被某些人公开继承:

class DataStructure : public Commons {};

正在编写科学代码,通常可以避免数据竞争;这种锁主要是为了防止我以后可能会犯的错误。因此,我的首要任务是低读取开销,因此我不会过多地妨碍正常运行的程序。每个线程可能会在自己的 CPU 内核上运行。

你能给我看一下(伪代码没问题(一个读器/写卡器锁吗?我现在所拥有的应该是防止作家饥饿的变体。到目前为止,我的主要问题是检查读取是否安全与实际增加读者数量之间的read_lock差距,之后write_lock知道等待。

void Commons::write_lock() {
    write_mutex.lock();
    reading_mode.store(false);
    while(readers.load() > 0) {}
}
void Commons::try_read_lock() {
    if(reading_mode.load()) {
        //if another thread calls write_lock here, bad things can happen
        ++readers; 
        return true;
    } else return false;
}

我对多线程有点陌生,我真的很想理解它。提前感谢您的帮助!

这是使用互斥锁和条件变量的简单读取器/写入器锁的伪代码。互斥 API 应该是不言自明的。条件变量被假定有一个成员wait(Mutex&),该成员(原子地!(丢弃互斥锁并等待条件发出信号。该条件通过唤醒一名服务员的signal()或唤醒所有服务员signal_all()发出信号。

read_lock() {
  mutex.lock();
  while (writer)
    unlocked.wait(mutex);
  readers++;
  mutex.unlock();
}
read_unlock() {
  mutex.lock();
  readers--;
  if (readers == 0)
    unlocked.signal_all();
  mutex.unlock();
}
write_lock() {
  mutex.lock();
  while (writer || (readers > 0))
    unlocked.wait(mutex);
  writer = true;
  mutex.unlock();
}
write_unlock() {
  mutex.lock();
  writer = false;
  unlocked.signal_all();
  mutex.unlock();
}

但是,这种实现有很多缺点。

每当锁可用时唤醒所有服务员

如果大多数服务员

都在等待写锁,这是浪费——毕竟大多数服务员将无法获得锁并继续等待。简单地使用signal()是行不通的,因为您确实想唤醒等待读锁定解锁的每个人。因此,要解决此问题,您需要单独的条件变量来实现可读性和可写性。

不公平。读者饿死作家

您可以通过跟踪挂起的读取和写入锁定的数量来解决此问题,并在出现挂起的写入锁定时停止获取读锁定(尽管您将饿死读取器!(,或者随机唤醒所有读取器或一个写入器(假设您使用单独的条件变量,请参阅上一节(。

锁不会按照请求的顺序处理

为了保证这一点,你需要一个真正的等待队列。例如,您可以为每个服务员创建一个条件变量,并在释放锁后向所有读取器或单个写入器发出信号,两者都在队列的头部。

即使是纯读取工作负载也会导致互斥锁引起的争用

这个很难修复。一种方法是使用原子指令来获取读或写锁(通常是比较和交换(。如果由于锁定而获取失败,则必须回退到互斥锁。但是,正确做到这一点非常困难。另外,仍然存在争用 - 原子指令远非免费,尤其是在具有大量内核的机器上。

结论

正确实现同步基元很困难。实现高效和公平的同步基元更加困难。例如,Linux 上的 pthreads 包含一个读取器/写入器锁,它使用二进制和原子指令的组合,因此可能优于您在几天的工作中能想到的任何东西。

检查这个类:

//
// Multi-reader Single-writer concurrency base class for Win32
//
// (c) 1999-2003 by Glenn Slayden (glenn@glennslayden.com)
//
//

#include "windows.h"
class MultiReaderSingleWriter
{
private:
    CRITICAL_SECTION m_csWrite;
    CRITICAL_SECTION m_csReaderCount;
    long m_cReaders;
    HANDLE m_hevReadersCleared;
public:
    MultiReaderSingleWriter()
    {
        m_cReaders = 0;
        InitializeCriticalSection(&m_csWrite);
        InitializeCriticalSection(&m_csReaderCount);
        m_hevReadersCleared = CreateEvent(NULL,TRUE,TRUE,NULL);
    }
    ~MultiReaderSingleWriter()
    {
        WaitForSingleObject(m_hevReadersCleared,INFINITE);
        CloseHandle(m_hevReadersCleared);
        DeleteCriticalSection(&m_csWrite);
        DeleteCriticalSection(&m_csReaderCount);
    }

    void EnterReader(void)
    {
        EnterCriticalSection(&m_csWrite);
        EnterCriticalSection(&m_csReaderCount);
        if (++m_cReaders == 1)
            ResetEvent(m_hevReadersCleared);
        LeaveCriticalSection(&m_csReaderCount);
        LeaveCriticalSection(&m_csWrite);
    }
    void LeaveReader(void)
    {
        EnterCriticalSection(&m_csReaderCount);
        if (--m_cReaders == 0)
            SetEvent(m_hevReadersCleared);
        LeaveCriticalSection(&m_csReaderCount);
    }
    void EnterWriter(void)
    {
        EnterCriticalSection(&m_csWrite);
        WaitForSingleObject(m_hevReadersCleared,INFINITE);
    }
    void LeaveWriter(void)
    {
        LeaveCriticalSection(&m_csWrite);
    }
};

我没有机会尝试它,但代码看起来不错。

你可以按照

这里的确切维基百科算法实现一个读者-作家锁(我写的(:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
int g_sharedData = 0;
int g_readersWaiting = 0;
std::mutex mu;
bool g_writerWaiting = false;
std::condition_variable cond;
void reader(int i)
{
    std::unique_lock<std::mutex> lg{mu};
    while(g_writerWaiting)
        cond.wait(lg);
    ++g_readersWaiting;
    // reading
    std::cout << "n reader #" << i << " is reading data = " << g_sharedData << 'n';
    // end reading
    --g_readersWaiting;
    while(g_readersWaiting > 0)
        cond.wait(lg);
    cond.notify_one();
}
void writer(int i)
{
    std::unique_lock<std::mutex> lg{mu};
    while(g_writerWaiting)
        cond.wait(lg);
    // writing
    std::cout << "n writer #" << i << " is writingn";
    g_sharedData += i * 10;
    // end writing
    g_writerWaiting = true;
    while(g_readersWaiting > 0)
        cond.wait(lg);
    g_writerWaiting = false;
    cond.notify_all();
}//lg.unlock()

int main()
{
    std::thread reader1{reader, 1};
    std::thread reader2{reader, 2};
    std::thread reader3{reader, 3};
    std::thread reader4{reader, 4};
    std::thread writer1{writer, 1};
    std::thread writer2{writer, 2};
    std::thread writer3{writer, 3};
    std::thread writer4{reader, 4};
    reader1.join();
    reader2.join(); 
    reader3.join();
    reader4.join();
    writer1.join();
    writer2.join();
    writer3.join();
    writer4.join();
    return(0);
}
我相信

这就是你要找的:

class Commons {
    std::mutex write_m_;
    std::atomic<unsigned int> readers_;
public:
    Commons() : readers_(0) {
    }
    void read_lock() {
        write_m_.lock();
        ++readers_;
        write_m_.unlock();
    }
    bool try_read_lock() {
        if (write_m_.try_lock()) {
            ++readers_;
            write_m_.unlock();
            return true;
        }
        return false;
    }
    // Note: unlock without holding a lock is Undefined Behavior!
    void read_unlock() {
        --readers_;
    }
    // Note: This implementation uses a busy wait to make other functions more efficient.
    //       Consider using try_write_lock instead! and note that the number of readers can be accessed using readers()
    void write_lock() {
        while (readers_) {}
        if (!write_m_.try_lock())
            write_lock();
    }
    bool try_write_lock() {
        if (!readers_)
            return write_m_.try_lock();
        return false;
    }
    // Note: unlock without holding a lock is Undefined Behavior!
    void write_unlock() {
        write_m_.unlock(); 
    }
    int readers() { 
        return readers_; 
    }
};

有关自 C++17 以来的记录,我们有 std::shared_mutex,请参阅:https://en.cppreference.com/w/cpp/thread/shared_mutex