20 #ifndef __TPIE_PIPELINING_SORT_H__
21 #define __TPIE_PIPELINING_SORT_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/pipelining/merge_sorter.h>
28 #include <tpie/file_stream.h>
36 namespace pipelining {
40 template <
typename T,
typename pred_t,
typename store_t>
43 template <
typename T,
typename pred_t,
typename store_t>
46 template <
typename T,
typename pred_t,
typename store_t>
64 forward(
"items", static_cast<stream_size_type>(m_sorter->item_count()));
65 memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
69 m_propagate_called =
true;
79 if (m_propagate_called)
83 m_sorter->set_phase_3_memory(available);
84 else if (type == FILES) {
85 m_sorter->set_phase_3_files(available);
91 , m_propagate_called(false)
96 bool m_propagate_called;
104 template <
typename T,
typename pred_t,
typename store_t>
122 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
128 this->m_sorter->set_owner(
this);
131 bool can_pull()
const {
132 return this->m_sorter->can_pull();
137 return this->m_sorter->pull();
141 this->m_sorter.reset();
149 virtual void go()
override {
150 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
151 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
161 template <
typename pred_t,
typename dest_t,
typename store_t>
176 , dest(std::move(dest))
183 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
189 this->m_sorter->set_owner(
this);
192 virtual void go()
override {
193 while (this->m_sorter->can_pull()) {
195 dest.push(std::move(y));
201 this->m_sorter.reset();
213 template <
typename T,
typename pred_t,
typename store_t>
214 class sort_calc_t :
public node {
228 template <
typename dest_t>
230 : dest(new dest_t(std::move(dest)))
232 m_sorter = this->dest->get_sorter();
233 this->dest->add_calc_dependency(this->
get_token());
238 :
node(tkn), m_sorter(sorter)
247 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
250 m_propagate_called =
false;
255 m_propagate_called =
true;
259 this->m_sorter->set_owner(
this);
263 m_weakSorter = m_sorter;
267 virtual bool is_go_free()
const override {
return m_sorter->is_calc_free();}
269 virtual void go()
override {
280 if (sorter) sorter->evacuate_before_reporting();
294 if (m_propagate_called)
298 m_sorter->set_phase_2_memory(available);
299 else if (type == FILES) {
300 m_sorter->set_phase_2_files(available);
306 std::weak_ptr<typename sorterptr::element_type> m_weakSorter;
307 bool m_propagate_called;
308 std::shared_ptr<Output> dest;
316 template <
typename T,
typename pred_t,
typename store_t>
317 class sort_input_t :
public node {
328 : m_sorter(dest.get_sorter())
329 , m_propagate_called(false)
330 , dest(std::move(dest))
332 this->dest.set_input_node(*
this);
333 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
343 m_sorter->set_items(this->fetch<stream_size_type>(
"items"));
344 m_propagate_called =
true;
348 m_sorter->push(std::move(item));
352 m_sorter->push(item);
357 m_sorter->set_owner(
this);
361 virtual void end()
override {
364 m_weakSorter = m_sorter;
374 if (sorter) sorter->evacuate_before_merging();
380 if (m_propagate_called)
384 m_sorter->set_phase_1_memory(available);
385 else if (type == FILES) {
386 m_sorter->set_phase_1_files(available);
391 std::weak_ptr<typename sorterptr::element_type> m_weakSorter;
392 bool m_propagate_called;
396 template <
typename child_t,
typename store_t>
398 const child_t &
self()
const {
return *
static_cast<const child_t *
>(
this); }
400 template <
typename dest_t>
405 typedef typename store_t::template element_type<item_type>::type element_type;
407 typedef typename child_t::template predicate<element_type>::type pred_type;
411 template <
typename dest_t>
414 typedef typename store_t::template element_type<item_type>::type element_type;
415 typedef typename constructed<dest_t>::pred_type pred_type;
420 self().
template get_pred<element_type>(),
428 return std::move(
input);
440 template <
typename store_t>
443 template <
typename item_type>
446 typedef std::less<item_type> type;
449 template <
typename T>
450 std::less<T> get_pred()
const {
451 return std::less<T>();
463 template <
typename pred_t,
typename store_t>
466 template <
typename Dummy>
478 template <
typename T>
479 pred_t get_pred()
const {
491 inline pipe_middle<bits::default_pred_sort_factory<default_store> >
501 template <
typename store_t>
502 inline pipe_middle<bits::default_pred_sort_factory<store_t> >
511 template <
typename pred_t>
512 inline pipe_middle<bits::sort_factory<pred_t, default_store> >
522 template <
typename pred_t,
typename store_t>
523 inline pipe_middle<bits::sort_factory<pred_t, store_t> >
524 sort(
const pred_t & p, store_t store) {
529 template <
typename T,
typename pred_t=std::less<T>,
typename store_t=default_store>
537 template <
typename T,
typename pred_t,
typename store_t>
544 typedef typename sorter_t::ptr sorterptr;
548 , m_calc_token(calc_token) {}
551 calc_t calc(std::move(m_sorter), m_calc_token);
566 template <
typename T,
typename pred_t,
typename store_t>
570 typedef typename sorter_t::ptr sorterptr;
575 , m_calc_token(calc_token)
580 res.add_calc_dependency(m_calc_token);
598 template <
typename T,
typename pred_t,
typename store_t>
611 store_t store = store_t())
612 : m_sorterInput(std::make_shared<
sorter_t>(pred, store))
613 , m_sorterOutput(m_sorterInput)
628 tp_assert(m_sorterInput,
"input() called more than once");
630 std::move(m_sorterInput), m_calc_token);
631 return {std::move(ret)};
638 tp_assert(m_sorterOutput,
"output() called more than once");
640 std::move(m_sorterOutput), m_calc_token);
641 return {std::move(ret)};
T item_type
Type of items sorted.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
push_type< dest_t >::type item_type
Type of items sorted.
output_pipe_t output()
Get the output pull node.
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Memory management subsystem.
Sort factory using the given predicate as comparator.
The base class for indicating the progress of some task.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
void begin() override
Begin pipeline processing phase.
Base class of all pipelining factories.
Merge sorting consists of three phases.
void init_node(node &r)
Initialize node constructed in a subclass.
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Pipelined sorter with push input and pull output.
merge_sorter< T, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
T item_type
Type of items sorted.
T item_type
Type of items sorted.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Simple parallel quick sort implementation with progress tracking.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
virtual void propagate() override
Propagate stream metadata.
Factory for the passive sorter output node.
Sort factory using std::less as comparator.
virtual void propagate() override
Propagate stream metadata.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
virtual void resource_available_changed(resource_type type, memory_size_type available) override
Called by the resource manager to notify the node's available amount of resource has changed...
bits::sort_pull_output_t< item_type, pred_t, store_t > output_t
Type of pipe sorter output.
input_pipe_t input()
Get the input push node.
Pipe sorter pull output node.
void init_sub_node(node &r)
Initialize node constructed in a subclass.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Pipe sorter push output node.
T item_type
Type of items sorted.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
virtual void end()
End pipeline processing phase.
void end() override
End pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
void begin() override
Begin pipeline processing phase.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void set_memory_fraction(double f)
Set the memory priority of this node.
pipe_middle< bits::default_pred_sort_factory< default_store > > sort()
Pipelining sorter using std::less.
virtual void resource_available_changed(resource_type type, memory_size_type available) override
Called by the resource manager to notify the node's available amount of resource has changed...
A pipe_middle class pushes input down the pipeline.
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
void set_resource_fraction(resource_type type, double f)
Set the resource priority of this node.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Fantastic store strategy.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
#define tp_assert(condition, message)
pipe_middle< bits::default_pred_sort_factory< store_t > > store_sort(store_t store=store_t())
A pipelining node that sorts large elements indirectly by using a store and std::less.
void end() override
End pipeline processing phase.
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
sort_output_base< item_type, pred_t, store_t > p_t
Base class.
merge_sorter< item_type, true, pred_t, store_t > sorter_t
Type of the merge sort implementation used.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
logstream & log_warning()
Return logstream for writing warning log messages.
void begin() override
Begin pipeline processing phase.
void end() override
End pipeline processing phase.