71 #ifndef __TPIE_PIPELINING_TOKENS_H__
72 #define __TPIE_PIPELINING_TOKENS_H__
75 #include <tpie/pipelining/exception.h>
77 #include <tpie/pipelining/container.h>
83 #include <boost/intrusive_ptr.hpp>
84 #include <boost/optional.hpp>
85 #include <unordered_map>
86 #include <unordered_set>
90 namespace pipelining {
104 typedef uint64_t id_t;
107 typedef std::map<id_t, val_t> map_t;
108 typedef map_t::const_iterator mapit;
110 typedef std::multimap<id_t, std::pair<id_t, node_relation> > relmap_t;
111 typedef relmap_t::const_iterator relmapit;
113 typedef std::unordered_map<std::string, std::pair<memory_size_type, any_noncopyable> > datastructuremap_t;
115 typedef boost::optional<any_noncopyable &> maybeany_t;
116 typedef std::unordered_map<std::string, any_noncopyable> forwardmap_t;
118 typedef boost::intrusive_ptr<node_map> ptr;
119 typedef std::unique_ptr<node> owned_ptr;
127 static ptr create() {
131 id_t add_token(val_t token) {
133 set_token(
id, token);
137 void set_token(id_t
id, val_t token) {
138 assert_authoritative();
139 m_tokens[id] = token;
143 void link(ptr target);
145 void union_set(ptr target) {
146 find_authority()->link(target->find_authority());
149 val_t
get(id_t id)
const {
150 mapit i = m_tokens.find(
id);
151 if (i == m_tokens.end())
return 0;
155 mapit begin()
const {
156 return m_tokens.begin();
160 return m_tokens.end();
163 size_t size()
const {
164 return m_tokens.size();
167 void no_forward_through(id_t
id) {
168 m_noForwardThrough.insert(
id);
172 ptr find_authority();
174 void add_relation(id_t from, id_t to, node_relation rel);
176 const relmap_t & get_relations()
const {
180 const datastructuremap_t & get_datastructures()
const {
181 return m_datastructures;
184 datastructuremap_t & get_datastructures() {
185 return m_datastructures;
188 size_t in_degree(id_t from, node_relation rel)
const {
189 return out_degree(m_relationsInv, from, rel);
192 size_t out_degree(id_t from, node_relation rel)
const {
193 return out_degree(m_relations, from, rel);
196 size_t out_degree(id_t from)
const {
197 return out_degree(m_relations, from);
200 void assert_authoritative()
const {
201 if (m_authority)
throw non_authoritative_node_map();
204 void dump(std::ostream & os = std::cout)
const;
210 void get_successors(id_t from, std::vector<id_t> & successors, memory_size_type k,
bool forward_only=
false);
212 void forward(std::string key, any_noncopyable value) {
213 m_pipelineForwards[key] = std::move(value);
216 maybeany_t fetch_maybe(std::string key) {
217 auto it = m_pipelineForwards.find(key);
218 if (it == m_pipelineForwards.end()) {
221 return maybeany_t(it->second);
224 void forward_from_pipe_base(id_t from, std::string key, any_noncopyable value) {
225 m_pipeBaseForwards.push_back({from, key, std::move(value)});
228 void forward_pipe_base_forwards();
230 friend void intrusive_ptr_add_ref(node_map * m) {
234 friend void intrusive_ptr_release(node_map * m) {
236 if (m->m_refCnt == 0)
delete m;
239 void increment_pipeline_ref() {
240 assert_authoritative();
244 void decrement_pipeline_ref() {
245 assert_authoritative();
247 if (m_pipelineRefCnt == 0) {
252 m_ownedNodes.clear();
256 void add_owned_node(owned_ptr p) {
257 m_ownedNodes.push_back(std::move(p));
263 size_t m_pipelineRefCnt;
264 relmap_t m_relations;
265 relmap_t m_relationsInv;
266 datastructuremap_t m_datastructures;
267 forwardmap_t m_pipelineForwards;
268 std::unordered_set<id_t> m_noForwardThrough;
269 std::vector<pipe_base_forward_t> m_pipeBaseForwards;
270 std::vector<owned_ptr> m_ownedNodes;
272 size_t out_degree(
const relmap_t &
map, id_t from, node_relation rel)
const;
273 size_t out_degree(
const relmap_t &
map, id_t from)
const;
281 , m_pipelineRefCnt(0)
286 inline node_map(
const node_map &);
287 inline node_map & operator=(
const node_map &);
296 typedef bits::node_map::id_t id_t;
301 : m_tokens(bits::node_map::create())
302 , m_id(m_tokens->add_token(owner))
311 : m_tokens(other.m_tokens->find_authority())
317 throw exception(
"Trying to take ownership of a non-free token");
318 if (m_tokens->get(m_id) != 0)
319 throw exception(
"A token already has an owner, but m_free is true - contradiction");
322 throw exception(
"Trying to copy a free token");
324 m_tokens->set_token(m_id, newOwner);
327 void assign(
const node_token & other,
val_t newOwner,
bool freshToken =
false) {
328 m_tokens = other.m_tokens->find_authority();
333 throw exception(
"Trying to take ownership of a non-free token");
334 if (m_tokens->get(m_id) != 0)
335 throw exception(
"A token already has an owner, but m_free is true - contradiction");
338 throw exception(
"Trying to copy a free token");
340 m_tokens->set_token(m_id, newOwner);
345 : m_tokens(bits::node_map::create())
346 , m_id(m_tokens->add_token(0))
351 inline id_t id()
const {
return m_id; }
353 inline bits::node_map::ptr map_union(
const node_token & with) {
354 if (m_tokens != with.m_tokens)
355 m_tokens->union_set(with.m_tokens);
356 return m_tokens = m_tokens->find_authority();
359 inline bits::node_map::ptr get_map()
const {
363 inline val_t get()
const {
364 return m_tokens->get(m_id);
368 bits::node_map::ptr m_tokens;
377 #endif // __TPIE_PIPELINING_TOKENS_H__
Defines the tp_assert macro.
Predeclarations of some pipelining classes.
pipe_middle< tempfactory< bits::map_t< F >, F > > map(const F &functor)
Pipelining nodes that applies to given functor to items in the stream.
void get_successors(id_t from, std::vector< id_t > &successors, memory_size_type k, bool forward_only=false)
Compute the set of nodes within k distance of the given node in the item flow graph.