TPIE

2362a60
virtual.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet cino+=(0 :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_VIRTUAL_H__
25 #define __TPIE_PIPELINING_VIRTUAL_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/pipe_base.h>
29 #include <tpie/pipelining/pipeline.h>
30 #include <tpie/pipelining/factory_helpers.h>
31 #include <tpie/pipelining/helpers.h>
32 
33 namespace tpie {
34 
35 namespace pipelining {
36 
43 public:
44  virtual ~virtual_container() {}
45 };
46 
47 namespace bits {
48 
53 template <typename T>
55  typedef const T & type;
56 };
57 
58 template <typename T>
59 struct maybe_add_const_ref<const T &> {
60  typedef const T & type;
61 };
62 
63 template <typename T>
64 struct maybe_add_const_ref<const T *> {
65  typedef const T * type;
66 };
67 
68 template <typename T>
69 struct maybe_add_const_ref<T &> {
70  typedef T & type;
71 };
72 
73 template <typename T>
74 struct maybe_add_const_ref<T *> {
75  typedef T * type;
76 };
77 
83 template <typename Input>
84 class virtsrc : public node {
85  typedef typename maybe_add_const_ref<Input>::type input_type;
86 
87 public:
88  virtual const node_token & get_token() = 0;
89  virtual void push(input_type v) = 0;
90 };
91 
95 template <typename dest_t, typename T>
96 class virtsrc_impl : public virtsrc<T> {
97 public:
98  typedef T item_type;
99 
100 private:
101  typedef typename maybe_add_const_ref<item_type>::type input_type;
102  dest_t dest;
103 
104 public:
105  virtsrc_impl(dest_t dest)
106  : dest(std::move(dest))
107  {
108  node::add_push_destination(this->dest);
109  this->set_name("Virtual source", PRIORITY_INSIGNIFICANT);
110  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
111  }
112 
113  const node_token & get_token() {
114  return node::get_token();
115  }
116 
117  void push(input_type v) {
118  dest.push(v);
119  }
120 };
121 
127 template <typename Output>
128 class virtrecv : public node {
129  virtrecv *& m_self;
130  virtsrc<Output> * m_virtdest;
131 
132 public:
133  typedef Output item_type;
134 
135  virtrecv(virtrecv *& self)
136  : m_self(self)
137  , m_virtdest(0)
138  {
139  m_self = this;
140  this->set_name("Virtual destination", PRIORITY_INSIGNIFICANT);
141  this->set_plot_options(node::PLOT_BUFFERED | node::PLOT_SIMPLIFIED_HIDE);
142  }
143 
144  virtrecv(virtrecv && o)
145  : node(std::move(o))
146  , m_self(o.m_self)
147  , m_virtdest(std::move(o.m_virtdest)) {
148  m_self = this;
149  }
150 
151  void begin() {
152  node::begin();
153  if (m_virtdest == 0) {
154  throw tpie::exception("No virtual destination");
155  }
156  }
157 
158  void push(typename maybe_add_const_ref<Output>::type v) {
159  m_virtdest->push(v);
160  }
161 
162  void set_destination(virtsrc<Output> * dest) {
163  if (m_virtdest != 0) {
164  throw tpie::exception("Virtual destination set twice");
165  }
166 
167  m_virtdest = dest;
168  add_push_destination(dest->get_token());
169  }
170 };
171 
182 class virt_node {
183 public:
184  typedef std::shared_ptr<virt_node> ptr;
185 
186 private:
187  std::unique_ptr<node> m_node;
188  std::unique_ptr<virtual_container> m_container;
189  ptr m_left;
190  ptr m_right;
191 
192 public:
196  static ptr take_own(node * pipe) {
197  virt_node * n = new virt_node();
198  n->m_node.reset(pipe);
199  ptr res(n);
200  return res;
201  }
202 
206  static ptr combine(ptr left, ptr right) {
207  virt_node * n = new virt_node();
208  n->m_left = left;
209  n->m_right = right;
210  ptr res(n);
211  return res;
212  }
213 
219  m_container.reset(ctr);
220  }
221 };
222 
228 template <typename T, typename U, typename Result>
230  static Result go(...) {
232  }
233 };
234 
240 template <typename T, typename Result>
241 struct assert_types_equal_and_return<T, T, Result> {
242  static Result go(Result r) {
243  return r;
244  }
245 };
246 
251  // pipeline_base has virtual dtor and shared_ptr to m_nodeMap
252 protected:
253  virt_node::ptr m_node;
254 public:
255  virtual_chunk_base() {}
256 
257  virt_node::ptr get_node() const { return m_node; }
258  virtual_chunk_base(node_map::ptr nodeMap, virt_node::ptr ptr)
259  : m_node(ptr)
260  {
261  this->m_nodeMap = nodeMap;
262  }
263 
264  virtual_chunk_base(node_map::ptr nodeMap) {
265  this->m_nodeMap = nodeMap;
266  }
267 
268  void set_container(virtual_container * ctr) {
269  m_node->set_container(ctr);
270  }
271 
272  bool empty() const { return m_node.get() == 0; }
273 };
274 
275 } // namespace bits
276 
277 // Predeclare
278 template <typename Input>
280 
281 // Predeclare
282 template <typename Input, typename Output>
284 
285 // Predeclare
286 template <typename Output>
288 
289 namespace bits {
290 
291 class access {
292  template <typename>
293  friend class pipelining::virtual_chunk_end;
294  template <typename, typename>
295  friend class pipelining::virtual_chunk;
296  template <typename>
297  friend class pipelining::virtual_chunk_begin;
298  template <typename>
299  friend class vfork_node;
300  template <typename>
301  friend class vpush_node;
302 
303  template <typename Input>
304  static virtsrc<Input> * get_source(const virtual_chunk_end<Input> &);
305  template <typename Input, typename Output>
306  static virtsrc<Input> * get_source(const virtual_chunk<Input, Output> &);
307  template <typename Input, typename Output>
308  static virtrecv<Output> * get_destination(const virtual_chunk<Input, Output> &);
309  template <typename Output>
310  static virtrecv<Output> * get_destination(const virtual_chunk_begin<Output> &);
311 };
312 
313 } // namespace bits
314 
318 template <typename Input>
320  friend class bits::access;
321  typedef bits::access acc;
322  typedef bits::virtsrc<Input> src_type;
323  src_type * m_src;
324 
325  src_type * get_source() const { return m_src; }
326 
327 public:
332  : m_src(0)
333  {}
334 
339  template <typename fact_t>
341  *this = std::forward<pipe_end<fact_t>>(pipe);
342  set_container(ctr);
343  }
344 
350  template <typename Mid>
352  const virtual_chunk_end<Mid> & right);
353 
357  template <typename fact_t>
359  if (this->m_node) {
360  log_error() << "Virtual chunk assigned twice" << std::endl;
361  throw tpie::exception("Virtual chunk assigned twice");
362  }
363 
364  typedef typename fact_t::constructed_type constructed_type;
365  m_src = new bits::virtsrc_impl<constructed_type, Input>(pipe.factory.construct());
366  this->m_node = bits::virt_node::take_own(m_src);
367  this->m_nodeMap = m_src->get_node_map();
368 
369  return *this;
370  }
371 };
372 
376 template <typename Input, typename Output=Input>
377 class virtual_chunk : public bits::virtual_chunk_base {
378  friend class bits::access;
379  typedef bits::access acc;
380  typedef bits::virtsrc<Input> src_type;
381  typedef bits::virtrecv<Output> recv_type;
382  src_type * m_src;
383  recv_type * m_recv;
384  src_type * get_source() const { return m_src; }
385  recv_type * get_destination() const { return m_recv; }
386 
387 public:
392  : m_src(0)
393  , m_recv(0)
394  {}
395 
400  template <typename fact_t>
402  *this = std::forward<pipe_middle<fact_t>>(pipe);
403  set_container(ctr);
404  }
405 
411  template <typename Mid>
413  const virtual_chunk<Mid, Output> & right)
414  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
415  {
416  m_src = acc::get_source(left);
417  m_recv = acc::get_destination(right);
418  }
419 
421  const virtual_chunk_begin<Output> & right)
422  : virtual_chunk_base(left.get_node_map(), bits::virt_node::combine(left.get_node(), right.get_node()))
423  {
424  m_src = acc::get_source(left);
425  m_recv = acc::get_destination(right);
426  }
427 
428 
432  template <typename fact_t>
434  if (this->m_node) {
435  log_error() << "Virtual chunk assigned twice" << std::endl;
436  throw tpie::exception("Virtual chunk assigned twice");
437  }
438  typedef typename fact_t::template constructed<recv_type>::type constructed_type;
439  recv_type temp(m_recv);
440  this->m_nodeMap = temp.get_node_map();
441  fact_t f = std::move(pipe.factory);
442  f.set_destination_kind_push();
443  m_src = new bits::virtsrc_impl<constructed_type, Input>(f.construct(std::move(temp)));
444  this->m_node = bits::virt_node::take_own(m_src);
445 
446  return *this;
447  }
448 
452  template <typename NextOutput>
454  if (empty()) {
456  ::go(&dest);
457  }
458  bits::virtsrc<Output> * dst=acc::get_source(dest);
459  if (dest.empty() || !dst) {
461  ::go(this);
462  }
463  m_recv->set_destination(dst);
464  return virtual_chunk<Input, NextOutput>(*this, dest);
465  }
466 
471  if (empty()) {
473  ::go(&dest);
474  }
475  m_recv->set_destination(acc::get_source(dest));
476  return virtual_chunk_end<Input>(*this, dest);
477  }
478 };
479 
480 template <typename Input>
481 template <typename Mid>
483  const virtual_chunk_end<Mid> & right)
484  : virtual_chunk_base(left.get_node_map(),
485  bits::virt_node::combine(left.get_node(), right.get_node()))
486 {
487  m_src = acc::get_source(left);
488 }
489 
493 template <typename Output>
495  friend class bits::access;
496  typedef bits::access acc;
497  typedef bits::virtrecv<Output> recv_type;
498  recv_type * m_recv;
499  recv_type * get_destination() const { return m_recv; }
500 
501 public:
506  : m_recv(0)
507  {}
508 
513  template <typename fact_t>
515  *this = std::forward<pipe_begin<fact_t>>(pipe);
516  set_container(ctr);
517  }
518 
524  template <typename Mid>
526  const virtual_chunk<Mid, Output> & right)
527  : virtual_chunk_base(left.get_node_map(),
528  bits::virt_node::combine(left.get_node(), right.get_node()))
529  {
530  m_recv = acc::get_destination(right);
531  }
532 
536  template <typename fact_t>
538  if (this->m_node) {
539  log_error() << "Virtual chunk assigned twice" << std::endl;
540  throw tpie::exception("Virtual chunk assigned twice");
541  }
542  typedef typename fact_t::template constructed<recv_type>::type constructed_type;
543  recv_type temp(m_recv);
544  this->m_nodeMap = m_recv->get_node_map();
545  fact_t f = std::move(pipe.factory);
546  f.set_destination_kind_push();
547  this->m_node = bits::virt_node::take_own(new constructed_type(f.construct(std::move(temp))));
548  return *this;
549  }
550 
554  template <typename NextOutput>
556  if (empty()) throw virtual_chunk_missing_begin();
557  if (dest.empty()) {
559  ::go(this);
560  }
561  m_recv->set_destination(acc::get_source(dest));
562  return virtual_chunk_begin<NextOutput>(*this, dest);
563  }
564 
568  virtual_chunk_base operator|(virtual_chunk_end<Output> dest) {
569  if (empty()) throw virtual_chunk_missing_begin();
570  if (dest.empty()) throw virtual_chunk_missing_end();
571  m_recv->set_destination(acc::get_source(dest));
572  return virtual_chunk_base(this->m_nodeMap,
573  bits::virt_node::combine(get_node(), dest.get_node()));
574  }
575 };
576 
577 namespace bits {
578 
579 template <typename Input>
580 virtsrc<Input> * access::get_source(const virtual_chunk_end<Input> & chunk) {
581  return chunk.get_source();
582 }
583 
584 template <typename Input, typename Output>
585 virtsrc<Input> * access::get_source(const virtual_chunk<Input, Output> & chunk) {
586  return chunk.get_source();
587 }
588 
589 template <typename Input, typename Output>
590 virtrecv<Output> * access::get_destination(const virtual_chunk<Input, Output> & chunk) {
591  return chunk.get_destination();
592 }
593 
594 template <typename Output>
595 virtrecv<Output> * access::get_destination(const virtual_chunk_begin<Output> & chunk) {
596  return chunk.get_destination();
597 }
598 
599 template <typename T>
600 class vfork_node {
601 public:
602  template <typename dest_t>
603  class type : public node {
604  public:
605  typedef T item_type;
606 
607  type(dest_t && dest, virtual_chunk_end<T> out)
608  : vnode(out.get_node())
609  , dest2(bits::access::get_source(out))
610  , dest(std::move(dest))
611  {
612  add_push_destination(this->dest);
613  if (dest2) add_push_destination(*dest2);
614  }
615 
616  void push(T v) {
617  dest.push(v);
618  if (dest2) dest2->push(v);
619  }
620 
621  private:
622  // This counted reference ensures dest2 is not deleted prematurely.
623  virt_node::ptr vnode;
624 
625  virtsrc<T> * dest2;
626 
627  dest_t dest;
628  };
629 };
630 
631 
632 template <typename T>
633 class vpush_node : public node {
634 public:
635  typedef T item_type;
636 
638  : vnode(out.get_node())
639  , dest(bits::access::get_source(out))
640  {
641  if (dest) add_push_destination(*dest);
642  }
643 
644  void push(T v) {
645  if (dest) dest->push(v);
646  }
647 
648 private:
649  // This counted reference ensures dest is not deleted prematurely.
650  virt_node::ptr vnode;
651  virtsrc<T> * dest;
652 };
653 
654 } // namespace bits
655 
656 template <typename T>
658  return out;
659 }
660 
661 template <typename T>
662 pipe_end<termfactory<bits::vpush_node<T>, virtual_chunk_end<T> > > push_to_virtual(const virtual_chunk_end<T> & out) {
663  return out;
664 }
665 
666 template <typename T>
667 virtual_chunk<T> vfork(const virtual_chunk_end<T> & out) {
668  if (out.empty()) return virtual_chunk<T>();
669  return fork_to_virtual(out);
670 }
671 
672 template <typename T>
673 inline virtual_chunk<T> chunk_if(bool b, virtual_chunk<T> t) {
674  if (b)
675  return t;
676  else
677  return virtual_chunk<T>();
678 }
679 
680 template <typename T>
681 virtual_chunk_end<T> chunk_end_if(bool b, virtual_chunk_end<T> t) {
682  if (b)
683  return t;
684  else
685  return virtual_chunk_end<T>(null_sink<T>());
686 }
687 
688 } // namespace pipelining
689 
690 } // namespace tpie
691 
692 #endif // __TPIE_PIPELINING_VIRTUAL_H__
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:299
virtual_chunk< Input, NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:453
virtual_chunk()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:391
virtual_chunk_begin< NextOutput > operator|(virtual_chunk< Output, NextOutput > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:555
void begin()
Begin pipeline processing phase.
Definition: virtual.h:151
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Definition: node.h:251
Virtual node that is injected into the end of a virtual chunk.
Definition: virtual.h:128
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:632
virtual_chunk_begin(pipe_begin< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:514
Base class of virtual chunks. Owns a virt_node.
Definition: virtual.h:250
static ptr combine(ptr left, ptr right)
Aggregate ownership of virt_nodes.
Definition: virtual.h:206
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Virtual chunk that has no output (that is, virtual consumer).
Definition: virtual.h:279
Virtual superclass for pipelines and subpipelines.
Definition: pipeline.h:110
virtual_chunk_begin & operator=(pipe_begin< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:537
virtual_chunk_begin()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:505
Virtual base class for extra data to go with virtual chunks.
Definition: virtual.h:42
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Virtual chunk that has no input (that is, virtual producer).
Definition: virtual.h:287
The maybe_add_const_ref helper struct adds const & to a type unless the type is already const...
Definition: virtual.h:54
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
static ptr take_own(node *pipe)
Take std::new-ownership of given node.
Definition: virtual.h:196
Ownership of nodes.
Definition: virtual.h:182
Helper class that throws an exception on behalf of virtual_chunks that have not been assigned a pipe_...
Definition: virtual.h:229
virtual_chunk_end(pipe_end< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:340
virtual_chunk_end & operator=(pipe_end< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:358
void set_container(virtual_container *ctr)
Set and/or reset the virtual_container assigned to this virtual node.
Definition: virtual.h:218
virtual_chunk_end()
Constructor that leaves the virtual chunk unassigned.
Definition: virtual.h:331
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
Virtual base node that is injected into the beginning of a virtual chunk.
Definition: virtual.h:84
virtual_chunk_begin(const virtual_chunk_begin< Mid > &left, const virtual_chunk< Mid, Output > &right)
Constructor that combines two virtual chunks.
Definition: virtual.h:525
virtual_chunk & operator=(pipe_middle< fact_t > &&pipe)
Construct a node and assign it to this virtual chunk.
Definition: virtual.h:433
node()
Default constructor, using a new node_token.
virtual_chunk_end< Input > operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:470
Concrete implementation of virtsrc.
Definition: virtual.h:96
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:147
virtual_chunk(const virtual_chunk< Input, Mid > &left, const virtual_chunk< Mid, Output > &right)
Constructor that combines two virtual chunks.
Definition: virtual.h:412
Virtual chunk that has input and output.
Definition: virtual.h:283
virtual_chunk_base operator|(virtual_chunk_end< Output > dest)
Connect this virtual chunk to another chunk.
Definition: virtual.h:568
virtual_chunk(pipe_middle< fact_t > &&pipe, virtual_container *ctr=0)
Constructor that recursively constructs a node and takes ownership of it.
Definition: virtual.h:401