TPIE

2362a60
tokens.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
70 
71 #ifndef __TPIE_PIPELINING_TOKENS_H__
72 #define __TPIE_PIPELINING_TOKENS_H__
73 
74 #include <tpie/exception.h>
75 #include <tpie/pipelining/exception.h>
77 #include <tpie/pipelining/container.h>
78 #include <tpie/types.h>
79 #include <tpie/tpie_assert.h>
80 #include <map>
81 #include <vector>
82 #include <iostream>
83 #include <boost/intrusive_ptr.hpp>
84 #include <boost/optional.hpp>
85 #include <unordered_map>
86 #include <unordered_set>
87 
88 namespace tpie {
89 
90 namespace pipelining {
91 
92 namespace bits {
93 
94 enum node_relation {
95  pushes,
96  pulls,
97  depends,
98  no_forward_depends,
99  memory_share_depends
100 };
101 
102 class node_map {
103 public:
104  typedef uint64_t id_t;
105  typedef node * val_t;
106 
107  typedef std::map<id_t, val_t> map_t;
108  typedef map_t::const_iterator mapit;
109 
110  typedef std::multimap<id_t, std::pair<id_t, node_relation> > relmap_t;
111  typedef relmap_t::const_iterator relmapit;
112 
113  typedef std::unordered_map<std::string, std::pair<memory_size_type, any_noncopyable> > datastructuremap_t;
114 
115  typedef boost::optional<any_noncopyable &> maybeany_t;
116  typedef std::unordered_map<std::string, any_noncopyable> forwardmap_t;
117 
118  typedef boost::intrusive_ptr<node_map> ptr;
119  typedef std::unique_ptr<node> owned_ptr;
120 
122  id_t from;
123  std::string key;
124  any_noncopyable value;
125  };
126 
127  static ptr create() {
128  return ptr(new node_map);
129  }
130 
131  id_t add_token(val_t token) {
132  id_t id = nextId++;
133  set_token(id, token);
134  return id;
135  }
136 
137  void set_token(id_t id, val_t token) {
138  assert_authoritative();
139  m_tokens[id] = token;
140  }
141 
142  // union-find link
143  void link(ptr target);
144 
145  void union_set(ptr target) {
146  find_authority()->link(target->find_authority());
147  }
148 
149  val_t get(id_t id) const {
150  mapit i = m_tokens.find(id);
151  if (i == m_tokens.end()) return 0;
152  return i->second;
153  }
154 
155  mapit begin() const {
156  return m_tokens.begin();
157  }
158 
159  mapit end() const {
160  return m_tokens.end();
161  }
162 
163  size_t size() const {
164  return m_tokens.size();
165  }
166 
167  void no_forward_through(id_t id) {
168  m_noForwardThrough.insert(id);
169  }
170 
171  // union-find
172  ptr find_authority();
173 
174  void add_relation(id_t from, id_t to, node_relation rel);
175 
176  const relmap_t & get_relations() const {
177  return m_relations;
178  }
179 
180  const datastructuremap_t & get_datastructures() const {
181  return m_datastructures;
182  }
183 
184  datastructuremap_t & get_datastructures() {
185  return m_datastructures;
186  }
187 
188  size_t in_degree(id_t from, node_relation rel) const {
189  return out_degree(m_relationsInv, from, rel);
190  }
191 
192  size_t out_degree(id_t from, node_relation rel) const {
193  return out_degree(m_relations, from, rel);
194  }
195 
196  size_t out_degree(id_t from) const {
197  return out_degree(m_relations, from);
198  }
199 
200  void assert_authoritative() const {
201  if (m_authority) throw non_authoritative_node_map();
202  }
203 
204  void dump(std::ostream & os = std::cout) const;
205 
210  void get_successors(id_t from, std::vector<id_t> & successors, memory_size_type k, bool forward_only=false);
211 
212  void forward(std::string key, any_noncopyable value) {
213  m_pipelineForwards[key] = std::move(value);
214  }
215 
216  maybeany_t fetch_maybe(std::string key) {
217  auto it = m_pipelineForwards.find(key);
218  if (it == m_pipelineForwards.end()) {
219  return maybeany_t();
220  }
221  return maybeany_t(it->second);
222  }
223 
224  void forward_from_pipe_base(id_t from, std::string key, any_noncopyable value) {
225  m_pipeBaseForwards.push_back({from, key, std::move(value)});
226  }
227 
228  void forward_pipe_base_forwards();
229 
230  friend void intrusive_ptr_add_ref(node_map * m) {
231  m->m_refCnt++;
232  }
233 
234  friend void intrusive_ptr_release(node_map * m) {
235  m->m_refCnt--;
236  if (m->m_refCnt == 0) delete m;
237  }
238 
239  void increment_pipeline_ref() {
240  assert_authoritative();
241  m_pipelineRefCnt++;
242  }
243 
244  void decrement_pipeline_ref() {
245  assert_authoritative();
246  m_pipelineRefCnt--;
247  if (m_pipelineRefCnt == 0) {
248  // Make sure we are not dealloc while cleaning up owned nodes
249  ptr thisPtr(this);
250 
251  // Dealloc owned nodes as no pipeline references this node map anymore
252  m_ownedNodes.clear();
253  }
254  }
255 
256  void add_owned_node(owned_ptr p) {
257  m_ownedNodes.push_back(std::move(p));
258  }
259 
260 private:
261  map_t m_tokens;
262  size_t m_refCnt;
263  size_t m_pipelineRefCnt;
264  relmap_t m_relations;
265  relmap_t m_relationsInv;
266  datastructuremap_t m_datastructures;
267  forwardmap_t m_pipelineForwards;
268  std::unordered_set<id_t> m_noForwardThrough;
269  std::vector<pipe_base_forward_t> m_pipeBaseForwards;
270  std::vector<owned_ptr> m_ownedNodes;
271 
272  size_t out_degree(const relmap_t & map, id_t from, node_relation rel) const;
273  size_t out_degree(const relmap_t & map, id_t from) const;
274 
275  // union rank structure
276  ptr m_authority;
277  size_t m_rank;
278 
279  node_map()
280  : m_refCnt(0)
281  , m_pipelineRefCnt(0)
282  , m_rank(0)
283  {
284  }
285 
286  inline node_map(const node_map &);
287  inline node_map & operator=(const node_map &);
288 
289  static id_t nextId;
290 };
291 
292 } // namespace bits
293 
294 class node_token {
295 public:
296  typedef bits::node_map::id_t id_t;
298 
299  // Use for the simple case in which a node owns its own token
300  explicit node_token(val_t owner)
301  : m_tokens(bits::node_map::create())
302  , m_id(m_tokens->add_token(owner))
303  , m_free(false)
304  {
305  }
306 
307  // This copy constructor has two uses:
308  // 1. Simple case when a node is copied (freshToken = false)
309  // 2. Advanced case when a node is being constructed with a specific token (freshToken = true)
310  inline node_token(const node_token & other, val_t newOwner, bool freshToken = false)
311  : m_tokens(other.m_tokens->find_authority())
312  , m_id(other.id())
313  , m_free(false)
314  {
315  if (freshToken) {
316  if (!other.m_free)
317  throw exception("Trying to take ownership of a non-free token");
318  if (m_tokens->get(m_id) != 0)
319  throw exception("A token already has an owner, but m_free is true - contradiction");
320  } else {
321  if (other.m_free)
322  throw exception("Trying to copy a free token");
323  }
324  m_tokens->set_token(m_id, newOwner);
325  }
326 
327  void assign(const node_token & other, val_t newOwner, bool freshToken = false) {
328  m_tokens = other.m_tokens->find_authority();
329  m_id = other.id();
330  m_free = false;
331  if (freshToken) {
332  if (!other.m_free)
333  throw exception("Trying to take ownership of a non-free token");
334  if (m_tokens->get(m_id) != 0)
335  throw exception("A token already has an owner, but m_free is true - contradiction");
336  } else {
337  if (other.m_free)
338  throw exception("Trying to copy a free token");
339  }
340  m_tokens->set_token(m_id, newOwner);
341  }
342 
343  // Use for the advanced case when a node_token is allocated before the node
344  inline node_token()
345  : m_tokens(bits::node_map::create())
346  , m_id(m_tokens->add_token(0))
347  , m_free(true)
348  {
349  }
350 
351  inline id_t id() const { return m_id; }
352 
353  inline bits::node_map::ptr map_union(const node_token & with) {
354  if (m_tokens != with.m_tokens)
355  m_tokens->union_set(with.m_tokens);
356  return m_tokens = m_tokens->find_authority();
357  }
358 
359  inline bits::node_map::ptr get_map() const {
360  return m_tokens;
361  }
362 
363  inline val_t get() const {
364  return m_tokens->get(m_id);
365  }
366 
367 private:
368  bits::node_map::ptr m_tokens;
369  id_t m_id;
370  bool m_free;
371 };
372 
373 } // namespace pipelining
374 
375 } // namespace tpie
376 
377 #endif // __TPIE_PIPELINING_TOKENS_H__
Defines the tp_assert macro.
Typesafe bitflags.
Predeclarations of some pipelining classes.
Exception classes.
Base class of all nodes.
Definition: node.h:78
pipe_middle< tempfactory< bits::map_t< F >, F > > map(const F &functor)
Pipelining nodes that applies to given functor to items in the stream.
Definition: map.h:143
void get_successors(id_t from, std::vector< id_t > &successors, memory_size_type k, bool forward_only=false)
Compute the set of nodes within k distance of the given node in the item flow graph.