线程池如何结束程序

thread pooling in how to end the program

本文关键字:结束 程序 何结束 线程      更新时间:2023-10-16

根据Kerrek SB在这个问题中的回答,我已经实现了线程池。

我已经为函数实现了MPMC队列,为线程实现了向量线程。

一切都很顺利,只是我不知道如何终止程序,最后如果我只执行thread.join,因为线程仍在等待更多任务,它将不会加入,主线程也不会继续。

知道如何正确结束程序吗?

为了完整起见,这是我的代码:

函数_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
public: 
Function_pool();
~Function_pool();
void push(std::function<void()> func);
std::function<void()> pop();
};

function_pool.cpp

#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition()
{
}
Function_pool::~Function_pool()
{
} 
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to 
get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
std::function<void()> Function_pool::pop()
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty(); 
});
auto func = m_function_queue.front();
m_function_queue.pop();
return func;
// Lock will be released
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
void example_function()
{
std::cout << "bla" << std::endl;
}
void infinite_loop_func()
{
while (true)
{
std::function<void()> func = func_pool.pop();
func();
}
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(infinite_loop_func));
}
//here we should send our functions
func_pool.push(example_function);
for (int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
int i;
std::cin >> i;
}

您的问题位于infinite_loop_func中,它是一个无限循环,结果不会终止。我已经阅读了前面的答案,其中建议抛出异常,但是,我不喜欢它,因为异常不应该用于常规控制流。

解决此问题的最佳方法是显式处理停止条件。例如:

std::atomic<bool> acceptsFunctions;

将其添加到函数池中可以清楚地拥有状态,并断言在析构函数时不会添加新函数。

std::optional<std::function<void()>> Function_pool::pop()

返回一个空的可选(或C++14及以前版本中的函数(,可以处理一个空队列。你必须这样做,因为condition_variable可能会造成虚假的唤醒。

这样,m_data_condition.notify_all()就可以用来唤醒所有线程。

最后,我们必须修复无限循环,因为它不包括过度投入,同时允许您执行仍在队列中的所有函数:

while (func_pool.acceptsFunctions || func_pool.containsFunctions())
{
auto f = func_pool.pop();
If (!f)
{
func_pool.m_data_condition.wait_for(1s);
continue;
}
auto &function = *f;
function ();
}

我将由您来实现containsFunctions()并清理代码(infinite_loop_func作为成员函数?(注意,使用计数器,您甚至可以处理生成的后台任务。

您可以始终使用特定的异常类型向infinite_loop_func发出信号,表明它应该返回。。。

class quit_worker_exception: public std::exception {};

然后将infinite_loop_func更改为…

void infinite_loop_func ()
{
while (true) {
std::function<void()> func = func_pool.pop();
try {
func();
}
catch (quit_worker_exception &ex) {
return;
}
}
}

通过以上更改,您可以使用(在main中(。。。

/*
* Enqueue `thread_pool.size()' function objects whose sole job is
* to throw an instance of `quit_worker_exception' when invoked.
*/
for (int i = 0; i < thread_pool.size(); i++)
func_pool.push([](){ throw quit_worker_exception(); });
/*
* Now just wait for each worker to terminate having received its
* quit_worker_exception.
*/
for (int i = 0; i < thread_pool.size(); i++)
thread_pool.at(i).join();

infinite_loop_func的每个实例将使一个函数对象出列,该函数对象在被调用时抛出一个quit_worker_exception,使其返回。

Follwoing [JVApen](https://stackoverflow.com/posts/51382714/revisions) suggestion, I copy my code in case anyone will want a working code:

函数_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}