在 Windows 上提升线程

Boost threads on Windows

本文关键字:线程 Windows      更新时间:2023-10-16

我们正在做一个项目,以生成一个通用线程类,允许我们处理一组互连的数据。基本思想是在不同的线程中仅评估未连接且可以同时处理的数据集。我们开发了一个基于 boost::thread 的 ThreadClass 和一个基于 boost::mutex 的 OF_bmutex 类,以便执行日志记录操作。

代码的方案在链接的pdf(http://cdm.unimore.it/dep/test.pd)中,而主类的骨架在下面...

// encapsulate boost::mutex to log...
class OF_bmutex{
public:
    std::string  mutex_type;
    int m_id;
    boost::mutex  m;
    void  lock(){ 
       std::cout << "Mutex " << mutex_type << m_id << " locking from " << boost::this_thread::get_id() << std::endl;       
       m.lock();       
       std::cout << "Mutex " << mutex_type << m_id << " locked from " <<  boost::this_thread::get_id()  << std::endl; 
    }
    void  unlock(){ 
       std::cout <<  "Mutex "  << mutex_type << m_id << " unlocking from " << boost::this_thread::get_id() << std::endl;
       m.unlock(); 
       std::cout << "Mutex " << mutex_type << m_id << " unlocked from " << boost::this_thread::get_id() << std::endl;
    }
    bool  try_lock(){ 
        std::cout <<  "Mutex " << mutex_type << m_id << " try locking from " <<  boost::this_thread::get_id() << std::endl;
        bool ret = m.try_lock();
        if( ret ){ 
            std::cout << "Mutex " << mutex_type << m_id << " try locked from " << boost::this_thread::get_id() << std::endl;
        }
        return(ret);
    }
 };


// My thread class
class OF_ThreadClass {
    private:
        //! running variable
        bool running;
        //! The thread executing this process...
        boost::thread *m_thread;
        //! The data to process...
        LinkedDataSet *my_data;
        //! The id of this thread
        int thread_id;
        //! Process the data...
        virtual int processData();
    public:
        //! The boost thread id
        boost::thread::id   boost_id;
        //! Thread function
        void operator () ();
        //! Default constructor
        OF_ThreadClass();
        //! Destructor
        ~OF_ThreadClass();
        //! Connect this thread with the process data to evaluate 
        void setProcessData( DataToProcess *pd );
        //! return the thread id
        int getId() const { return this->thread_id; }
        //! post process the thread...
        void post_process();
};


// The core function with the execution point of the Thread class...
void  OF_ThreadClass::operator () (){
    while( this->running ){
        OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
        OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->lock();
        // PUT HERE OUR CODE...
        if( running == true ){
            if( my_data != NULL ){
                this->processData();
            }
            this->my_data->done = true;
        }
        std::cout << ">>>>>> Thread " << thread_id << " notified that evaluation terminatedn"; 
        OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->unlock();              
        OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->lock();
    }
    OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
}


// A class to perform multithread calculation...
class OF_SmartThreads{
    private:
    //! The number of threads to activate
    int     _n_threads;
    //! The polling time
    int     _polling_time;
    //! The thread pool...
    std::vector< OF_ThreadClass *> threadPool;
    //! The stack of the available threads 
    std::set< OF_ThreadClass *> *OF_AVAILABLE_THREADS;
    //! The set of the running threads
    std::set< OF_ThreadClass *> OF_RUNNING_THREADS;
    //! The set of the locked datasets
    std::set< LinkedDataSet* >  locked_data;
    //! The set of the available datasets
    std::set< LinkedDataSet* >  unlocked_data;
    //! The set of the datasets under processing
    std::set< LinkedDataSet* >  processing_data;
    //! The size of the progress bar
    int progBarDim;
    public:
    //! Constructor
    OF_SmartThreads();
    //! Destructor
    ~OF_SmartThreads();
    //! Initialize the SmartThreads
    int init_function( std::list< LinkedDataSet * > *dList, int n_max_threads);
    //! Initialize the SmartThreads
    int init_function( std::set< LinkedDataSet * > *dSet, int n_max_threads);
    //! Process all the cuncurrent threads..
    int process_data();
    //! Process all the cuncurrent threads..
    int process_data_polling( int polling_time );
    //! stop the process..
    int post_process();
};


// Initialization function...
int OF_SmartThreads::init_function( ... ){
    // in the main thread...
    // Fill the pool of thread mutex...
    for(int i = 0; i< _n_threads; i++ ){
        _tm = new OF_BMUTEX;
        _tm->mutex_type.assign( "A" );
        _tm->m_id = i;
        OF_AVAILABLE_THREADS_MUTEX.push_back( _tm );
        _tm = new OF_BMUTEX;
        _tm->mutex_type.assign( "R" );
        _tm->m_id = i;
        OF_RUNNING_THREADS_MUTEX.push_back( _tm );
    }
    // Create the threads...
    threadPool.resize( _n_threads );
    for(int i = 0; i< _n_threads; i++ ){
        // ...preventivally lock the resources...
        OF_RUNNING_THREADS_MUTEX[i]->lock();
        OF_AVAILABLE_THREADS_MUTEX[i]->unlock();
        // ..create the new thread...
        pc = new OF_ThreadClass;
        // insert the new thread in the list...
        threadPool.at( pc->getId() ) = pc;
        // set it as available...
        OF_AVAILABLE_THREADS->insert( pc );
    }
}

// Execution function...
void process_data_polling( int _polling_time ){
    while ( running ){
        if ( something_changed ){   
            //Print the status on the screen...
            ...
        }
        something_changed = false;
        // Poll the status of the processing data periodically      
        boost::this_thread::sleep(boost::posix_time::millisec( _polling_time ));
        // Are there some data ready to process?
        if( OF_UNLOCKED_DATASETS->size() > 0 ){
            // Take the first 
            pd = *OF_UNLOCKED_DATASETS->begin();
            // are there some threads available?
            if( OF_AVAILABLE_THREADS->size() != 0 ){
                //...lock and move the datasets linked to pd...
                ret = lock_data( pd, LOCK ); 
                std::cout << "tNumber of available threads: " << OF_AVAILABLE_THREADS->size() << std::endl;
                // Take the available thread...
                pc = *OF_AVAILABLE_THREADS->begin();
                // ...link it the dataset to process...
                pc->setProcess( pd );
                OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->lock();
                OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->unlock();
                something_changed = true;
            } // available threads  
        } // unlock datasets

        // Find, unlock and remove finished datasets...
        pIter2  = OF_RUNNING_THREADS->begin();
        pEnd2   = OF_RUNNING_THREADS->end();
        while( pIter2 != pEnd2 ){
            pc = *pIter2++;
            pd = pc->getDataSet();
            if( pd->isDone() ){
                //...unlock and move the datasets linked to the current dataset...
                ret_move = lock_data( pd, RELEASE_LOCK ); 
                //...remove the data from the active set
                ret_remove = OF_ACTIVE_DATASETS->erase( pd );
                // make the threads available
                moveThreads( pc, _RUNNING_, _AVAILABLE_ );
                something_changed = true;
            }
        }

        pIter2  = OF_AVAILABLE_THREADS->begin();
        pEnd2   = OF_AVAILABLE_THREADS->end();
        while( pIter2 != pEnd2 ){
            pc = *pIter2++;
            bool obtained = OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->try_lock();
            if( obtained ){
                std::cout << "tttOF_SMART_THREADS: Thread " << pc->getId() << " obtained running mutex..." << std::endl;
            }
            else{
                std::cout << "tttOF_SMART_THREADS: Thread " << pc->getId() << " failed to obtain running mutex..." << std::endl;
            }
            OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->unlock();
            std::cout << "tttOF_SMART_THREADS: Thread " << pc->getId() << " released available mutex..." << std::endl;
        }
        if(  ( OF_LOCKED_DATASETS->size() + OF_UNLOCKED_DATASETS->size() + OF_ACTIVE_DATASETS->size() ) > 0 ){
            running = true;
        }
        else{
            running = false;
        }
    } // end running...
}

// The main function...
int main( int argc, char* argv[]) {
    init_function( &data, INT_MAX );
    process_data_polling( 100 );
    lc.post_process();
    return 0;
}

当在带有Boost 1.53的Linux和OSX上编译时,所有系统都可以完美运行。使用的线程数为 2。日志摘录如下。请注意从正确线程发出的互斥锁日志...

---> LOG FROM OSX ...
 ---------------------------------
 Number of data: 2
Data: 0, links: 
Data: 1, links: 
---> OF_SmartThreads::init_function --
 ------------------------------------
 --> 8 processors/cores detected.
 --> n_max_threads = 2
 ------------------------------------
                Mutex R0 locking from thread master 
                Mutex R0 locked from thread master 
                Mutex R0 try locking from thread master 
            OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                Mutex A0 unlocking from thread master 
                Mutex A0 unlocked from thread master 
 New thread 0 created 
                Mutex R1 locking from thread master 
                Mutex R1 locked from thread master 
                Mutex R1 try locking from thread master 
            OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
 New thread 1 created 
 ---------------------------------
Available threads: 2
Unlocked datasets: 2

---> OF_SmartThreads::process_data_function
                Mutex A1 unlocking from thread1
                Mutex A1 unlocked from thread1
>>>>>> Thread 1 released available mutex...
                Mutex R1 locking from thread1
                Mutex A0 unlocking from thread0
                Mutex A0 unlocked from thread0
>>>>>> Thread 0 released available mutex...
                Mutex R0 locking from thread0
 UNLOCKED DATASETS : 0 1 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 
 RUNNING THREADS   : 
    OF_SMART_THREADS: THREADS AVAILABLE
    Number of available threads: 2
    OF_SMART_THREADS: take the thread 0
    OF_SMART_THREADS: Thread master try to lock available mutex... 0
                Mutex A0 locking from thread master 
                Mutex A0 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 0
    OF_SMART_THREADS: Thread try to unlock running mutex... 0
                Mutex R0 unlocking from thread master 
                Mutex R0 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 0
            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R1 try locking from thread master 
            OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
            OF_SMART_THREADS: Thread 1 released available mutex...
 UNLOCKED DATASETS : 1 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 0 
 RUNNING THREADS   : 0->0 
                Mutex R0 locked from thread0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
                Mutex R0 unlocking from thread0
                Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
                Mutex A0 locking from thread0
    OF_SMART_THREADS: THREADS AVAILABLE
    Number of available threads: 1
    OF_SMART_THREADS: take the thread 1
    OF_SMART_THREADS: Thread master try to lock available mutex... 1
                Mutex A1 locking from thread master 
                Mutex A1 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 1
    OF_SMART_THREADS: Thread try to unlock running mutex... 1
                Mutex R1 unlocking from thread master 
                Mutex R1 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 1
        OF_SMART_THREADS: CHECK THREADS DONE
        ------------> DATASETS 0 done...
        ------------> DATASETS 0 removed from the active set. 
            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R0 try locking from thread master 
                Mutex R0 try locked from thread master 
            OF_SMART_THREADS: Thread 0 obtained running mutex...
                Mutex R1 locked from thread1
                Mutex A0 unlocking from thread master 
>>>>>> Thread 1 obtained running mutex...
                Mutex A0 unlocked from thread master 
>>>>>> Thread 1 is going to process the dataset 1
                Mutex A0 locked from thread0
            OF_SMART_THREADS: Thread 0 released available mutex...
>>>>>> Thread 0 obtained available mutex...
 UNLOCKED DATASETS : 
 LOCKED DATASETS   : 
 ACTIVE DATASETS   : 1 
 RUNNING THREADS   : 1->1 
>>>>>> Thread 1 terminated to process the dataset 1
                Mutex A0 unlocking from thread0
>>>>>> Thread 1 notified that evaluation terminated
                Mutex A0 unlocked from thread0
                Mutex R1 unlocking from thread1
>>>>>> Thread 0 released available mutex...
                Mutex R1 unlocked from thread1
                Mutex R0 locking from thread0
>>>>>> Thread 1 released running mutex...
                Mutex A1 locking from thread1
        OF_SMART_THREADS: CHECK THREADS DONE
        ------------> DATASETS 1 done...
        ------------> DATASETS 1 removed from the active set. 
            OF_SMART_THREADS: PREPARE AVAILABLE THREADS
                Mutex R0 try locking from thread master 
            OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                Mutex A0 unlocking from thread master 
                Mutex A0 unlocked from thread master 
            OF_SMART_THREADS: Thread 0 released available mutex...
                Mutex R1 try locking from thread master 
                Mutex R1 try locked from thread master 
            OF_SMART_THREADS: Thread 1 obtained running mutex...
                Mutex A1 unlocking from thread master 
                Mutex A1 unlocked from thread master 
            OF_SMART_THREADS: Thread 1 released available mutex...
OF_SMART_THREADS: ALL THE DATASETS HAS BEEN SUCCESFULLY PROCESSED...

                Mutex A1 locked from thread1
                Mutex R0 unlocking from thread master 
>>>>>> Thread 1 obtained available mutex...
                Mutex R0 unlocked from thread master 
                Mutex R0 locked from thread0
                Mutex A1 unlocking from thread1
>>>>>> Thread 0 obtained running mutex...
                Mutex A1 unlocked from thread1
>>>>>> Thread 0 notified that evaluation terminated
>>>>>> Thread 1 released available mutex...
                Mutex R0 unlocking from thread0
                Mutex R1 locking from thread1
                Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
                Mutex A0 locking from thread0
                Mutex A0 locked from thread0
>>>>>> Thread 0 obtained available mutex...
                Mutex A0 unlocking from thread0
                Mutex A0 unlocked from thread0
>>>>>> Thread 0 is terminating...
                Mutex R1 unlocking from thread master 
                Mutex R1 unlocked from thread master 
                Mutex R1 locked from thread1
>>>>>> Thread 1 obtained running mutex...
>>>>>> Thread 1 notified that evaluation terminated
                Mutex R1 unlocking from thread1
                Mutex R1 unlocked from thread1
>>>>>> Thread 1 released running mutex...
                Mutex A1 locking from thread1
                Mutex A1 locked from thread1
>>>>>> Thread 1 obtained available mutex...
                Mutex A1 unlocking from thread1
                Mutex A1 unlocked from thread1
>>>>>> Thread 1 is terminating...

在Windows 7上编译系统时出现问题,包括Visual Studio 64位和Mingw 32位。从之前的日志中可以看到一开始就陷入了僵局。这在我们看来非常奇怪,不能用来自不同线程的互斥日志来解释。关于如何调试此问题的一些建议?

---> LOG FROM WINDOWS 7...

 ---------------------------------
 Number of data: 2
Data: 0, links:
Data: 1, links:
-———> OF_SmartThreads::init_function --
 ------------------------------------
 --> 4 processors/cores detected.
 --> n_max_threads = 2
 ------------------------------------
                                Mutex R0 locking from thread master
                                Mutex R0 locked from thread master
                                Mutex R0 try locking from thread master
                        OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
                                Mutex A0 unlocking from thread master
                                Mutex A0 unlocked from thread master
 New thread 0 created
                                Mutex A0 unlocking from thread0
                                Mutex A0 unlocked from thread0
                                Mutex R1 locking from thread master
                                Mutex R1 locked from thread master
>>>>>> Thread 0 released available mutex...
                                Mutex R0 locking from thread0
                                Mutex R1 try locking from thread master
                        OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                                Mutex A1 unlocking from thread master
                                Mutex A1 unlocked from thread master
 New thread 1 created
                                Mutex A1 unlocking from thread1
                                Mutex A1 unlocked from thread1
 ---------------------------------
Available threads: 2
>>>>>> Thread 1 released available mutex...
                                Mutex R1 locking from thread1
Unlocked datasets: 2
---> OF_SmartThreads::process_data_function

 UNLOCKED DATASETS : 0 1
 LOCKED DATASETS   :
 ACTIVE DATASETS   :
 RUNNING THREADS   :
        OF_SMART_THREADS: THREADS AVAILABLE
        Number of available threads: 2
        OF_SMART_THREADS: take the thread 0
        OF_SMART_THREADS: Thread master try to lock available mutex... 0
                                Mutex A0 locking from thread master
                                Mutex A0 locked from thread master
        OF_SMART_THREADS: Thread obtained available mutex... 0
        OF_SMART_THREADS: Thread try to unlock running mutex... 0
                                Mutex R0 unlocking from thread master
                                Mutex R0 unlocked from thread master
                                Mutex R0 locked from thread0
        OF_SMART_THREADS: Thread released running mutex... 0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
 Process Data: delay 41
                        OF_SMART_THREADS: PREPARE AVAILABLE THREADS
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
                                Mutex R1 try locking from thread master
                        OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
                                Mutex A1 unlocking from thread master
                                Mutex A1 unlocked from thread master
                                Mutex R0 unlocking from thread0
                                Mutex R0 unlocked from thread0
                        OF_SMART_THREADS: Thread 1 released available mutex...
 UNLOCKED DATASETS : 1
 LOCKED DATASETS   :
 ACTIVE DATASETS   : 0*
 RUNNING THREADS   : 0->0
>>>>>> Thread 0 released running mutex...
                                Mutex A0 locking from thread0
        OF_SMART_THREADS: THREADS AVAILABLE
        Number of available threads: 1
        OF_SMART_THREADS: take the thread 1
        OF_SMART_THREADS: Thread master try to lock available mutex... 1
                                Mutex A1 locking from thread master
存在死锁,线程主服务器无法锁定互斥锁

A1,但从日志中可以看出,之前没有其他线程锁定该互斥锁。关于如何调试此问题的一些建议?

问候

将锁定监控添加到您的OF_bmutex中,例如bool locked。您不应该取消未锁定互斥锁的时钟,或锁定锁定的互斥锁 - 因此请放置assert。似乎您的init_function在没有事先锁定的情况下OF_AVAILABLE_THREADS_MUTEX[i]->unlock();

提升基本可锁定概念:

m.unlock();

要求:当前线程拥有 m

所以看起来你违反了unlock()先决条件。这可以在您的日志中看到:

Mutex A0 unlocking from thread master 
Mutex A0 unlocked from thread master