异构作业处理器的并行化
Parallelization of heterogeous job processors
假设我们有一些函数,比如:
R1* process_input1(I1*);
R2* process_input2(I1*);
R1* process_input3(I2*);
//... etc
这些函数是占用大量时间的cpu密集型操作。然而,这些都可以相互独立运行,因此是并行运行的好候选者。它们负责为R(esult)类型分配内存。
我们还有一些其他功能,如:
void process_result1(R1*);
void process_result2(R1*);
void process_result3(R2*);
//... etc
它们消耗R(esult),并负责释放它们所占用的内存。
主循环如下所示:
void event_loop(Queue& some_queue)
{
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
R1* pResult = process_input1(job.getI1());
process_result1(pResult);
break;
case CMD2:
R2* pResult = process_input2(job.getI1());
process_result3(pResult);
break;
case CMD3:
R1* pResult = process_input3(job.getI2());
process_result2(pResult);
break;
//... etc
}
}
}
正如您所看到的,process_input
方法是序列化的。目标是并行化process_input
方法以提高吞吐量,同时仍然保持调用process_result
方法的顺序。
因此,我们设计了一个接口如下的类:
class ParallelSequencer
{
public:
ParallelSequencer(size_t nThreads);
template <typename I, typename R>
enqueue(I* input, R* (*process_input)(I*), void (*process_result)(R*));
};
主循环现在变成:
void event_loop(Queue& some_queue)
{
ParallelSequencer sequencer(NUM_SEQUENCER_THREADS);
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
sequencer.enqueue(job.getI1(), process_input1, process_result1);
break;
case CMD2:
sequencer.enqueue(job.getI1(), process_input2, process_result3);
break;
case CMD3:
sequencer.enqueue(job.getI2(), process_input3, process_result2);
break;
//... etc
}
}
}
为了实现这一点,我们需要对两种类型的元组进行排队:
template <typename I, typename R>
struct Input
{
I* data;
R* (*process)(I*);
}
Queued up as enqueue()在线程获取作业时被调用和删除。
template <typename R>
struct Output
{
R* data;
void (*process)(R*);
}
当线程调用完process_input()
后排队,并在调用process_result()
之前/之后删除。
如何以类型安全的方式声明包含这两个数据结构序列的队列?
这是一个完全错误的方法来解决这个问题吗?
我知道在任何地方使用void*
都可以做到这一点,但其中的乐趣在哪里?
我不确定我是否正确理解你,但如果我是正确的,你想做的事情可以用一个简单的接口来完成:
struct IInput {
// I actually didnt understand where and how you want to process the outputs...
// It seemed to me that the output is processed sequentially so I'm pushing it to a SerialSequencer
void process_input(SerialSequencer& sequence) = 0;
}
struct IOutput {
void process_output() = 0;
}
template<typename R>
struct Output : public IOutput {
R* data;
void (*process)(R*);
void process_output() override {
process(data);
}
}
template<typename I, typename R>
struct Input : public IInput {
I* data;
R* (*process_I)(I*);
void (*process_O)(R*);
void process_input(SerialSequencer& output_sequencer) override {
R* result = process_i(data);
Output<R> output = {result, process_O};
output_sequencer.enqueue(output);
}
}
ParallelSequencer
只需要接受一个指向接口结构的指针:
class ParallelSequencer
{
public:
ParallelSequencer(size_t nThreads);
void enqueue(std::shared_ptr<IInput> input_ptr);
};
然后你可以这样使用它:
void event_loop(Queue& some_queue)
{
ParallelSequencer sequencer(NUM_SEQUENCER_THREADS);
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
std::shared_ptr<Input<I1, R1> input_ptr =
std::make_shared<Input<I1, R1>>(ob.getI1(), process_input1, process_result1));
sequencer.enqueue(input_ptr);
break;
//... etc
}
}
}
调用线程弹出一个IInput并调用纯虚拟函数:
void thread_func() {
while(something) {
std::shared_ptr<IInput> input_ptr = sequencer.pop();
input_ptr->process_input(serial_sequencer);
}
}
相关文章:
- 如何使用OpenMP并行化此矩阵时间矢量运算
- 如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?
- 在C++中使用并行化的预期速度是多少(不是 OpenMp,而是 <thread>)
- 如何使用 OpenMP 并行化最近邻搜索
- Malloc 在使用线程并行化 SSH 调用时存在问题
- 如何使用 OpenMP 正确并行化 for 循环?
- 如何将矩阵的行随机复制到内存中的另一个矩阵的过程并行化?
- 如何使用 Pthreads 并行化图像翻转?
- MPI:反复并行化缓冲区
- 是否可以使用OpenMP并行化一个列表,该列表可以在每次迭代中添加新元素
- 如何在Visual Studio中并行化armadillo
- 嵌套循环 OpenMP 并行化、私有索引还是公共索引?
- 如何并行化增加循环的大小
- 在 C++ 中使用 OpenMP 并行化两个 for 循环不会提供更好的性能
- OpenMP C++:并行化 for 循环的负载不平衡
- OpenMP 条件并行化 - 并行部分中 if 子句的语法
- C++ 犰狳和OpenMp:外积求和的并行化 - 定义犰狳矩阵的约简
- 将 for 循环与嵌套的 while 循环并行化时出现 OpenMP 分段错误
- 迭代卡拉苏巴算法在C++中使用OpenACC并行化和矢量化
- 异构作业处理器的并行化