C++上的手动重置事件(来自 C#)实现:如何避免争用条件

ManualResetEvent (from C#) implementation on C++: how to avoid race condition

本文关键字:实现 争用条件 何避免 来自 事件 C++      更新时间:2023-10-16

我想了解有关多线程编程的更多信息,我认为尝试在C++中实现一些 C# 同步原语是一个很好的练习。
我从ManualResetEvent开始,这就是我到目前为止所拥有的:

class manual_reset_event
{
public:
void wait_one()
{
if (cv_flag_.load() == false)
{
thread_local std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
cond_var_.wait(lock, [this]() { return cv_flag_.load(); });
}
}
void set()
{
cv_flag_.store(true);
cond_var_.notify_all();
}
void reset()
{
cv_flag_.store(false);
}
private:
std::condition_variable cond_var_;
std::atomic<bool> cv_flag_;
};

但是,这里有一个竞争条件:你可以在一个线程上调用 wait_one((,通过 if (cv_flag( 检查,然后从另一个线程调用 set。这将导致 wait_one(( 等待,即使现在cv_flag_为 true。
我可以通过在wait_one上使用锁定、设置和重置来解决此问题。
我想我也可以通过在 wait_one(( 上立即在 cond_var_.wait(( 之后调用 cond_var_.notify_all(( 来解决这个问题,但我认为这不是一个好主意(尽管也许我错了(。
我想知道是否可以在这里做其他事情(甚至可能是不使用conditional_variables的完全不同的方法(来避免这种竞争条件。

在大多数情况下,最简单的方法是使用匯点体对对象内部使用顺序,而忽略原子学。只需确保对数据的所有访问都受到锁的保护。

如果您只存储一个位,那么在执行set和之后快速reset时可能会导致唤醒丢失,因为等待的线程将仅在reset完成后安排。为了解决这个问题,我将使用计数器。计数器的最低位是其"打开"状态。此状态的每次更改都以增量实现。我使用64位计数器以防万一。32 位极不可能不够,即使它可能在长时间运行的程序中环绕。

class manual_reset_event
{
public:
void wait_one()
{
std::unique_lock<std::mutex> lock(mutex_);
uint64_t initial_value = value_;
if(initial_value & 1)
{
return;
}
while (value_ == initial_value)
{
signalled_.wait(lock);
}
}
void set()
{
std::unique_lock<std::mutex> lock(mutex);
if((value_ & 1) == 0)
{
value_++;
lock.release(); // optimization
signalled_.notify_all();
}
}
void reset()
{
std::unique_lock<std::mutex> lock(mutex);
if(value_ & 1)
{
value_++;
}
}
private:
std::mutex mutex_;
std::condition_variable signalled_;
uint64_t value_;
};

如果您坚持避免不必要的锁使用,则可以使用原子学,但解决方案有些棘手,因为需要考虑更多的排序。

class manual_reset_event
{
public:
void wait_one()
{
uint64_t initial_value = value_;
if(initial_value & 1)
{
return;
}
std::unique_lock<std::mutex> lock(mutex_);
while (value_ == initial_value)
{ // !
signalled_.wait(lock);
}
}
void set()
{
uint64_t initial_value = value_;
if(initial_value & 1)
{
return;
}
std::unique_lock<std::mutex> lock(mutex_);
// Still need lock to prevent lost wakeup if atomic change happens when
// other thread is on "// !" line.
if(value.compare_exchange_strong(initial_value, initial_value + 1)) {
// One strong attempt is enough. If it fails than someone else must have
// succeeded. It's as if these two set() operations happened at the same time.
lock.release();
signalled_.notify_all();
}
}
void reset()
{
uint64_t initial_value = value_;
if((initial_value & 1) == 0)
{
return;
}
std::unique_lock<std::mutex> lock(mutex_);
value.compare_exchange_strong(initial_value, initial_value + 1);
}
private:
std::mutex mutex_;
std::condition_variable signalled_;
std::atomic<uint64_t> value_;
};

一种可能的实现 - 保留等待事件线程的列表。 对于保护状态manual_reset_event可以使用std::mutex。 当线程开始等待时 - 他检查事件状态,如果没有信号 - 在列表中插入自我"等待块"。 这是在受公共对象互斥锁保护的"关键部分"内完成的。 然后,如果我们需要等待事件(当且仅当我们将 self 插入等待列表时( - 开始等待等待块。在此之前,从"关键部分"退出非常重要,甚至在等待结束后甚至不会临时获取它。从另一端,设置事件的线程 - 获取等待线程的列表,然后通知所有线程(从关键部分退出后(或可能只通知单个,首先开始等待。因此,有了这个,我们可以实现手动重新发送事件逻辑(当所有等待的线程同时唤醒时(或自动重置事件逻辑 - 当只有单个线程被唤醒时,事件将再次重置(实际上根本没有设置为信号状态。 仅当不再等待线程时 - 事件发出信号。

class manual_reset_event : std::mutex
{
struct WaitBlock : public std::condition_variable, std::mutex  
{
WaitBlock(WaitBlock* next) : next(next), signaled(false) {}
WaitBlock* next;
volatile bool signaled;
void Wait()
{
// synchronization point with Wake()
std::unique_lock<std::mutex> lock(*this);
while (!signaled)
{
// notify_one() yet not called
wait(lock);
}
}
void Wake()
{
{
// synchronization point with Wait()
std::lock_guard<std::mutex> lock(*this);
signaled = true;
}
notify_one();
}
};
WaitBlock* _head;
volatile bool _signaled;
public:
manual_reset_event(bool signaled = false) : _signaled(signaled), _head(0) { }
void wait()
{
lock();//++ protect object state
WaitBlock wb(_head);
bool inserted = false;
if (!_signaled)
{
_head = &wb;
inserted = true;
}
unlock();//-- protect object state
if (inserted)
{
wb.Wait();
}
}
// manual reset logic
void set_all() 
{
WaitBlock* last, *head = 0;
lock();//++ protect object state
head = _head, _signaled = true;
unlock();//-- protect object state
while (last = head)
{
head = head->next;
last->Wake();
}
}
// auto reset logic - only one thread will be signaled, event auto reset
void set_single()  
{
WaitBlock* last = 0;
lock();//++ protect object state
if (!_signaled)
{
if (last = _head)
{
// wake first waiting thread
WaitBlock* prev = 0, *pwb;
while (pwb = last->next)
{
prev = last, last = pwb;
}
(prev ? prev->next : _head) = 0;
}
else
{
// nobody wait
_signaled = true;
}
}
unlock();//-- protect object state
if (last)
{
last->Wake();
}
}
void reset()
{
_signaled = false;
}
};