线程池如何结束程序
thread pooling in how to end the program
根据Kerrek SB在这个问题中的回答,我已经实现了线程池。
我已经为函数实现了MPMC队列,为线程实现了向量线程。
一切都很顺利,只是我不知道如何终止程序,最后如果我只执行thread.join
,因为线程仍在等待更多任务,它将不会加入,主线程也不会继续。
知道如何正确结束程序吗?
为了完整起见,这是我的代码:
函数_pool.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
std::function<void()> pop();
};
function_pool.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition()
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to
get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
std::function<void()> Function_pool::pop()
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty();
});
auto func = m_function_queue.front();
m_function_queue.pop();
return func;
// Lock will be released
}
main.cpp
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
void example_function()
{
std::cout << "bla" << std::endl;
}
void infinite_loop_func()
{
while (true)
{
std::function<void()> func = func_pool.pop();
func();
}
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(infinite_loop_func));
}
//here we should send our functions
func_pool.push(example_function);
for (int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
int i;
std::cin >> i;
}
您的问题位于infinite_loop_func
中,它是一个无限循环,结果不会终止。我已经阅读了前面的答案,其中建议抛出异常,但是,我不喜欢它,因为异常不应该用于常规控制流。
解决此问题的最佳方法是显式处理停止条件。例如:
std::atomic<bool> acceptsFunctions;
将其添加到函数池中可以清楚地拥有状态,并断言在析构函数时不会添加新函数。
std::optional<std::function<void()>> Function_pool::pop()
返回一个空的可选(或C++14及以前版本中的函数(,可以处理一个空队列。你必须这样做,因为condition_variable
可能会造成虚假的唤醒。
这样,m_data_condition.notify_all()
就可以用来唤醒所有线程。
最后,我们必须修复无限循环,因为它不包括过度投入,同时允许您执行仍在队列中的所有函数:
while (func_pool.acceptsFunctions || func_pool.containsFunctions())
{
auto f = func_pool.pop();
If (!f)
{
func_pool.m_data_condition.wait_for(1s);
continue;
}
auto &function = *f;
function ();
}
我将由您来实现containsFunctions()
并清理代码(infinite_loop_func作为成员函数?(注意,使用计数器,您甚至可以处理生成的后台任务。
您可以始终使用特定的异常类型向infinite_loop_func
发出信号,表明它应该返回。。。
class quit_worker_exception: public std::exception {};
然后将infinite_loop_func
更改为…
void infinite_loop_func ()
{
while (true) {
std::function<void()> func = func_pool.pop();
try {
func();
}
catch (quit_worker_exception &ex) {
return;
}
}
}
通过以上更改,您可以使用(在main
中(。。。
/*
* Enqueue `thread_pool.size()' function objects whose sole job is
* to throw an instance of `quit_worker_exception' when invoked.
*/
for (int i = 0; i < thread_pool.size(); i++)
func_pool.push([](){ throw quit_worker_exception(); });
/*
* Now just wait for each worker to terminate having received its
* quit_worker_exception.
*/
for (int i = 0; i < thread_pool.size(); i++)
thread_pool.at(i).join();
infinite_loop_func
的每个实例将使一个函数对象出列,该函数对象在被调用时抛出一个quit_worker_exception
,使其返回。
Follwoing [JVApen](https://stackoverflow.com/posts/51382714/revisions) suggestion, I copy my code in case anyone will want a working code:
函数_pool.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
function_pool.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
main.cpp
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}
- 试图对缓存进行跨步测试,但程序并没有结束
- 为什么我的两个 cin 语句没有在程序结束时运行?
- C ++尝试并捕获未结束的程序
- 为什么我的程序在输入某个形状的面积的测量值后没有结束?
- 全局向量导致 C++ 程序结束时出现段错误
- 程序显示以退出代码 0; 结束
- C++程序从主程序开始执行并在主程序结束?
- 如何立即停止/结束程序?
- 在不释放所有动态分配的资源的情况下结束程序是否有风险
- 线程池如何结束程序
- 客户端发送消息,但服务器在客户端结束程序之前不接收消息
- 如果用户想要输入 x 数量的数字,我如何设法跟踪它然后结束程序?
- 如何从另一个函数中调用的函数结束程序 (C++)
- 如果用户输入 -1 作为动态数组中的数字,如何结束程序
- 引发异常并直接结束程序的自定义异常处理
- MPI在某个进程找到解决方案时使用广播结束程序
- 退出(0)未结束程序
- 通过函数结束程序
- 错误后如何结束C++程序
- 如何在不结束c++程序的情况下结束ncurses ?