这是实施过程间生产者消费者对流程崩溃的正确和安全的实施
Is this implementation of inter-process Producer Consumer correct and safe against process crash?
我正在Windows上两个进程之间开发一个消息队列。我想支持多个生产商和一个消费者。队列不得因其中一个进程的崩溃而破坏,即,其他过程不受崩溃的影响,当崩溃的过程重新启动时,它可以继续通信(具有新的,更新的状态)。p>假设这些片段中的事件对象是命名Windows自动重置事件的包装器,而Mutex对象是名为Windows Mutex的包装器(我使用C 非Interprocess Mutex类型作为占位符)。
)。这是生产者方面:
void producer()
{
for (;;)
{
// Multiple producers modify _writeOffset so must be given exclusive access
unique_lock<mutex> excludeProducers(_producerMutex);
// A snapshot of the readOffset is sufficient because we use _notFullEvent.
long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
// while is required because _notFullEvent.Wait might return because it was abandoned
while (IsFull(readOffset, _writeOffset))
{
_notFullEvent.Wait(INFINITE);
readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
}
// use a mutex to protect the resource from the consumer
{
unique_lock<mutex> lockResource(_resourceMutex);
produce(_writeOffset);
}
// update the state
InterlockedExchange(&_writeOffset, IncrementOffset(_writeOffset));
_notEmptyEvent.Set();
}
}
同样,这是消费者:
void consumer()
{
for (;;)
{
long writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
while (IsEmpty(_readOffset, writeOffset))
{
_notEmptyEvent.Wait(INFINITE);
writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
}
{
unique_lock<mutex> lockResource(_resourceMutex);
consume(_readOffset);
}
InterlockedExchange(&_readOffset, IncrementOffset(_readOffset));
_notFullEvent.Set();
}
}
此实施中有任何种族条件吗?确实可以根据需要保护崩溃吗?
P.S。如果队列的状态受到保护,队列将符合要求。如果崩溃发生在过程中(i)或消耗(i)这些插槽的内容可能会被损坏,而其他手段将用于检测甚至可以纠正这些损坏的损坏。这些手段不在这个问题的范围内。
在此实现中确实存在种族条件。谢谢@vtt指出。
@vtt写道,如果生产者在_notemptyevent.set()之前就死了。那么消费者可能会永远卡住。
好吧,也许不是永远,因为当恢复生产者时,它将添加一个项目并再次唤醒消费者。但是国家确实被腐败了。例如,例如,这发生了queue_size时间,生产者将看到队列已满(isfull()将返回true),并且会等待。这是一个僵局。
我正在考虑以下解决方案,并在生产者方面添加注释的代码。应该在消费者方面增加类似的补充:
void producer()
{
for (;;)
{
// Multiple producers modify _writeOffset so must be given exclusive access
unique_lock<mutex> excludeProducers(_producerMutex);
// A snapshot of the readOffset is sufficient because we use _notFullEvent.
long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
// ====================== Added begin
if (!IsEmpty(readOffset, _writeOffset))
{
_notEmptyEvent.Set();
}
// ======================= end Added
// while is required because _notFullEvent.Wait might return because it was abandoned
while (IsFull(readOffset, _writeOffset))
这将导致生产者在有机会运行的机会时唤醒消费者,如果确实没有排空的话。这看起来更像是基于条件变量的解决方案,这本来是我的首选模式,这不是因为不幸的是,在Windows上,条件变量未命名,因此在过程之间无法共享。
如果此解决方案的投票正确,我将使用完整的代码编辑原始帖子。
,因此在问题中发布的代码存在一些问题:
-
正如已经指出的那样,有边缘的比赛条件;如果队列要变得完整,并且所有活跃的生产商在设置
_notFullEvent
之前都坠毁,则您的代码将陷入僵局。您的答案通过在循环开始时而不是结束时正确解决该问题。 -
你是锁定的;如果只有一个生产者一次生产,那么拥有多个生产商通常几乎没有意义。这禁止直接写入共享内存,您需要本地缓存。(并非不可能让多个生产者直接写入共享内存中的不同插槽,但是这会使稳健性更难实现。)
-
同样,您通常需要能够同时生产和消费,并且您的代码不允许这样做。
这是我如何做的,使用单个静音(由消费者和生产者线程共享)和两个自动重置事件对象。
void consumer(void)
{
claim_mutex();
for (;;)
{
if (!IsFull(*read_offset, *write_offset))
{
// Queue is not full, make sure at least one producer is awake
SetEvent(notFullEvent);
}
while (IsEmpty(*read_offset, *write_offset))
{
// Queue is empty, wait for producer to add a message
release_mutex();
WaitForSingleObject(notEmptyEvent, INFINITE);
claim_mutex();
}
release_mutex();
consume(*read_offset);
claim_mutex();
*read_offset = IncrementOffset(*read_offset);
}
}
void producer(void)
{
claim_mutex();
for (;;)
{
if (!IsEmpty(*read_offset, *write_offset))
{
// Queue is not empty, make sure consumer is awake
SetEvent(notEmptyEvent);
}
if (!IsFull(*read_offset, *write_offset))
{
// Queue is not full, make sure at least one other producer is awake
SetEvent(notFullEvent);
}
release_mutex();
produce_in_local_cache();
claim_mutex();
while (IsFull(*read_offset, *write_offset))
{
// Queue is full, wait for consumer to remove a message
release_mutex();
WaitForSingleObject(notFullEvent, INFINITE);
claim_mutex();
}
copy_from_local_cache_to_shared_memory(*write_offset);
*write_offset = IncrementOffset(*write_offset);
}
}
- 当回溯以零开始时,如何调试崩溃
- 从不同线程使用int64的不同字节安全吗
- 将数组作为参数传递给函数安全吗?作为第三方职能部门,可以探索他们想要的之外的其他元素
- 虚拟决赛作为安全
- 获取日期异步信号安全吗?如果在信号处理程序中使用,它会导致死锁吗
- 内联映射初始化的动态atexit析构函数崩溃
- 执行函数时导致崩溃的变量
- 如何将元素添加到数组的线程安全函数?
- 程序崩溃并显示"std::out_of_range"错误
- C++中的线程安全删除
- CoInitialize()在单独的线程上崩溃而不返回
- 通过网络、跨平台传递std::变体是否安全
- 使用调试/崩溃报告将应用程序部署到客户端
- 在std::thread中,joinable()然后join()线程安全吗
- 这段代码安全吗(为什么它没有崩溃)?
- 这是实施过程间生产者消费者对流程崩溃的正确和安全的实施
- 为什么使用static_cast运算符的不安全强制转换不会崩溃
- c++析构函数调用数组索引?在非线程安全的refcount对象上崩溃
- 测试答案是错的,对吗?它不会崩溃,因为我们删除NULL指针,这是安全的
- jpeg_write_scanlines和glTexImage2D线程安全。为什么不崩溃?