20 #ifndef __TPIE_PIPELINING_HELPERS_H__
21 #define __TPIE_PIPELINING_HELPERS_H__
24 #include <tpie/pipelining/node.h>
25 #include <tpie/pipelining/pipe_base.h>
26 #include <tpie/pipelining/factory_helpers.h>
32 namespace pipelining {
36 template <
typename dest_t>
41 inline ostream_logger_t(dest_t dest, std::ostream & log) : dest(std::move(dest)), log(log), begun(
false), ended(
false) {
43 set_name(
"Log", PRIORITY_INSIGNIFICANT);
49 virtual void end()
override {
53 inline void push(
const item_type & item) {
55 log <<
"WARNING: push() called before begin(). Calling begin on rest of pipeline." << std::endl;
59 log <<
"WARNING: push() called after end()." << std::endl;
62 log <<
"pushing " << item << std::endl;
72 template <
typename source_t>
77 pull_peek_t(source_t source) : source(std::move(source)) {
83 could_pull = source.can_pull();
84 if (could_pull) item=source.pull();
89 could_pull = source.can_pull();
90 if (could_pull) item=source.pull();
94 const item_type & peek()
const {
98 bool can_pull()
const {
108 template <
typename T>
114 std::shared_ptr<T>
buffer;
115 inline void push(
const T & el) {
123 template <
typename fact2_t>
126 typedef typename fact2_t::constructed_type dest2_t;
128 template <
typename dest_t>
133 type(dest_t dest, fact2_t fact2) : dest(std::move(dest)), dest2(fact2.construct()) {
138 inline void push(
const item_type & item) {
150 template <
typename source_t,
typename dest_fact_t>
155 pull_fork_t(source_t source, dest_fact_t dest_fact)
156 : dest(dest_fact.construct())
157 , source(std::move(source)) {
162 bool can_pull() {
return source.can_pull();}
165 item_type i=source.pull();
171 typename dest_fact_t::constructed_type dest;
176 template <
typename T>
181 void push(
const T &) {}
185 template <
typename T>
190 T pull() {tpie_unreachable();}
191 bool can_pull() {
return false;}
195 template <
typename IT>
200 typedef typename IT::value_type item_type;
216 template <
typename IT>
219 template <
typename dest_t>
225 type(dest_t dest, IT from, IT to)
228 , dest(std::move(dest))
233 virtual void go()
override {
242 template <
typename Iterator,
typename Item =
void>
245 template <
typename Iterator>
249 typedef typename Iterator::value_type item_type;
255 void push(
const item_type & item) {
261 template <
typename Iterator,
typename Item>
265 typedef Item item_type;
271 void push(
const item_type & item) {
277 template <
typename IT>
280 template <
typename dest_t>
285 type(dest_t dest, IT to)
287 , dest(std::move(dest))
292 virtual void go()
override {
293 while (dest.can_pull()) {
301 template <
typename F>
304 template <
typename dest_t>
311 type(dest_t dest,
const F & functor): functor(functor), dest(std::move(dest)) {
316 functor(*static_cast<node*>(
this));
319 void push(
const item_type & item) {dest.push(item);}
324 template <
typename F>
327 template <
typename dest_t>
334 type(dest_t dest,
const F & functor): functor(functor), dest(std::move(dest)) {
339 functor(*static_cast<node*>(
this));
342 void push(
const item_type & item) {dest.push(item);}
346 template <
typename src_fact_t>
348 typedef typename src_fact_t::constructed_type src_t;
349 template <
typename dest_t>
354 type(dest_t dest, src_fact_t src_fact)
355 : src(src_fact.construct()), dest(std::move(dest)) {
360 void push(
const item_type & item) {
361 tp_assert(src.can_pull(),
"We should be able to pull");
362 dest.push(std::make_pair(item, src.pull()));
370 template <
typename fact2_t>
372 typedef typename fact2_t::constructed_type dest2_t;
374 template <
typename dest1_t>
377 typedef typename push_type<dest1_t>::type first_type;
378 typedef typename push_type<dest2_t>::type second_type;
379 typedef std::pair<first_type, second_type> item_type;
381 type(dest1_t dest1, fact2_t fact2) : dest1(std::move(dest1)), dest2(fact2.construct()) {
386 void push(
const item_type & item) {
387 dest2.push(item.second);
388 dest1.push(item.first);
396 template <
typename T>
399 template <
typename dest_t>
405 type(dest_t dest): dest(std::move(dest)) {}
406 void push(
const item_type & item) {dest.push(item);}
410 template <
typename fact_t>
413 template <
typename dest_t>
416 typedef typename fact_t::constructed_type source_t;
419 type(dest_t dest, fact_t fact)
420 : dest(std::move(dest)), src(fact.construct()) {
426 size_t size = fetch<stream_size_type>(
"items");
431 while (src.can_pull()) {
432 dest.push(src.pull());
443 template <
typename dest_t,
typename equal_t>
448 unique_t(dest_t dest, equal_t equal)
449 : equal(equal), dest(std::move(dest)) {}
455 void push(
const item_type & item) {
456 if (!first && equal(prev, item))
476 inline pipe_middle<factory<bits::ostream_logger_t, std::ostream &> >
493 template <
typename fact_t>
496 return {std::move(to.factory)};
505 template <
typename dest_fact_t>
506 pullpipe_middle<tfactory<bits::pull_fork_t, Args<dest_fact_t>, dest_fact_t> >
508 return {std::move(dest_fact)};
517 template <
typename fact_t>
518 pipe_middle<tempfactory<bits::unzip_t<fact_t>, fact_t> >
520 return {std::move(to.factory)};
530 template <
typename fact_t>
531 pipe_middle<tempfactory<bits::zip_t<fact_t>, fact_t> >
533 return {std::move(from.factory)};
541 template <
typename T>
542 inline pipe_end<termfactory<bits::null_sink_t<T> > >
550 template <
typename T>
551 inline pullpipe_begin<termfactory<bits::zero_source_t<T> > >
555 template <
template <
typename dest_t>
class Fact,
typename... T>
556 pipe_begin<factory<Fact, T...> > make_pipe_begin(T... t) {
560 template <
template <
typename dest_t>
class Fact,
typename... T>
561 pipe_middle<factory<Fact, T...> > make_pipe_middle(T... t) {
565 template <
typename Fact,
typename... T>
566 pipe_end<termfactory<Fact, T...> > make_pipe_end(T ... t) {
575 template <
typename IT>
586 template <
typename IT>
597 template <
typename IT>
609 template <
typename Item,
typename IT>
620 template <
typename IT>
631 template <
typename F>
642 template <
typename F>
653 template <
typename T>
663 template <
typename fact_t>
664 pipe_begin<tempfactory<bits::pull_source_t<fact_t>, fact_t> >
666 return {std::move(from.factory)};
675 template <
typename equal_t>
Defines the tp_assert macro.
pipe_end< termfactory< bits::push_output_iterator_t< IT, Item >, IT > > typed_push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
virtual void begin()
Begin pipeline processing phase.
Memory management subsystem.
pipe_middle< tempfactory< bits::preparer_t< F >, F > > preparer(const F &functor)
Create preparer callback identity pipe node.
pipe_end< termfactory< bits::push_output_iterator_t< IT >, IT > > push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
pullpipe_end< tempfactory< bits::pull_output_iterator_t< IT >, IT > > pull_output_iterator(IT to)
A pull-pipe node that writes its given items to the destination pointed to by the given iterator...
pipe_middle< tfactory< bits::unique_t, Args< equal_t >, equal_t > > unique(equal_t equal)
Filter consecutive duplicates out.
void propagate() override
Propagate stream metadata.
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
pipe_middle< tempfactory< bits::propagater_t< F >, F > > propagater(const F &functor)
Create propagate callback identity pipe node.
virtual void begin() override
Begin pipeline processing phase.
pullpipe_middle< factory< bits::pull_peek_t > > pull_peek
A node that allows peeking at the next item in the pipeline.
pipe_middle< tempfactory< bits::zip_t< fact_t >, fact_t > > zip(pullpipe_begin< fact_t > from)
Create a zip pipe node.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Node factory for variadic argument templated generators.
virtual void end() override
End pipeline processing phase.
void propagate() override
Propagate stream metadata.
virtual void begin() override
Begin pipeline processing phase.
void begin() override
Begin pipeline processing phase.
pipe_end< termfactory< bits::null_sink_t< T > > > null_sink()
Create a dummy end pipe node.
pipe_middle< tempfactory< bits::unzip_t< fact_t >, fact_t > > unzip(pipe_end< fact_t > to)
Create unzip pipe node.
pullpipe_middle< tfactory< bits::pull_fork_t, Args< dest_fact_t >, dest_fact_t > > pull_fork(dest_fact_t dest_fact)
Create a pulling fork pipe node.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Node factory for variadic argument terminators.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
pipe_begin< tempfactory< bits::push_input_iterator_t< IT >, IT, IT > > push_input_iterator(IT begin, IT end)
A pipelining node that pushes the items in the range given by two iterators.
virtual void end()
End pipeline processing phase.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void step(stream_size_type steps=1)
Step the progress indicator.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
pipe_middle< tempfactory< bits::fork_t< fact_t >, fact_t > > fork(pipe_end< fact_t > to)
Create a fork pipe node.
A pipe_middle class pushes input down the pipeline.
pullpipe_begin< termfactory< bits::zero_source_t< T > > > zero_source()
Create a dummy pull begin pipe node.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
#define tp_assert(condition, message)
pipe_begin< tempfactory< bits::pull_source_t< fact_t >, fact_t > > pull_source(pullpipe_begin< fact_t > from)
A node that pulls items from source and push them into dest.
pipe_middle< factory< bits::ostream_logger_t, std::ostream & > > cout_logger()
A pipelining node that writes items to standard out and then pushes them to the next node...
pullpipe_begin< termfactory< bits::pull_input_iterator_t< IT >, IT, IT > > pull_input_iterator(IT begin, IT end)
A pull-pipe that returns items in the range given by two iterators.