取消 tbb::concurrent_bounded_queue 上所有等待推送或弹出的正确方法

Proper way to cancel all waiting pushes or pops on tbb::concurrent_bounded_queue?

本文关键字:方法 bounded concurrent tbb queue 取消 等待      更新时间:2023-10-16

我正在使用 tbb::concurrent_bounded_queue 在 N 个生产者/消费者线程之间以缓冲方式进行通信。

这样:

//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue; 
ThreadA()
{ 
  while(running_A)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    g_queue.push(outItem);
  }
}
ThreadB()
{
  while(running_B)
  { 
    MyItemType inItem;
    g_queue.pop(inItem);
    //do some stuff with the contents of inItem
  }
}

我刚才展示的例子只有一个生产者和一个消费者,实际上两者可以有很多。应用户要求,所有处理应尽快停止。

因此,如果线程 A 推送了一些 B 未消耗的项目,我希望对 B 的 pop 的下一次调用立即以某种方式返回,表明队列已关闭,以便我可以脱离循环。队列中尚未处理的任何项目都可以丢弃。

如果线程 A 正在等待推送,我希望它立即返回,可能表明队列已关闭

我读到了一些可能做到这一点的方法是在队列中推送一个特殊项目(停止项目、停止和停止指示器),然后当弹出该项目时,我可以知道事情正在停止。这样做的问题是它是一个 FIFO 队列,在停止项目之前的任何项目都需要弹出,然后才能到达我的停止项目。我想避免。

我看到有一个 q.abort 方法,它会导致等待推送或弹出返回抛出异常(仅当我设置 tbb 预处理器定义TBB_USE_EXCEPTIONS时才有效)。不认为必须为所有 tbb 打开例外是执行此操作的正确方法吗?

我还认为我可以在添加停止项之前清除队列,但理论上另一个生产者可以进入我当前的消费者 q.clear() 和 q.push(stopItem) 之间,并推送一个非停止项,这不是我想要的。因为我必须等待该非停止项被处理,然后才能到达推送的停止项。

处理这个问题的正确方法是什么?我以前有自己的队列实现,它有一个关闭信号,会导致所有弹出和推送返回(不仅填写引用项参数,而是从函数 all 返回代码),并带有指示队列已关闭的特定值代码。不确定如何对 tbb 的并发有界队列具有类似的功能。

有什么建议吗?

谢谢-莱恩

如果我理解正确,你会问如何在不处理剩余元素的情况下随时终止生产者和消费者。如果是这样,您需要一个单独的通知机制,例如标志:

tbb::atomic<bool> is_finished = false;

然后,您可以读取循环中的标志,以便在未被阻止时终止它们:

ThreadA()
{ 
  while(running_A && !is_finished)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    try { g_queue.push(outItem); } catch(...) {}
  }
}
ThreadB()
{
  while(running_B && !is_finished)
  { 
    MyItemType inItem;
    try { g_queue.pop(inItem); } catch(...) {}
    if(is_finished) // check before processing
        break;
    //do some stuff with the contents of inItem
  }
}
Terminate()
{
    is_finished = true;
    g_queue.abort();
}

如果线程被阻塞,abort()看起来是一个很好的方法,除非它需要快速和重复地完成。 TBB_USE_EXCEPTIONS在 TBB 二进制文件中默认处于打开状态,因此,您实际上无需在 TBB 调度程序和conurrent_bounded_queue内部中为其付费。默认情况下,如果使用启用的异常(通常为 true)编译应用程序,则在 TBB 标头中打开TBB_USE_EXCEPTIONS。因此,您可能只为Push&pop周围的尝试/捕获块付费。

除了例外之外,还有其他选择。最直接的方法是通过弹出一些项目来解锁生产者并推送虚拟项目来解锁消费者来解锁线程。例如:

Terminate()
{
    is_finished = true;
    MyItemType dummyItem;
    while(g_queue.size() < 0) // consumers are blocked
        g_queue.try_push(dummyItem);
    while(g_queue.size() >= g_queue.capacity() ) // producers are blocked
        g_queue.try_pop(dummyItem);
}