标准::抛线"resource dead lock would occur"
std::thread throwing "resource dead lock would occur"
我有一个对象列表,每个对象都有由"update"函数计算的成员变量。我想并行地更新对象,也就是说,我想为每个对象创建一个线程来执行它的更新函数。
这样做合理吗?为什么这可能不是一个好主意?
下面是一个尝试执行我所描述的操作的程序,这是一个完整的程序,所以你应该能够运行它(我使用的是VS2015)。目标是并行更新每个对象。问题是,一旦更新函数完成,线程就会抛出"将发生资源死锁"异常并中止。
我哪里错了?
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <chrono>
class Object
{
public:
Object(int sleepTime, unsigned int id)
: m_pSleepTime(sleepTime), m_pId(id), m_pValue(0) {}
void update()
{
if (!isLocked()) // if an object is not locked
{
// create a thread to perform it's update
m_pThread.reset(new std::thread(&Object::_update, this));
}
}
unsigned int getId()
{
return m_pId;
}
unsigned int getValue()
{
return m_pValue;
}
bool isLocked()
{
bool mutexStatus = m_pMutex.try_lock();
if (mutexStatus) // if mutex is locked successfully (meaning it was unlocked)
{
m_pMutex.unlock();
return false;
}
else // if mutex is locked
{
return true;
}
}
private:
// private update function which actually does work
void _update()
{
m_pMutex.lock();
{
std::cout << "thread " << m_pId << " sleeping for " << m_pSleepTime << std::endl;
std::chrono::milliseconds duration(m_pSleepTime);
std::this_thread::sleep_for(duration);
m_pValue = m_pId * 10;
}
m_pMutex.unlock();
try
{
m_pThread->join();
}
catch (const std::exception& e)
{
std::cout << e.what() << std::endl; // throws "resource dead lock would occur"
}
}
unsigned int m_pSleepTime;
unsigned int m_pId;
unsigned int m_pValue;
std::mutex m_pMutex;
std::shared_ptr<std::thread> m_pThread; // store reference to thread so it doesn't go out of scope when update() returns
};
typedef std::shared_ptr<Object> ObjectPtr;
class ObjectManager
{
public:
ObjectManager()
: m_pNumObjects(0){}
void updateObjects()
{
for (int i = 0; i < m_pNumObjects; ++i)
{
m_pObjects[i]->update();
}
}
void removeObjectByIndex(int index)
{
m_pObjects.erase(m_pObjects.begin() + index);
}
void addObject(ObjectPtr objPtr)
{
m_pObjects.push_back(objPtr);
m_pNumObjects++;
}
ObjectPtr getObjectByIndex(unsigned int index)
{
return m_pObjects[index];
}
private:
std::vector<ObjectPtr> m_pObjects;
int m_pNumObjects;
};
void main()
{
int numObjects = 2;
// Generate sleep time for each object
std::vector<int> objectSleepTimes;
objectSleepTimes.reserve(numObjects);
for (int i = 0; i < numObjects; ++i)
objectSleepTimes.push_back(rand());
ObjectManager mgr;
// Create some objects
for (int i = 0; i < numObjects; ++i)
mgr.addObject(std::make_shared<Object>(objectSleepTimes[i], i));
// Print expected object completion order
// Sort from smallest to largest
std::sort(objectSleepTimes.begin(), objectSleepTimes.end());
for (int i = 0; i < numObjects; ++i)
std::cout << objectSleepTimes[i] << ", ";
std::cout << std::endl;
// Update objects
mgr.updateObjects();
int numCompleted = 0; // number of objects which finished updating
while (numCompleted != numObjects)
{
for (int i = 0; i < numObjects; ++i)
{
auto objectRef = mgr.getObjectByIndex(i);
if (!objectRef->isLocked()) // if object is not locked, it is finished updating
{
std::cout << "Object " << objectRef->getId() << " completed. Value = " << objectRef->getValue() << std::endl;
mgr.removeObjectByIndex(i);
numCompleted++;
}
}
}
system("pause");
}
看起来您有一个线程正在尝试加入自己。
当我试图理解您的解决方案时,我对它进行了大量简化。我指出,您以错误的方式使用std::thread::join()方法。std::thread提供了等待线程完成的功能(非旋转等待)——在您的示例中,您在无限循环中等待线程完成(snip等待),这将严重消耗CPU时间。
您应该从其他线程调用std::thread::join()来等待线程完成。在您的示例中,对象中的互斥是不必要的。此外,您还错过了一个用于同步访问std::cout的互斥对象,这不是线程安全的。我希望下面的例子能有所帮助。
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <chrono>
#include <cassert>
// cout is not thread-safe
std::recursive_mutex cout_mutex;
class Object {
public:
Object(int sleepTime, unsigned int id)
: _sleepTime(sleepTime), _id(id), _value(0) {}
void runUpdate() {
if (!_thread.joinable())
_thread = std::thread(&Object::_update, this);
}
void waitForResult() {
_thread.join();
}
unsigned int getId() const { return _id; }
unsigned int getValue() const { return _value; }
private:
void _update() {
{
{
std::lock_guard<std::recursive_mutex> lock(cout_mutex);
std::cout << "thread " << _id << " sleeping for " << _sleepTime << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(_sleepTime));
_value = _id * 10;
}
std::lock_guard<std::recursive_mutex> lock(cout_mutex);
std::cout << "Object " << getId() << " completed. Value = " << getValue() << std::endl;
}
unsigned int _sleepTime;
unsigned int _id;
unsigned int _value;
std::thread _thread;
};
class ObjectManager : public std::vector<std::shared_ptr<Object>> {
public:
void runUpdate() {
for (auto it = this->begin(); it != this->end(); ++it)
(*it)->runUpdate();
}
void waitForAll() {
auto it = this->begin();
while (it != this->end()) {
(*it)->waitForResult();
it = this->erase(it);
}
}
};
int main(int argc, char* argv[]) {
enum {
TEST_OBJECTS_NUM = 2,
};
srand(static_cast<unsigned int>(time(nullptr)));
ObjectManager mgr;
// Generate sleep time for each object
std::vector<int> objectSleepTimes;
objectSleepTimes.reserve(TEST_OBJECTS_NUM);
for (int i = 0; i < TEST_OBJECTS_NUM; ++i)
objectSleepTimes.push_back(rand() * 9 / RAND_MAX + 1); // 1..10 seconds
// Create some objects
for (int i = 0; i < TEST_OBJECTS_NUM; ++i)
mgr.push_back(std::make_shared<Object>(objectSleepTimes[i], i));
assert(mgr.size() == TEST_OBJECTS_NUM);
// Print expected object completion order
// Sort from smallest to largest
std::sort(objectSleepTimes.begin(), objectSleepTimes.end());
for (size_t i = 0; i < mgr.size(); ++i)
std::cout << objectSleepTimes[i] << ", ";
std::cout << std::endl;
// Update objects
mgr.runUpdate();
mgr.waitForAll();
//system("pause"); // use Ctrl+F5 to run the app instead. That's more reliable in case of sudden app exit.
}
关于这是一件合理的事情吗…
更好的方法是创建一个对象更新队列。需要更新的对象被添加到此队列中,这可以由一组线程而不是每个对象一个线程来完成。
好处是:
- 线程和对象之间没有1对1的对应关系。创建线程是一项繁重的操作,可能比单个对象的大多数更新代码更昂贵
- 支持数千个对象:使用您的解决方案,您需要创建数千个线程,您会发现这些线程超出了您的操作系统容量
- 可以支持其他功能,如声明对象之间的依赖关系或将一组相关对象更新为一个操作