How to make Intel TBB multifunction_node with dynamic number of ports?
Asked Answered
S

1

6

I'm new with Intel TBB library. As you can see my question is related to tbb::flow::graph. I need to implement logic like:

User draws graph with some logic blocks. Every block(node) could have unlimited connections(edges) so every block(node) could choose where to put data next. Then my program will build such graph with help of TBB library and perform calculations.

So I don't know if it is possible to construct node (I guess it has to be multifunction_node) with dynamic number of output ports. Could you show me the way to do it please?

Sneaking answered 27/10, 2015 at 12:42 Comment(0)
S
5

Unfortunately there is no way (without dynamic compilation) to change the number of output ports in a multifunction_node. You can create the maximum number of ports (which is controlled by a macro switch and depends on the compiler), and just attach to the ports dynamically. If you do a try_put to a port and there is no successor attached, the try_put fails and you can react to this at runtime.

Another way to do it (albeit with some frustration, I think) is to build a binary tree of two-port multifunction_nodes. If you use a class with an output destination as a field, construct each node to react to one bit of the destination and output to port 0 or port 1, depending on the result of the mask. the scheduler short circuit would steer the output relatively quickly through the tree, but you'd pay a bit of a penalty for the multiple dynamic calls.

Or you could use some other base besides 2 (like, say, 10.)

Addendum: After talking with Mike (the designer of flow::graph), we realized there is another way to handle this, which would allow a dynamic number of ports. You would have to do a little low-level stuff, but it goes like this:

#include "tbb/tbb.h"
#include <iostream>

using namespace tbb::flow;

tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};

struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};

int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }

    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }

    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

Notes on the above:

  • We are creating a concurrent_vector of pointers to broadcast_nodes. The successors to the function_node are attached to these broadcast_nodes. The output of the function_node is ignored.
  • The concurrent_vector is passed in to the constructor of the multioutput_function_body. We don't need a multifunction_node at all in this case. The multioutput_function_body decides which broadcast_node to try_put to at runtime. Note we are doing explicit try_puts to the broadcast_nodes. These result in a task being spawned for each try_put. Spawned tasks are faster than enqueued tasks, but there is more scheduling overhead than just returning a value from a node.
  • I didn't add the cleanup of the heap-allocated broadcast_nodes and the output function_nodes. The "obvious" place to do the deletion of the broadcast_nodes would be in the destructor of multioutput_function_body. You should not do this, as the creation of the function_node results in the copy-construction of the passed-in function bodies, and multiple copies of the function_body will have the reference to the concurrent_vector of broadcast_node pointers. Do the deletion after the g.wait_for_all().

I used concurrent_vector because it allows the access to the pointers while the concurrent_vector is being modified. The question of whether additional broadcast_node pointers can be added during the execution of the graph is open. I hope you are only creating the nodes and using them as-is, not modifying them on-the-fly. concurrent_vectors do not reallocate and move already-initialized elements when growing the structure; that is why I used it, but don't think this is a complete answer if you are hoping to add additional nodes while the graph is running.

Sudan answered 28/10, 2015 at 12:57 Comment(5)
Anton's blog discusses the issues with concurrent growth during access for concurrent_vector. The simple way to add items is to use the zero_allocator for the concurrent_vector, and push_back the pointers to the broadcast nodes after they are constructed.Sudan
Many thanks for your reply! Unfortunately I do want to modify graph on-the-fly. So I will implement some sort of tree with multifunction_nodes (may be binary tree like you suggest). Anyway it will be a little headache))Sneaking
@Max, after thinking about it, I believe you can concurrently grow the concurrent_vector while the graph is running. You will have non-determinism while executing, but if you can live with that, and you are not destroying nodes during graph execution, you should be okay.Sudan
One more difference between the multifunction_node solution and this is broadcast nodes always accept inputs, so you could not use the return value from the try_put to detect whether another node is connected to the broadcast_node.Sudan
ok, I see. The multifunction_node solution seems to have more flexibility with cost of more complexity.Sneaking

© 2022 - 2024 — McMap. All rights reserved.