20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
28 namespace pipelining {
50 virtual void push(
const T & v) = 0;
56 template <
typename dest_t>
61 , the_source(the_source)
62 , dest(std::move(dest))
64 this->
set_name(
"Join source", PRIORITY_INSIGNIFICANT);
71 if (*the_source != NULL && *the_source !=
this) {
75 throw exception(
"Attempted to set join source a second time");
80 virtual void push(
const T & v)
override {
85 source_base ** the_source;
89 pipe_begin<factory<source_impl, node_token, source_base **> > source() {
90 return factory<source_impl, node_token, source_base **>(source_token, &the_source);
98 : the_source(the_source)
100 set_name(
"Join sink", PRIORITY_INSIGNIFICANT);
105 the_source_cache = *the_source;
108 void push(
const T & v) {
109 the_source_cache->push(v);
113 source_base * the_source_cache;
114 source_base ** the_source;
117 pipe_end<termfactory<sink_impl, node_token, source_base **> > sink() {
118 return termfactory<sink_impl, node_token, source_base **>(source_token, &the_source);
121 join() : the_source(NULL) {}
123 source_base * the_source;
124 node_token source_token;
130 #endif // TPIE_PIPELINING_JOIN_H
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void begin() override
Begin pipeline processing phase.
Joins multiple push streams into one.
virtual void prepare() override
Called before memory assignment but after depending phases have executed and ended.
node()
Default constructor, using a new node_token.