24 #ifndef __TPIE_PIPELINING_VIRTUAL_H__
25 #define __TPIE_PIPELINING_VIRTUAL_H__
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/pipe_base.h>
29 #include <tpie/pipelining/pipeline.h>
30 #include <tpie/pipelining/factory_helpers.h>
31 #include <tpie/pipelining/helpers.h>
35 namespace pipelining {
55 typedef const T & type;
60 typedef const T & type;
65 typedef const T * type;
83 template <
typename Input>
85 typedef typename maybe_add_const_ref<Input>::type input_type;
89 virtual void push(input_type v) = 0;
95 template <
typename dest_t,
typename T>
101 typedef typename maybe_add_const_ref<item_type>::type input_type;
106 : dest(std::move(dest))
109 this->
set_name(
"Virtual source", PRIORITY_INSIGNIFICANT);
117 void push(input_type v) {
127 template <
typename Output>
133 typedef Output item_type;
140 this->
set_name(
"Virtual destination", PRIORITY_INSIGNIFICANT);
147 , m_virtdest(std::move(o.m_virtdest)) {
153 if (m_virtdest == 0) {
158 void push(
typename maybe_add_const_ref<Output>::type v) {
162 void set_destination(virtsrc<Output> * dest) {
163 if (m_virtdest != 0) {
184 typedef std::shared_ptr<virt_node> ptr;
187 std::unique_ptr<node> m_node;
188 std::unique_ptr<virtual_container> m_container;
198 n->m_node.reset(pipe);
219 m_container.reset(ctr);
228 template <
typename T,
typename U,
typename Result>
230 static Result go(...) {
240 template <
typename T,
typename Result>
242 static Result go(Result r) {
253 virt_node::ptr m_node;
257 virt_node::ptr get_node()
const {
return m_node; }
261 this->m_nodeMap = nodeMap;
265 this->m_nodeMap = nodeMap;
269 m_node->set_container(ctr);
272 bool empty()
const {
return m_node.get() == 0; }
278 template <
typename Input>
282 template <
typename Input,
typename Output>
286 template <
typename Output>
294 template <
typename,
typename>
303 template <
typename Input>
305 template <
typename Input,
typename Output>
307 template <
typename Input,
typename Output>
309 template <
typename Output>
318 template <
typename Input>
325 src_type * get_source()
const {
return m_src; }
339 template <
typename fact_t>
341 *
this = std::forward<pipe_end<fact_t>>(pipe);
350 template <
typename M
id>
357 template <
typename fact_t>
360 log_error() <<
"Virtual chunk assigned twice" << std::endl;
364 typedef typename fact_t::constructed_type constructed_type;
376 template <
typename Input,
typename Output=Input>
377 class virtual_chunk :
public bits::virtual_chunk_base {
378 friend class bits::access;
379 typedef bits::access acc;
380 typedef bits::virtsrc<Input> src_type;
381 typedef bits::virtrecv<Output> recv_type;
384 src_type * get_source()
const {
return m_src; }
385 recv_type * get_destination()
const {
return m_recv; }
400 template <
typename fact_t>
402 *
this = std::forward<pipe_middle<fact_t>>(pipe);
411 template <
typename M
id>
414 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
416 m_src = acc::get_source(left);
417 m_recv = acc::get_destination(right);
422 : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
424 m_src = acc::get_source(left);
425 m_recv = acc::get_destination(right);
432 template <
typename fact_t>
435 log_error() <<
"Virtual chunk assigned twice" << std::endl;
438 typedef typename fact_t::template constructed<recv_type>::type constructed_type;
441 fact_t f = std::move(pipe.factory);
442 f.set_destination_kind_push();
452 template <
typename NextOutput>
459 if (dest.empty() || !dst) {
463 m_recv->set_destination(dst);
475 m_recv->set_destination(acc::get_source(dest));
480 template <
typename Input>
481 template <
typename M
id>
484 : virtual_chunk_base(left.get_node_map(),
485 bits::virt_node::combine(left.get_node(), right.get_node()))
487 m_src = acc::get_source(left);
493 template <
typename Output>
499 recv_type * get_destination()
const {
return m_recv; }
513 template <
typename fact_t>
515 *
this = std::forward<pipe_begin<fact_t>>(pipe);
524 template <
typename M
id>
527 : virtual_chunk_base(left.get_node_map(),
528 bits::virt_node::combine(left.get_node(), right.get_node()))
530 m_recv = acc::get_destination(right);
536 template <
typename fact_t>
539 log_error() <<
"Virtual chunk assigned twice" << std::endl;
542 typedef typename fact_t::template constructed<recv_type>::type constructed_type;
545 fact_t f = std::move(pipe.factory);
546 f.set_destination_kind_push();
554 template <
typename NextOutput>
561 m_recv->set_destination(acc::get_source(dest));
571 m_recv->set_destination(acc::get_source(dest));
572 return virtual_chunk_base(this->m_nodeMap,
579 template <
typename Input>
580 virtsrc<Input> * access::get_source(
const virtual_chunk_end<Input> & chunk) {
581 return chunk.get_source();
584 template <
typename Input,
typename Output>
585 virtsrc<Input> * access::get_source(
const virtual_chunk<Input, Output> & chunk) {
586 return chunk.get_source();
589 template <
typename Input,
typename Output>
590 virtrecv<Output> * access::get_destination(
const virtual_chunk<Input, Output> & chunk) {
591 return chunk.get_destination();
594 template <
typename Output>
595 virtrecv<Output> * access::get_destination(
const virtual_chunk_begin<Output> & chunk) {
596 return chunk.get_destination();
599 template <
typename T>
602 template <
typename dest_t>
608 : vnode(out.get_node())
609 , dest2(bits::access::get_source(out))
610 , dest(std::move(dest))
618 if (dest2) dest2->push(v);
623 virt_node::ptr vnode;
632 template <
typename T>
638 : vnode(out.get_node())
639 , dest(bits::access::get_source(out))
645 if (dest) dest->push(v);
650 virt_node::ptr vnode;
656 template <
typename T>
661 template <
typename T>
662 pipe_end<termfactory<bits::vpush_node<T>, virtual_chunk_end<T> > > push_to_virtual(
const virtual_chunk_end<T> & out) {
666 template <
typename T>
667 virtual_chunk<T> vfork(
const virtual_chunk_end<T> & out) {
668 if (out.empty())
return virtual_chunk<T>();
669 return fork_to_virtual(out);
672 template <
typename T>
673 inline virtual_chunk<T> chunk_if(
bool b, virtual_chunk<T> t) {
677 return virtual_chunk<T>();
680 template <
typename T>
681 virtual_chunk_end<T> chunk_end_if(
bool b, virtual_chunk_end<T> t) {
685 return virtual_chunk_end<T>(null_sink<T>());
692 #endif // __TPIE_PIPELINING_VIRTUAL_H__
virtual void begin()
Begin pipeline processing phase.
virtual_chunk< Input, NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
virtual_chunk()
Constructor that leaves the virtual chunk unassigned.
virtual_chunk_begin< NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
void begin()
Begin pipeline processing phase.
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Virtual node that is injected into the end of a virtual chunk.
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
virtual_chunk_begin(pipe_begin< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.
Base class of virtual chunks. Owns a virt_node.
static ptr combine(ptr left, ptr right)
Aggregate ownership of virt_nodes.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Virtual chunk that has no output (that is, virtual consumer).
Virtual superclass for pipelines and subpipelines.
virtual_chunk_begin & operator=(pipe_begin< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
virtual_chunk_begin()
Constructor that leaves the virtual chunk unassigned.
Virtual base class for extra data to go with virtual chunks.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Virtual chunk that has no input (that is, virtual producer).
The maybe_add_const_ref helper struct adds const & to a type unless the type is already const...
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
static ptr take_own(node *pipe)
Take std::new-ownership of given node.
Helper class that throws an exception on behalf of virtual_chunks that have not been assigned a pipe_...
virtual_chunk_end(pipe_end< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.
virtual_chunk_end & operator=(pipe_end< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
void set_container(virtual_container *ctr)
Set and/or reset the virtual_container assigned to this virtual node.
virtual_chunk_end()
Constructor that leaves the virtual chunk unassigned.
A pipe_middle class pushes input down the pipeline.
Virtual base node that is injected into the beginning of a virtual chunk.
virtual_chunk_begin(const virtual_chunk_begin< Mid > &left, const virtual_chunk< Mid, Output > &right)
Constructor that combines two virtual chunks.
virtual_chunk & operator=(pipe_middle< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
node()
Default constructor, using a new node_token.
virtual_chunk_end< Input > operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
Concrete implementation of virtsrc.
logstream & log_error()
Return logstream for writing error log messages.
virtual_chunk(const virtual_chunk< Input, Mid > &left, const virtual_chunk< Mid, Output > &right)
Constructor that combines two virtual chunks.
Virtual chunk that has input and output.
virtual_chunk_base operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
virtual_chunk(pipe_middle< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.