TPIE

2362a60
pipeline.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PIPELINE_H__
21 #define __TPIE_PIPELINING_PIPELINE_H__
22 
23 #include <tpie/types.h>
24 #include <iostream>
25 #include <tpie/pipelining/tokens.h>
27 #include <tpie/file_manager.h>
28 #include <unordered_set>
29 #include <mutex>
30 
31 namespace tpie {
32 
33 namespace pipelining {
34 
35 namespace bits {
36 
37 
43 public:
45  pipeline_base_base(const pipeline_base_base &) = default;
46  pipeline_base_base & operator=(const pipeline_base_base &) = default;
48  pipeline_base_base & operator=(pipeline_base_base &&) = default;
49 
65  void plot(std::ostream & out) {plot_impl(out, false);}
66 
77  void plot_full(std::ostream & out) {plot_impl(out, true);}
78 
82  virtual ~pipeline_base_base() {}
83 
84  void forward_any(std::string key, any_noncopyable value);
85 
86  bool can_fetch(std::string key);
87 
88  any_noncopyable & fetch_any(std::string key);
89 
90  node_map::ptr get_node_map() const {
91  return m_nodeMap;
92  }
93 
94  void output_memory(std::ostream & o) const;
95 
96  size_t uid() const {return m_uid;};
97 protected:
98 
99  node_map::ptr m_nodeMap;
100  size_t m_uid;
101 private:
102  void plot_impl(std::ostream & out, bool full);
103 };
104 
105 
111 public:
115  void operator()(stream_size_type items, progress_indicator_base & pi,
116  memory_size_type filesAvailable, memory_size_type mem,
117  const char * file, const char * function);
118 
123  void operator()(stream_size_type items, progress_indicator_base & pi,
124  memory_size_type mem,
125  const char * file, const char * function) {
126  operator()(items, pi, get_file_manager().available(), mem, file, function);
127  }
128 
129  double memory() const {
130  return m_memory;
131  }
132 
133  void order_before(pipeline_base & other);
134 protected:
135  double m_memory;
136 };
137 
143 template <typename fact_t>
144 class pipeline_impl : public pipeline_base {
145 public:
146  typedef typename fact_t::constructed_type gen_t;
147 
148  pipeline_impl(fact_t & factory) {
149  this->m_memory = factory.memory();
150 
151  auto n = std::unique_ptr<gen_t>(new gen_t(factory.construct()));
152  this->m_nodeMap = n->get_node_map()->find_authority();
153  this->m_nodeMap->increment_pipeline_ref();
154  this->m_nodeMap->add_owned_node(std::move(n));
155  }
156 
157  pipeline_impl(const pipeline_impl & o) = delete;
158  pipeline_impl & operator=(const pipeline_impl & o) = delete;
159 
160  pipeline_impl(pipeline_impl && o) = default;
161  pipeline_impl & operator=(pipeline_impl && o) = default;
162 
163  ~pipeline_impl() override {
164  if (this->m_nodeMap) {
165  this->m_nodeMap->find_authority()->decrement_pipeline_ref();
166  }
167  }
168 };
169 
170 } // namespace bits
171 
172 
173 extern std::unordered_set<bits::pipeline_base_base *> current_pipelines;
174 extern std::mutex current_pipelines_mutex;
175 
182 class pipeline {
183 public:
184  pipeline() {}
185  pipeline(pipeline &&) = default;
186  pipeline(const pipeline &) = default;
187  pipeline & operator=(pipeline &&) = default;
188  pipeline & operator=(const pipeline &) = default;
189 
190  template <typename T>
191  pipeline(T from) {
192  *this = std::move(from);
193  }
194 
195  template <typename T>
196  pipeline & operator=(T from) {
197  p.reset(new T(std::move(from)));
198  return *this;
199  }
200 
201  pipeline(const std::shared_ptr<bits::pipeline_base> & p): p(p) {}
202 
203  void operator()() {
205  (*p)(1, pi, get_file_manager().available(), get_memory_manager().available(), nullptr, nullptr);
206  }
207 
208  void operator()(stream_size_type items, progress_indicator_base & pi,
209  const char * file, const char * function) {
210  (*p)(items, pi, get_file_manager().available(), get_memory_manager().available(), file, function);
211  }
212 
213  void operator()(stream_size_type items, progress_indicator_base & pi,
214  memory_size_type mem,
215  const char * file, const char * function) {
216  (*p)(items, pi, get_file_manager().available(), mem, file, function);
217  }
218 
219  void operator()(stream_size_type items, progress_indicator_base & pi,
220  memory_size_type filesAvailable, memory_size_type mem,
221  const char * file, const char * function) {
222  (*p)(items, pi, filesAvailable, mem, file, function);
223  }
224 
225  void plot(std::ostream & os = std::cout) {
226  p->plot(os);
227  }
228 
229  void plot_full(std::ostream & os = std::cout) {
230  p->plot_full(os);
231  }
232 
233  inline double memory() const {
234  return p->memory();
235  }
236 
237  bits::node_map::ptr get_node_map() const {
238  return p->get_node_map();
239  }
240 
241  bool can_fetch(std::string key) {
242  return p->can_fetch(key);
243  }
244 
245  any_noncopyable & fetch_any(std::string key) {
246  return p->fetch_any(key);
247  }
248 
249  template <typename T>
250  T & fetch(std::string key) {
251  any_noncopyable &a = fetch_any(key);
252  return any_cast<T>(a);
253  }
254 
255  void forward_any(std::string key, any_noncopyable value) {
256  p->forward_any(key, std::move(value));
257  }
258 
259  template <typename T>
260  void forward(std::string key, T value) {
261  forward_any(key, any_noncopyable(std::move(value)));
262  }
263 
264  pipeline & then(pipeline & other) {
265  p->order_before(*other.p);
266  return other;
267  }
268 
269  void output_memory(std::ostream & o) const {p->output_memory(o);}
270 private:
271  std::shared_ptr<bits::pipeline_base> p;
272 };
273 
274 } // namespace pipelining
275 
276 } // namespace tpie
277 
278 #endif // __TPIE_PIPELINING_PIPELINE_H__
Typesafe bitflags.
The base class for indicating the progress of some task.
Null-object progress indicator.
Central file abstraction.
Definition: file.h:44
void operator()(stream_size_type items, progress_indicator_base &pi, memory_size_type filesAvailable, memory_size_type mem, const char *file, const char *function)
Invoke the pipeline.
void plot(std::ostream &out)
Generate a GraphViz plot of the pipeline.
Definition: pipeline.h:65
a dummy progress indicator that produces no output
Virtual superclass for pipelines and subpipelines.
Definition: pipeline.h:110
Pipeline tokens.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
file_manager & get_file_manager()
Return a reference to the file manager.
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
void plot_full(std::ostream &out)
Generate a GraphViz plot of the actor graph.
Definition: pipeline.h:77
Node factory for variadic argument generators.
This class is used to avoid writing the template argument in the pipeline_impl type.
Definition: pipeline.h:182
void operator()(stream_size_type items, progress_indicator_base &pi, memory_size_type mem, const char *file, const char *function)
Invoke the pipeline with amount of available files automatically configured.
Definition: pipeline.h:123
virtual ~pipeline_base_base()
Virtual dtor.
Definition: pipeline.h:82