#pragma once #include #include #include "ur_modern_driver/queue/readerwriterqueue.h" using namespace moodycamel; using namespace std; template class IProducer { public: virtual void setup_producer() = 0; virtual void teardown_producer() = 0; virtual void stop_producer() = 0; virtual unique_ptr try_get() = 0; }; template class IConsumer { public: virtual void setup_consumer() = 0; virtual void teardown_consumer() = 0; virtual void stop_consumer() = 0; virtual bool push(unique_ptr product) = 0; }; template class Pipeline { private: IProducer &_producer; IConsumer &_consumer; BlockingReaderWriterQueue> _queue; atomic _running; thread _pThread, _cThread; void run_producer() { _producer.setup_producer(); while(_running) { unique_ptr product(_producer.try_get()); if(product == nullptr) break; if(!_queue.try_enqueue(std::move(product))) { //log dropped product } } _producer.teardown_producer(); //todo cleanup } void run_consumer() { _consumer.setup_consumer(); while(_running) { unique_ptr product; _queue.wait_dequeue(product); if(!_consumer.push(std::move(product))) break; } _consumer.teardown_consumer(); //todo cleanup } public: Pipeline(IProducer &producer, IConsumer &consumer) : _producer(producer), _consumer(consumer), _queue{32}, _running{false} { } void run() { if(_running) return; _running = true; _pThread = thread(&Pipeline::run_producer, this); _cThread = thread(&Pipeline::run_consumer, this); } void stop() { if(!_running) return; _consumer.stop_consumer(); _producer.stop_producer(); _pThread.join(); _cThread.join(); } };