24 #ifndef TPIE_PIPELINING_SERIALIZATION_H
25 #define TPIE_PIPELINING_SERIALIZATION_H
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pair_factory.h>
30 #include <tpie/pipelining/pipe_base.h>
35 namespace pipelining {
37 namespace serialization_bits {
39 template <
typename dest_t>
48 : dest(std::move(dest))
61 virtual void go()
override {
63 stream_size_type bytesRead = 0;
64 while (rd->can_read()) {
68 stream_size_type bytesRead2 = rd->
offset();
69 step(bytesRead2 - bytesRead);
70 bytesRead = bytesRead2;
93 void push(
const T & x) {
118 template <
typename T>
119 pipe_end<typename serialization_bits::output_factory<T>::type>
124 namespace serialization_bits {
126 template <
typename T>
132 std::shared_ptr<node> output=std::shared_ptr<node>())
133 :
node(token), output(output)
137 this->
set_name(
"Serialization reverse writer");
145 forward<tpie::maybe<tpie::temp_file>*>(
"__srev_file", &
file, 1);
152 void push(
const item_type & x) {
159 forward<stream_size_type>(
"items", items);
164 std::shared_ptr<node> output;
166 stream_size_type items;
169 template <
typename dest_t>
175 : dest(std::move(dest))
177 set_name(
"Serialization reverse reader");
186 file = fetch<tpie::maybe<tpie::temp_file> *>(
"__srev_file");
187 if (!
file->is_constructed())
189 rd.open((*file)->path());
195 stream_size_type bytesRead = 0;
196 while (rd.can_read()) {
200 stream_size_type bytesRead2 = rd.
offset();
201 step(bytesRead2 - bytesRead);
202 bytesRead = bytesRead2;
216 template <
typename T>
223 set_name(
"Serialization reverse reader");
231 file = fetch<tpie::maybe<tpie::temp_file> *>(
"__srev_file");
232 if (!
file->is_constructed())
234 rd.open((*file)->path());
239 return rd.can_read();
244 stream_size_type bytesRead = rd.
offset();
246 stream_size_type bytesRead2 = rd.
offset();
247 step(bytesRead2 - bytesRead);
261 template <
typename T>
267 std::shared_ptr<node> output = std::shared_ptr<node>())
272 set_name(
"Serialization buffer writer");
280 forward<tpie::maybe<tpie::temp_file>*>(
"__sbuf_file", &
file, 1);
287 void push(
const item_type & x) {
294 this->forward<stream_size_type>(
"items", items);
297 std::shared_ptr<node> output;
300 stream_size_type items;
304 template <
typename dest_t>
310 : dest(std::move(dest))
316 set_name(
"Serialization buffer reader");
321 file = fetch<tpie::maybe<tpie::temp_file> *>(
"__sbuf_file");
322 if (!
file->is_constructed())
325 rd.open((*file)->path());
331 stream_size_type bytesRead = 0;
332 while (rd.can_read()) {
336 stream_size_type bytesRead2 = rd.
offset();
337 step(bytesRead2 - bytesRead);
338 bytesRead = bytesRead2;
352 template <
typename T>
359 set_name(
"Fetching items", PRIORITY_SIGNIFICANT);
366 file = fetch<tpie::maybe<tpie::temp_file> *>(
"__sbuf_file");
367 if (!
file->is_constructed())
369 rd.open((*file)->path());
374 return rd.can_read();
379 stream_size_type bytesRead = rd.
offset();
381 stream_size_type bytesRead2 = rd.
offset();
382 step(bytesRead2 - bytesRead);
403 template <
typename T>
450 template <
typename T>
505 #endif // TPIE_PIPELINING_SERIALIZATION_H
virtual void propagate() override
Propagate stream metadata.
Serialization stream buffer.
void propagate() override
Propagate stream metadata.
void end() override
End pipeline processing phase.
void end() override
End pipeline processing phase.
inputpipe_t input()
Returns a termfactory for the input nodes.
const std::string & path() const
The path of the file opened or the empty string.
stream_size_type offset()
Number of bytes read, not including the header.
Central file abstraction.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
A passive serialization reverser stored in external memory.
Stream of serializable items.
void propagate() override
Propagate stream metadata.
void end() override
End pipeline processing phase.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
stream_size_type size()
Size of file in bytes, not including the header.
outputpipe_t output()
Returns a termfactory for the output nodes.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
Class to deduce the item_type of a node of type T.
stream_size_type offset()
Number of bytes read, not including the header.
pipe_end< typename serialization_bits::output_factory< T >::type > serialization_output(serialization_writer &wr)
A pipelining node that writes item to a serialization_writer.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void unserialize(T &v)
Unserialize an unserializable item from the stream.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Node factory for variadic argument terminators.
void end() override
End pipeline processing phase.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void step(stream_size_type steps=1)
Step the progress indicator.
void propagate() override
Propagate stream metadata.
Node factory for variadic argument generators.
A pipe_middle class pushes input down the pipeline.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
pipe_middle< split_factory< serialization_bits::reverser_input_t, node, serialization_bits::reverser_output_t > > serialization_reverser
A pipelining node that reverses serializable items and creates a phase boundary.
pipe_begin< serialization_bits::input_factory > serialization_input(serialization_reader &rd)
A pipelining node that reads items from a serialization_reader.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
node()
Default constructor, using a new node_token.
pipe_middle< split_factory< serialization_bits::buffer_input_t, node, serialization_bits::buffer_output_t > > serialization_buffer
A pipelining node that acts as a buffer for serializable items and creates a phase boundary...