通用线程池类工作不正常

Generic thread pool class is not working properly

本文关键字:工作 不正常 线程      更新时间:2023-10-16

我正在尝试创建一个线程池类,它接收几个函数并将它们放入队列中,直到它们完成,然后我可以添加另一个函数来利用创建的线程,而不是在我想运行其他函数时创建它们。这就是为什么我包含了一个条件变量来同步所有线程。

然而,代码无法正常工作,因为在调用函数时,对象会以某种方式进行复制。经过几次尝试,我想不出我错过了什么!

我所期望的是hw对象的成员函数greetings与他的索引并行执行。但是,当执行行(o.*f)(std::forward<Args>(args)...);时,对象被复制,尽管复制构造函数被删除。因此,当它进入greetings成员时,它产生一个SEGMENTATION FAULT

CMakeLists.txt

cmake_minimum_required(VERSION 3.5)
project(boost_asyo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_executable(boost_asyo main.cpp)
target_link_libraries(${PROJECT_NAME} boost_thread boost_system)

main.cpp

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};

class ThreadPool
{
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;
public:
ThreadPool(size_t n)
{
this->m_work = std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++)
{
m_threads.create_thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service));
}
}
ThreadPool(const ThreadPool & v) = delete;
ThreadPool(ThreadPool && v) = delete;
~ThreadPool()
{
m_io_service.stop();
}
template<class type, class T, class T1, class... Args>
auto post(type T::*f, T1 &obj, Args... args)
{
this->m_sem.take();
this->m_io_service.post([&] ()
{
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}
void wait()
{
this->m_sem.wait();
}
};
class HelloWorld
{
private:
public:
std::string m_str;
HelloWorld(std::string str) : m_str(str) {};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;
void greetings(int ii)
{
for (int jj = 0; jj < 5; jj++)
{
std::cout << this->m_str << " " << ii <<  std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};

int main()
{
ThreadPool tp(8);
HelloWorld hw("Hola mundo");
for (int ii = 0; ii < 5; ii++)
{
tp.post(&HelloWorld::greetings, hw, ii);
}
tp.wait();
return 0;
}

这段代码是基于这段代码的,它可以正常工作,这与我想对类和成员所做的类似。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};

int main()
{    
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);
boost::thread_group threads;
for (size_t ii = 0; ii < 2; ii++)
{
std::cout << "id: " << ii << std::endl;
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
Semaphore sem;
for (size_t ii = 0; ii < 3; ii++)
{
//Take
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("hello world %in", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
//Give
sem.give();
});
}

sem.wait();

for (size_t ii = 0; ii < 3; ii++)
{
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("bye world %in", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
sem.give();
});
}
sem.wait();
io_service.stop();
return 0;
}

我真的很好奇信号量是关于什么的。

io_service已经是一个任务队列。它是线程安全的,您不需要信号量。

为了进行比较,这里有基于io_service的线程池:

  • 像您这样的线程池,没有Asio,以几乎相同的方式使用条件变量,但没有调用它";信号量";boost线程抛出异常";thread_resource_error:资源暂时不可用">

  • 同样的事情,但围绕io_service重写,就像您的ThreadPool一样,这表明您不再需要信号量中的[解决方案1]带有阻塞的c++工作队列(解决方案2使用与以前相同的线程池)。

(更好的是,最近的Asio版本有一个内置的线程池)。

错误在哪里

这是不安全的:

