28 #ifndef _TPIE_AMI_SORT_MANAGER_H
29 #define _TPIE_AMI_SORT_MANAGER_H
33 #include <tpie/stream.h>
42 #include <boost/filesystem.hpp>
59 template <
class T,
class I,
class M>
87 void compute_sort_params();
104 TPIE_OS_OFFSET nInputItems;
105 TPIE_OS_SIZE_T mmBytesAvail;
106 TPIE_OS_SIZE_T mmBytesPerStream;
110 TPIE_OS_OFFSET progCount;
116 TPIE_OS_SIZE_T nItemsPerRun;
118 TPIE_OS_OFFSET nRuns;
126 TPIE_OS_OFFSET minRunsPerStream;
132 TPIE_OS_SIZE_T nItemsInLastRun;
134 TPIE_OS_SIZE_T nItemsInThisRun;
136 TPIE_OS_OFFSET runsInStream;
142 std::string working_disk;
149 template <
class T,
class I,
class M>
151 m_internalSorter(isort),
164 curOutputRunStream(NULL),
175 template<
class T,
class I,
class M>
178 m_indicator = indicator;
182 use2xSpace = (in == out);
188 if (in==NULL || out==NULL) {
189 if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
193 if (inStream->size() < 2) {
194 if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
199 out->write(in->
read());
208 template<
class T,
class I,
class M>
210 sort(in, in, indicator);
213 template<
class T,
class I,
class M>
230 mmBytesAvail -= 2 * mmBytesPerStream;
234 nInputItems = inStream->size();
238 if (nInputItems < TPIE_OS_OFFSET(m_internalSorter->MaxItemCount(mmBytesAvail))) {
241 fp.id() << __FILE__ << __FUNCTION__ <<
"internal_sort" <<
typeid(T) <<
typeid(I) <<
typeid(M);
245 allocate_progress.init(nInputItems);
246 m_internalSorter->allocate(static_cast<TPIE_OS_SIZE_T>(nInputItems));
247 allocate_progress.done();
253 m_internalSorter->sort(inStream,
255 static_cast<TPIE_OS_SIZE_T>(nInputItems),
258 m_internalSorter->deallocate();
268 compute_sort_params();
285 fractional_progress fp(m_indicator);
286 fp.id() << __FILE__ << __FUNCTION__ <<
"external_sort" <<
typeid(T) <<
typeid(I) <<
typeid(M);
287 fractional_subindicator run_progress(fp,
"run",
TPIE_FSI, nInputItems,
"",tpie::IMPORTANCE_LOG);
288 fractional_subindicator merge_progress(fp,
"merge",
TPIE_FSI, nInputItems,
"",tpie::IMPORTANCE_LOG);
294 TP_LOG_DEBUG_ID (
"Beginning general merge sort.");
295 partition_and_sort_runs(&run_progress, temporaries);
297 merge_to_output(&merge_progress, temporaries);
302 template<
class T,
class I,
class M>
303 void sort_manager<T,I,M>::compute_sort_params(
void){
343 TP_LOG_DEBUG_ID (
"Computing merge sort parameters.");
345 TPIE_OS_SIZE_T mmBytesAvailSort;
347 TP_LOG_DEBUG (
"Each object of size " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(
sizeof(T)) <<
" uses "
348 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(m_internalSorter->space_per_item ()) <<
" bytes "
349 <<
"for sorting in memory\n");
354 mmBytesAvailSort=mmBytesAvail - mmBytesPerStream;
356 nItemsPerRun=m_internalSorter->MaxItemCount(mmBytesAvailSort);
359 throw stream_exception(
"Insufficient Memory for forming sorted runs");
366 TPIE_OS_SIZE_T mmBytesPerMergeItem = mmBytesPerStream +
367 m_mergeHeap->space_per_item() +
sizeof(T*) +
368 sizeof(TPIE_OS_OFFSET)+
sizeof(ami::stream<T>*);
373 TPIE_OS_SIZE_T mmBytesFixedForMerge = m_mergeHeap->space_overhead() +
376 if (mmBytesFixedForMerge > mmBytesAvail) {
377 throw stream_exception(
"Insufficient memory for merge heap and output stream");
385 mrgArity =
static_cast<arity_t>(mmBytesAvail-mmBytesFixedForMerge) /
387 TP_LOG_DEBUG(
"mem avail=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesAvail-mmBytesFixedForMerge)
388 <<
" bytes per merge item=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesPerMergeItem)
389 <<
" initial mrgArity=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
"\n");
393 throw stream_exception(
"Merge arity < 2 -- Insufficient memory for a merge.");
406 if (availableStreams < 3) {
407 throw stream_exception(
"Not enough stream descriptors available to perform merge.");
413 if (mrgArity > static_cast<arity_t>(availableStreams - 1)) {
415 mrgArity =
static_cast<arity_t>(availableStreams - 1);
417 TP_LOG_DEBUG_ID (
"Reduced merge arity due to AMI restrictions.");
422 nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
424 #ifdef TPIE_SORT_SMALL_MRGARITY
429 TP_LOG_WARNING_ID(
"TPIE_SORT_SMALL_MRGARITY flag is set."
430 " Did you mean to do this?");
431 if(mrgArity > TPIE_SORT_SMALL_MRGARITY) {
432 TP_LOG_WARNING_ID(
"Reducing merge arity due to compiler specified flag");
433 mrgArity=TPIE_SORT_SMALL_MRGARITY;
435 #endif // TPIE_SORT_SMALL_MRGARITY
437 #ifdef TPIE_SORT_SMALL_RUNSIZE
442 TP_LOG_WARNING_ID(
"TPIE_SORT_SMALL_RUNSIZE flag is set."
443 " Did you mean to do this?");
444 if(nItemsPerRun > TPIE_SORT_SMALL_RUNSIZE) {
445 TP_LOG_WARNING_ID(
"Reducing run size due to compiler specified flag");
446 nItemsPerRun=TPIE_SORT_SMALL_RUNSIZE;
450 nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
451 #endif // TPIE_SORT_SMALL_RUNSIZE
454 #ifdef MINIMIZE_INITIAL_RUN_LENGTH
460 TP_LOG_DEBUG_ID (
"Minimizing initial run lengths without increasing" <<
461 " the height of the merge tree.");
465 double tree_height = log((
double)nRuns) / log((
double)mrgArity);
466 tp_assert (tree_height > 0,
"Negative or zero tree height!");
467 tree_height = ceil (tree_height);
471 double maxOrigRuns = pow ((
double) mrgArity, tree_height);
472 tp_assert (maxOrigRuns >= nRuns
"Number of permitted runs was reduced!");
475 double new_nItemsPerRun = ceil (nInputItems/ maxOrigRuns);
476 tp_assert (new_nItemsPerRun <= nItemsPerRun,
477 "Size of original runs increased!");
480 nItemsPerRun = (TPIE_OS_SIZE_T) new_nItemsPerRun;
482 TP_LOG_DEBUG_ID (
"With long internal memory runs, nRuns = "
485 nRuns = (nInputItems + nItemsPerRun - 1) / nItemsPerRun;
487 TP_LOG_DEBUG_ID (
"With shorter internal memory runs "
488 <<
"and the same merge tree height, nRuns = "
492 "We increased the merge height when we weren't supposed to do so!");
493 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
499 if(static_cast<TPIE_OS_OFFSET>(mrgArity)>nRuns){
502 mrgArity=
static_cast<TPIE_OS_SIZE_T
>(nRuns);
507 tp_assert (nRuns > 1,
"Less than two runs to merge!");
509 tp_assert (nRuns * nItemsPerRun - nInputItems < nItemsPerRun,
510 "Total expected output size is too large.");
511 tp_assert (nInputItems - (nRuns - 1) * nItemsPerRun <= nItemsPerRun,
512 "Total expected output size is too small.");
514 TP_LOG_DEBUG_ID (
"Input stream has " << nInputItems <<
" items");
515 TP_LOG_DEBUG (
"Max number of items per runs " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nItemsPerRun) );
516 TP_LOG_DEBUG (
"\nInitial number of runs " << nRuns );
517 TP_LOG_DEBUG (
"\nMerge arity is " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
"\n" );
520 template<
class T,
class I,
class M>
521 void sort_manager<T,I,M>::partition_and_sort_runs(progress_indicator_base* indicator,
tpie::array<temp_file> & temporaries){
534 minRunsPerStream = nRuns/mrgArity;
538 nXtraRuns =
static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
539 tp_assert(nXtraRuns<mrgArity,
"Too many extra runs");
543 nItemsInLastRun =
static_cast<TPIE_OS_SIZE_T
>(nInputItems % nItemsPerRun);
544 if(nItemsInLastRun==0){
546 nItemsInLastRun=nItemsPerRun;
552 m_internalSorter->allocate(nItemsPerRun);
554 TP_LOG_DEBUG_ID (
"Partitioning and forming sorted runs.");
557 nItemsInThisRun=nItemsPerRun;
565 TPIE_OS_OFFSET check_size = 0;
568 indicator->init(nRuns*1000);
570 for(
arity_t ii=0; ii<mrgArity; ii++){
573 curOutputRunStream = tpie_new<file_stream<T> >();
574 curOutputRunStream->open(temporaries[ii],
access_write);
579 runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
581 for(TPIE_OS_OFFSET jj=0; jj < runsInStream; jj++ ) {
583 if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
584 nItemsInThisRun=nItemsInLastRun;
587 progress_indicator_subindicator sort_indicator(indicator, 1000);
588 m_internalSorter->sort(inStream, curOutputRunStream,
589 nItemsInThisRun, &sort_indicator);
593 TP_LOG_DEBUG_ID (
"Wrote " << runsInStream <<
" runs and "
594 << curOutputRunStream->size() <<
" items to file "
595 <<
static_cast<TPIE_OS_OUTPUT_SIZE_T
>(ii));
596 check_size+=curOutputRunStream->size();
600 tp_assert(check_size == nInputItems,
"item count mismatch");
604 m_internalSorter->deallocate();
607 inStream->truncate(0);
610 if (indicator) indicator->done();
613 template<
class T,
class I,
class M>
614 void sort_manager<T,I,M>::merge_to_output(progress_indicator_base* indicator,
tpie::array<temp_file> & temporaries){
628 TP_LOG_DEBUG_ID(
"Allocated " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(
sizeof(ami::stream<T>*)*mrgArity)
629 <<
" bytes for " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
" merge input stream pointers.\n"
642 m_mergeHeap->allocate( mrgArity );
656 treeHeight=
static_cast<int>(ceil(log(static_cast<float>(nRuns)) /
657 log(static_cast<float>(mrgArity))));
659 indicator->set_range( nInputItems * treeHeight);
667 while (nRuns > TPIE_OS_OFFSET(mrgArity)) {
679 TP_LOG_DEBUG (
"Intermediate merge. level="<<mrgHeight <<
"\n");
682 nRuns = (nRuns + mrgArity - 1)/mrgArity;
686 minRunsPerStream = nRuns/mrgArity;
691 arity_t mergeRunsInLastOutputRun=(nXtraRuns>0) ? nXtraRuns : mrgArity;
696 nXtraRuns =
static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
697 tp_assert(nXtraRuns<mrgArity,
"Too many extra runs");
700 arity_t nOutputStreams = (minRunsPerStream > 0) ? mrgArity : nXtraRuns;
702 arity_t nRunsToMerge = mrgArity;
705 for(ii = 0; ii < mrgArity; ii++){
709 file_stream<T> * stream = tpie_new<file_stream<T> >();
710 mergeInputStreams[ii].reset(stream);
711 stream->open(temporaries[mrgArity*(mrgHeight%2)+ii],
access_read);
715 TPIE_OS_OFFSET check_size=0;
721 TP_LOG_DEBUG(
"Writing " << nRuns <<
" runs to " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nOutputStreams)
722 <<
" output files.\nEach output file has at least "
723 << minRunsPerStream <<
" runs.\n");
725 for(ii = mrgArity-nOutputStreams; ii < mrgArity; ii++){
729 file_stream<T> curOutputRunStream;
730 curOutputRunStream.open(temporaries[mrgArity*((mrgHeight+1)%2)+ii],
access_write);
735 runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
736 TP_LOG_DEBUG(
"Writing " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) <<
" runs to output "
737 <<
" file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) <<
"\n");
738 for( jj=0; jj < runsInStream; jj++ ) {
740 if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
741 nRunsToMerge=mergeRunsInLastOutputRun;
744 single_merge(mergeInputStreams.find(mrgArity-nRunsToMerge),
745 mergeInputStreams.find(mrgArity),
747 nItemsPerRun, indicator);
751 TP_LOG_DEBUG(
"Wrote " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) <<
" runs and "
752 << curOutputRunStream.size() <<
" items "
753 <<
"to file " <<
static_cast<TPIE_OS_OUTPUT_SIZE_T
>(ii) <<
"\n");
754 check_size+=curOutputRunStream.size();
757 tp_assert(check_size==nInputItems,
"item count mismatch in merge");
762 for(ii = 0; ii < mrgArity; ii++) {
763 mergeInputStreams[ii].reset();
764 temporaries[mrgArity*(mrgHeight%2)+ii].free();
767 nItemsPerRun=mrgArity*nItemsPerRun;
771 tp_assert( nRuns > 1,
"Not enough runs to merge to final output");
772 tp_assert( nRuns <= TPIE_OS_OFFSET(mrgArity),
"Too many runs to merge to final output");
778 TP_LOG_DEBUG_ID (
"Final merge. level="<<mrgHeight);
779 TP_LOG_DEBUG(
"Merge runs left="<<nRuns<<
"\n");
780 for(ii = mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns); ii < mrgArity; ii++){
787 TP_LOG_DEBUG (
"Putting merge stream "<< static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) <<
" in slot "
788 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))) <<
"\n");
789 file_stream<T> * stream = tpie_new<file_stream<T> >();
790 mergeInputStreams[ii-(mrgArity-
static_cast<TPIE_OS_SIZE_T
>(nRuns))].reset(stream);
791 stream->open(temporaries[mrgArity*(mrgHeight%2)+ii],
access_read);
798 single_merge(mergeInputStreams.begin(),
799 mergeInputStreams.find((
size_t)nRuns),
800 outStream, -1, indicator);
802 if (indicator) indicator->done();
803 tp_assert((TPIE_OS_OFFSET)outStream->size() == nInputItems,
"item count mismatch");
805 TP_LOG_DEBUG(
"merge cleanup\n");
808 mergeInputStreams.resize(0);
811 m_mergeHeap->deallocate();
812 TP_LOG_DEBUG_ID (
"Number of passes incl run formation is " <<
814 TP_LOG_DEBUG(
"AMI_partition_and_merge END\n");
817 template<
class T,
class I,
class M>
818 void sort_manager<T,I,M>::single_merge(
821 file_stream < T >*outStream, TPIE_OS_OFFSET cutoff, progress_indicator_base* indicator)
831 #endif // _TPIE_AMI_SORT_MANAGER_H
void sort(uncompressed_stream< T > &instream, uncompressed_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
Defines the tp_assert macro.
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
The base class for indicating the progress of some task.
static std::string tpie_name(const std::string &post_base="", const std::string &dir="", const std::string &ext="")
Generate path for a new temporary file.
A generic array with a fixed size.
merge_sorted_runs as used in several of TPIE's merge variants
#define TPIE_FSI
For use when constructing a fractional subindicator.
const T & read()
Reads next item from stream if can_read() == true.
Fractional progress reporter.
This file contains a few deprecated definitions for legacy code.
void merge_sorted_runs(typename tpie::array< tpie::unique_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::unique_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
void sort(file_stream< T > *in, file_stream< T > *out, progress_indicator_base *indicator=NULL)
Sort in stream to out stream an save in stream (uses 3x space)
Generic internal array with known memory requirements.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
file_manager & get_file_manager()
Return a reference to the file manager.
Open a file for writing only, content is truncated.
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
std::unique_ptr< T, tpie_deleter > unique_ptr
like std::unique_ptr, but delete the object with tpie_delete.
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
#define tp_assert(condition, message)
A class of manager objects for merge sorting objects of type T.
Subindicator for fractional progress reporting.