我如何判断ThreadPool的任务何时完成
How can I tell when my ThreadPool is finished with its tasks?
在c++11中,我有一个ThreadPool对象,它管理通过单个lambda函数排队的多个线程。我知道我必须处理多少行数据,所以我提前知道我需要对N个作业进行排队。我不确定的是如何判断所有这些工作何时完成,这样我就可以继续下一步了。
这是管理线程池的代码:
#include <cstdlib>
#include <vector>
#include <deque>
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
class ThreadPool;
class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
};
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
~ThreadPool();
void joinAll();
int taskSize();
private:
friend class Worker;
// the task queue
std::deque< std::function<void()> > tasks;
// keep track of threads
std::vector< std::thread > workers;
// sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
void Worker::operator()()
{
std::function<void()> task;
while(true)
{
{ // acquire lock
std::unique_lock<std::mutex>
lock(pool.queue_mutex);
// look for a work item
while ( !pool.stop && pool.tasks.empty() ) {
// if there are none wait for notification
pool.condition.wait(lock);
}
if ( pool.stop ) {// exit if the pool is stopped
return;
}
// get the task from the queue
task = pool.tasks.front();
pool.tasks.pop_front();
} // release lock
// execute the task
task();
}
}
// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0;i<threads;++i) {
workers.push_back(std::thread(Worker(*this)));
}
//workers.
//tasks.
}
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
// stop all threads
stop = true;
condition.notify_all();
// join them
for ( size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
void ThreadPool::joinAll() {
// join them
for ( size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
int ThreadPool::taskSize() {
return tasks.size();
}
// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
{ // acquire lock
std::unique_lock<std::mutex> lock(queue_mutex);
// add the task
tasks.push_back(std::function<void()>(f));
} // release lock
// wake up one thread
condition.notify_one();
}
然后我把我的工作分配到这样的线程中:
ThreadPool pool(4);
/* ... */
for (int y=0;y<N;y++) {
pool->enqueue([this,y] {
this->ProcessRow(y);
});
}
// wait until all threads are finished
std::this_thread::sleep_for( std::chrono::milliseconds(100) );
等待100毫秒是有效的,因为我知道这些工作可以在不到100毫秒的时间内完成,但显然这不是最好的方法。一旦它完成了N行处理,就需要再经历1000代左右的相同处理。显然,我想尽快开始下一代。
我知道一定有办法把代码添加到我的线程池中,这样我就可以做这样的事情:
while ( pool->isBusy() ) {
std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}
我已经为此工作了几个晚上,我发现很难找到如何做到这一点的好例子那么,实现我的isBusy()方法的正确方法是什么
我搞定了!
首先,我向ThreadPool类引入了一些额外的成员:
class ThreadPool {
/* ... exisitng code ... */
/* plus the following */
std::atomic<int> njobs_pending;
std::mutex main_mutex;
std::condition_variable main_condition;
}
现在,我可以做得比每X个时间检查一次状态更好。现在,我可以阻止主循环,直到没有更多的作业挂起:
void ThreadPool::waitUntilCompleted(unsigned n) {
std::unique_lock<std::mutex> lock(main_mutex);
main_condition.wait(lock);
}
只要我在ThreadPool.enqueue()函数的头部使用以下记账代码管理挂起的内容:
njobs_pending++;
在我运行Worker::operator()函数中的任务之后:
if ( --pool.njobs_pending == 0 ) {
pool.main_condition.notify_one();
}
然后,主线程可以将任何必要的任务排队,然后坐着等待,直到所有计算都完成:
for (int y=0;y<N;y++) {
pool->enqueue([this,y] {
this->ProcessRow(y);
});
}
pool->waitUntilCompleted();
您可能需要创建一个与bool变量标志关联的线程内部结构。
class ThreadPool {
private:
// This Structure Will Keep Track Of Each Thread's Progress
struct ThreadInfo {
std::thread thread;
bool isDone;
ThreadInfo( std::thread& threadIn ) :
thread( threadIn ), isDone(false)
{}
}; // ThredInfo
// This Vector Should Be Populated In The Constructor Initially And
// Updated Anytime You Would Add A New Task.
// This Should Also Replace // std::vector<std::thread> workers
std::vector<ThreadInfo> workers;
public:
// The rest of your class would appear to be the same, but you need a
// way to test if a particular thread is currently active. When the
// thread is done this bool flag would report as being true;
// This will only return or report if a particular thread is done or not
// You would have to set this variable's flag for a particular thread to
// true when it completes its task, otherwise it will always be false
// from moment of creation. I did not add in any bounds checking to keep
// it simple which should be taken into consideration.
bool isBusy( unsigned idx ) const {
return workers[idx].isDone;
}
};
如果您有N个作业,并且必须通过调用线程睡眠来等待它们,那么最有效的方法是在某个地方创建一个变量,该变量将在调度作业之前由原子操作设置为N,并且在每个作业内进行计算时,变量将原子递减。然后可以使用原子指令来测试变量是否为零。
或者使用等待句柄锁定递减,此时变量将递减到零。
我只想说,我不喜欢你要求的这个想法:
while ( pool->isBusy() ) {
std::this_thread::sleep_for( std::chrono::milliseconds(1) );
}
它只是不太适合,它不会是1ms,几乎永远不会,它在不必要地使用资源等…
最好的方法是原子地减少一些变量,如果全部完成,则原子地测试变量,最后一项工作将仅基于原子测试集WaitForSingleObject。如果必须的话,等待将在WaitForSingleObject上,并且在完成后会醒来,次数不多。
WaitForSingleObject
- 有没有任务栏API可以立即应用注册表更改
- 如何创建线程序列以按照启动顺序执行任务?
- C++一个线程如何正确通信其任务已完成?
- 在线程之间拆分任务总是值得的吗?
- 递归求和任务的错误答案
- "main"函数堆栈中的对象在第一个任务运行时被覆盖 (FreeRTOS)
- C++ 任务流库入门
- Pisarze - 来自波兰奥林匹克信息学的数据分析任务
- 在程序运行时监视 VxWorks 中的任务 CPU 利用率
- 编写一个读取五个整数并执行一些任务的C++程序
- 任务内部的 OpenMP 任务循环
- 不知道如何在家庭作业任务中实现一件事
- 如何安全地停止 IOCP WSARecv() 任务,并释放 WSAOVERLAPED 结构?
- 为什么我的 FreeRTOS 任务看不到类成员?
- 当前步骤:构建任务.json 文件
- 命令模式 - 使用"weight"执行任务的命令
- 无法删除EXE崩溃后,即使进程未显示在任务管理器中
- 需要帮助来理解我的任务的 QRegularExpression
- xSemaphoreTake 在调用 xSemaphoreGive 后不会恢复任务
- 我如何判断ThreadPool的任务何时完成