1
0
mirror of https://gitlab.com/obbart/universal_robots_ros_driver.git synced 2026-04-10 10:00:48 +02:00

reworked pipeline to allow for use without consumer

This commit is contained in:
Tristan Schnell
2019-04-10 13:25:48 +02:00
parent 75d73c1c28
commit 89ee72ac2d

View File

@@ -20,13 +20,13 @@
#pragma once
#include "ur_rtde_driver/comm/package.h"
#include "ur_rtde_driver/log.h"
#include "ur_rtde_driver/queue/readerwriterqueue.h"
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
#include "ur_rtde_driver/log.h"
#include "ur_rtde_driver/queue/readerwriterqueue.h"
#include "ur_rtde_driver/comm/package.h"
namespace ur_driver
{
@@ -139,9 +139,15 @@ template <typename HeaderT>
class Pipeline
{
public:
typedef std::chrono::high_resolution_clock Clock;
typedef Clock::time_point Time;
using _package_type = URPackage<HeaderT>;
Pipeline(IProducer<HeaderT>& producer, IConsumer<_package_type>& consumer, std::string name, INotifier& notifier)
: producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
: producer_(producer), consumer_(&consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
{
}
Pipeline(IProducer<HeaderT>& producer, std::string name, INotifier& notifier)
: producer_(producer), consumer_(nullptr), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
{
}
@@ -151,8 +157,9 @@ public:
return;
running_ = true;
pThread_ = std::thread(&Pipeline::run_producer, this);
cThread_ = std::thread(&Pipeline::run_consumer, this);
pThread_ = std::thread(&Pipeline::runProducer, this);
if (consumer_ != nullptr)
cThread_ = std::thread(&Pipeline::runConsumer, this);
notifier_.started(name_);
}
@@ -161,9 +168,10 @@ public:
if (!running_)
return;
LOG_INFO("Stopping pipeline! <%s>", name_.c_str());
LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
consumer_.stopConsumer();
if (consumer_ != nullptr)
consumer_->stopConsumer();
producer_.stopProducer();
running_ = false;
@@ -173,18 +181,21 @@ public:
notifier_.stopped(name_);
}
bool getLatestProduct(std::unique_ptr<URPackage<HeaderT>> product, std::chrono::milliseconds timeout)
{
return !queue_.wait_dequeue_timed(product, timeout);
}
private:
typedef std::chrono::high_resolution_clock Clock;
typedef Clock::time_point Time;
IProducer<HeaderT>& producer_;
IConsumer<_package_type>& consumer_;
IConsumer<_package_type>* consumer_;
std::string name_;
INotifier& notifier_;
BlockingReaderWriterQueue<std::unique_ptr<_package_type>> queue_;
std::atomic<bool> running_;
std::thread pThread_, cThread_;
void run_producer()
void runProducer()
{
producer_.setupProducer();
std::vector<std::unique_ptr<_package_type>> products;
@@ -207,14 +218,15 @@ private:
}
producer_.teardownProducer();
LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
consumer_.stopConsumer();
if (consumer_ != nullptr)
consumer_->stopConsumer();
running_ = false;
notifier_.stopped(name_);
}
void run_consumer()
void runConsumer()
{
consumer_.setupConsumer();
consumer_->setupConsumer();
std::unique_ptr<_package_type> product;
while (running_)
{
@@ -224,14 +236,14 @@ private:
// So we update the consumer more frequently via onTimeout
if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8)))
{
consumer_.onTimeout();
consumer_->onTimeout();
continue;
}
if (!consumer_.consume(std::move(product)))
if (!consumer_->consume(std::move(product)))
break;
}
consumer_.teardownConsumer();
consumer_->teardownConsumer();
LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
producer_.stopProducer();
running_ = false;