20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sorter.h>
31 namespace pipelining {
33 namespace serialization_bits {
35 template <
typename T,
typename pred_t>
39 typedef pred_t pred_type;
41 typedef std::shared_ptr<sorter_t> sorterptr;
44 template <
typename Traits>
47 template <
typename Traits>
50 template <
typename Traits>
52 typedef typename Traits::pred_type pred_type;
65 void set_calc_node(
node & calc) {
71 forward(
"items", static_cast<stream_size_type>(m_sorter->item_count()));
72 memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
76 m_propagate_called =
true;
80 m_sorter->set_owner(
this);
84 m_sorter->set_owner(
nullptr);
94 if (!m_propagate_called)
95 m_sorter->set_phase_3_memory(availableMemory);
100 , m_propagate_called(false)
106 , m_propagate_called(false)
111 bool m_propagate_called;
117 template <
typename Traits>
120 typedef typename Traits::item_type
item_type;
121 typedef typename Traits::pred_type pred_type;
122 typedef typename Traits::sorter_t
sorter_t;
123 typedef typename Traits::sorterptr
sorterptr;
129 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
133 inline bool can_pull()
const {
134 return this->m_sorter->can_pull();
139 return this->m_sorter->pull();
147 virtual void go()
override {
148 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
149 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
157 template <
typename Traits,
typename dest_t>
159 typedef typename Traits::pred_type pred_type;
161 typedef typename Traits::item_type
item_type;
163 typedef typename Traits::sorter_t
sorter_t;
164 typedef typename Traits::sorterptr
sorterptr;
168 , dest(std::move(dest))
172 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
176 virtual void go()
override {
177 while (this->m_sorter->can_pull()) {
178 dest.push(this->m_sorter->pull());
191 template <
typename Traits>
192 class sort_calc_t :
public node {
194 typedef typename Traits::item_type item_type;
195 typedef typename Traits::sorter_t sorter_t;
196 typedef typename Traits::sorterptr sorterptr;
198 typedef sort_output_base<Traits> Output;
200 sort_calc_t(sort_calc_t && other) =
default;
202 template <
typename dest_t>
203 sort_calc_t(dest_t dest)
204 : dest(new dest_t(std::move(dest)))
206 m_sorter = this->dest->get_sorter();
207 this->dest->add_calc_dependency(this->
get_token());
211 sort_calc_t(sorterptr sorter, node_token tkn)
212 :
node(tkn), m_sorter(sorter)
219 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
221 m_propagate_called =
false;
226 m_propagate_called =
true;
230 m_sorter->set_owner(
this);
233 virtual bool is_go_free()
const override {
return m_sorter->is_merge_runs_free();}
235 virtual void go()
override {
237 log_debug() <<
"TODO: Progress information during merging." << std::endl;
238 m_sorter->merge_runs();
245 m_weak_sorter = m_sorter;
254 auto p = m_weak_sorter.lock();
255 if (p) p->evacuate();
258 sorterptr get_sorter()
const {
268 if (!m_propagate_called)
269 m_sorter->set_phase_2_memory(availableMemory);
274 std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
275 bool m_propagate_called;
276 std::shared_ptr<Output> dest;
284 template <
typename Traits>
285 class sort_input_t :
public node {
286 typedef typename Traits::pred_type pred_type;
288 typedef typename Traits::item_type item_type;
289 typedef typename Traits::sorter_t sorter_t;
290 typedef typename Traits::sorterptr sorterptr;
292 sort_input_t(sort_calc_t<Traits> dest)
293 : m_sorter(dest.get_sorter())
294 , dest(std::move(dest))
296 this->dest.set_input_node(*
this);
298 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
300 m_propagate_called =
false;
304 m_propagate_called =
true;
308 m_sorter->set_owner(
this);
312 void push(
const item_type & item) {
313 m_sorter->push(item);
316 virtual void end()
override {
318 m_weak_sorter = m_sorter;
327 auto p = m_weak_sorter.lock();
328 if (p) p->evacuate();
333 if (!m_propagate_called)
334 m_sorter->set_phase_1_memory(availableMemory);
339 std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
341 bool m_propagate_called;
344 template <
typename child_t>
346 const child_t &
self()
const {
return *
static_cast<const child_t *
>(
this); }
348 template <
typename dest_t>
354 typedef typename child_t::template predicate<item_type>::type pred_type;
359 template <
typename dest_t>
380 template <
typename item_type>
383 typedef std::less<item_type> type;
386 template <
typename T>
387 std::less<T> get_pred()
const {
388 return std::less<T>();
395 template <
typename pred_t>
398 template <
typename Dummy>
409 template <
typename T>
410 pred_t get_pred()
const {
423 inline pipe_middle<serialization_bits::default_pred_sort_factory>
432 template <
typename pred_t>
433 pipe_middle<serialization_bits::sort_factory<pred_t> >
439 template <
typename T,
typename pred_t=std::less<T> >
442 namespace serialization_bits {
447 template <
typename Traits>
454 typedef typename Traits::sorter_t sorter_t;
455 typedef typename Traits::sorterptr sorterptr;
459 , m_calc_token(calc_token) {}
462 calc_t calc(std::move(m_sorter), m_calc_token);
466 return std::move(input);
477 template <
typename Traits>
481 typedef typename Traits::sorterptr sorterptr;
486 , m_calc_token(calc_token)
491 res.add_calc_dependency(m_calc_token);
493 return std::move(res);
511 template <
typename T,
typename pred_t>
528 : m_sorter_input(new
sorter_t(sizeof(T), pred))
529 , m_sorter_output(m_sorter_input)
542 assert(m_sorter_input);
543 return input_pipe_t(std::move(m_sorter_input), m_calc_token);
550 assert(m_sorter_output);
551 return output_pipe_t(std::move(m_sorter_output), m_calc_token);
564 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
void end() override
End pipeline processing phase.
output_pipe_t output()
Get the output pull node.
The base class for indicating the progress of some task.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
input_pipe_t input()
Get the input push node.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
Base class of all pipelining factories.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void init_node(node &r)
Initialize node constructed in a subclass.
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Pipe sorter push output node.
virtual void done()
Advance the indicator to the end.
Factory for the passive sorter output node.
Pipe sorter pull output node.
T item_type
Type of items sorted.
serialization_bits::sort_pull_output_t< Traits > output_t
Type of pipe sorter output.
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Pipelined sorter with push input and pull output.
Binary serialization and unserialization.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
virtual void propagate() override
Propagate stream metadata.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void propagate() override
Propagate stream metadata.
void begin() override
Begin pipeline processing phase.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
pipe_middle< serialization_bits::default_pred_sort_factory > serialization_sort()
Pipelining sorter using std::less.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
void init_sub_node(node &r)
Initialize node constructed in a subclass.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
logstream & log_debug()
Return logstream for writing debug log messages.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void begin() override
Begin pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void set_memory_fraction(double f)
Set the memory priority of this node.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
A pipe_middle class pushes input down the pipeline.
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Sort factory using std::less as comparator.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
void end() override
End pipeline processing phase.
node()
Default constructor, using a new node_token.
logstream & log_warning()
Return logstream for writing warning log messages.
Sort factory using the given predicate as comparator.
Traits::item_type item_type
Type of items sorted.
virtual void init(stream_size_type range=0)
Initialize progress indicator.