20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
24 #include <tpie/pipelining/sort_parameters.h>
25 #include <tpie/pipelining/merger.h>
26 #include <tpie/pipelining/node.h>
27 #include <tpie/pipelining/exception.h>
120 memory_size_type m_levels;
122 memory_size_type m_runs[2];
130 bool m_finalExtraSet;
149 template <
typename T,
bool UseProgress,
typename pred_t = std::less<T>,
typename store_t=default_store>
152 typedef typename store_t::template element_type<T>::type TT;
153 typedef typename store_t::template specific<TT> specific_store_t;
154 typedef typename specific_store_t::outer_type outer_type;
155 typedef typename specific_store_t::store_type store_type;
156 typedef typename specific_store_t::element_type element_type;
157 typedef outer_type item_type;
158 static const size_t item_size = specific_store_t::item_size;
161 typedef std::shared_ptr<merge_sorter> ptr;
164 static const memory_size_type defaultFiles = 253;
165 static const memory_size_type minimumFilesPhase1 = 1;
166 static const memory_size_type maximumFilesPhase1 = 1;
167 static const memory_size_type minimumFilesPhase2 = 5;
168 static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
169 static const memory_size_type minimumFilesPhase3 = 5;
170 static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
172 inline merge_sorter(pred_t pred = pred_t(), store_t store = store_t())
175 , m_state(stNotStarted)
177 , m_parametersSet(
false)
178 , m_store(store.template get_specific<element_type>())
179 , m_merger(pred, m_store, m_bucket)
180 , m_currentRunItems(m_bucket)
181 , m_maxItems(std::numeric_limits<stream_size_type>::max())
184 , m_finalMergeInitialized(
false)
185 , m_owning_node(
nullptr)
192 inline void set_parameters(memory_size_type runLength, memory_size_type fanout) {
193 tp_assert(m_state == stNotStarted,
"Merge sorting already begun");
196 m_parametersSet =
true;
197 log_debug() <<
"Manually set merge sort run length and fanout\n";
199 log_debug() <<
"Fanout = " << p.
fanout <<
" (uses memory " << fanout_memory_usage(p.
fanout) <<
")" << std::endl;
248 inline void check_not_started() {
249 if (m_state != stNotStarted) {
250 throw tpie::exception(
"Can't change parameters after merge sorting has started");
255 inline void set_phase_1_files(memory_size_type f1) {
260 inline void set_phase_2_files(memory_size_type f2) {
265 inline void set_phase_3_files(memory_size_type f3) {
270 inline void set_phase_1_memory(memory_size_type m1) {
275 inline void set_phase_2_memory(memory_size_type m2) {
280 inline void set_phase_3_memory(memory_size_type m3) {
289 tp_assert(m_state == stNotStarted,
"Merge sorting already begun");
290 if (!m_parametersSet) calculate_parameters();
291 log_debug() <<
"Start forming input runs" << std::endl;
294 m_runFiles.resize(p.
fanout*2);
295 m_currentRunItemCount = 0;
297 m_state = stRunFormation;
304 inline void push(item_type && item) {
305 tp_assert(m_state == stRunFormation,
"Wrong phase");
306 if (m_currentRunItemCount >= p.
runLength) {
310 m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
311 ++m_currentRunItemCount;
315 inline void push(
const item_type & item) {
316 tp_assert(m_state == stRunFormation,
"Wrong phase");
317 if (m_currentRunItemCount >= p.
runLength) {
321 m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(item);
322 ++m_currentRunItemCount;
330 tp_assert(m_state == stRunFormation,
"Wrong phase");
333 if (m_itemCount == 0) {
334 tp_assert(m_currentRunItemCount == 0,
"m_itemCount == 0, but m_currentRunItemCount != 0");
335 m_reportInternal =
true;
337 m_currentRunItems.
resize(0);
338 log_debug() <<
"Got no items. Internal reporting mode." << std::endl;
341 m_reportInternal =
true;
343 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode." << std::endl;
345 }
else if (m_finishedRuns == 0
353 for (
size_t i=0; i < m_currentRunItemCount; ++i)
354 currentRun[i] = std::move(m_currentRunItems[i]);
355 m_currentRunItems.
swap(currentRun);
357 m_reportInternal =
true;
359 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode "
360 <<
"after resizing item buffer." << std::endl;
363 m_reportInternal =
false;
365 m_currentRunItems.
resize(0);
366 log_debug() <<
"Got " << m_finishedRuns <<
" runs. External reporting mode." << std::endl;
371 inline bool is_calc_free()
const {
372 tp_assert(m_state == stMerge,
"Wrong phase");
373 return m_reportInternal || m_finishedRuns <= p.
fanout;
381 tp_assert(m_state == stMerge,
"Wrong phase");
382 if (!m_reportInternal) {
392 inline void evacuate() {
393 tp_assert(m_state == stMerge || m_state == stReport,
"Wrong phase");
394 if (m_reportInternal) {
395 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") in internal reporting mode" << std::endl;
396 m_reportInternal =
false;
397 memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
399 m_currentRunItems.
resize(0);
400 initialize_final_merger(0, runCount);
401 }
else if (m_state == stMerge) {
402 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before merge in external reporting mode (noop)" << std::endl;
406 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before reporting in external reporting mode" << std::endl;
412 inline void evacuate_before_merging() {
413 if (m_state == stMerge) evacuate();
416 inline void evacuate_before_reporting() {
417 if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
425 inline void sort_current_run() {
427 bits::store_pred<pred_t, specific_store_t>(pred));
431 inline void empty_current_run() {
432 if (m_finishedRuns < 10)
433 log_debug() <<
"Write " << m_currentRunItemCount <<
" items to run file " << m_finishedRuns << std::endl;
434 else if (m_finishedRuns == 10)
436 file_stream<element_type> fs;
437 open_run_file_write(fs, 0, m_finishedRuns);
438 for (memory_size_type i = 0; i < m_currentRunItemCount; ++i)
439 fs.write(m_store.store_to_element(std::move(m_currentRunItems[i])));
440 m_currentRunItemCount = 0;
448 inline void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
453 array<file_stream<element_type> > in(runCount);
454 for (memory_size_type i = 0; i < runCount; ++i) {
455 open_run_file_read(in[i], mergeLevel, runNumber+i);
457 stream_size_type runLength = calculate_run_length(p.
runLength, p.
fanout, mergeLevel);
459 m_merger.reset(in, runLength);
465 inline void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
466 if (m_finalMergeInitialized) {
467 reinitialize_final_merger();
471 m_finalMergeInitialized =
true;
472 m_finalMergeLevel = finalMergeLevel;
473 m_finalRunCount = runCount;
477 log_debug() <<
"Run count in final level (" << runCount <<
") is greater than the final fanout (" << p.
finalFanout <<
")\n";
480 memory_size_type n = runCount-i;
481 log_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
482 dummy_progress_indicator pi;
483 m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
485 log_debug() <<
"Run count in final level (" << runCount <<
") is less or equal to the final fanout (" << p.
finalFanout <<
")" << std::endl;
486 m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
488 reinitialize_final_merger();
492 inline void reinitialize_final_merger() {
493 tp_assert(m_finalMergeInitialized,
"reinitialize_final_merger while !m_finalMergeInitialized");
495 if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
496 array<file_stream<element_type> > in(p.
finalFanout);
497 for (memory_size_type i = 0; i < p.
finalFanout-1; ++i) {
498 open_run_file_read(in[i], m_finalMergeLevel, i);
499 log_debug() <<
"Run " << i <<
" is at offset " << in[i].offset() <<
" and has size " << in[i].size() << std::endl;
501 open_run_file_read(in[p.
finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
503 stream_size_type runLength = calculate_run_length(p.
runLength, p.
fanout, m_finalMergeLevel+1);
504 log_debug() <<
"Run length " << runLength << std::endl;
505 m_merger.reset(in, runLength);
507 initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
516 static inline stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
517 stream_size_type runLength = initialRunLength;
518 for (memory_size_type i = 0; i < mergeLevel; ++i) {
529 template <
typename ProgressIndicator>
530 inline memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
531 initialize_merger(mergeLevel, runNumber, runCount);
532 file_stream<element_type> out;
533 memory_size_type nextRunNumber = runNumber/p.
fanout;
534 open_run_file_write(out, mergeLevel+1, nextRunNumber);
535 while (m_merger.can_pull()) {
537 out.write(m_store.store_to_element(m_merger.pull()));
539 return nextRunNumber;
545 inline void prepare_pull(
typename Progress::base & pi) {
549 int treeHeight=
static_cast<int>(ceil(log(static_cast<float>(m_finishedRuns)) /
550 log(static_cast<float>(p.
fanout))));
551 pi.init(item_count()*treeHeight);
553 memory_size_type mergeLevel = 0;
554 memory_size_type runCount = m_finishedRuns;
555 while (runCount > p.
fanout) {
556 log_debug() <<
"Merge " << runCount <<
" runs in merge level " << mergeLevel <<
'\n';
558 memory_size_type newRunCount = 0;
559 for (memory_size_type i = 0; i < runCount; i += p.
fanout) {
560 memory_size_type n = std::min(runCount-i, p.
fanout);
562 if (newRunCount < 10)
563 log_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
564 else if (newRunCount == 10)
567 merge_runs(mergeLevel, i, n, pi);
571 runCount = newRunCount;
573 log_debug() <<
"Final merge level " << mergeLevel <<
" has " << runCount <<
" runs" << std::endl;
574 initialize_final_merger(mergeLevel, runCount);
586 tp_assert(m_state == stReport,
"Wrong phase");
587 if (m_reportInternal)
return m_itemsPulled < m_currentRunItemCount;
589 if (m_evacuated) reinitialize_final_merger();
590 return m_merger.can_pull();
598 tp_assert(m_state == stReport,
"Wrong phase");
599 if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
600 store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
602 return m_store.store_to_outer(std::move(el));
604 if (m_evacuated) reinitialize_final_merger();
605 m_runPositions.
close();
606 return m_store.store_to_outer(m_merger.pull());
610 inline stream_size_type item_count() {
614 static memory_size_type memory_usage_phase_1(
const sort_parameters & params) {
615 return params.runLength * item_size
617 + file_stream<element_type>::memory_usage()
618 + 2*params.fanout*
sizeof(temp_file);
621 static memory_size_type minimum_memory_phase_1() {
629 sort_parameters tmp_p((sort_parameters()));
631 tmp_p.fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max(), 0);
632 return memory_usage_phase_1(tmp_p);
635 static memory_size_type memory_usage_phase_2(
const sort_parameters & params) {
636 return fanout_memory_usage(params.fanout);
639 static memory_size_type minimum_memory_phase_2() {
640 return fanout_memory_usage(calculate_fanout(0, 0));
643 static memory_size_type memory_usage_phase_3(
const sort_parameters & params) {
644 return fanout_memory_usage(params.finalFanout);
647 static memory_size_type minimum_memory_phase_3() {
648 return fanout_memory_usage(calculate_fanout(0, 0));
651 static memory_size_type maximum_memory_phase_3() {
652 return std::numeric_limits<memory_size_type>::max();
655 memory_size_type actual_memory_phase_3() {
656 tp_assert(m_state == stReport,
"Wrong phase");
657 if (m_reportInternal)
658 return m_runFiles.memory_usage(m_runFiles.size())
661 return fanout_memory_usage(m_finalRunCount);
664 inline memory_size_type evacuated_memory_usage()
const {
665 return 2*p.
fanout*
sizeof(temp_file);
669 static memory_size_type clamp(memory_size_type lo, memory_size_type val, memory_size_type hi) {
670 return std::max(lo, std::min(val, hi));
676 inline void calculate_parameters() {
677 tp_assert(m_state == stNotStarted,
"Merge sorting already begun");
680 p.
filesPhase1 = clamp(minimumFilesPhase1, defaultFiles, maximumFilesPhase1);
682 p.
filesPhase2 = clamp(minimumFilesPhase2, defaultFiles, maximumFilesPhase2);
684 p.
filesPhase3 = clamp(minimumFilesPhase3, defaultFiles, maximumFilesPhase3);
687 throw tpie::exception(
"file limit for phase 1 too small (" + std::to_string(p.
filesPhase1) +
" < " + std::to_string(minimumFilesPhase1) +
")");
689 throw tpie::exception(
"file limit for phase 2 too small (" + std::to_string(p.
filesPhase2) +
" < " + std::to_string(minimumFilesPhase2) +
")");
691 throw tpie::exception(
"file limit for phase 3 too small (" + std::to_string(p.
filesPhase3) +
" < " + std::to_string(minimumFilesPhase3) +
")");
731 memory_size_type streamMemory = file_stream<element_type>::memory_usage();
732 memory_size_type tempFileMemory = 2*p.
fanout*
sizeof(temp_file);
734 log_debug() <<
"Phase 1: " << p.
memoryPhase1 <<
" b available memory; " << streamMemory <<
" b for a single stream; " << tempFileMemory <<
" b for temp_files\n";
737 log_warning() <<
"Not enough phase 1 memory for 128 KB items and an open stream! (" << p.
memoryPhase1 <<
" < " << min_m1 <<
")\n";
745 - tempFileMemory)/item_size;
749 m_parametersSet =
true;
753 log_debug() <<
"Calculated merge sort parameters\n";
758 << p.
memoryPhase1 <<
" b available, " << memory_usage_phase_1(p) <<
" b expected" << std::endl;
760 log_warning() <<
"Merge sort phase 1 exceeds the alloted memory usage: "
761 << p.
memoryPhase1 <<
" b available, but " << memory_usage_phase_1(p) <<
" b expected" << std::endl;
764 << p.
memoryPhase2 <<
" b available, " << memory_usage_phase_2(p) <<
" b expected" << std::endl;
766 log_warning() <<
"Merge sort phase 2 exceeds the alloted memory usage: "
767 << p.
memoryPhase2 <<
" b available, but " << memory_usage_phase_2(p) <<
" b expected" << std::endl;
770 << p.
memoryPhase3 <<
" b available, " << memory_usage_phase_3(p) <<
" b expected" << std::endl;
772 log_warning() <<
"Merge sort phase 3 exceeds the alloted memory usage: "
773 << p.
memoryPhase3 <<
" b available, but " << memory_usage_phase_3(p) <<
" b expected" << std::endl;
780 static inline memory_size_type calculate_fanout(memory_size_type availableMemory, memory_size_type availableFiles) {
781 memory_size_type fanout_lo = 2;
782 memory_size_type fanout_hi = availableFiles - 2;
784 while (fanout_lo < fanout_hi - 1) {
785 memory_size_type mid = fanout_lo + (fanout_hi-fanout_lo)/2;
786 if (fanout_memory_usage(mid) <= availableMemory) {
798 static inline memory_size_type fanout_memory_usage(memory_size_type fanout) {
799 return merger<specific_store_t, pred_t>::memory_usage(fanout)
801 + file_stream<element_type>::memory_usage()
802 + 2*
sizeof(temp_file);
815 if (m_state != stNotStarted)
816 throw exception(
"Wrong state in set_items: state is not stNotStarted");
820 if (!m_parametersSet) {
829 memory_size_type newRunLength =
832 <<
" to " << newRunLength
833 <<
" since at most " << m_maxItems <<
" items will be pushed,"
834 <<
" and the internal report threshold is "
836 <<
". New merge sort parameters:\n";
850 if (m_owning_node !=
nullptr)
851 m_bucketPtr = std::move(m_owning_node->
bucket(0));
854 n->
bucket(0) = std::move(m_bucketPtr);
864 inline memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
868 return (mergeLevel % 2)*p.
fanout + (runNumber % p.
fanout);
874 void open_run_file_write(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
877 memory_size_type idx = run_file_index(mergeLevel, runNumber);
878 if (runNumber < p.
fanout) m_runFiles[idx].free();
880 fs.seek(0, file_stream_base::end);
881 m_runPositions.
set_position(mergeLevel, runNumber, fs.get_position());
887 void open_run_file_read(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
890 memory_size_type idx = run_file_index(mergeLevel, runNumber);
892 fs.set_position(m_runPositions.
get_position(mergeLevel, runNumber));
903 std::unique_ptr<memory_bucket> m_bucketPtr;
904 memory_bucket_ref m_bucket;
906 array<temp_file> m_runFiles;
911 bool m_parametersSet;
913 specific_store_t m_store;
914 merger<specific_store_t, pred_t> m_merger;
916 bits::run_positions m_runPositions;
921 stream_size_type m_finishedRuns;
924 array<store_type> m_currentRunItems;
928 memory_size_type m_currentRunItemCount;
930 bool m_reportInternal;
934 memory_size_type m_itemsPulled;
936 stream_size_type m_itemCount;
938 stream_size_type m_maxItems;
942 bool m_finalMergeInitialized;
943 memory_size_type m_finalMergeLevel;
944 memory_size_type m_finalRunCount;
945 memory_size_type m_finalMergeSpecialRunNumber;
952 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__
Sequential access is intended.
Encapsulation of two pointers from any random access container.
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
void unevacuate()
Switch from any state to the corresponding non-evacuated state.
void set_available_memory(memory_size_type m)
Calculate parameters from given memory amount.
Class to maintain the positions where sorted runs start.
item_type pull()
In phase 3, fetch next item in the final merge phase.
The base class for indicating the progress of some task.
memory_size_type filesPhase3
files available during output phase.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
A generic array with a fixed size.
void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3)
Calculate parameters from given memory amount.
static memory_size_type memory_usage(memory_size_type size)
Return the number of bytes required to create a data structure supporting a given number of elements...
void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3)
Calculate parameters from given amount of files.
Merge sorting consists of three phases.
virtual void done()
Advance the indicator to the end.
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
Compressed stream public API.
static memory_size_type memory_usage()
Memory usage when open and not evacuated.
memory_size_type memoryPhase3
Memory available during output phase.
POD object indicating the position of an item in a stream.
memory_size_type filesPhase2
files available while merging runs.
Simple parallel quick sort implementation with progress tracking.
A allocator object usable in STL containers, using the TPIE memory manager.
Class storring a reference to a memory bucket.
void set_items(stream_size_type n)
Set upper bound on number of items pushed.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
memory_size_type finalFanout
Fanout of merge tree during phase 3.
void final_level(memory_size_type fanout)
Set this to be the final level in the merge heap - see class docstring.
Class representing a reference to a temporary file.
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Bucket used for memory counting.
void set_position(memory_size_type mergeLevel, memory_size_type runNumber, stream_position pos)
Store a stream position - see class docstring.
logstream & log_debug()
Return logstream for writing debug log messages.
memory_size_type memoryPhase2
Memory available while merging runs.
void next_level()
Go to next level in the merge heap - see class docstring.
void resize(size_t size, const T &elm)
Change the size of the array.
void close()
Switch from any state to closed state.
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
void open()
Switch from closed to open state.
void swap(array &other)
Swap two arrays.
Compress some blocks according to available resources (time, memory).
iterator begin()
Return an iterator to the beginning of the array.
size_type size() const
Return the size of the array.
void evacuate()
Switch from any state to the corresponding evacuated state.
void begin()
Initiate phase 1: Formation of input runs.
void push(item_type &&item)
Push item to merge sorter during phase 1.
void set_parameters(memory_size_type runLength, memory_size_type fanout)
Enable setting run length and fanout manually (for testing purposes).
memory_size_type filesPhase1
files available while forming sorted runs.
#define tp_assert(condition, message)
void calc(typename Progress::base &pi)
Perform phase 2: Performing all merges in the merge tree except the last one.
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber)
Fetch a stream position - see class docstring.
logstream & log_warning()
Return logstream for writing warning log messages.
void set_available_files(memory_size_type f)
Calculate parameters from given amount of files.
memory_size_type fanout
Fanout of merge tree during phase 2.
Progress indicator concept in an efficient non-inheritance way.
Open a file for reading or writing.
virtual void init(stream_size_type range=0)
Initialize progress indicator.
memory_size_type memoryPhase1
memory available while forming sorted runs.