TPIE

2362a60
internal_buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_INTERNAL_BUFFER_H__
25 #define __TPIE_PIPELINING_INTERNAL_BUFFER_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/internal_queue.h>
30 #include <tpie/tpie_assert.h>
31 
32 namespace tpie {
33 
34 namespace pipelining {
35 
36 namespace bits {
37 
38 template <typename T>
40  internal_queue<T> * m_queue;
41 public:
42  typedef T item_type;
43 
44  internal_buffer_pull_output_t(const node_token & input_token) {
45  add_pull_source(input_token);
46  set_name("Fetching items", PRIORITY_SIGNIFICANT);
47  // Memory is accouted for by the input node
48  }
49 
50  virtual void propagate() override {
51  m_queue = fetch<internal_queue<T> *>("queue");
52  }
53 
54  bool can_pull() const {
55  return !m_queue->empty();
56  }
57 
58  T pull() {
59  T item=m_queue->front();
60  m_queue->pop();
61  return item;
62  }
63 
64  virtual void end() override {
65  tpie_delete(m_queue);
66  m_queue=NULL;
67  }
68 };
69 
70 template <typename T>
72 public:
73  typedef T item_type;
74 
75  internal_buffer_input_t(const node_token & token, size_t size, size_t additional_item_size=0)
76  : node(token), size(size) {
77  set_name("Storing items", PRIORITY_INSIGNIFICANT);
78  set_minimum_memory(internal_queue<T>::memory_usage(size) + size*additional_item_size);
79  set_plot_options(PLOT_SIMPLIFIED_HIDE);
80  }
81 
82  virtual void propagate() override {
83  m_queue = tpie::tpie_new<internal_queue<item_type> >(size);
84  forward("queue", m_queue, 1);
85  }
86 
87  void push(const T & item) {
88  tp_assert(!m_queue->full(), "Tried to push into full queue");
89  m_queue->push(item);
90  }
91 
92 private:
93  internal_queue<item_type> * m_queue;
94  size_t size;
95 };
96 
97 } //namespace bits
98 
104 template <typename T>
106 public:
107  typedef T item_type;
110 private:
115 
116 public:
128 
129  internal_passive_buffer(size_t size, size_t additional_item_size=0)
130  : size(size)
131  , additional_item_size(additional_item_size) {}
132 
133  input_t raw_input() {
134  return input_t(input_token, size);
135  }
136 
137  output_t raw_output() {
138  return output_t(input_token, size);
139  }
140 
146  return inputfact_t(input_token, size, additional_item_size);
147  }
148 
154  return outputfact_t(input_token);
155  }
156 
157 private:
158  node_token input_token;
159  size_t size, additional_item_size;
160 
163 };
164 
165 } // namespace pipelining
166 
167 } // namespace tpie
168 
169 #endif // __TPIE_PIPELINING_INTERNAL_BUFFER_H__
Defines the tp_assert macro.
bool full() const
Check if the queue is empty.
outputpipe_t output()
Return pipe node where items in the buffer can be pulled from Pulling from the empty buffer causes un...
virtual void propagate() override
Propagate stream metadata.
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
Generic internal queue with known memory requirements.
A generic internal circular queue.
Base class of all nodes.
Definition: node.h:78
void push(T val)
Add an element to the front of the queue.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
virtual void propagate() override
Propagate stream metadata.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
Node factory for variadic argument terminators.
inputpipe_t input()
Return pipe node where items are pushed into the buffer Pushing items into a full buffer causes undef...
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
virtual void end() override
End pipeline processing phase.
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:301
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
node()
Default constructor, using a new node_token.
internal_passive_buffer(size_t size, size_t additional_item_size=0)
Construct a factory for the buffer.