TPIE

2362a60
split.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2016, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_SPLIT_H
21 #define TPIE_PIPELINING_SPLIT_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
26 
27 namespace tpie {
28 namespace pipelining {
29 
39 template <typename T>
40 class split {
41 public:
42  class source_base : public node {
43  public:
44  source_base() = default;
45  source_base(source_base &&) = default;
46 
47  virtual void push(const T & v) = 0;
48 
49  protected:
50  ~source_base() {}
51  };
52 
53  template <typename dest_t>
54  class source_impl : public source_base {
55  public:
56  source_impl(dest_t dest, node_token sink_token, std::vector<source_base *> & the_sources)
57  : the_sources(the_sources)
58  , dest(std::move(dest))
59  {
60  this->set_name("Split source", PRIORITY_INSIGNIFICANT);
61  this->add_push_destination(this->dest);
62 
63  this->get_node_map()->union_set(sink_token.get_map());
64  bits::node_map::ptr m = this->get_node_map()->find_authority();
65  m->add_relation(sink_token.id(), this->get_token().id(), bits::pushes);
66  }
67 
68  source_impl(source_impl &&) = default;
69 
70  virtual void prepare() override {
71  the_sources.push_back(this);
72  };
73 
74  virtual void push(const T & v) override {
75  dest.push(v);
76  }
77 
78  private:
79  std::vector<source_base *> & the_sources;
80  dest_t dest;
81  };
82 
83  pipe_begin<factory<source_impl, node_token, std::vector<source_base *> &> > source() {
84  return {sink_token, the_sources};
85  }
86 
87  class sink_impl : public node {
88  public:
89  typedef T item_type;
90 
91  sink_impl(node_token sink_token, std::vector<source_base *> & the_sources)
92  : node(sink_token), the_sources(the_sources)
93  {
94  set_name("Join sink", PRIORITY_INSIGNIFICANT);
95  }
96 
97  void push(const T & v) {
98  for (auto & source : the_sources)
99  source->push(v);
100  }
101 
102  private:
103  std::vector<source_base *> & the_sources;
104  };
105 
107  return {sink_token, the_sources};
108  }
109 
110 private:
111  std::vector<source_base *> the_sources;
112  node_token sink_token;
113 };
114 
115 } // namespace pipelining
116 } // namespace tpie
117 
118 #endif // TPIE_PIPELINING_SPLIT_H
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Definition: node.h:251
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:632
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Split one push streams into multiple.
Definition: split.h:40
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: split.h:70
node()
Default constructor, using a new node_token.