一个pthread_rwlock可以同时有多少个读者

How many simultaneous readers can a pthread_rwlock have?

本文关键字:多少 pthread rwlock 一个      更新时间:2023-10-16

我有一个多线程应用程序,它创建了48个线程,这些线程都需要访问一个公共属性(stl::map)。映射只会在线程启动时写入,其余时间将从中读取映射。这似乎是pthread_rw_lock的完美用例,而且一切似乎都运行良好。

我遇到了一个完全不相关的赛格故障,并开始分析核心。使用 gdb,我info threads执行了命令,并对结果感到非常惊讶。我观察到几个线程实际上按预期从地图读取,但奇怪的是,在等待rw_lock的 pthread_rwlock_rdlock() 中有几个线程被阻塞。

下面是正在等待锁的线程的堆栈跟踪:

#0  0xffffe430 in __kernel_vsyscall ()
#1  0xf76fe159 in __lll_lock_wait () from /lib/libpthread.so.0
#2  0xf76fab5d in pthread_rwlock_rdlock () from /lib/libpthread.so.0
#3  0x0804a81a in DiameterServiceSingleton::getDiameterService(void*) ()

有这么多线程,很难说有多少线程正在读取,有多少线程被阻止,但我不明白为什么任何线程会被阻止等待读取,考虑到其他线程已经在读取。

所以我的问题来了:为什么有些线程被阻塞等待读取rw_lock,而其他线程已经在从中读取?似乎可以同时读取的线程数是有限制的。

我查看了pthread_rwlock_attr_t函数,没有看到任何相关内容。

操作系统是Linux,SUSE 11。

以下是相关代码:

{
  pthread_rwlock_init(&serviceMapRwLock_, NULL);
}
// This method is called for each request processed by the threads
Service *ServiceSingleton::getService(void *serviceId)
{
  pthread_rwlock_rdlock(&serviceMapRwLock_);
    ServiceMapType::const_iterator iter = serviceMap_.find(serviceId);
    bool notFound(iter == serviceMap_.end());
  pthread_rwlock_unlock(&serviceMapRwLock_);
  if(notFound)
  {
    return NULL;
  }
  return iter->second;
}
// This method is only called when the app is starting
void ServiceSingleton::addService(void *serviceId, Service *service)
{
  pthread_rwlock_wrlock(&serviceMapRwLock_);
    serviceMap_[serviceId] = service;
  pthread_rwlock_unlock(&serviceMapRwLock_);
}

更新:

正如 MarkB 在评论中提到的,如果我将 pthread_rwlockattr_getkind_np() 设置为优先考虑作者,并且有一个作家被阻止等待,那么观察到的行为将是有意义的。但是,我使用默认值,我相信这是为了优先考虑读者。我刚刚验证没有线程被阻止等待写入。我还按照@Shahbaz在注释中的建议更新代码并获得相同的结果。

您只是观察到了获取锁所涉及的固有性能问题。这需要一些时间,而你只是碰巧在中间抓住了这些线程。当受锁保护的操作持续时间非常短时尤其如此。

编辑:读取源代码,glibc使用lll_lock来保护其自己的pthread库数据结构中的关键部分。pthread_rwlock_rdlock检查多个标志并递增计数器,因此它在持有锁的同时执行这些操作。完成这些操作后,锁将释放 lll_unlock .

为了演示,我实施了一个简短的例程,在获得rwlock后睡觉。主线程等待它们完成。但在等待之前,它会打印线程实现的并发性。

enum { CONC = 50 };
pthread_rwlock_t rwlock;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
unsigned count;
void *routine(void *arg)
{
    int *fds = static_cast<int *>(arg);
    pthread_rwlock_rdlock(&rwlock);
    pthread_mutex_lock(&mutex);
    ++count;
    if (count == CONC) pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
    sleep(5);
    pthread_rwlock_unlock(&rwlock);
    pthread_t self = pthread_self();
    write(fds[1], &self, sizeof(self));
    return 0;
}

主线程等待计数器达到 50:

int main()
{
    int fds[2];
    pipe(fds);
    pthread_rwlock_init(&rwlock, 0);
    pthread_mutex_lock(&mutex);
    for (int i = 0; i < CONC; i++) {
        pthread_t tid;
        pthread_create(&tid, NULL, routine, fds);
    }
    while (count < CONC) pthread_cond_wait(&cond, &mutex);
    pthread_mutex_unlock(&mutex);
    std::cout << "count: " << count << std::endl;
    for (int i = 0; i < CONC; i++) {
        pthread_t tid;
        read(fds[0], &tid, sizeof(tid));
        pthread_join(tid, 0);
    }
    pthread_rwlock_destroy(&rwlock);
    pthread_exit(0);
}

编辑:使用 C++11 线程支持简化了示例:

enum { CONC = 1000 };
std::vector<std::thread> threads;
pthread_rwlock_t rwlock;
std::mutex mutex;
std::condition_variable cond;
unsigned count;
void *routine(int self)
{
    pthread_rwlock_rdlock(&rwlock);
    { std::unique_lock<std::mutex> lk(mutex);
      if (++count == CONC) cond.notify_one(); }
    sleep(5);
    pthread_rwlock_unlock(&rwlock);
    return 0;
}
int main()
{
    pthread_rwlock_init(&rwlock, 0);
    { std::unique_lock<std::mutex> lk(mutex);
      for (int i = 0; i < CONC; i++) {
          threads.push_back(std::thread(routine, i));
      }
      cond.wait(lk, [](){return count == CONC;}); }
    std::cout << "count: " << count << std::endl;
    for (int i = 0; i < CONC; i++) {
        threads[i].join();
    }
    pthread_rwlock_destroy(&rwlock);
    pthread_exit(0);
}

作为旁注,上面发布的代码已损坏。您无法从 rw_lock 部分访问 iter->second,因为一旦您解锁rw_lock,编写器就可以删除映射中的任何元素,从而使它上的任何迭代器无效。

我知道你没有这样做,因为你在程序执行开始之后没有写任何东西,但仍然值得一提。

另外,作为旁注,由于您描述的行为看起来像是序列化的(作者在开始时写入地图,然后读者从现在开始阅读"只读"地图),您可能应该这样写:

int writerDoneWithMap = 0;
// pthread_cond & mutex init here
// The writer write to the map here 
// Then they signal the reader that they are done with it
while (!__sync_bool_compare_and_swap(&writerDoneWithMap, 1, writerDoneWithMap));
pthread_cond_broadcast here

// The readers will then simply do this:
while (!writerDoneWithMap) 
{
     // pthread_cond_wait here
}
// Read the data without locks.

如果编写器已完成地图填充,上面的代码可以避免对读取器进行任何锁定,如果他们还没有,那么您将诉诸典型的pthread_cond/互斥技术。上面的代码是正确的,当且仅当你有作家 THEN 阅读器(但这就是你所说的),否则它会失败。