如何使Intel TBB multifunction_node具有动态的端口数

How to make Intel TBB multifunction_node with dynamic number of ports?

本文关键字:动态 node Intel 何使 TBB multifunction      更新时间:2023-10-16

我是新的英特尔TBB库。正如你所看到的,我的问题与tbb::flow::图表有关。我需要实现这样的逻辑:

用户用一些逻辑块绘制图形。每个块(节点)可以有无限的连接(边),因此每个块(节点)可以选择下一步将数据放在哪里。然后我的程序将在TBB库的帮助下构建这样的图形并进行计算。

所以我不知道是否有可能构建节点(我猜它必须是multifunction_node)与输出端口的动态数量。你能告诉我怎么做吗?

不幸的是,没有办法(没有动态编译)改变multifunction_node中的输出端口数量。您可以创建端口的最大数量(由宏开关控制并取决于编译器),并动态地附加到端口。如果您对端口执行try_put操作,并且没有附加后续操作,则try_put操作失败,您可以在运行时对此作出反应。

另一种方法(尽管有些沮丧,我认为)是构建一个双端口multifunction_nodes的二叉树。如果使用带有输出目的地作为字段的类,则根据掩码的结果构造每个节点以响应目的地的一个比特并输出到端口0或端口1。调度器短路将引导输出相对较快地通过树,但是您将为多个动态调用付出一些代价。

或者你可以使用除2以外的其他基数(比如,10)

附录:在与Mike (flow::graph的设计者)讨论后,我们意识到有另一种方法可以处理这个问题,它将允许动态端口数量。你需要做一些低级别的事情,但是它是这样的:

#include "tbb/tbb.h"
#include <iostream>
using namespace tbb::flow;
tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};
struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};
int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }
    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }
    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

以上说明:

  • 我们正在创建指向broadcast_nodes的指针concurrent_vectorfunction_node的继承者附着在这些broadcast_nodes上。function_node的输出被忽略。
  • concurrent_vector传递给multioutput_function_body的构造函数。在这种情况下,我们根本不需要multifunction_node。multioutput_function_body在运行时决定broadcast_nodetry_put的归属。注意我们正在显式地对broadcast_nodes执行try_puts。这将导致为每个try_put生成一个任务。生成的任务比排队的任务要快,但是比从节点返回值有更多的调度开销。我没有添加清理堆分配的broadcast_nodes和输出function_nodes。删除broadcast_nodes的"明显"位置是在multioutput_function_body的析构函数中。您不应该这样做,因为function_node的创建导致传入函数体的复制构造,并且function_body的多个副本将具有对broadcast_node指针的concurrent_vector的引用。在g.wait_for_all()之后进行删除。

我使用concurrent_vector是因为它允许在修改concurrent_vector时访问指针。在执行图期间是否可以添加额外的broadcast_node指针的问题是开放的。我希望您只是创建节点并按原样使用它们,而不是动态地修改它们。concurrent_vectors在增长结构时不重新分配和移动已经初始化的元素;这就是为什么我使用它,但不要认为这是一个完整的答案,如果你希望在图运行时添加额外的节点。