互斥条件信号环路是如何工作的

How does mutex condition signaling loop works?

本文关键字:工作 何工作 条件 信号 环路      更新时间:2023-10-16

我将做一个假设的场景,只是为了清楚我需要知道什么。

假设我有一个经常更新的文件。

我需要通过几个不同的线程读取和解析这个文件。

每次重写这个文件时,我都会唤醒一个条件互斥锁,这样其他线程就可以做任何他们想做的事情了。

我的问题是:

如果我有10000个线程,第一个线程的执行将阻塞其他9999个线程的执行?

它是并行工作还是同步工作?

这篇文章自第一次发布以来已经被编辑,以解决下面Jonathan Wakely的评论,并更好地区分condition_variable,条件(在第一个版本中都称为条件)以及等待函数的操作方式。然而,同样重要的是,从现代c++,using std::future,std::threadstd::packaged_task中探索更好的方法,并讨论了有关缓冲和合理的线程数。

首先,10,000个线程是很多线程。除了性能最高的计算机之外,线程调度器对所有计算机来说都是负担沉重的。Windows操作系统下典型的四核工作站会遇到困难。这是某种任务队列调度有序的标志,典型的服务器使用可能10个线程接受数千个连接,每个线程服务1,000个连接。对于这个问题来说,线程的数量并不重要,但是在这样大量的任务中,10,000个线程是不切实际的。

要处理同步,互斥锁本身实际上并不做你所建议的事情。你所描述的概念是一种事件对象,可能是一个自动重置事件,它本身是一个更高层次的概念。Windows将它们作为其API的一部分,但它们是在Linux(通常用于可移植软件)上由两个基本组件组成的,一个互斥锁和一个条件变量。它们一起创建了自动重置事件,以及其他类型的"可等待事件"(Windows称之为"可等待事件")。在c++中,这些由std::mutexstd::condition_variable提供。

互斥本身仅仅提供对公共资源的锁定控制。在这种情况下,我们不是考虑客户端和服务器(或工作人员和执行人员),而是考虑对等体之间对单个资源的竞争,该资源一次只能由一个参与者(线程)访问。互斥锁可以阻塞执行,但它不会根据外部信号释放。如果另一个线程锁定了互斥锁,互斥锁就会阻塞,并无限期地等待,直到锁的所有者释放它。这不是你在问题中呈现的场景。

在您的场景中,有许多"客户端"和一个"服务器"线程。服务器负责发出信号,表明某事已准备好进行处理。在这种设计中,所有其他线程都是客户端(线程本身并没有使它们成为客户端,我们只是通过它们执行的函数将它们视为客户端)。在某些讨论中,客户端被称为工作线程。

客户端使用互斥锁/条件变量对来等待信号。这种构造通常采用锁定互斥锁的形式,然后使用该互斥锁等待条件变量。当线程在条件变量上进入wait时,互斥锁被解锁。等待工作完成的所有客户端线程都要重复此操作。一个典型的客户端等待示例是:

std::mutex m;
std::condition_variable cv;
void client_thread()
{
// Wait until server signals data is ready
std::unique_lock<std::mutex> lk(m);  // lock the mutex
cv.wait(lk);                         // wait on cv
// do the work
}

这是显示互斥锁/条件变量一起使用的伪代码。std::condition_variable有两个等待函数的重载,这是最简单的一个。这样做的目的是使线程阻塞,进入空闲状态,直到发出condition_variable信号。这并不是一个完整的例子,只是想指出这两个对象可以一起使用。

Johnathan Wakely下面的评论是基于wait不是无限期的这一事实;不能保证呼叫解除阻塞的原因是由于信号。文档将此称为"虚假唤醒",偶尔会因为操作系统调度的复杂原因而发生。Johnathan提出的观点是,使用这对的代码必须是安全的,即使唤醒不是因为condition_variable被发出信号。

