标准::抛线"resource dead lock would occur"

std::thread throwing "resource dead lock would occur"

本文关键字:would occur lock resource 抛线 标准 dead      更新时间:2023-10-16

我有一个对象列表,每个对象都有由"update"函数计算的成员变量。我想并行地更新对象,也就是说,我想为每个对象创建一个线程来执行它的更新函数。

这样做合理吗?为什么这可能不是一个好主意?

下面是一个尝试执行我所描述的操作的程序,这是一个完整的程序,所以你应该能够运行它(我使用的是VS2015)。目标是并行更新每个对象。问题是,一旦更新函数完成,线程就会抛出"将发生资源死锁"异常并中止。

我哪里错了?

#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <chrono>
class Object 
{
public:
    Object(int sleepTime, unsigned int id)
        : m_pSleepTime(sleepTime), m_pId(id), m_pValue(0) {}
    void update()
    {
        if (!isLocked()) // if an object is not locked
        {
            // create a thread to perform it's update
            m_pThread.reset(new std::thread(&Object::_update, this));
        }
    }
    unsigned int getId()
    {
        return m_pId;
    }
    unsigned int getValue()
    {
        return m_pValue;
    }
    bool isLocked()
    {
        bool mutexStatus = m_pMutex.try_lock();
        if (mutexStatus) // if mutex is locked successfully (meaning it was unlocked)
        {
            m_pMutex.unlock();
            return false;
        }
        else // if mutex is locked
        {
            return true; 
        }
    }
private:
    // private update function which actually does work
    void _update()
    {
        m_pMutex.lock();
        {
            std::cout << "thread " << m_pId << " sleeping for " << m_pSleepTime << std::endl;
            std::chrono::milliseconds duration(m_pSleepTime);
            std::this_thread::sleep_for(duration);
            m_pValue = m_pId * 10;
        }
        m_pMutex.unlock();
        try
        {
            m_pThread->join();
        }
        catch (const std::exception& e)
        {
            std::cout << e.what() << std::endl; // throws "resource dead lock would occur"
        }
    }
    unsigned int m_pSleepTime;
    unsigned int m_pId;
    unsigned int m_pValue;
    std::mutex m_pMutex;
    std::shared_ptr<std::thread> m_pThread; // store reference to thread so it doesn't go out of scope when update() returns
};
typedef std::shared_ptr<Object> ObjectPtr;
class ObjectManager
{
public:
    ObjectManager()
        : m_pNumObjects(0){}
    void updateObjects()
    {
        for (int i = 0; i < m_pNumObjects; ++i)
        {
            m_pObjects[i]->update();
        }
    }
    void removeObjectByIndex(int index)
    {
        m_pObjects.erase(m_pObjects.begin() + index);
    }
    void addObject(ObjectPtr objPtr)
    {
        m_pObjects.push_back(objPtr);
        m_pNumObjects++;
    }
    ObjectPtr getObjectByIndex(unsigned int index)
    {
        return m_pObjects[index];
    }
private:
    std::vector<ObjectPtr> m_pObjects;
    int m_pNumObjects;
};
void main()
{
    int numObjects = 2;
    // Generate sleep time for each object
    std::vector<int> objectSleepTimes;
    objectSleepTimes.reserve(numObjects);
    for (int i = 0; i < numObjects; ++i)
        objectSleepTimes.push_back(rand());
    ObjectManager mgr;
    // Create some objects
    for (int i = 0; i < numObjects; ++i)
        mgr.addObject(std::make_shared<Object>(objectSleepTimes[i], i));
    // Print expected object completion order
    // Sort from smallest to largest
    std::sort(objectSleepTimes.begin(), objectSleepTimes.end());
    for (int i = 0; i < numObjects; ++i)
        std::cout << objectSleepTimes[i] << ", ";
    std::cout << std::endl;
    // Update objects
    mgr.updateObjects();
    int numCompleted = 0;  // number of objects which finished updating
    while (numCompleted != numObjects)
    {
        for (int i = 0; i < numObjects; ++i)
        {
            auto objectRef = mgr.getObjectByIndex(i);
            if (!objectRef->isLocked()) // if object is not locked, it is finished updating
            {
                std::cout << "Object " << objectRef->getId() << " completed. Value = " << objectRef->getValue() << std::endl;
                mgr.removeObjectByIndex(i);
                numCompleted++;
            }
        }
    }
    system("pause");
}