template <class type, class T, class T1, class... Args>
auto post(type T::*f, T1& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([&]() {
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}

具体而言:

  1. 线

    T o = static_cast<T&&>(obj);
    

    不复制t(即HelloWorld)。你知道,因为那是不可能的。发生的情况更糟:对象从obj移动。

    顺便说一句,这假设T是从T1移动可构造的。

    您通过显式将右手边强制转换为右值引用来明确要求它。

    这就是std::move被指定要做的事情,实际上:"特别是,std::move生成一个xvalue表达式,用于标识其参数t。它完全等效于右值引用类型的static_cast">

    效果是main中的HelloWorld实例不再有效,但您仍在为后续任务移动它。

  2. 通过引用捕获的其他参数。这意味着它们在任务实际执行(包括f)之前就超出了范围。

为了确保安全,您必须在本地副本中捕获参数:

template <class type, class T, class... Args>
auto post(type T::*f, T&& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([=, o = std::move(obj)]() mutable {
try {
(o.*f)(args...);
} catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

注:

  1. 现在obj右值引用获取。这意味着除非obj是右值,否则post不会编译。

    注意,这不是通用引用,因为T是作为f的一部分推导出来的。

  2. lambda现在是可变的(因为否则只有const成员函数可以在捕获的o上运行)

  3. 所有其他参数都被复制——这大致是std::bind的操作方式,但您可以针对可移动参数进行优化)。

  4. 我们处理异常-在您的代码中,如果f抛出,您将永远不会give()信号量

当然,main需要进行调整,因此实际创建了多个HelloWorld实例,并通过右值传递:

for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}

但是,它不会起作用

至少,对我来说,它不会编译。Asio要求处理程序是可复制的(为什么Boost.Asio处理程序必须是可复制构造的?,如何欺骗Boost::Asio允许只移动处理程序)。

此外,我们几乎没有触及表面。通过对type T::*f进行硬编码,您可以使其在许多方面都需要新的post重载:静态方法、const成员函数。。。

相反,为什么不用C++的方式:

template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
{
try { f(); }
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

事实上,在更现代的C++中,你会写(假设这里是C++17):

//...
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
{
try { std::apply(f, args); }
//...

哦,我们还需要

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1

因为是只移动的处理程序类型

完整的固定版本演示

注意:还添加了一个输出互斥(s_outputmx),以避免混合控制台输出。

在Coliru上直播

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore {
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore() { count = 0; }
void wait() {
std::unique_lock<std::mutex> m(lock);
while (count > 0)
cond.wait(m, [this] { return count == 0; });
}
void take() {
std::unique_lock m(lock);
count++;
}
void give() {
std::unique_lock m(lock);
count--;
if (count == 0) {
cond.notify_one();
}
}
};

class ThreadPool {
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;
public:
ThreadPool(size_t n) {
this->m_work =
std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++) {
m_threads.create_thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
}
}
ThreadPool(const ThreadPool& v) = delete;
ThreadPool(ThreadPool&& v) = delete;
~ThreadPool() { m_io_service.stop(); }
template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
#if 1 // pre-c++17
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
{
try { f(); }
#else // https://en.cppreference.com/w/cpp/utility/apply
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
{
try { std::apply(f, args); }
#endif
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}

void wait() { this->m_sem.wait(); }
};
struct HelloWorld {
std::string m_str;
HelloWorld(std::string str) : m_str(str){};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;
void greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
{
static std::mutex s_outputmx;
std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << this->m_str << " " << ii << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};
int main()
{
ThreadPool tp(8);
for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}
tp.wait();
}

打印

Hola mundo 0
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 4
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 1
Hola mundo 3

奖金:放下信号灯

丢弃信号量并实际使用work:

class ThreadPool {
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
public:
ThreadPool(size_t n)
: m_work(std::make_unique<boost::asio::io_service::work>(m_io_service))
{
while (n--) {
m_threads.create_thread([this] { m_io_service.run(); });
}
}
~ThreadPool() { wait(); }
void wait() {
m_work.reset();
m_threads.join_all();
}
template <class F, class... Args> void post(F&& f, Args&&... args) {
m_io_service.post(
[f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] {
std::apply(f, args); 
});
}
};

这是28行代码,而你原来的代码是90行。它实际上做了更多的事情

也可以在Coliru上观看。

剩下什么

我们没有正确处理来自io_service::run的异常(请参阅是否应该捕获boost::asio::io_service::run()引发的异常?)

此外,如果你有";最近的";Boost,您可以享受到work(make_work_guard.reset(),因此您不需要unique_ptr)的改进接口,以及现成的thread_pool(因此您不再需要……基本上任何东西):

在Coliru上直播

#include <boost/asio.hpp>
#include <mutex>
#include <iostream>
static std::mutex s_outputmx;
using namespace std::chrono_literals;
struct HelloWorld {
std::string const m_str;
void greetings(int ii) const;
};
int main() {
boost::asio::thread_pool tp(8);
for (int ii = 0; ii < 5; ii++)
//post(tp, [hw=HelloWorld{"Hola mundo"}, ii] { hw.greetings(ii); });
post(tp, std::bind(&HelloWorld::greetings, HelloWorld{"Hola mundo"}, ii));
tp.join();
}
void HelloWorld::greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
std::this_thread::sleep_for(1s);
std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << m_str << " " << ii << std::endl;
}
}