我如何判断ThreadPool的任务何时完成

How can I tell when my ThreadPool is finished with its tasks?

本文关键字:ThreadPool 任务 何时完 判断 何判断      更新时间:2023-10-16

在c++11中,我有一个ThreadPool对象,它管理通过单个lambda函数排队的多个线程。我知道我必须处理多少行数据,所以我提前知道我需要对N个作业进行排队。我不确定的是如何判断所有这些工作何时完成,这样我就可以继续下一步了。

这是管理线程池的代码:

#include <cstdlib>
#include <vector>
#include <deque>
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
class ThreadPool;
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
    void joinAll();
    int taskSize();
private:
    friend class Worker;
    // the task queue
    std::deque< std::function<void()> > tasks;
    // keep track of threads
    std::vector< std::thread > workers;
    // sync
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex> 
                lock(pool.queue_mutex);
            // look for a work item
            while ( !pool.stop && pool.tasks.empty() ) {
                // if there are none wait for notification
                pool.condition.wait(lock);
            }
            if ( pool.stop )  {// exit if the pool is stopped
                return;
            }
            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop_front();
        }   // release lock
        // execute the task
        task();
    }
}

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for (size_t i = 0;i<threads;++i) {
        workers.push_back(std::thread(Worker(*this)));
    }
    //workers.
    //tasks.
}
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();
    // join them
    for ( size_t i = 0;i<workers.size();++i) {
        workers[i].join();
    }
}
void ThreadPool::joinAll() {
    // join them
    for ( size_t i = 0;i<workers.size();++i) {
        workers[i].join();
    }
}
int ThreadPool::taskSize() {
    return tasks.size();
}
// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);
        // add the task
        tasks.push_back(std::function<void()>(f));
    } // release lock
    // wake up one thread
    condition.notify_one();
}

然后我把我的工作分配到这样的线程中:

ThreadPool pool(4);
/* ... */
for (int y=0;y<N;y++) {
    pool->enqueue([this,y] {
        this->ProcessRow(y);
    });
}
// wait until all threads are finished
std::this_thread::sleep_for( std::chrono::milliseconds(100) );

等待100毫秒是有效的,因为我知道这些工作可以在不到100毫秒的时间内完成,但显然这不是最好的方法。一旦它完成了N行处理,就需要再经历1000代左右的相同处理。显然,我想尽快开始下一代。

我知道一定有办法把代码添加到我的线程池中,这样我就可以做这样的事情:

while ( pool->isBusy() ) {
    std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}

我已经为此工作了几个晚上,我发现很难找到如何做到这一点的好例子那么,实现我的isBusy()方法的正确方法是什么

我搞定了!

首先,我向ThreadPool类引入了一些额外的成员:

class ThreadPool {
    /* ... exisitng code ... */
    /* plus the following */
    std::atomic<int> njobs_pending;
    std::mutex main_mutex;
    std::condition_variable main_condition;
}

现在,我可以做得比每X个时间检查一次状态更好。现在,我可以阻止主循环,直到没有更多的作业挂起:

void ThreadPool::waitUntilCompleted(unsigned n) {
    std::unique_lock<std::mutex> lock(main_mutex);
    main_condition.wait(lock);
}

只要我在ThreadPool.enqueue()函数的头部使用以下记账代码管理挂起的内容:

njobs_pending++;

在我运行Worker::operator()函数中的任务之后:

if ( --pool.njobs_pending == 0 ) {
    pool.main_condition.notify_one();
}

然后,主线程可以将任何必要的任务排队,然后坐着等待,直到所有计算都完成:

for (int y=0;y<N;y++) {
    pool->enqueue([this,y] {
        this->ProcessRow(y);
    });
}
pool->waitUntilCompleted();

您可能需要创建一个与bool变量标志关联的线程内部结构。

class ThreadPool {
private:
    // This Structure Will Keep Track Of Each Thread's Progress
    struct ThreadInfo {
        std::thread thread;
        bool        isDone;
        ThreadInfo( std::thread& threadIn ) : 
            thread( threadIn ), isDone(false) 
        {}
    }; // ThredInfo
    // This Vector Should Be Populated In The Constructor Initially And
    // Updated Anytime You Would Add A New Task.
    // This Should Also Replace // std::vector<std::thread> workers
    std::vector<ThreadInfo> workers;
public:
    // The rest of your class would appear to be the same, but you need a
    // way to test if a particular thread is currently active. When the
    // thread is done this bool flag would report as being true;
    // This will only return or report if a particular thread is done or not
    // You would have to set this variable's flag for a particular thread to
    // true when it completes its task, otherwise it will always be false
    // from moment of creation. I did not add in any bounds checking to keep
    // it simple which should be taken into consideration.
    bool isBusy( unsigned idx ) const {
        return workers[idx].isDone;
    }
};

如果您有N个作业,并且必须通过调用线程睡眠来等待它们,那么最有效的方法是在某个地方创建一个变量,该变量将在调度作业之前由原子操作设置为N,并且在每个作业内进行计算时,变量将原子递减。然后可以使用原子指令来测试变量是否为零。

或者使用等待句柄锁定递减,此时变量将递减到零。

我只想说,我不喜欢你要求的这个想法:

while ( pool->isBusy() ) {
    std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}

它只是不太适合,它不会是1ms,几乎永远不会,它在不必要地使用资源等…

最好的方法是原子地减少一些变量,如果全部完成,则原子地测试变量,最后一项工作将仅基于原子测试集WaitForSingleObject。如果必须的话,等待将在WaitForSingleObject上,并且在完成后会醒来,次数不多。

WaitForSingleObject