使用 boost::thread(英语:Actor) 计算模型
Actor calculation model using boost::thread
我正在尝试使用 boost::thread 在C++上的线程上实现 Actor 计算模型。但是程序在执行过程中会抛出奇怪的异常。异常不稳定,有时程序以正确的方式工作。
那里我的代码:
演员.hpp
class Actor {
public:
typedef boost::function<int()> Job;
private:
std::queue<Job> d_jobQueue;
boost::mutex d_jobQueueMutex;
boost::condition_variable d_hasJob;
boost::atomic<bool> d_keepWorkerRunning;
boost::thread d_worker;
void workerThread();
public:
Actor();
virtual ~Actor();
void execJobAsync(const Job& job);
int execJobSync(const Job& job);
};
演员.cpp
namespace {
int executeJobSync(std::string *error,
boost::promise<int> *promise,
const Actor::Job *job)
{
int rc = (*job)();
promise->set_value(rc);
return 0;
}
}
void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
job = d_jobQueue.front();
d_jobQueue.pop();
}
job();
}
catch (...) {
// Log error
}
}
void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}
int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}
int rc = future.get();
if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}
return rc;
}
Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}
Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}
实际上,抛出的异常是 boost::thread_interrupted 在int rc = future.get();
行中。但是表单提升文档我无法推理此异常。文档 说
抛出: - boost::thread_interrupted 如果与 *this 关联的结果在调用时尚未准备就绪,并且当前线程被中断。
但是我的工作线程不能处于中断状态。
当我使用 gdb 并设置"捕获抛出"时,我看到回跟踪看起来像
扔thread_interrupted
提升::d尾::interruption_checker::check_for_interruption
提升::d尾::interruption_checker::interruption_checker
提升::condition_variable::等待
提升::d尾::future_object_base::wait_internal
提升::d尾部::future_object_base::等待
提升::d尾部::future_object::获取
提升::unique_future::获取
我查看了提升源,但不明白为什么interruption_checker决定工作线程中断。
所以有人C++上师,请帮助我。我需要做什么才能获得正确的代码?我正在使用:
助推器 1_53
Linux 版本 2.6.18-194.32.1.el5 Red Hat 4.1.2-48
海湾合作委员会 4.7
编辑
修好了!感谢叶夫根尼·帕纳修克和拉津。问题出在 TLS 中 管理。boost::线程和 boost::thread_specific_ptr正在使用 用于其目的的相同 TLS 存储。就我而言,当 他们都试图在创建时更改此存储(不幸的是,我 不明白为什么会详细发生)。所以TLS被破坏了。
我用__thread替换了代码中的 boost::thread_specific_ptr 指定的变量。
Offtop:在调试期间,我发现外部库中的内存损坏 并修复了它=)
.
编辑 2 我遇到了确切的问题...这是 GCC 中的一个错误 =) _GLIBCXX_DEBUG编译标志会中断 ABI。 您可以看到有关boost错误跟踪器的讨论: https://svn.boost.org/trac/boost/ticket/7666
我发现了几个错误:
Actor::workerThread
功能在d_jobQueueMutex
上双重解锁。第一次解锁是手动d_jobQueueMutex.unlock();
,第二次是在boost::unique_lock<boost::mutex>
的析构函数中。
您应该防止解锁之一,例如unique_lock
和mutex
之间的发布关联:
g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();
或者添加额外的代码块+默认构造的Job
。
workerThread
可能永远不会离开以下循环:
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
想象一下以下情况:d_jobQueue
为空,Actor::~Actor()
被调用,它会设置标志并通知工作线程:
d_keepWorkerRunning = false;
d_hasJob.notify_one();
workerThread
while 循环中醒来,看到队列为空并再次休眠。
通常的做法是发送特殊的最终作业来停止工作线程:
~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}
在这种情况下,d_keepWorkerRunning
不需要是原子的。
科里鲁现场演示
编辑:
我已将事件队列代码添加到您的示例中。
您在 EventQueueImpl
和 Actor
中都有并发队列,但类型不同。可以将公共部分提取到适用于任何类型的单独的实体concurrent_queue<T>
。在一个地方调试和测试队列比捕获分散在不同类中的错误要容易得多。
所以,你可以尝试使用这个concurrent_queue<T>
(在科利鲁上)
一个猜测。我认为一些代码实际上可以调用boost::tread::interrupt()。您可以为此函数设置断点,并查看负责此的代码。您可以在execJobSync
中测试中断:
int Actor::execJobSync(const Job& job)
{
if (boost::this_thread::interruption_requested())
std::cout << "Interruption requested!" << std::endl;
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
在这种情况下,最可疑的代码是引用线程对象的代码。
无论如何,让您的 boost::线程代码中断意识是一种很好的做法。也可以禁用某些范围的中断。
如果不是这种情况 - 您需要检查适用于线程本地存储的代码,因为线程中断标志存储在 TLS 中。也许你的代码重写了它。您可以在此类代码片段之前和之后检查中断。
另一种可能性是您的内存已损坏。如果没有代码调用 boost::thread::interrupt(),并且您不使用 TLS。这是最困难的情况,尝试使用一些动态分析器 - valgrind 或 clang 内存清理器。
题外话:您可能需要使用一些并发队列。std::queue 由于内存争用过多,速度会非常慢,最终缓存性能会很差。良好的并发队列允许代码并行排队和取消元素排队。
此外,actor不是应该执行任意代码的东西。Actor队列必须接收简单的消息,而不是函数!您正在编写作业队列:)你需要看看一些演员系统,比如Akka或libcpa。
- 为什么"do while"循环不断退出,即使条件计算结果为 false?
- 递归函数计算序列中的平方和(并输出过程)
- (C++)分析树以计算返回错误值的简单算术表达式
- 我的字符计数代码计算错误.为什么
- 在计算中使用二的幂有多有利可图
- 如何计算文件中的"columns"数?
- 计算排序向量的向量中唯一值的计数
- 如何使用 std::累积在 C++ 中计算总和立方体
- 使用Qt C++计算类似Git的SHA1哈希
- OpenCV C++.快速计算混淆矩阵
- cpp二进制搜索问题,计算给定数组中输入元素的出现次数
- C++如何计算用户输入的数字中的偶数位数
- 如何计算数据类型的范围,例如int
- 类似枚举的计算常量
- 计算每个节点的树高,帮助我解释这个代码解决方案
- 多个If语句与使用逻辑运算符计算条件的单个语句的比较
- 计算缩放多边形的比例,得到给定的多边形面积
- 在C++中如何在没有pow的情况下进行基础计算
- 计算平均值,不包括上次得分
- 使用 boost::thread(英语:Actor) 计算模型