在使用条件变量的术语中,这被称为条件(而不是condition_variable)。条件是应用程序定义的概念,通常在文献中表示为布尔值,并且通常是检查bool值、整数(有时是原子类型)或调用返回bool值的函数的结果。有时,应用程序定义的构成真实条件的概念更为复杂,但条件的总体效果是确定线程一旦被唤醒,是应该继续处理,还是应该简单地重复等待。

满足这个要求的一种方法是std::condition_variable::wait的第二个版本。它们被声明为:

void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

Johnathan的观点是坚持使用第二个版本。但是,文档描述(有两个重载的事实表明)谓词是可选的。Predicate是某种函子,通常是lambda表达式,如果等待应该解除阻塞,则解析为true,如果等待应该继续等待,则解析为false,并且在锁定下求值。Predicate与condition同义,因为Predicate是指示wait是否应该解除阻塞的真或假的一种方式。

虽然谓词实际上是可选的,但是在阻塞直到接收到信号时,"等待"的概念并不完美,如果使用第一个版本,这是因为应用程序的构造使得虚假唤醒没有后果(实际上,这是设计的一部分)。

Jonathan的引用表明谓词是在锁下求值的,但在范式的广义形式中,这通常是不可行的。Std::condition_variable必须等待锁定的Std::互斥锁,这可能是在保护定义条件的变量,但有时这是不可能的。有时,条件非常复杂、外部或微不足道,以至于std::互斥锁与条件没有关联。

要了解在建议的解决方案上下文中如何工作,假设有10个客户端线程等待服务器发出要完成工作的信号,并且该工作作为虚函子的容器在队列中被调度。虚函子可能是这样的:

struct VFunc
{
virtual void operator()(){}
};
template <typename T>
struct VFunctor
{
// Something referring to T, possible std::function
virtual void operator()(){...call the std::function...}
};
typedef std::deque< VFunc > Queue;

上面的伪代码暗示了一个带有虚拟操作符()的典型函子,返回void且不接受参数,有时被称为"盲调用"。建议它的关键是Queue可以拥有这些集合而不知道正在调用什么,并且Queue中的任何VFunctors都可以引用std::函数可能调用的任何对象,包括其他对象的成员函数,lambda,简单函数等。然而,如果只有一个函数签名需要调用,可能是:

typedef std::deque< std::function<void(void)>> Queue

是充分的。

对于这两种情况,只有在Queue中有条目时才会执行工作。

要等待,可以使用这样的类:

class AutoResetEvent
{
private:
std::mutex  m;
std::condition_variable cv;
bool signalled;
bool signalled_all;
unsigned int wcount;
public:
AutoResetEvent() : wcount( 0 ), signalled(false), signalled_all(false) {}
void SignalAll() { std::unique_lock<std::mutex> l(m);
signalled = true;
signalled_all = true;
cv.notify_all();
}
void SignalOne() { std::unique_lock<std::mutex> l(m);
signalled = true;
cv.notify_one();
}
void Wait()      { std::unique_lock<std::mutex> l(m);
++wcount;
while( !signalled )
{
cv.wait(l);
}
--wcount;
if ( signalled_all )
{ if ( wcount == 0 ) 
{ signalled = false; 
signalled_all = false; 
}
}
else { signalled = false; 
}
}
};

这是一个可等待对象的标准重置事件类型的伪代码,与WindowsCreateEventWaitForSingleObjectAPI兼容,功能基本相同。

所有客户端线程结束于cv。wait(这在Windows中可以有一个超时,使用Windows API,但没有std::condition_variable)。在某个时刻,服务器通过调用Signalxxx向事件发出信号。您的场景建议使用SignalAll()

如果调用notify_one,则释放一个等待线程,其他线程保持休眠状态。的notify_all被调用,那么所有等待该条件的线程都被释放去做工作。

下面可能是使用AutoResetEvent的一个例子:

