这是实施过程间生产者消费者对流程崩溃的正确和安全的实施

Is this implementation of inter-process Producer Consumer correct and safe against process crash?

本文关键字:崩溃 安全 程崩溃 生产者 施过程 过程 消费者 对流      更新时间:2023-10-16

我正在Windows上两个进程之间开发一个消息队列。我想支持多个生产商和一个消费者。队列不得因其中一个进程的崩溃而破坏,即,其他过程不受崩溃的影响,当崩溃的过程重新启动时,它可以继续通信(具有新的,更新的状态)。p>假设这些片段中的事件对象是命名Windows自动重置事件的包装器,而Mutex对象是名为Windows Mutex的包装器(我使用C 非Interprocess Mutex类型作为占位符)。

)。

这是生产者方面:

void producer()
{
    for (;;)
    {
        // Multiple producers modify _writeOffset so must be given exclusive access
        unique_lock<mutex> excludeProducers(_producerMutex);
        // A snapshot of the readOffset is sufficient because we use _notFullEvent.
        long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
        // while is required because _notFullEvent.Wait might return because it was abandoned
        while (IsFull(readOffset, _writeOffset))
        {
            _notFullEvent.Wait(INFINITE);
            readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
        }
        // use a mutex to protect the resource from the consumer
        {
            unique_lock<mutex> lockResource(_resourceMutex);
            produce(_writeOffset);
        }
        // update the state
        InterlockedExchange(&_writeOffset, IncrementOffset(_writeOffset));
        _notEmptyEvent.Set();
    }
}

同样,这是消费者:

void consumer()
{
    for (;;)
    {
        long writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
        while (IsEmpty(_readOffset, writeOffset))
        {
            _notEmptyEvent.Wait(INFINITE);
            writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
        }
        {
            unique_lock<mutex> lockResource(_resourceMutex);
            consume(_readOffset);
        }
        InterlockedExchange(&_readOffset, IncrementOffset(_readOffset));
        _notFullEvent.Set();
    }
}

此实施中有任何种族条件吗?确实可以根据需要保护崩溃吗?

P.S。如果队列的状态受到保护,队列将符合要求。如果崩溃发生在过程中(i)或消耗(i)这些插槽的内容可能会被损坏,而其他手段将用于检测甚至可以纠正这些损坏的损坏。这些手段不在这个问题的范围内。

在此实现中确实存在种族条件。谢谢@vtt指出。

@vtt写道,如果生产者在_notemptyevent.set()之前就死了。那么消费者可能会永远卡住。

好吧,也许不是永远,因为当恢复生产者时,它将添加一个项目并再次唤醒消费者。但是国家确实被腐败了。例如,例如,这发生了queue_size时间,生产者将看到队列已满(isfull()将返回true),并且会等待。这是一个僵局。

我正在考虑以下解决方案,并在生产者方面添加注释的代码。应该在消费者方面增加类似的补充:

void producer()
{
    for (;;)
    {
        // Multiple producers modify _writeOffset so must be given exclusive access
        unique_lock<mutex> excludeProducers(_producerMutex);
        // A snapshot of the readOffset is sufficient because we use _notFullEvent.
        long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
        // ====================== Added begin
        if (!IsEmpty(readOffset, _writeOffset))
        {
            _notEmptyEvent.Set();
        }
        // ======================= end Added
        // while is required because _notFullEvent.Wait might return because it was abandoned
        while (IsFull(readOffset, _writeOffset))

这将导致生产者在有机会运行的机会时唤醒消费者,如果确实没有排空的话。这看起来更像是基于条件变量的解决方案,这本来是我的首选模式,这不是因为不幸的是,在Windows上,条件变量未命名,因此在过程之间无法共享。

如果此解决方案的投票正确,我将使用完整的代码编辑原始帖子。

,因此在问题中发布的代码存在一些问题:

  • 正如已经指出的那样,有边缘的比赛条件;如果队列要变得完整,并且所有活跃的生产商在设置_notFullEvent之前都坠毁,则您的代码将陷入僵局。您的答案通过在循环开始时而不是结束时正确解决该问题。

  • 你是锁定的;如果只有一个生产者一次生产,那么拥有多个生产商通常几乎没有意义。这禁止直接写入共享内存,您需要本地缓存。(并非不可能让多个生产者直接写入共享内存中的不同插槽,但是这会使稳健性更难实现。)

  • 同样,您通常需要能够同时生产和消费,并且您的代码不允许这样做。

这是我如何做的,使用单个静音(由消费者和生产者线程共享)和两个自动重置事件对象。

void consumer(void)
{
    claim_mutex();
    for (;;)
    {
        if (!IsFull(*read_offset, *write_offset))
        {
            // Queue is not full, make sure at least one producer is awake
            SetEvent(notFullEvent);
        }
        while (IsEmpty(*read_offset, *write_offset))
        {
            // Queue is empty, wait for producer to add a message
            release_mutex();
            WaitForSingleObject(notEmptyEvent, INFINITE);
            claim_mutex();
        }
        release_mutex();
        consume(*read_offset);
        claim_mutex();
        *read_offset = IncrementOffset(*read_offset);
    }
}
void producer(void)
{
    claim_mutex();
    for (;;)
    {
        if (!IsEmpty(*read_offset, *write_offset))
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(notEmptyEvent);
        }
        if (!IsFull(*read_offset, *write_offset))
        {
            // Queue is not full, make sure at least one other producer is awake
            SetEvent(notFullEvent);
        }
        release_mutex();
        produce_in_local_cache();
        claim_mutex();
        while (IsFull(*read_offset, *write_offset))
        {
            // Queue is full, wait for consumer to remove a message
            release_mutex();
            WaitForSingleObject(notFullEvent, INFINITE);
            claim_mutex();
        }
        copy_from_local_cache_to_shared_memory(*write_offset);
        *write_offset = IncrementOffset(*write_offset);
    }
}