取消 tbb::concurrent_bounded_queue 上所有等待推送或弹出的正确方法
Proper way to cancel all waiting pushes or pops on tbb::concurrent_bounded_queue?
我正在使用 tbb::concurrent_bounded_queue 在 N 个生产者/消费者线程之间以缓冲方式进行通信。
这样:
//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue;
ThreadA()
{
while(running_A)
{
MyItemType outItem;
//do some stuff, resulting in filling outItem ...
g_queue.push(outItem);
}
}
ThreadB()
{
while(running_B)
{
MyItemType inItem;
g_queue.pop(inItem);
//do some stuff with the contents of inItem
}
}
我刚才展示的例子只有一个生产者和一个消费者,实际上两者可以有很多。应用户要求,所有处理应尽快停止。
因此,如果线程 A 推送了一些 B 未消耗的项目,我希望对 B 的 pop 的下一次调用立即以某种方式返回,表明队列已关闭,以便我可以脱离循环。队列中尚未处理的任何项目都可以丢弃。
如果线程 A 正在等待推送,我希望它立即返回,可能表明队列已关闭
。我读到了一些可能做到这一点的方法是在队列中推送一个特殊项目(停止项目、停止和停止指示器),然后当弹出该项目时,我可以知道事情正在停止。这样做的问题是它是一个 FIFO 队列,在停止项目之前的任何项目都需要弹出,然后才能到达我的停止项目。我想避免。
我看到有一个 q.abort 方法,它会导致等待推送或弹出返回抛出异常(仅当我设置 tbb 预处理器定义TBB_USE_EXCEPTIONS时才有效)。不认为必须为所有 tbb 打开例外是执行此操作的正确方法吗?
我还认为我可以在添加停止项之前清除队列,但理论上另一个生产者可以进入我当前的消费者 q.clear() 和 q.push(stopItem) 之间,并推送一个非停止项,这不是我想要的。因为我必须等待该非停止项被处理,然后才能到达推送的停止项。
处理这个问题的正确方法是什么?我以前有自己的队列实现,它有一个关闭信号,会导致所有弹出和推送返回(不仅填写引用项参数,而是从函数 all 返回代码),并带有指示队列已关闭的特定值代码。不确定如何对 tbb 的并发有界队列具有类似的功能。
有什么建议吗?
谢谢-莱恩
如果我理解正确,你会问如何在不处理剩余元素的情况下随时终止生产者和消费者。如果是这样,您需要一个单独的通知机制,例如标志:
tbb::atomic<bool> is_finished = false;
然后,您可以读取循环中的标志,以便在未被阻止时终止它们:
ThreadA()
{
while(running_A && !is_finished)
{
MyItemType outItem;
//do some stuff, resulting in filling outItem ...
try { g_queue.push(outItem); } catch(...) {}
}
}
ThreadB()
{
while(running_B && !is_finished)
{
MyItemType inItem;
try { g_queue.pop(inItem); } catch(...) {}
if(is_finished) // check before processing
break;
//do some stuff with the contents of inItem
}
}
Terminate()
{
is_finished = true;
g_queue.abort();
}
如果线程被阻塞,abort()
看起来是一个很好的方法,除非它需要快速和重复地完成。 TBB_USE_EXCEPTIONS
在 TBB 二进制文件中默认处于打开状态,因此,您实际上无需在 TBB 调度程序和conurrent_bounded_queue内部中为其付费。默认情况下,如果使用启用的异常(通常为 true)编译应用程序,则在 TBB 标头中打开TBB_USE_EXCEPTIONS
。因此,您可能只为Push&pop周围的尝试/捕获块付费。
除了例外之外,还有其他选择。最直接的方法是通过弹出一些项目来解锁生产者并推送虚拟项目来解锁消费者来解锁线程。例如:
Terminate()
{
is_finished = true;
MyItemType dummyItem;
while(g_queue.size() < 0) // consumers are blocked
g_queue.try_push(dummyItem);
while(g_queue.size() >= g_queue.capacity() ) // producers are blocked
g_queue.try_pop(dummyItem);
}
- 为不同配置设置MSVC_RUNTIME_LIBRARY的正确方法是什么
- 通过方法访问结构
- 最小硬币更换问题(自上而下方法)
- C++为构建时间获取QDateTime的可靠方法
- 在C#中处理C++指针而不使用unsafe的最佳方法
- 处理多个异常集合的C++方法
- 如果C++类在类方法中具有动态分配,但没有构造函数/析构函数或任何非静态成员,那么它仍然是POD类型吗
- 有什么方法可以遍历结构吗
- 当类在C++中定义时,有什么方法可以"register"类吗?
- 在C++中,将大的无符号浮点数四舍五入为整数的最佳方法是什么
- 实现无开销push_back的最佳方法是什么
- 使用std::函数映射对象方法
- 有符号的int和int-有没有一种方法可以在C++中区分它们
- C++从另一个类访问公共静态向量的正确方法是什么
- C++优先级队列,按对象的唯一指针的特定方法升序排列
- 没有为自己的结构调用列表推回方法
- 有没有什么方法可以使用一个函数中定义的常量变量,也可以由c++中同一程序中的其他函数使用
- 在类定义之后定义一个私有方法
- 枚举环境变量的惯用C++14/C++17方法
- 初始化具有非默认构造函数的std::数组项的更好方法