diff --git a/include/ur_rtde_driver/comm/pipeline.h b/include/ur_rtde_driver/comm/pipeline.h index 4d0c6b6..cf35d38 100644 --- a/include/ur_rtde_driver/comm/pipeline.h +++ b/include/ur_rtde_driver/comm/pipeline.h @@ -20,13 +20,13 @@ #pragma once +#include "ur_rtde_driver/comm/package.h" +#include "ur_rtde_driver/log.h" +#include "ur_rtde_driver/queue/readerwriterqueue.h" #include #include #include #include -#include "ur_rtde_driver/log.h" -#include "ur_rtde_driver/queue/readerwriterqueue.h" -#include "ur_rtde_driver/comm/package.h" namespace ur_driver { @@ -139,9 +139,15 @@ template class Pipeline { public: + typedef std::chrono::high_resolution_clock Clock; + typedef Clock::time_point Time; using _package_type = URPackage; Pipeline(IProducer& producer, IConsumer<_package_type>& consumer, std::string name, INotifier& notifier) - : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false } + : producer_(producer), consumer_(&consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false } + { + } + Pipeline(IProducer& producer, std::string name, INotifier& notifier) + : producer_(producer), consumer_(nullptr), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false } { } @@ -151,8 +157,9 @@ public: return; running_ = true; - pThread_ = std::thread(&Pipeline::run_producer, this); - cThread_ = std::thread(&Pipeline::run_consumer, this); + pThread_ = std::thread(&Pipeline::runProducer, this); + if (consumer_ != nullptr) + cThread_ = std::thread(&Pipeline::runConsumer, this); notifier_.started(name_); } @@ -161,9 +168,10 @@ public: if (!running_) return; - LOG_INFO("Stopping pipeline! <%s>", name_.c_str()); + LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str()); - consumer_.stopConsumer(); + if (consumer_ != nullptr) + consumer_->stopConsumer(); producer_.stopProducer(); running_ = false; @@ -173,18 +181,21 @@ public: notifier_.stopped(name_); } + bool getLatestProduct(std::unique_ptr> product, std::chrono::milliseconds timeout) + { + return !queue_.wait_dequeue_timed(product, timeout); + } + private: - typedef std::chrono::high_resolution_clock Clock; - typedef Clock::time_point Time; IProducer& producer_; - IConsumer<_package_type>& consumer_; + IConsumer<_package_type>* consumer_; std::string name_; INotifier& notifier_; BlockingReaderWriterQueue> queue_; std::atomic running_; std::thread pThread_, cThread_; - void run_producer() + void runProducer() { producer_.setupProducer(); std::vector> products; @@ -207,14 +218,15 @@ private: } producer_.teardownProducer(); LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str()); - consumer_.stopConsumer(); + if (consumer_ != nullptr) + consumer_->stopConsumer(); running_ = false; notifier_.stopped(name_); } - void run_consumer() + void runConsumer() { - consumer_.setupConsumer(); + consumer_->setupConsumer(); std::unique_ptr<_package_type> product; while (running_) { @@ -224,14 +236,14 @@ private: // So we update the consumer more frequently via onTimeout if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8))) { - consumer_.onTimeout(); + consumer_->onTimeout(); continue; } - if (!consumer_.consume(std::move(product))) + if (!consumer_->consume(std::move(product))) break; } - consumer_.teardownConsumer(); + consumer_->teardownConsumer(); LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str()); producer_.stopProducer(); running_ = false;