在生产者完成时通知消费者的优雅方式

Elegant ways to notify consumer when producer is done?

本文关键字:方式 消费者 生产者 完成时 通知      更新时间:2023-10-16

我正在实现一个具有最少函数的concurrent_blocking_queue

//a thin wrapper over std::queue
template<typename T>
class concurrent_blocking_queue
{
    std::queue<T> m_internal_queue;
     //...
 public:
     void add(T const & item);
     T&   remove();
     bool empty();
};

我打算将其用于生产者 - 消费者问题(我猜,这是使用此类数据结构的地方? 但我被困在一个问题上,那就是:

生产者完成后如何优雅地通知消费者?完成后,生产者将如何通知队列?通过调用特定成员函数,例如done()?从队列(即从remove函数(抛出异常是个好主意吗?

我遇到过很多例子,但都有无限循环,就好像生产者将永远生产项目一样。没有人讨论停止条件的问题,甚至没有讨论维基文章。

我过去只是简单地介绍了一个虚拟的"完成"产品。因此,如果生产者可以创造A型和B型的"产品",我发明了"完成"型。当消费者遇到"完成"类型的产品时,它知道不再需要进一步处理。

确实,排队一条特殊的"我们完成了"消息是很常见的; 但是我认为 OP 最初对带外指标的渴望是合理的。看看人们正在考虑设置带内完成消息的复杂性!代理类型,模板;好悲伤。我会说done()方法更简单,更容易,它使常见情况(我们还没有完成(更快,更干净。

我同意kids_fox的观点,即最好使用在队列完成时返回错误代码的try_remove,但这是风格和 YMMV。

编辑:实施队列的奖励积分,该队列跟踪有多少生产者处于多生产者的情况,并在所有生产者都认输的情况下提出已完成的例外;-(不会对带内消息这样做!

我的队列通常使用指针(std::auto_ptr在接口,以明确指示发件人可能不再访问指针(;在大多数情况下,排队的对象是多态的,因此无论如何,动态分配和引用语义都是必需的。否则,添加"结束文件"标志到队列。 您需要在生产者端(close?(来设置它(使用完全相同的锁定写入队列时的基元(,以及删除中的循环函数必须等待某些东西在那里,或者队列在那里闭。 当然,您需要返回一个Fallible值,以便读者可以知道读取是否成功。 另外,不要忘记在这种情况下,您需要一个notify_all来确保所有等待条件的进程被唤醒。

顺便说一句:我不太明白你的界面是如何实现的。 什么作用remove返回的T&所指。 基本上,remove必须是像这样:

Fallible<T>
MessageQueue<T>::receive()
{
    ScopedLock l( myMutex );
    while ( myQueue.empty() && ! myIsDone )
        myCondition.wait( myMutex );
    Fallible<T> results;
    if ( !myQueue.empty() ) {
        results.validate( myQueue.top() );
        myQueue.pop();
    }
    return results;
}

即使没有myIsDone条件,您也必须将值读入局部变量在将其从队列中删除之前,并且无法返回对局部变量的引用。

其余的:

void
MessageQueue<T>::send( T const& newValue )
{
    ScopedLock l( myMutex );
    myQueue.push( newValue );
    myCondition.notify_all();
}
void
MessageQueue<T>::close()
{
    ScopedLock l( myMutex );
    myIsDone = true;
    myCondition.notify_all();
}
">

停止"不经常被讨论,因为它通常永远不会被讨论过。在需要的情况下,使用更高级别的 P-C 协议本身排队毒丸通常与在队列本身中构建额外功能一样容易和灵活。

如果你真的想这样做,你确实可以设置一个标志,使每个消费者"立即"或每当它回到队列时引发异常,但存在问题。 您是否需要"完成"方法是同步的,即。您是否希望所有消费者在"完成"返回时消失,还是异步返回,即。当所有其他使用者都消失时,最后一个使用者线程调用事件参数?

您打算如何安排那些正在等待醒来的消费者? 有多少人在等待,有多少人在忙,但当他们完成工作后会回到队列中? 如果一个或多个消费者卡在阻塞调用上怎么办(也许他们可以被解锁,但这需要来自另一个线程的调用 - 你将如何做到这一点(?

消费者将如何通知他们已经处理了他们的异常并且即将死亡? "即将死亡"就足够了,还是需要等待线程句柄? 如果必须等待线程句柄,那么等待将执行什么操作 - 请求队列关闭的线程或最后一个要通知的使用者线程?

哦,是的 - 为了安全起见,您应该安排在处于"关闭"状态时与对象一起出现的生产者线程排队以引发异常。

我提出这些问题是因为我已经做过一次,很久以前。 最终,一切都奏效了。排队的对象都必须在其继承链中插入"QueuedItem"(以便可以向队列公开作业取消方法(,并且队列必须保留由线程,但尚未处理。

一段时间后,我停止使用该类,转而使用没有特殊关闭机制的简单 P-C 队列。