在具有可替换阶段的C++中建模管道

Modeling a pipeline in C++ with replaceable stages

本文关键字:C++ 建模 管道 可替换      更新时间:2023-10-16

我正试图构建一个C++数据结构,用于建模一个简单的N阶段过程,其中每个阶段都可以用不同的函数替换。一种方法是使用OO方法,并为每个阶段提供一个带有虚拟方法的抽象基类;例如:

class Pipeline {
protected:
virtual void processA(const In& in, BType& B) = 0;
virtual void processB(const BType& B, BType& C) = 0;
virtual void processC(const CType& C, BType& D) = 0;
virtual void processD(const DType& D, Out& out) = 0;
public:
void process(const In& in, Out& out) {
Btype B;
processA(in, B);
Ctype C;
processB(B, C);
Btype D;
processC(C, D);
processD(D,out);
}
};

这种方法的问题是,如果N个阶段中的每一个都可以与M个进程交换,那么就有N*M个可能的子类。

另一个想法是存储函数对象:

class Pipeline {
public:
std::function<void(const In& in, BType& B)> processA;
std::function<void(const In& B, CType& C)> processB;
std::function<void(const In& C, DType& D)> processC;
std::function<void(const In& D, Out& out)> processD;
void process(const In& in, Out& out) {
Btype B;
processA(in, B);
Ctype C;
processB(B, C);
Btype D;
processC(C, D);
processD(D,out);
}
};

我在这种方法中遇到的问题是,阶段并不是真正独立的,在某些情况下,我希望用一个对象来存储有关多个阶段的信息。

有人为具有可更换部件的管道找到了良好的数据结构吗?奖金将允许每个阶段同时运行。

指向std函数对象是个坏主意。如果需要的话,它们已经可以存储指针了。

我建议使用图表。

sink是消费者:

template<class...Ts>
struct sink : std::function<void(Ts...)> {
using std::function<void(Ts...)>::function;
};

来源是吸引消费者并使其满意的东西:

template<class...Ts>
using source = sink<sink<Ts...>>;

过程是将生产者与消费者联系起来的东西,可能会改变类型:

template<class In, class Out>
using process = sink< source<In>, sink<Out> >;

然后我们可以定义一个管道操作:

template<class In, class Out>
sink<In> operator|( process< In, Out > a, sink< Out > b ){
return [a,b]( In in ){
a( [&in]( sink<In> s )mutable{ s(std::forward<In>(in)); }, b );
};
}
template<class In, class Out>
source<Out> operator|( source< In > a, process< In, Out > b ){
return [a,b]( sink<Out> out ){
b( a, out );
};
}
template<class In, class Mid, class Out>
process<In, Out> operator|( process<In, Mid> a, process<Mid, Out> b ){
return [a,b]( source<In> in, sink<Out> out ){
a( in, b|out ); // or b( in|a, out )
};
}
template<class...Ts>
sink<> operator|( source<Ts...> a, sink<Ts...> b ){
return[a,b]{ a(b); };
}

我认为组件管道元素的状态复制起来很便宜,所以共享ptr或原始指针之类的。

如果您想要并发性,只需启动提供值队列并通过管道传递未来的进程。但我认为通常最好将元素连接在一起,使管道异步,而不是阶段。

让管道元素像gsl跨度一样也很有用,允许阶段具有固定的缓冲区,并在不分配的情况下以块的形式传递计算结果。

让你开始的玩具流程:

process<char, char> to_upper = []( source<char> in, sink<char> out ){
in( [&out]( char c ) { out( std::toupper(c) ); } );
};

和一个来源:

source<char> hello_world = [ptr="hello world"]( sink<char> s ){
for (auto it = ptr; *it; ++it ){ s(*it); }
};
sink<char> print = [](char c){std::cout<<c;};
int main(){
auto prog = hello_world|to_upper|print;
prog();
}

输出CCD_ 2。

现场演示:https://ideone.com/MC4fDV

请注意,这是一个基于推送的管道。基于拉动的管道是一种替代方案。推送管道可以更容易地进行作业批处理;pull pipeline可以避免生成没人想要的数据。推送使数据传播变得自然;pull使数据收集变得自然。

推论也可以使其更加自然。从某种意义上说,源程序是一个协程,当它调用推送管道中的接收器时会挂起。反过来拉。推论可能使推/拉在同一处理代码中同时工作。

为了使第一种方法更具互换性,可以将抽象基类拆分为多个基类,每个进程一个。然后基类可以由一个或多个对象来实现。管道将保存指向每个基类的引用、指针或智能指针:

struct ProcessA {
virtual void processA(const In& in, BType& B) = 0;
virtual ~ProcessA() = default;
};
struct ProcessB {
virtual void processB(const BType& B, CType& C) = 0;
virtual ~ProcessB() = default;
};
// ...
struct Pipeline {
ProcessA* processA;
ProcessB* processB;
ProcessC* processC;
ProcessD* processD;
void process(const In& in, Out& out) {
BType B;
processA->processA(in, B);
CType C;
processB->processB(B, C);
DType D;
processC->processC(C, D);
processD->processD(D,out);
}
};
struct SimpleProcessor : ProcessA, ProcessB, ProcessC, ProcessD {
void processA(const In& in, BType& B) override;
void processB(const BType& B, CType& C) override;
void processC(const CType& C, DType& D) override;
void processD(const DType& D, Out& out) override;
};
int main() {
SimpleProcessor processor;
Pipeline pipeline;
pipeline.processA = &processor;
pipeline.processB = &processor; 
pipeline.processC = &processor; 
pipeline.processD = &processor; 
In in;
Out out;
pipeline.process(in, out);
}

现场演示。

你的第二种方法也可以。您可以使用类似lambda的东西来调整单个对象以适应每个std::function:

struct Pipeline {
std::function<void(const In& in, BType& B)>   processA;
std::function<void(const BType& B, CType& C)> processB;
std::function<void(const CType& C, DType& D)> processC;
std::function<void(const DType& D, Out& out)> processD;
void process(const In& in, Out& out) {
BType B;
processA(in, B);
CType C;
processB(B, C);
DType D;
processC(C, D);
processD(D,out);
}
};
int main() {
SimpleProcessor proc;
Pipeline pipeline;
pipeline.processA = [&proc](const In& in, BType& B){ return proc.processA(in, B); };
pipeline.processB = [&proc](const BType& B, CType& C){ return proc.processB(B, C); }; 
pipeline.processC = [&proc](const CType& C, DType& D){ return proc.processC(C, D); }; 
pipeline.processD = [&proc](const DType& D, Out& out){ return proc.processD(D, out); }; 
In in;
Out out;
pipeline.process(in, out);
}

现场演示。

是的,这两种方法都允许您同时运行每个进程,但您的BTypeCTypeDType必须支持并发访问,这样才能同时写入和读取它们。例如,并发队列。