AutoResetEvent evt;   // probably not a global
void client()
{
while( !Shutdown ) // assuming some bool to indicate shutdown
{
if ( IsWorkPending() ) DoWork();
evt.Wait();
}
}
void server()
{
// gather data
evt.SignalAll();
}

使用IsWorkPending()满足条件的概念,正如Jonathan Wakely指出的那样。在指示关闭之前,如果工作处于挂起状态,则该循环将处理工作,否则将等待信号。虚假的唤醒没有负面影响。IsWorkPending()将检查Queue.size(),可能通过使用std::互斥锁或其他同步机制保护Queue的对象。如果工作正在挂起,DoWork()将依次从Queue中弹出条目,直到Queue为空。返回后,循环将再次等待信号。

通过以上讨论,互斥锁和condition_variable的组合与一种旧的思维方式有关,在c++ 11/c++ 14时代已经过时了。除非您在使用兼容的编译器时遇到麻烦,否则最好研究std::promise、std::future以及std::async或std::thread与std::packaged_task的使用。例如,使用future、promise、packaged_task和thread可以完全取代上面的讨论。例如:

// a function for threads to execute
int func()
{
// do some work, return status as result
return result;
}

假设func完成了您对文件的工作,那么这些类型定义适用:

typedef std::packaged_task< int() >  func_task;
typedef std::future< int >           f_int;
typedef std::shared_ptr< f_int >     f_int_ptr;
typedef std::vector< f_int_ptr >     f_int_vec;

std::future不能被复制,所以为了便于在vector容器中使用,它使用shared_ptr存储,但有多种解决方案。

下一个例子是对10个工作线程使用这些

void executive_function()
{
// a vector of future pointers
f_int_vec future_list;
// start some threads
for( int n=0; n < 10; ++n )
{
// a packaged_task calling func
func_task  ft( &func );
// get a future from the task as a shared_ptr
f_int_ptr future_ptr( new f_int( ft.get_future() ) );
// store the task for later use
future_list.push_back( future_ptr );
// launch a thread to call task
std::thread( std::move( ft )).detach();
}
// at this point, 10 threads are running
for( auto &d : future_list )
{ 
// for each future pointer, wait (block if required)
// for each thread's func to return
d->wait(); 
// get the result of the func return value
int res = d->get();
}
}

这里的点实际上是在最后一个range-for循环中。vector存储packaged_tasks提供的未来。这些任务用于启动线程,未来是同步执行的关键。一旦所有线程都在运行,每个线程都通过调用future的wait函数来"等待",之后可以获得该函数的返回值。没有涉及互斥锁或条件变量(我们知道的)。

这就引出了并行处理文件的主题,无论您如何启动多个线程。如果有一台机器可以处理10,000个线程,那么如果每个线程都是一个微不足道的面向文件的操作,那么将有相当多的RAM资源用于文件处理,所有这些都是相互复制的。根据所选择的API,每个读取操作都有相应的缓冲区。

假设文件大小为10mb,有10,000个线程开始对其进行操作,其中每个线程使用4kbyte缓冲区进行处理。综合起来,这表明需要40mb的缓冲区来处理10mb的文件。简单地将文件读取到RAM中,并提供对RAM中所有线程的只读访问,这样会减少浪费。

如果磁盘缓存不能跟上,那么多个任务在不同时间从文件的不同部分读取可能会导致标准硬盘的严重抖动(对于闪存源不是这样),这一事实使这个概念进一步复杂化。但更重要的是,10,000个线程都调用系统API来读取文件,每个线程都有相当大的开销。

如果源材料是完全读入RAM的候选材料,则线程可以专注于RAM而不是文件,从而减轻了开销,提高了性能。线程可以在没有锁的情况下共享对内容的读访问。

如果源文件太大而不能完全读取到RAM中,那么最好还是以源文件的块为单位读取,让线程处理共享内存资源中的那部分,然后依次移动到下一个块。