通用线程池类工作不正常
Generic thread pool class is not working properly
我正在尝试创建一个线程池类,它接收几个函数并将它们放入队列中,直到它们完成,然后我可以添加另一个函数来利用创建的线程,而不是在我想运行其他函数时创建它们。这就是为什么我包含了一个条件变量来同步所有线程。
然而,代码无法正常工作,因为在调用函数时,对象会以某种方式进行复制。经过几次尝试,我想不出我错过了什么!
我所期望的是hw
对象的成员函数greetings
与他的索引并行执行。但是,当执行行(o.*f)(std::forward<Args>(args)...);
时,对象被复制,尽管复制构造函数被删除。因此,当它进入greetings
成员时,它产生一个SEGMENTATION FAULT
。
CMakeLists.txt
cmake_minimum_required(VERSION 3.5)
project(boost_asyo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_executable(boost_asyo main.cpp)
target_link_libraries(${PROJECT_NAME} boost_thread boost_system)
main.cpp
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};
class ThreadPool
{
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;
public:
ThreadPool(size_t n)
{
this->m_work = std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++)
{
m_threads.create_thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service));
}
}
ThreadPool(const ThreadPool & v) = delete;
ThreadPool(ThreadPool && v) = delete;
~ThreadPool()
{
m_io_service.stop();
}
template<class type, class T, class T1, class... Args>
auto post(type T::*f, T1 &obj, Args... args)
{
this->m_sem.take();
this->m_io_service.post([&] ()
{
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}
void wait()
{
this->m_sem.wait();
}
};
class HelloWorld
{
private:
public:
std::string m_str;
HelloWorld(std::string str) : m_str(str) {};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;
void greetings(int ii)
{
for (int jj = 0; jj < 5; jj++)
{
std::cout << this->m_str << " " << ii << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};
int main()
{
ThreadPool tp(8);
HelloWorld hw("Hola mundo");
for (int ii = 0; ii < 5; ii++)
{
tp.post(&HelloWorld::greetings, hw, ii);
}
tp.wait();
return 0;
}
这段代码是基于这段代码的,它可以正常工作,这与我想对类和成员所做的类似。
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};
int main()
{
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);
boost::thread_group threads;
for (size_t ii = 0; ii < 2; ii++)
{
std::cout << "id: " << ii << std::endl;
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
Semaphore sem;
for (size_t ii = 0; ii < 3; ii++)
{
//Take
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("hello world %in", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
//Give
sem.give();
});
}
sem.wait();
for (size_t ii = 0; ii < 3; ii++)
{
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("bye world %in", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
sem.give();
});
}
sem.wait();
io_service.stop();
return 0;
}
我真的很好奇信号量是关于什么的。
io_service
已经是一个任务队列。它是线程安全的,您不需要信号量。
为了进行比较,这里有基于io_service的线程池:
像您这样的线程池,没有Asio,以几乎相同的方式使用条件变量,但没有调用它";信号量";boost线程抛出异常";thread_resource_error:资源暂时不可用">
同样的事情,但围绕
io_service
重写,就像您的ThreadPool一样,这表明您不再需要信号量中的[解决方案1]带有阻塞的c++工作队列(解决方案2使用与以前相同的线程池)。(更好的是,最近的Asio版本有一个内置的线程池)。
错误在哪里
这是不安全的:
template <class type, class T, class T1, class... Args>
auto post(type T::*f, T1& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([&]() {
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}
具体而言:
线
T o = static_cast<T&&>(obj);
不复制t(即
HelloWorld
)。你知道,因为那是不可能的。发生的情况更糟:对象从obj
移动。顺便说一句,这假设T是从T1移动可构造的。
您通过显式将右手边强制转换为右值引用来明确要求它。
这就是
std::move
被指定要做的事情,实际上:"特别是,std::move生成一个xvalue表达式,用于标识其参数t。它完全等效于右值引用类型的static_cast">效果是
main
中的HelloWorld实例不再有效,但您仍在为后续任务移动它。通过引用捕获的其他参数。这意味着它们在任务实际执行(包括
f
)之前就超出了范围。
为了确保安全,您必须在本地副本中捕获参数:
template <class type, class T, class... Args>
auto post(type T::*f, T&& obj, Args... args) {
this->m_sem.take();
this->m_io_service.post([=, o = std::move(obj)]() mutable {
try {
(o.*f)(args...);
} catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}
注:
现在
obj
由右值引用获取。这意味着除非obj
是右值,否则post
不会编译。注意,这不是通用引用,因为
T
是作为f
的一部分推导出来的。lambda现在是可变的(因为否则只有
const
成员函数可以在捕获的o
上运行)所有其他参数都被复制——这大致是
std::bind
的操作方式,但您可以针对可移动参数进行优化)。我们处理异常-在您的代码中,如果
f
抛出,您将永远不会give()
信号量
当然,main
需要进行调整,因此实际创建了多个HelloWorld
实例,并通过右值传递:
for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}
但是,它不会起作用
至少,对我来说,它不会编译。Asio要求处理程序是可复制的(为什么Boost.Asio处理程序必须是可复制构造的?,如何欺骗Boost::Asio允许只移动处理程序)。
此外,我们几乎没有触及表面。通过对type T::*f
进行硬编码,您可以使其在许多方面都需要新的post
重载:静态方法、const
成员函数。。。
相反,为什么不用C++的方式:
template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)]
{
try { f(); }
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}
事实上,在更现代的C++中,你会写(假设这里是C++17):
//...
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)]
{
try { std::apply(f, args); }
//...
哦,我们还需要
#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
因为是只移动的处理程序类型
完整的固定版本演示
注意:还添加了一个输出互斥(
s_outputmx
),以避免混合控制台输出。
在Coliru上直播
#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore {
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore() { count = 0; }
void wait() {
std::unique_lock<std::mutex> m(lock);
while (count > 0)
cond.wait(m, [this] { return count == 0; });
}
void take() {
std::unique_lock m(lock);
count++;
}
void give() {
std::unique_lock m(lock);
count--;
if (count == 0) {
cond.notify_one();
}
}
};
class ThreadPool {
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;
public:
ThreadPool(size_t n) {
this->m_work =
std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++) {
m_threads.create_thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
}
}
ThreadPool(const ThreadPool& v) = delete;
ThreadPool(ThreadPool&& v) = delete;
~ThreadPool() { m_io_service.stop(); }
template <class F, class... Args>
auto post(F&& f, Args&&... args) {
this->m_sem.take();
this->m_io_service.post(
#if 1 // pre-c++17
[this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)]
{
try { f(); }
#else // https://en.cppreference.com/w/cpp/utility/apply
[this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)]
{
try { std::apply(f, args); }
#endif
catch (...) {
this->m_sem.give();
throw;
}
this->m_sem.give();
});
}
void wait() { this->m_sem.wait(); }
};
struct HelloWorld {
std::string m_str;
HelloWorld(std::string str) : m_str(str){};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;
void greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
{
static std::mutex s_outputmx;
std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << this->m_str << " " << ii << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};
int main()
{
ThreadPool tp(8);
for (int ii = 0; ii < 5; ii++) {
HelloWorld hw("Hola mundo");
tp.post(&HelloWorld::greetings, std::move(hw), ii);
}
tp.wait();
}
打印
Hola mundo 0
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 4
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 1
Hola mundo 3
奖金:放下信号灯
丢弃信号量并实际使用work
:
class ThreadPool {
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
public:
ThreadPool(size_t n)
: m_work(std::make_unique<boost::asio::io_service::work>(m_io_service))
{
while (n--) {
m_threads.create_thread([this] { m_io_service.run(); });
}
}
~ThreadPool() { wait(); }
void wait() {
m_work.reset();
m_threads.join_all();
}
template <class F, class... Args> void post(F&& f, Args&&... args) {
m_io_service.post(
[f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] {
std::apply(f, args);
});
}
};
这是28行代码,而你原来的代码是90行。它实际上做了更多的事情。
也可以在Coliru上观看。
剩下什么
我们没有正确处理来自io_service::run
的异常(请参阅是否应该捕获boost::asio::io_service::run()引发的异常?)
此外,如果你有";最近的";Boost,您可以享受到work
(make_work_guard
和.reset()
,因此您不需要unique_ptr
)的改进接口,以及现成的thread_pool
(因此您不再需要……基本上任何东西):
在Coliru上直播
#include <boost/asio.hpp>
#include <mutex>
#include <iostream>
static std::mutex s_outputmx;
using namespace std::chrono_literals;
struct HelloWorld {
std::string const m_str;
void greetings(int ii) const;
};
int main() {
boost::asio::thread_pool tp(8);
for (int ii = 0; ii < 5; ii++)
//post(tp, [hw=HelloWorld{"Hola mundo"}, ii] { hw.greetings(ii); });
post(tp, std::bind(&HelloWorld::greetings, HelloWorld{"Hola mundo"}, ii));
tp.join();
}
void HelloWorld::greetings(int ii) const {
for (int jj = 0; jj < 5; jj++) {
std::this_thread::sleep_for(1s);
std::lock_guard<std::mutex> lk(s_outputmx);
std::cout << m_str << " " << ii << std::endl;
}
}
- C++中的memset函数工作不正常
- 通用线程池类工作不正常
- 名为DLL的C++windows服务程序工作不正常
- C++-循环中的If语句工作不正常
- While循环和if/else语句工作不正常
- C++:最大数组值函数工作不正常
- 工会工作不正常
- 为什么这个循环运行不可见的代码?工作不正常
- C++STL映射键和值工作不正常
- c++文件指针工作不正常
- do while循环工作不正常
- qdbusxml2cpp工作不正常
- 缓存未命中似乎工作不正常
- 线程工作不正常
- 循环工作不正常
- 构造函数工作不正常
- IOCTL_DISK_GET_DRIVE_LAYOUT_EX工作不正常
- C++if/else-if语句工作不正常
- 面部和眼睛检测工作不正常
- Qt-RegExp工作不正常