std::condition_variable – 通知一次,但等待线程唤醒两次
std::condition_variable – notify once but wait thread wakened twice
这是一个简单的C++线程池实现。这是从 https://github.com/progschj/ThreadPool 开始的修改版本。
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
namespace ThreadPool {
class FixedThreadPool {
public:
FixedThreadPool(size_t);
template<class F, class... Args>
auto Submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
template<class F, class... Args>
void Execute(F&& f, Args&&... args);
~FixedThreadPool();
void AwaitTermination();
void Stop();
private:
void ThreadWorker();
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex worker_mutex;
std::mutex queue_mutex;
std::condition_variable condition;
// stop flag
bool stop_;
// thread size
int thread_size_;
};
// Constructor does nothing. Threads are created when new task submitted.
FixedThreadPool::FixedThreadPool(size_t num_threads):
stop_(false),
thread_size_(num_threads) {}
// Destructor joins all threads
FixedThreadPool::~FixedThreadPool() {
//std::this_thread::sleep_for(std::chrono::seconds(5));
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
}
// Thread worker
void FixedThreadPool::ThreadWorker() {
std::function<void()> task;
while (1) {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]() { return this->stop_ || !this->tasks.empty(); });
printf("wakeeeeeenedn");
if (this->stop_ && this->tasks.empty()) {
printf("returning ...n");
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
// Add new work item to the pool
template<class F, class... Args>
auto FixedThreadPool::Submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type >
{
{
std::unique_lock<std::mutex> lock(this->worker_mutex);
if (workers.size() < thread_size_) {
workers.emplace_back(std::thread(&FixedThreadPool::ThreadWorker, this));
}
}
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop_) {
throw std::runtime_error("ThreadPool has been shutdown.");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
// Execute new task without returning std::future object.
template<class F, class... Args>
void FixedThreadPool::Execute(F&& f, Args&&... args) {
Submit(std::forward<F>(f), std::forward<Args>(args)...);
}
// Blocks and wait for all previously submitted tasks to be completed.
void FixedThreadPool::AwaitTermination() {
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
}
// Shut down the threadpool. This method does not wait for previously submitted
// tasks to be completed.
void FixedThreadPool::Stop() {
printf("Stopping ...n");
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop_ = true;
}
}
} // namespace ThreadPool
#endif /* __THREAD_POOL_H__ */
和测试主.cpp:
#include <iostream>
#include <vector>
#include <chrono>
#include <exception>
#include "ThreadPool.h"
int main(int argc, char** argv) {
ThreadPool::FixedThreadPool pool(3);
pool.Execute([]() {
std::cout << "hello world" << std::endl;
}
);
pool.Stop();
pool.AwaitTermination();
std::cout << "All tasks complted." << std::endl;
return 0;
}
我在这个测试程序中有一个错误。只有一个任务提交到线程池,但我的工作线程被唤醒了两次:
>>./test
Stopping ...
wakeeeeeened
hello world
wakeeeeeened
returning ...
All tasks complted.
我认为问题出在FixedThreadPool::ThreadWorker((本身。工作线程不断等待条件变量以获取新任务。函数 FixedThreadPool::Submit(( 将一个新任务添加到队列中,并调用 condition.nofity_one(( 来唤醒工作线程。
但是我不知道如何唤醒工作线程两次。在此测试中,我只提交了一个任务和一个工作线程。
将注释转换为答案:
condition_variable::wait(lock, pred)
相当于while(!pred()) wait(lock);
。如果pred()
返回true
则实际上不会发生等待,并且呼叫会立即返回。
您的第一次唤醒来自notify_one()
调用;第二次"唤醒"是因为第二个wait()
调用恰好在Stop()
调用之后执行,因此您的谓词返回true
并且wait()
立即返回而无需等待。
很明显,你在这里(不(幸运:如果第二次wait()
调用发生在Stop()
调用之前,那么你的工作线程将永远等待(在没有虚假唤醒的情况下(,你的主线程也会如此。
另外,摆脱__THREAD_POOL_H__
.把这些双下划线烧到地上。
相关文章:
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 虚假唤醒是否会解锁所有等待线程,甚至是不相关的线程?
- 等待线程的最佳方式是什么
- std::thread从函数启动,无需等待线程完成
- 带有共享缓冲区的两个等待线程(生产者/消费者)
- std::condition_variable::notify_one() 不会唤醒等待线程
- 在退出进程时等待线程完成
- 如何正确等待线程中的信号
- 如何等待线程直到它开始运行
- 在纯虚拟类的析构函数中等待线程死亡会导致运行时错误
- 更改可报警/可等待线程的上下文
- C++11 std::线程并等待线程完成
- 我如何在Qt中等待线程完成而不阻塞其执行
- Notify_all不唤醒等待线程
- 如何使用boost条件变量来等待线程完成处理
- 等待线程结束(c++)
- 等待线程超时:冻结
- 等待线程
- 等待线程/s直到另一个线程/s结束
- 有效地等待线程池中的所有任务完成