TPIE

2362a60
base.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
25 #include <tpie/array_view.h>
26 #include <memory>
28 #include <tpie/pipelining/parallel/options.h>
29 #include <tpie/pipelining/parallel/worker_state.h>
30 #include <tpie/pipelining/parallel/aligned_array.h>
31 
32 namespace tpie {
33 
34 namespace pipelining {
35 
36 namespace parallel_bits {
37 
38 // predeclare
39 template <typename T>
40 class before;
41 template <typename dest_t>
43 template <typename T>
44 class after;
45 template <typename T1, typename T2>
46 class state;
47 
56 template <typename Input, typename Output>
57 class threads {
58  typedef before<Input> before_t;
59 
60 protected:
61  static const size_t alignment = 64;
62 
65  aligned_array<pi_t, alignment> m_progressIndicators;
66 
73  threads * t;
74  public:
76  : t(t)
77  , index(0)
78  {
79  }
80 
81  virtual void init_node(node & r) override {
82  r.set_progress_indicator(t->m_progressIndicators.get(index));
83  }
84 
85  size_t index;
86  };
87 
88  friend class progress_indicator_hook;
89 
90  std::vector<before_t *> m_dests;
91 
92 public:
93  before_t & operator[](size_t idx) {
94  return *m_dests[idx];
95  }
96 
97  stream_size_type sum_steps() {
98  stream_size_type res = 0;
99  for (size_t i = 0; i < m_progressIndicators.size(); ++i) {
100  res += m_progressIndicators.get(i)->get_current();
101  }
102  return res;
103  }
104 
105  virtual ~threads() {}
106 };
107 
111 template <typename Input, typename Output, typename fact_t>
112 class threads_impl : public threads<Input, Output> {
113 private:
114  typedef threads<Input, Output> p_t;
115 
117  typedef typename p_t::pi_t pi_t;
118 
119  typedef after<Output> after_t;
120  typedef typename fact_t::template constructed<after_t>::type worker_t;
121  typedef typename push_type<worker_t>::type T1;
122  typedef Output T2;
124  static const size_t alignment = p_t::alignment;
126 
128  size_t numJobs;
129 
131  aligned_before_t m_data;
132 
133 public:
134  threads_impl(fact_t && fact,
135  state<T1, T2> & st)
136  : numJobs(st.opts.numJobs)
137  {
138  typename p_t::progress_indicator_hook hook(this);
139  fact.hook_initialization(&hook);
140  fact.set_destination_kind_push();
141  // uninitialized allocation
142  m_data.realloc(numJobs);
143  this->m_progressIndicators.realloc(numJobs);
144  this->m_dests.resize(numJobs);
145 
146  // construct elements manually
147  for (size_t i = 0; i < numJobs; ++i) {
148  // for debugging: check that pointer is aligned.
149  if (((size_t) m_data.get(i)) % alignment != 0) {
150  log_warning() << "Thread " << i << " is not aligned: Address "
151  << m_data.get(i) << " is off by " <<
152  (((size_t) m_data.get(i)) % alignment) << " bytes"
153  << std::endl;
154  }
155 
156  hook.index = i;
157  new (this->m_progressIndicators.get(i)) pi_t();
158 
159  auto n = fact.construct_copy(after_t(st, i));
160  if (i == 0)
161  n.set_plot_options(node::PLOT_PARALLEL);
162  else
163  n.set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
164  this->m_dests[i] =
165  new(m_data.get(i))
166  before_t(st, i, std::move(n));
167  }
168  }
169 
170  virtual ~threads_impl() {
171  for (size_t i = 0; i < numJobs; ++i) {
172  m_data.get(i)->~before_t();
173  this->m_progressIndicators.get(i)->~pi_t();
174  }
175  m_data.realloc(0);
176  this->m_progressIndicators.realloc(0);
177  }
178 };
179 
183 class after_base : public node {
184 public:
188  virtual void worker_initialize() = 0;
189 
195  virtual void flush_buffer() = 0;
196 
200  virtual void set_consumer(node *) = 0;
201 };
202 
211 class state_base {
212 public:
213  typedef std::mutex mutex_t;
214  typedef std::condition_variable cond_t;
215  typedef std::unique_lock<std::mutex> lock_t;
216 
217  const options opts;
218 
220  mutex_t mutex;
221 
228  cond_t producerCond;
229 
239  cond_t * workerCond;
240 
243 
245  void set_input_ptr(size_t idx, node * v) {
246  m_inputs[idx] = v;
247  }
248 
250  void set_output_ptr(size_t idx, after_base * v) {
251  m_outputs[idx] = v;
252  }
253 
261  node & input(size_t idx) { return *m_inputs[idx]; }
262 
273  after_base & output(size_t idx) { return *m_outputs[idx]; }
274 
276  worker_state get_state(size_t idx) {
277  return m_states[idx];
278  }
279 
281  void transition_state(size_t idx, worker_state from, worker_state to) {
282  if (m_states[idx] != from) {
283  std::stringstream ss;
284  ss << idx << " Invalid state transition " << from << " -> " << to << "; current state is " << m_states[idx];
285  log_error() << ss.str() << std::endl;
286  throw exception(ss.str());
287  }
288  m_states[idx] = to;
289  }
290 
291 protected:
292  std::vector<node *> m_inputs;
293  std::vector<after_base *> m_outputs;
294  std::vector<worker_state> m_states;
295 
296  state_base(const options opts)
297  : opts(opts)
298  , runningWorkers(0)
299  , m_inputs(opts.numJobs, 0)
300  , m_outputs(opts.numJobs, 0)
301  , m_states(opts.numJobs, INITIALIZING)
302  {
303  workerCond = new cond_t[opts.numJobs];
304  }
305 
306  virtual ~state_base() {
307  delete[] workerCond;
308  }
309 };
310 
314 template <typename T>
316  memory_size_type m_inputSize;
317  array<T> m_inputBuffer;
318 
319 public:
320  array_view<T> get_input() {
321  return array_view<T>(&m_inputBuffer[0], m_inputSize);
322  }
323 
324  void set_input(array_view<T> input) {
325  if (input.size() > m_inputBuffer.size())
326  throw tpie::exception(m_inputBuffer.size() ? "Input too large" : "Input buffer not initialized");
327 
328  memory_size_type items =
329  std::copy(input.begin(), input.end(), m_inputBuffer.begin())
330  -m_inputBuffer.begin();
331 
332  m_inputSize = items;
333  }
334 
335  parallel_input_buffer(const options & opts)
336  : m_inputSize(0)
337  , m_inputBuffer(opts.bufSize)
338  {
339  }
340 };
341 
345 template <typename T>
347  memory_size_type m_outputSize;
348  array<T> m_outputBuffer;
349  friend class after<T>;
350 
351 public:
352  array_view<T> get_output() {
353  return array_view<T>(&m_outputBuffer[0], m_outputSize);
354  }
355 
356  parallel_output_buffer(const options & opts)
357  : m_outputSize(0)
358  , m_outputBuffer(opts.bufSize)
359  {
360  }
361 };
362 
370 template <typename T>
371 class consumer : public node {
372 public:
373  typedef T item_type;
374 
375  virtual void consume(array_view<T>) = 0;
376  // node has virtual dtor
377 };
378 
383 template <typename T1, typename T2>
384 class state : public state_base {
385 public:
386  typedef std::shared_ptr<state> ptr;
387  typedef state_base::mutex_t mutex_t;
388  typedef state_base::cond_t cond_t;
389  typedef state_base::lock_t lock_t;
390 
391  array<parallel_input_buffer<T1> *> m_inputBuffers;
392  array<parallel_output_buffer<T2> *> m_outputBuffers;
393 
394  consumer<T2> * m_cons;
395 
396  std::unique_ptr<threads<T1, T2> > pipes;
397 
398  template <typename fact_t>
399  state(const options opts, fact_t && fact)
400  : state_base(opts)
401  , m_inputBuffers(opts.numJobs)
402  , m_outputBuffers(opts.numJobs)
403  , m_cons(0)
404  {
405  typedef threads_impl<T1, T2, fact_t> pipes_impl_t;
406  pipes.reset(new pipes_impl_t(std::move(fact), *this));
407  }
408 
409  void set_consumer_ptr(consumer<T2> * cons) {
410  m_cons = cons;
411  }
412 
413  consumer<T2> * const * get_consumer_ptr_ptr() const {
414  return &m_cons;
415  }
416 };
417 
421 template <typename T>
422 class after : public after_base {
423 protected:
424  state_base & st;
425  size_t parId;
426  std::unique_ptr<parallel_output_buffer<T> > m_buffer;
427  array<parallel_output_buffer<T> *> & m_outputBuffers;
428  typedef state_base::lock_t lock_t;
429  consumer<T> * const * m_cons;
430 
431 public:
432  typedef T item_type;
433 
434  template <typename Input>
435  after(state<Input, T> & state,
436  size_t parId)
437  : st(state)
438  , parId(parId)
439  , m_outputBuffers(state.m_outputBuffers)
440  , m_cons(state.get_consumer_ptr_ptr())
441  {
442  state.set_output_ptr(parId, this);
443  set_name("Parallel after", PRIORITY_INSIGNIFICANT);
444  set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
445  if (m_cons == 0) throw tpie::exception("Unexpected nullptr");
446  if (*m_cons != 0) throw tpie::exception("Expected nullptr");
447  }
448 
449  virtual void set_consumer(node * cons) override {
450  this->add_push_destination(*cons);
451  }
452 
453  after(after && other)
454  : after_base(std::move(other))
455  , st(other.st)
456  , parId(std::move(other.parId))
457  , m_outputBuffers(other.m_outputBuffers)
458  , m_cons(std::move(other.m_cons)) {
459  st.set_output_ptr(parId, this);
460  if (m_cons == 0) throw tpie::exception("Unexpected nullptr in move");
461  if (*m_cons != 0) throw tpie::exception("Expected nullptr in move");
462  }
463 
467  void push(const T & item) {
468  if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
469  flush_buffer_impl(false);
470 
471  m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
472  }
473 
474  virtual void end() override {
475  flush_buffer_impl(true);
476  }
477 
481  virtual void worker_initialize() override {
482  m_buffer.reset(new parallel_output_buffer<T>(st.opts));
483  m_outputBuffers[parId] = m_buffer.get();
484  }
485 
490  virtual void flush_buffer() override {
491  flush_buffer_impl(true);
492  }
493 
494 private:
495  bool is_done() const {
496  switch (st.get_state(parId)) {
497  case INITIALIZING:
498  throw tpie::exception("INITIALIZING not expected in after::is_done");
499  case IDLE:
500  return true;
501  case PROCESSING:
502  // The main thread may transition us from Outputting to Idle to
503  // Processing without us noticing, or it may transition us from
504  // Partial_Output to Processing. In either case, we are done
505  // flushing the buffer.
506  return true;
507  case PARTIAL_OUTPUT:
508  case OUTPUTTING:
509  return false;
510  case DONE:
511  return true;
512  }
513  throw tpie::exception("Unknown state");
514  }
515 
531  void flush_buffer_impl(bool complete) {
532  // At this point, we could check if the output buffer is empty and
533  // short-circuit when it is without acquiring the lock; however, we
534  // must do a full PROCESSING -> OUTPUTTING -> IDLE transition in this
535  // case to let the main thread know that we are done processing the
536  // input.
537 
538  lock_t lock(st.mutex);
539  if (st.get_state(parId) == DONE) {
540  if (*m_cons == 0) throw tpie::exception("Unexpected nullptr in flush_buffer");
541  array_view<T> out = m_buffer->get_output();
542  (*m_cons)->consume(out);
543  } else {
544  st.transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
545  // notify producer that output is ready
546  st.producerCond.notify_one();
547  while (!is_done()) {
548  st.workerCond[parId].wait(lock);
549  }
550  }
551  m_buffer->m_outputSize = 0;
552  }
553 };
554 
560 template <typename T>
561 class before : public node {
562 protected:
563  state_base & st;
564  size_t parId;
565  std::unique_ptr<parallel_input_buffer<T> > m_buffer;
566  array<parallel_input_buffer<T> *> & m_inputBuffers;
567  std::thread m_worker;
568 
572  virtual void push_all(array_view<T> items) = 0;
573 
574  template <typename Output>
575  before(state<T, Output> & st, size_t parId)
576  : st(st)
577  , parId(parId)
578  , m_inputBuffers(st.m_inputBuffers)
579  {
580  set_name("Parallel before", PRIORITY_INSIGNIFICANT);
581  set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
582  }
583  // virtual dtor in node
584 
585  before(const before & other)
586  : st(other.st)
587  , parId(other.parId)
588  , m_inputBuffers(other.m_inputBuffers)
589  {
590  }
591 
592  ~before() {
593  // If we were destructed because of an exception,
594  // we should stop the worker thread
595  {
596  state_base::lock_t lock(st.mutex);
597  if (st.get_state(parId) == IDLE) {
598  st.transition_state(parId, IDLE, DONE);
599  }
600  }
601  st.workerCond[parId].notify_one();
602  if (m_worker.joinable()) {
603  m_worker.join();
604  }
605  }
606 
607 public:
608  typedef T item_type;
609 
610  virtual void begin() override {
611  node::begin();
612  std::thread t(run_worker, this);
613  m_worker.swap(t);
614  }
615 
616 private:
620  bool ready() {
621  switch (st.get_state(parId)) {
622  case INITIALIZING:
623  throw tpie::exception("INITIALIZING not expected in before::ready");
624  case IDLE:
625  return false;
626  case PROCESSING:
627  return true;
628  case PARTIAL_OUTPUT:
629  throw tpie::exception("State 'partial_output' was not expected in before::ready");
630  case OUTPUTTING:
631  throw tpie::exception("State 'outputting' was not expected in before::ready");
632  case DONE:
633  return false;
634  }
635  throw tpie::exception("Unknown state");
636  }
637 
641  class running_signal {
642  typedef state_base::cond_t cond_t;
643  memory_size_type & sig;
644  cond_t & producerCond;
645  public:
646  running_signal(memory_size_type & sig, cond_t & producerCond)
647  : sig(sig)
648  , producerCond(producerCond)
649  {
650  ++sig;
651  producerCond.notify_one();
652  }
653 
654  ~running_signal() {
655  --sig;
656  producerCond.notify_one();
657  }
658  };
659 
660  static void run_worker(before * self) {
661  self->worker();
662  }
663 
667  void worker() {
668  state_base::lock_t lock(st.mutex);
669 
670  m_buffer.reset(new parallel_input_buffer<T>(st.opts));
671  m_inputBuffers[parId] = m_buffer.get();
672 
673  // virtual invocation
674  st.output(parId).worker_initialize();
675 
676  st.transition_state(parId, INITIALIZING, IDLE);
677  running_signal _(st.runningWorkers, st.producerCond);
678  while (true) {
679  // wait for transition IDLE -> PROCESSING
680  while (!ready()) {
681  if (st.get_state(parId) == DONE) {
682  return;
683  }
684  st.workerCond[parId].wait(lock);
685  }
686  lock.unlock();
687 
688  // virtual invocation
689  push_all(m_buffer->get_input());
690 
691  lock.lock();
692  }
693  }
694 };
695 
699 template <typename dest_t>
700 class before_impl : public before<typename push_type<dest_t>::type> {
701  typedef typename push_type<dest_t>::type item_type;
702 
703  dest_t dest;
704 
705 public:
706  template <typename Output>
707  before_impl(state<item_type, Output> & st,
708  size_t parId,
709  dest_t dest)
710  : before<item_type>(st, parId)
711  , dest(std::move(dest))
712  {
713  this->add_push_destination(dest);
714  st.set_input_ptr(parId, this);
715  }
716 
723  virtual void push_all(array_view<item_type> items) {
724  for (size_t i = 0; i < items.size(); ++i) {
725  dest.push(items[i]);
726  }
727 
728  // virtual invocation
729  this->st.output(this->parId).flush_buffer();
730  }
731 };
732 
736 template <typename Input, typename Output, typename dest_t>
737 class consumer_impl : public consumer<typename push_type<dest_t>::type> {
739  typedef typename state_t::ptr stateptr;
740  dest_t dest;
741  stateptr st;
742 public:
743  typedef typename push_type<dest_t>::type item_type;
744 
745  consumer_impl(dest_t dest, stateptr st)
746  : dest(std::move(dest))
747  , st(st)
748  {
749  this->add_push_destination(dest);
750  this->set_name("Parallel output", PRIORITY_INSIGNIFICANT);
751  this->set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
752  for (size_t i = 0; i < st->opts.numJobs; ++i) {
753  st->output(i).set_consumer(this);
754  }
755  }
756 
760  virtual void consume(array_view<item_type> a) override {
761  for (size_t i = 0; i < a.size(); ++i) {
762  dest.push(a[i]);
763  }
764  }
765 };
766 
772 template <typename T1, typename T2>
773 class producer : public node {
774 public:
775  typedef T1 item_type;
776 
777 private:
778  typedef state<T1, T2> state_t;
779  typedef typename state_t::ptr stateptr;
780  stateptr st;
781  array<T1> inputBuffer;
782  size_t written;
783  size_t readyIdx;
784  std::shared_ptr<consumer<T2> > cons;
785  internal_queue<memory_size_type> m_outputOrder;
786  stream_size_type m_steps;
787 
795  bool has_ready_pipe() {
796  for (size_t i = 0; i < st->opts.numJobs; ++i) {
797  switch (st->get_state(i)) {
798  case INITIALIZING:
799  case PROCESSING:
800  break;
801  case PARTIAL_OUTPUT:
802  case OUTPUTTING:
803  // If we have to maintain order of items, the only
804  // outputting worker we consider to be waiting is the
805  // "front worker".
806  if (st->opts.maintainOrder && m_outputOrder.front() != i)
807  break;
808  // fallthrough
809  case IDLE:
810  readyIdx = i;
811  return true;
812  case DONE:
813  throw tpie::exception("State DONE not expected in has_ready_pipe().");
814  }
815  }
816  return false;
817  }
818 
829  bool has_outputting_pipe() {
830  for (size_t i = 0; i < st->opts.numJobs; ++i) {
831  switch (st->get_state(i)) {
832  case INITIALIZING:
833  case IDLE:
834  case PROCESSING:
835  break;
836  case PARTIAL_OUTPUT:
837  case OUTPUTTING:
838  if (st->opts.maintainOrder && m_outputOrder.front() != i)
839  break;
840  readyIdx = i;
841  return true;
842  case DONE:
843  throw tpie::exception("State DONE not expected in has_outputting_pipe().");
844  }
845  }
846  return false;
847  }
848 
859  bool has_processing_pipe() {
860  for (size_t i = 0; i < st->opts.numJobs; ++i) {
861  switch (st->get_state(i)) {
862  case INITIALIZING:
863  case IDLE:
864  case PARTIAL_OUTPUT:
865  case OUTPUTTING:
866  break;
867  case PROCESSING:
868  return true;
869  case DONE:
870  throw tpie::exception("State DONE not expected in has_processing_pipe().");
871  }
872  }
873  return false;
874  }
875 
879  void flush_steps() {
880  // The number of items has been forwarded along unchanged to all
881  // the workers (it is still a valid upper bound).
882  //
883  // This means the workers each expect to handle all the items,
884  // which means the number of steps reported in total is scaled up
885  // by the number of workers.
886  //
887  // Therefore, we similarly scale up the number of times we call step.
888  // In effect, every time step() is called once in a single worker,
889  // we process this as if all workers called step().
890 
891  stream_size_type steps = st->pipes->sum_steps();
892  if (steps != m_steps) {
893  this->get_progress_indicator()->step(st->opts.numJobs*(steps - m_steps));
894  m_steps = steps;
895  }
896  }
897 
898 public:
899  template <typename consumer_t>
900  producer(stateptr st, consumer_t cons)
901  : st(st)
902  , written(0)
903  , cons(new consumer_t(std::move(cons)))
904  , m_steps(0)
905  {
906  for (size_t i = 0; i < st->opts.numJobs; ++i) {
907  this->add_push_destination(st->input(i));
908  }
909  this->set_name("Parallel input", PRIORITY_INSIGNIFICANT);
910  this->set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
911 
912  memory_size_type usage =
913  st->opts.numJobs * st->opts.bufSize * (sizeof(T1) + sizeof(T2)) // workers
914  + st->opts.bufSize * sizeof(item_type) // our buffer
915  ;
916  this->set_minimum_memory(usage);
917 
918  if (st->opts.maintainOrder) {
919  m_outputOrder.resize(st->opts.numJobs);
920  }
921  }
922 
923  virtual void begin() override {
924  inputBuffer.resize(st->opts.bufSize);
925 
926  state_base::lock_t lock(st->mutex);
927  while (st->runningWorkers != st->opts.numJobs) {
928  st->producerCond.wait(lock);
929  }
930  }
931 
941  void push(item_type item) {
942  inputBuffer[written++] = item;
943  if (written < st->opts.bufSize) {
944  // Wait for more items before doing anything expensive such as
945  // locking.
946  return;
947  }
948  state_base::lock_t lock(st->mutex);
949 
950  flush_steps();
951 
952  empty_input_buffer(lock);
953  }
954 
955 private:
956  void empty_input_buffer(state_base::lock_t & lock) {
957  while (written > 0) {
958  while (!has_ready_pipe()) {
959  st->producerCond.wait(lock);
960  }
961  switch (st->get_state(readyIdx)) {
962  case INITIALIZING:
963  throw tpie::exception("State 'INITIALIZING' not expected at this point");
964  case IDLE:
965  {
966  // Send buffer to ready worker
967  item_type * first = &inputBuffer[0];
968  item_type * last = first + written;
969  parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
970  dest.set_input(array_view<T1>(first, last));
971  st->transition_state(readyIdx, IDLE, PROCESSING);
972  st->workerCond[readyIdx].notify_one();
973  written = 0;
974  if (st->opts.maintainOrder)
975  m_outputOrder.push(readyIdx);
976  break;
977  }
978  case PROCESSING:
979  throw tpie::exception("State 'processing' not expected at this point");
980  case PARTIAL_OUTPUT:
981  // Receive buffer (virtual invocation)
982  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
983  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
984  st->workerCond[readyIdx].notify_one();
985  break;
986  case OUTPUTTING:
987  // Receive buffer (virtual invocation)
988  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
989 
990  st->transition_state(readyIdx, OUTPUTTING, IDLE);
991  st->workerCond[readyIdx].notify_one();
992  if (st->opts.maintainOrder) {
993  if (m_outputOrder.front() != readyIdx) {
994  log_error() << "Producer: Expected " << readyIdx << " in front; got "
995  << m_outputOrder.front() << std::endl;
996  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
997  }
998  m_outputOrder.pop();
999  }
1000  break;
1001  case DONE:
1002  throw tpie::exception("State 'DONE' not expected at this point");
1003  }
1004  }
1005  }
1006 
1007 public:
1008  virtual void end() override {
1009  state_base::lock_t lock(st->mutex);
1010 
1011  flush_steps();
1012 
1013  empty_input_buffer(lock);
1014 
1015  inputBuffer.resize(0);
1016 
1017  st->set_consumer_ptr(cons.get());
1018 
1019  bool done = false;
1020  while (!done) {
1021  while (!has_outputting_pipe()) {
1022  if (!has_processing_pipe()) {
1023  done = true;
1024  break;
1025  }
1026  // All items pushed; wait for processors to complete
1027  st->producerCond.wait(lock);
1028  }
1029  if (done) break;
1030 
1031  // virtual invocation
1032  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1033 
1034  if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1035  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1036  st->workerCond[readyIdx].notify_one();
1037  continue;
1038  }
1039  st->transition_state(readyIdx, OUTPUTTING, IDLE);
1040  if (st->opts.maintainOrder) {
1041  if (m_outputOrder.front() != readyIdx) {
1042  log_error() << "Producer: Expected " << readyIdx << " in front; got "
1043  << m_outputOrder.front() << std::endl;
1044  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
1045  }
1046  m_outputOrder.pop();
1047  }
1048  }
1049  // Notify all workers that all processing is done
1050  for (size_t i = 0; i < st->opts.numJobs; ++i) {
1051  st->transition_state(i, IDLE, DONE);
1052  st->workerCond[i].notify_one();
1053  }
1054  while (st->runningWorkers > 0) {
1055  st->producerCond.wait(lock);
1056  }
1057  // All workers terminated
1058 
1059  flush_steps();
1060  }
1061 };
1062 
1063 } // namespace parallel_bits
1064 
1065 } // namespace pipelining
1066 
1067 } // namespace tpie
1068 
1069 #endif
after_base & output(size_t idx)
Get the specified after instance.
Definition: base.h:273
void set_input_ptr(size_t idx, node *v)
Must not be used concurrently.
Definition: base.h:245
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Definition: file_stream.h:379
Encapsulation of two pointers from any random access container.
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:299
virtual void flush_buffer()=0
Called by before::worker after a batch of items has been pushed.
virtual void worker_initialize()=0
Called by before::worker to initialize buffers.
virtual void set_consumer(node *)=0
For internal use in order to construct the pipeline graph.
virtual void flush_buffer() override
Invoked by before::push_all when all input items have been pushed.
Definition: base.h:490
const T & front() const
Return the item that has been in the queue for the longest time.
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:408
Accepts input items from the main thread and sends them down the pipeline.
Definition: base.h:40
Factory hook that sets the progress indicator of the nodes run in parallel to the null progress indic...
Definition: base.h:72
cond_t producerCond
Condition variable.
Definition: base.h:228
Class containing an array of node instances.
Definition: base.h:57
void set_output_ptr(size_t idx, after_base *v)
Must not be used concurrently.
Definition: base.h:250
progress_indicator_null pi_t
Progress indicator type.
Definition: base.h:64
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
Accepts output items and sends them to the main thread.
Definition: base.h:44
cond_t * workerCond
Condition variable, one per worker.
Definition: base.h:239
void push(T val)
Add an element to the front of the queue.
a dummy progress indicator that produces no output
Aligned, uninitialized storage.
Definition: aligned_array.h:43
void push(item_type item)
Accumulate input buffer and send off to workers.
Definition: base.h:941
virtual void push_all(array_view< T > items)=0
Overridden in subclass to push a buffer of items.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:415
size_t runningWorkers
Shared state, must have mutex to write.
Definition: base.h:242
Encapsulation of two pointers from any random access container.
Definition: array_view.h:47
void pop()
Remove an element from the back of the queue.
Common state in parallel pipelining library.
Definition: base.h:211
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
virtual void push_all(array_view< item_type > items)
Push all items from buffer and flush output buffer afterwards.
Definition: base.h:723
virtual void set_consumer(node *cons) override
For internal use in order to construct the pipeline graph.
Definition: base.h:449
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Subclass of threads instantiating and managing the pipelines.
Definition: base.h:112
Node running in main thread, accepting an output buffer from the managing producer and forwards them ...
Definition: base.h:371
Non-templated virtual base class of after.
Definition: base.h:183
size_t size() const
Get number of elements in the array.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
iterator begin() const
Return an iterator to the beginning of the array.
virtual void begin() override
Begin pipeline processing phase.
Definition: base.h:610
void push(const T &item)
Push to thread-local buffer; flush it when full.
Definition: base.h:467
virtual void end() override
End pipeline processing phase.
Definition: base.h:474
User-supplied options to the parallelism framework.
Definition: options.h:34
State subclass containing the item type specific state, i.e.
Definition: base.h:46
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:307
size_type size() const
Return the size of the array.
Definition: array.h:526
worker_state get_state(size_t idx)
Shared state, must have mutex to use.
Definition: base.h:276
Producer, running in main thread, managing the parallel execution.
Definition: base.h:773
virtual void end() override
End pipeline processing phase.
Definition: base.h:1008
void transition_state(size_t idx, worker_state from, worker_state to)
Shared state, must have mutex to use.
Definition: base.h:281
virtual void worker_initialize() override
Invoked by before::worker (in worker thread context).
Definition: base.h:481
virtual void consume(array_view< item_type > a) override
Push all items from output buffer to the rest of the pipeline.
Definition: base.h:760
iterator end() const
Return an iterator to the end of the array.
Whether to maintain order in parallel or not.
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:147
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
virtual void begin() override
Begin pipeline processing phase.
Definition: base.h:923
void resize(size_t size=0)
Resize the queue; all data is lost.
Concrete consumer implementation.
Definition: base.h:737
node & input(size_t idx)
Get the specified before instance.
Definition: base.h:261