异构作业处理器的并行化

Parallelization of heterogeous job processors

本文关键字:并行化 处理器 作业处理 作业 异构      更新时间:2023-10-16

假设我们有一些函数,比如:

R1* process_input1(I1*);
R2* process_input2(I1*);
R1* process_input3(I2*);
//... etc

这些函数是占用大量时间的cpu密集型操作。然而,这些都可以相互独立运行,因此是并行运行的好候选者。它们负责为R(esult)类型分配内存。

我们还有一些其他功能,如:

void process_result1(R1*);
void process_result2(R1*);
void process_result3(R2*);
//... etc

它们消耗R(esult),并负责释放它们所占用的内存。

主循环如下所示:

void event_loop(Queue& some_queue)
{
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
R1* pResult = process_input1(job.getI1());
process_result1(pResult);
break;
case CMD2:
R2* pResult = process_input2(job.getI1());
process_result3(pResult);
break;
case CMD3:
R1* pResult = process_input3(job.getI2());
process_result2(pResult);
break;
//... etc
}
}
}

正如您所看到的,process_input方法是序列化的。目标是并行化process_input方法以提高吞吐量,同时仍然保持调用process_result方法的顺序。

因此,我们设计了一个接口如下的类:

class ParallelSequencer
{
public:
ParallelSequencer(size_t nThreads);
template <typename I, typename R>
enqueue(I* input, R* (*process_input)(I*), void (*process_result)(R*));
};

主循环现在变成:

void event_loop(Queue& some_queue)
{
ParallelSequencer sequencer(NUM_SEQUENCER_THREADS);
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
sequencer.enqueue(job.getI1(), process_input1, process_result1);
break;
case CMD2:
sequencer.enqueue(job.getI1(), process_input2, process_result3);
break;
case CMD3:
sequencer.enqueue(job.getI2(), process_input3, process_result2);
break;
//... etc
}
}
}

为了实现这一点,我们需要对两种类型的元组进行排队:

template <typename I, typename R>
struct Input 
{
I* data;
R* (*process)(I*);
}

Queued up as enqueue()在线程获取作业时被调用和删除。

template <typename R>
struct Output
{
R* data;
void (*process)(R*);
}

当线程调用完process_input()后排队,并在调用process_result()之前/之后删除。

如何以类型安全的方式声明包含这两个数据结构序列的队列?

这是一个完全错误的方法来解决这个问题吗?

我知道在任何地方使用void*都可以做到这一点,但其中的乐趣在哪里?

我不确定我是否正确理解你,但如果我是正确的,你想做的事情可以用一个简单的接口来完成:

struct IInput {
// I actually didnt understand where and how you want to process the outputs...
// It seemed to me that the output is processed sequentially so I'm pushing it to a SerialSequencer
void process_input(SerialSequencer& sequence) = 0;
}
struct IOutput {
void process_output() = 0;
}
template<typename R>
struct Output : public IOutput {
R* data;
void (*process)(R*);
void process_output() override {
process(data);
}
}
template<typename I, typename R>
struct Input : public IInput {
I* data;
R* (*process_I)(I*);
void (*process_O)(R*);
void process_input(SerialSequencer& output_sequencer) override {
R* result = process_i(data);
Output<R> output = {result, process_O};
output_sequencer.enqueue(output); 
}
}

ParallelSequencer只需要接受一个指向接口结构的指针:

class ParallelSequencer
{
public:
ParallelSequencer(size_t nThreads);
void enqueue(std::shared_ptr<IInput> input_ptr);
};

然后你可以这样使用它:

void event_loop(Queue& some_queue)
{
ParallelSequencer sequencer(NUM_SEQUENCER_THREADS);
while (job = some_queue.get_front())
{
switch(job.getCmdCode())
{
case CMD1:
std::shared_ptr<Input<I1, R1> input_ptr = 
std::make_shared<Input<I1, R1>>(ob.getI1(), process_input1, process_result1));
sequencer.enqueue(input_ptr);
break;
//... etc
}
}
}

调用线程弹出一个IInput并调用纯虚拟函数:

void thread_func() {
while(something) {
std::shared_ptr<IInput> input_ptr = sequencer.pop();
input_ptr->process_input(serial_sequencer);
}
}