Unfortunately there is no way (without dynamic compilation) to change the number of output ports in a multifunction_node. You can create the maximum number of ports (which is controlled by a macro switch and depends on the compiler), and just attach to the ports dynamically. If you do a try_put to a port and there is no successor attached, the try_put fails and you can react to this at runtime.
Another way to do it (albeit with some frustration, I think) is to build a binary tree of two-port multifunction_nodes. If you use a class with an output destination as a field, construct each node to react to one bit of the destination and output to port 0 or port 1, depending on the result of the mask. the scheduler short circuit would steer the output relatively quickly through the tree, but you'd pay a bit of a penalty for the multiple dynamic calls.
Or you could use some other base besides 2 (like, say, 10.)
Addendum: After talking with Mike (the designer of flow::graph), we realized there is another way to handle this, which would allow a dynamic number of ports. You would have to do a little low-level stuff, but it goes like this:
#include "tbb/tbb.h"
#include <iostream>
using namespace tbb::flow;
tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
output_port_vector_t &my_ports;
public:
multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
continue_msg operator()(const int in) {
int current_size = my_ports.size();
if(in >= current_size) {
// error condition? grow concurrent_vector?
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << "Received input out of range(" << in << ")" << std::endl;
}
else {
// do computation
my_ports[in]->try_put(in*2);
}
return continue_msg();
}
};
struct output_function_body {
int my_prefix;
output_function_body(int i) : my_prefix(i) { }
int operator()(const int i) {
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << " output node "<< my_prefix << " received " << i << std::endl;
return i;
}
};
int main() {
graph g;
output_port_vector_t output_ports;
function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
// create broadcast_nodes
for( int i = 0; i < 20; ++i) {
bnode_element_t *bp = new bnode_element_t(g);
output_ports.push_back(bp);
}
// attach the output nodes to the broadcast_nodes
for(int i = 0; i < 20; ++i) {
function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
make_edge(*(output_ports[i]),*fp);
}
for( int i = 0; i < 21; ++i) {
my_node.try_put(i);
}
g.wait_for_all();
return 0;
}
Notes on the above:
- We are creating a
concurrent_vector
of pointers to broadcast_nodes
. The successors to the function_node
are attached to these broadcast_nodes
. The output of the function_node
is ignored.
- The concurrent_vector is passed in to the constructor of the
multioutput_function_body
. We don't need a multifunction_node at all in this case. The multioutput_function_body
decides which broadcast_node
to try_put
to at runtime. Note we are doing explicit try_puts
to the broadcast_nodes
. These result in a task being spawned for each try_put
. Spawned tasks are faster than enqueued tasks, but there is more scheduling overhead than just returning a value from a node.
- I didn't add the cleanup of the heap-allocated
broadcast_nodes
and the output function_nodes
. The "obvious" place to do the deletion of the broadcast_nodes
would be in the destructor of multioutput_function_body
. You should not do this, as the creation of the function_node
results in the copy-construction of the passed-in function bodies, and multiple copies of the function_body
will have the reference to the concurrent_vector of broadcast_node
pointers. Do the deletion after the g.wait_for_all()
.
I used concurrent_vector
because it allows the access to the pointers while the concurrent_vector
is being modified. The question of whether additional broadcast_node
pointers can be added during the execution of the graph is open. I hope you are only creating the nodes and using them as-is, not modifying them on-the-fly. concurrent_vectors
do not reallocate and move already-initialized elements when growing the structure; that is why I used it, but don't think this is a complete answer if you are hoping to add additional nodes while the graph is running.