20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
28 #include <tpie/pipelining/parallel/options.h>
29 #include <tpie/pipelining/parallel/worker_state.h>
30 #include <tpie/pipelining/parallel/aligned_array.h>
34 namespace pipelining {
36 namespace parallel_bits {
41 template <
typename dest_t>
45 template <
typename T1,
typename T2>
56 template <
typename Input,
typename Output>
61 static const size_t alignment = 64;
81 virtual void init_node(
node & r)
override {
90 std::vector<before_t *> m_dests;
97 stream_size_type sum_steps() {
98 stream_size_type res = 0;
99 for (
size_t i = 0; i < m_progressIndicators.size(); ++i) {
100 res += m_progressIndicators.get(i)->get_current();
105 virtual ~threads() {}
111 template <
typename Input,
typename Output,
typename fact_t>
120 typedef typename fact_t::template constructed<after_t>::type worker_t;
121 typedef typename push_type<worker_t>::type T1;
124 static const size_t alignment = p_t::alignment;
136 : numJobs(st.opts.numJobs)
139 fact.hook_initialization(&hook);
140 fact.set_destination_kind_push();
142 m_data.realloc(numJobs);
143 this->m_progressIndicators.realloc(numJobs);
144 this->m_dests.resize(numJobs);
147 for (
size_t i = 0; i < numJobs; ++i) {
149 if (((
size_t) m_data.get(i)) % alignment != 0) {
150 log_warning() <<
"Thread " << i <<
" is not aligned: Address "
151 << m_data.get(i) <<
" is off by " <<
152 (((size_t) m_data.get(i)) % alignment) <<
" bytes"
157 new (this->m_progressIndicators.get(i))
pi_t();
159 auto n = fact.construct_copy(
after_t(st, i));
161 n.set_plot_options(node::PLOT_PARALLEL);
163 n.set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
171 for (
size_t i = 0; i < numJobs; ++i) {
172 m_data.get(i)->~before_t();
173 this->m_progressIndicators.get(i)->~pi_t();
176 this->m_progressIndicators.realloc(0);
213 typedef std::mutex mutex_t;
214 typedef std::condition_variable cond_t;
215 typedef std::unique_lock<std::mutex> lock_t;
277 return m_states[idx];
282 if (m_states[idx] != from) {
283 std::stringstream ss;
284 ss << idx <<
" Invalid state transition " << from <<
" -> " << to <<
"; current state is " << m_states[idx];
292 std::vector<node *> m_inputs;
293 std::vector<after_base *> m_outputs;
294 std::vector<worker_state> m_states;
299 , m_inputs(opts.numJobs, 0)
300 , m_outputs(opts.numJobs, 0)
301 , m_states(opts.numJobs, INITIALIZING)
306 virtual ~state_base() {
314 template <
typename T>
316 memory_size_type m_inputSize;
325 if (input.
size() > m_inputBuffer.
size())
326 throw tpie::exception(m_inputBuffer.
size() ?
"Input too large" :
"Input buffer not initialized");
328 memory_size_type items =
330 -m_inputBuffer.
begin();
337 , m_inputBuffer(opts.bufSize)
345 template <
typename T>
347 memory_size_type m_outputSize;
349 friend class after<T>;
358 , m_outputBuffer(opts.bufSize)
370 template <
typename T>
383 template <
typename T1,
typename T2>
386 typedef std::shared_ptr<state> ptr;
387 typedef state_base::mutex_t mutex_t;
388 typedef state_base::cond_t cond_t;
389 typedef state_base::lock_t lock_t;
396 std::unique_ptr<threads<T1, T2> > pipes;
398 template <
typename fact_t>
401 , m_inputBuffers(opts.numJobs)
402 , m_outputBuffers(opts.numJobs)
406 pipes.reset(
new pipes_impl_t(std::move(fact), *
this));
409 void set_consumer_ptr(consumer<T2> * cons) {
413 consumer<T2> *
const * get_consumer_ptr_ptr()
const {
421 template <
typename T>
422 class after :
public after_base {
426 std::unique_ptr<parallel_output_buffer<T> > m_buffer;
427 array<parallel_output_buffer<T> *> & m_outputBuffers;
428 typedef state_base::lock_t lock_t;
429 consumer<T> *
const * m_cons;
434 template <
typename Input>
435 after(state<Input, T> & state,
439 , m_outputBuffers(state.m_outputBuffers)
440 , m_cons(state.get_consumer_ptr_ptr())
442 state.set_output_ptr(parId,
this);
443 set_name(
"Parallel after", PRIORITY_INSIGNIFICANT);
456 , parId(std::move(other.parId))
457 , m_outputBuffers(other.m_outputBuffers)
458 , m_cons(std::move(other.m_cons)) {
468 if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
469 flush_buffer_impl(
false);
471 m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
474 virtual void end()
override {
475 flush_buffer_impl(
true);
483 m_outputBuffers[parId] = m_buffer.get();
491 flush_buffer_impl(
true);
495 bool is_done()
const {
531 void flush_buffer_impl(
bool complete) {
538 lock_t lock(st.
mutex);
540 if (*m_cons == 0)
throw tpie::exception(
"Unexpected nullptr in flush_buffer");
542 (*m_cons)->consume(out);
544 st.
transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
551 m_buffer->m_outputSize = 0;
560 template <
typename T>
561 class before :
public node {
565 std::unique_ptr<parallel_input_buffer<T> > m_buffer;
566 array<parallel_input_buffer<T> *> & m_inputBuffers;
567 std::thread m_worker;
572 virtual void push_all(array_view<T> items) = 0;
574 template <
typename Output>
575 before(state<T, Output> & st,
size_t parId)
578 , m_inputBuffers(st.m_inputBuffers)
580 set_name(
"Parallel before", PRIORITY_INSIGNIFICANT);
585 before(
const before & other)
588 , m_inputBuffers(other.m_inputBuffers)
596 state_base::lock_t lock(st.
mutex);
602 if (m_worker.joinable()) {
612 std::thread t(run_worker,
this);
629 throw tpie::exception(
"State 'partial_output' was not expected in before::ready");
631 throw tpie::exception(
"State 'outputting' was not expected in before::ready");
641 class running_signal {
642 typedef state_base::cond_t cond_t;
643 memory_size_type & sig;
644 cond_t & producerCond;
646 running_signal(memory_size_type & sig, cond_t & producerCond)
648 , producerCond(producerCond)
651 producerCond.notify_one();
656 producerCond.notify_one();
660 static void run_worker(before *
self) {
668 state_base::lock_t lock(st.
mutex);
670 m_buffer.reset(
new parallel_input_buffer<T>(st.opts));
671 m_inputBuffers[parId] = m_buffer.get();
699 template <
typename dest_t>
700 class before_impl :
public before<typename push_type<dest_t>::type> {
701 typedef typename push_type<dest_t>::type item_type;
706 template <
typename Output>
707 before_impl(state<item_type, Output> & st,
710 : before<item_type>(st, parId)
711 , dest(std::move(dest))
714 st.set_input_ptr(parId,
this);
724 for (
size_t i = 0; i < items.
size(); ++i) {
729 this->st.output(this->parId).flush_buffer();
736 template <
typename Input,
typename Output,
typename dest_t>
739 typedef typename state_t::ptr stateptr;
746 : dest(std::move(dest))
750 this->
set_name(
"Parallel output", PRIORITY_INSIGNIFICANT);
752 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
753 st->output(i).set_consumer(
this);
761 for (
size_t i = 0; i < a.
size(); ++i) {
772 template <
typename T1,
typename T2>
775 typedef T1 item_type;
779 typedef typename state_t::ptr stateptr;
784 std::shared_ptr<consumer<T2> > cons;
786 stream_size_type m_steps;
795 bool has_ready_pipe() {
796 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
797 switch (st->get_state(i)) {
806 if (st->opts.maintainOrder && m_outputOrder.
front() != i)
829 bool has_outputting_pipe() {
830 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
831 switch (st->get_state(i)) {
838 if (st->opts.maintainOrder && m_outputOrder.
front() != i)
843 throw tpie::exception(
"State DONE not expected in has_outputting_pipe().");
859 bool has_processing_pipe() {
860 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
861 switch (st->get_state(i)) {
870 throw tpie::exception(
"State DONE not expected in has_processing_pipe().");
891 stream_size_type steps = st->pipes->sum_steps();
892 if (steps != m_steps) {
899 template <
typename consumer_t>
900 producer(stateptr st, consumer_t cons)
903 , cons(
new consumer_t(std::move(cons)))
906 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
909 this->
set_name(
"Parallel input", PRIORITY_INSIGNIFICANT);
912 memory_size_type usage =
913 st->opts.numJobs * st->opts.bufSize * (
sizeof(T1) +
sizeof(T2))
914 + st->opts.bufSize *
sizeof(item_type)
918 if (st->opts.maintainOrder) {
919 m_outputOrder.
resize(st->opts.numJobs);
924 inputBuffer.
resize(st->opts.bufSize);
926 state_base::lock_t lock(st->mutex);
927 while (st->runningWorkers != st->opts.numJobs) {
928 st->producerCond.wait(lock);
942 inputBuffer[written++] = item;
943 if (written < st->opts.bufSize) {
948 state_base::lock_t lock(st->mutex);
952 empty_input_buffer(lock);
956 void empty_input_buffer(state_base::lock_t & lock) {
957 while (written > 0) {
958 while (!has_ready_pipe()) {
959 st->producerCond.wait(lock);
961 switch (st->get_state(readyIdx)) {
963 throw tpie::exception(
"State 'INITIALIZING' not expected at this point");
967 item_type * first = &inputBuffer[0];
968 item_type * last = first + written;
969 parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
971 st->transition_state(readyIdx, IDLE, PROCESSING);
972 st->workerCond[readyIdx].notify_one();
974 if (st->opts.maintainOrder)
975 m_outputOrder.
push(readyIdx);
979 throw tpie::exception(
"State 'processing' not expected at this point");
982 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
983 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
984 st->workerCond[readyIdx].notify_one();
988 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
990 st->transition_state(readyIdx, OUTPUTTING, IDLE);
991 st->workerCond[readyIdx].notify_one();
992 if (st->opts.maintainOrder) {
993 if (m_outputOrder.
front() != readyIdx) {
994 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
995 << m_outputOrder.
front() << std::endl;
1009 state_base::lock_t lock(st->mutex);
1013 empty_input_buffer(lock);
1017 st->set_consumer_ptr(cons.get());
1021 while (!has_outputting_pipe()) {
1022 if (!has_processing_pipe()) {
1027 st->producerCond.wait(lock);
1032 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1034 if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1035 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1036 st->workerCond[readyIdx].notify_one();
1039 st->transition_state(readyIdx, OUTPUTTING, IDLE);
1040 if (st->opts.maintainOrder) {
1041 if (m_outputOrder.
front() != readyIdx) {
1042 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
1043 << m_outputOrder.
front() << std::endl;
1044 throw tpie::exception(
"Producer got wrong entry from has_ready_pipe");
1046 m_outputOrder.
pop();
1050 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
1051 st->transition_state(i, IDLE, DONE);
1052 st->workerCond[i].notify_one();
1054 while (st->runningWorkers > 0) {
1055 st->producerCond.wait(lock);
after_base & output(size_t idx)
Get the specified after instance.
void set_input_ptr(size_t idx, node *v)
Must not be used concurrently.
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Encapsulation of two pointers from any random access container.
virtual void begin()
Begin pipeline processing phase.
virtual void flush_buffer()=0
Called by before::worker after a batch of items has been pushed.
virtual void worker_initialize()=0
Called by before::worker to initialize buffers.
virtual void set_consumer(node *)=0
For internal use in order to construct the pipeline graph.
virtual void flush_buffer() override
Invoked by before::push_all when all input items have been pushed.
const T & front() const
Return the item that has been in the queue for the longest time.
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Accepts input items from the main thread and sends them down the pipeline.
Factory hook that sets the progress indicator of the nodes run in parallel to the null progress indic...
cond_t producerCond
Condition variable.
Class containing an array of node instances.
void set_output_ptr(size_t idx, after_base *v)
Must not be used concurrently.
progress_indicator_null pi_t
Progress indicator type.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
Accepts output items and sends them to the main thread.
cond_t * workerCond
Condition variable, one per worker.
void push(T val)
Add an element to the front of the queue.
a dummy progress indicator that produces no output
Aligned, uninitialized storage.
void push(item_type item)
Accumulate input buffer and send off to workers.
virtual void push_all(array_view< T > items)=0
Overridden in subclass to push a buffer of items.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
size_t runningWorkers
Shared state, must have mutex to write.
Encapsulation of two pointers from any random access container.
void pop()
Remove an element from the back of the queue.
Common state in parallel pipelining library.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
virtual void push_all(array_view< item_type > items)
Push all items from buffer and flush output buffer afterwards.
virtual void set_consumer(node *cons) override
For internal use in order to construct the pipeline graph.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Subclass of threads instantiating and managing the pipelines.
Node running in main thread, accepting an output buffer from the managing producer and forwards them ...
Non-templated virtual base class of after.
size_t size() const
Get number of elements in the array.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
void resize(size_t size, const T &elm)
Change the size of the array.
mutex_t mutex
Single mutex.
iterator begin() const
Return an iterator to the beginning of the array.
virtual void begin() override
Begin pipeline processing phase.
void push(const T &item)
Push to thread-local buffer; flush it when full.
virtual void end() override
End pipeline processing phase.
User-supplied options to the parallelism framework.
State subclass containing the item type specific state, i.e.
iterator begin()
Return an iterator to the beginning of the array.
size_type size() const
Return the size of the array.
worker_state get_state(size_t idx)
Shared state, must have mutex to use.
Producer, running in main thread, managing the parallel execution.
virtual void end() override
End pipeline processing phase.
void transition_state(size_t idx, worker_state from, worker_state to)
Shared state, must have mutex to use.
virtual void worker_initialize() override
Invoked by before::worker (in worker thread context).
virtual void consume(array_view< item_type > a) override
Push all items from output buffer to the rest of the pipeline.
iterator end() const
Return an iterator to the end of the array.
Whether to maintain order in parallel or not.
Instantiated in each thread.
logstream & log_error()
Return logstream for writing error log messages.
logstream & log_warning()
Return logstream for writing warning log messages.
virtual void begin() override
Begin pipeline processing phase.
void resize(size_t size=0)
Resize the queue; all data is lost.
Concrete consumer implementation.
node & input(size_t idx)
Get the specified before instance.