看起来您有一个线程正在尝试加入自己。

当我试图理解您的解决方案时,我对它进行了大量简化。我指出,您以错误的方式使用std::thread::join()方法。std::thread提供了等待线程完成的功能(非旋转等待)——在您的示例中,您在无限循环中等待线程完成(snip等待),这将严重消耗CPU时间。

您应该从其他线程调用std::thread::join()来等待线程完成。在您的示例中,对象中的互斥是不必要的。此外,您还错过了一个用于同步访问std::cout的互斥对象,这不是线程安全的。我希望下面的例子能有所帮助。

#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <chrono>
#include <cassert>
// cout is not thread-safe
std::recursive_mutex cout_mutex;
class Object {
public:
    Object(int sleepTime, unsigned int id)
        : _sleepTime(sleepTime), _id(id), _value(0) {}
    void runUpdate() {
        if (!_thread.joinable())
            _thread = std::thread(&Object::_update, this);
    }
    void waitForResult() {
        _thread.join();
    }
    unsigned int getId() const { return _id; }
    unsigned int getValue() const { return _value; }
private:
    void _update() {
        {
            {
                std::lock_guard<std::recursive_mutex> lock(cout_mutex);
                std::cout << "thread " << _id << " sleeping for " << _sleepTime << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::seconds(_sleepTime));
            _value = _id * 10;
        }
        std::lock_guard<std::recursive_mutex> lock(cout_mutex);
        std::cout << "Object " << getId() << " completed. Value = " << getValue() << std::endl;
    }
    unsigned int _sleepTime;
    unsigned int _id;
    unsigned int _value;
    std::thread _thread;
};
class ObjectManager : public std::vector<std::shared_ptr<Object>> {
public:
    void runUpdate() {
        for (auto it = this->begin(); it != this->end(); ++it)
            (*it)->runUpdate();
    }
    void waitForAll() {
        auto it = this->begin();
        while (it != this->end()) {
            (*it)->waitForResult();
            it = this->erase(it);
        }
    }
};
int main(int argc, char* argv[]) {
    enum {
        TEST_OBJECTS_NUM = 2,
    };
    srand(static_cast<unsigned int>(time(nullptr)));
    ObjectManager mgr;
    // Generate sleep time for each object
    std::vector<int> objectSleepTimes;
    objectSleepTimes.reserve(TEST_OBJECTS_NUM);
    for (int i = 0; i < TEST_OBJECTS_NUM; ++i)
        objectSleepTimes.push_back(rand() * 9 / RAND_MAX + 1);  // 1..10 seconds
    // Create some objects
    for (int i = 0; i < TEST_OBJECTS_NUM; ++i)
        mgr.push_back(std::make_shared<Object>(objectSleepTimes[i], i));
    assert(mgr.size() == TEST_OBJECTS_NUM);
    // Print expected object completion order
    // Sort from smallest to largest
    std::sort(objectSleepTimes.begin(), objectSleepTimes.end());
    for (size_t i = 0; i < mgr.size(); ++i)
        std::cout << objectSleepTimes[i] << ", ";
    std::cout << std::endl;
    // Update objects
    mgr.runUpdate();
    mgr.waitForAll();
    //system("pause");  // use Ctrl+F5 to run the app instead. That's more reliable in case of sudden app exit.
}

关于这是一件合理的事情吗…

更好的方法是创建一个对象更新队列。需要更新的对象被添加到此队列中,这可以由一组线程而不是每个对象一个线程来完成。

好处是:

  • 线程和对象之间没有1对1的对应关系。创建线程是一项繁重的操作,可能比单个对象的大多数更新代码更昂贵
  • 支持数千个对象:使用您的解决方案,您需要创建数千个线程,您会发现这些线程超出了您的操作系统容量
  • 可以支持其他功能,如声明对象之间的依赖关系或将一组相关对象更新为一个操作