c++线程池:std::函数的替代品,用于将函数/lambdas传递给线程

c++ thread pool: alternative to std::function for passing functions/lambdas to threads?

本文关键字:函数 线程 lambdas 替代品 std c++ 用于      更新时间:2023-10-16

我有一个线程池,用来执行许多微小的作业(数百万个作业,每个作业几十/几百毫秒)。作业以以下任一形式传递:

std::bind(&fn, arg1, arg2, arg3...)

[&](){fn(arg1, arg2, arg3...);}

线程池像这样对待它们:

std::queue<std::function<void(void)>> queue;
void addJob(std::function<void(void)> fn)
{
queue.emplace_back(std::move(fn));
}

相当标准的东西。。。。除了我注意到一个瓶颈,即如果作业以足够快的时间(不到一毫秒)执行,那么addJob函数中从lambda/binder到std::函数的转换实际上比作业本身的执行时间更长。在阅读了一些内容之后,std::函数的速度是出了名的慢,所以我的瓶颈并不一定出乎意料。

有没有更快的方法来做这类事情?我已经研究了std::函数替换的下降,但它们要么与我的编译器不兼容,要么速度不够快。我也研究过Don Clugston的"快速委托",但它们似乎不允许在传递函数的同时传递参数(也许我没有正确理解它们?)。

我使用VS2015u3进行编译,传递给作业的函数都是静态的,它们的参数要么是int/foats,要么是指向其他对象的指针。

为每种任务类型都有一个单独的队列-您可能没有数以万计的任务类型。例如,每一个都可以是任务的静态成员。那么addJob()实际上是Task的ctor,它是完全类型安全的。

然后定义一个任务类型的编译时列表,并通过模板元编程(for_each)访问它。它会更快,因为您不需要任何虚拟调用fnptr/std::function<>来实现这一点。

只有当您的元组代码看到所有Task类时,这才会起作用(因此您不能通过从光盘加载映像来向已经运行的可执行文件添加Task的新子代,希望这不是问题)。

template<typename D> // CRTP on D
class Task {
public:
// you might want to static_assert at some point that D is in TaskTypeList
Task() : it_(tasks_.end()) {} // call enqueue() in descendant
~Task() {
// add your favorite lock here
if (queued()) {
tasks_.erase(it_);
}
}
bool queued() const { return it_ != tasks_.end(); }
static size_t ExecNext() {
if (!tasks_.empty()) {
// add your favorite lock here
auto&& itTask = tasks_.begin();
tasks_.pop_front();
// release lock
(*itTask)();
itTask->it_ = tasks_.end();
}
return tasks_.size();
}
protected:
void enqueue() const
{
// add your favorite lock here
tasks_.push_back(static_cast<D*>(this));
it_ = tasks_.rbegin();
}
private:
std::list<D*>::iterator it_;
static std::list<D*> tasks_; // you can have one per thread, too - then you don't need locking, but tasks are assigned to threads statically
};
struct MyTask : Task<MyTask> {
MyTask() { enqueue(); } // call enqueue only when the class is ready
void operator()() { /* add task here */ }
// ...
};
struct MyTask2; // etc.
template<typename...>
struct list_ {};
using TaskTypeList = list_<MyTask, MyTask2>;
void thread_pocess(list_<>) {}
template<typename TaskType, typename... TaskTypes>
void thread_pocess(list_<TaskType, TaskTypes...>)
{
TaskType::ExecNext();
thread_process(list_<TaskTypes...>());
}
void thread_process(void*)
{
for (;;) {
thread_process(TaskTypeList());
}
}

这个代码有很多需要调整的地方:不同的线程应该从队列的不同部分开始(或者使用一个环,或者几个队列,并对线程进行静态/动态分配),当绝对没有任务时,你会将其发送到睡眠状态,可以有任务的枚举,等等。

请注意,这不能与任意Lambda一起使用:您需要列出任务类型。您需要从声明lambda类型的函数中"通信"lambda类型(例如,通过返回"std::make_pair(retval,list_)",有时这并不容易。然而,您总是可以将lambda转换为函子,这很简单——只是很难看。