1
0
mirror of https://gitlab.com/obbart/universal_robots_ros_driver.git synced 2026-04-10 01:50:46 +02:00
Files
universal_robots_ros_driver/include/ur_modern_driver/pipeline.h
Michael Görner e4a503fe5f various improvements and fixes for use_ros_control=true (#6)
* Find matching hardware_interface using the required type

The name of the controller was used in order to find and start
the matching hardware interface.
In consequence this meant that one could only define one controller
for each hardware interface.

Now, the controller's required type of hardware interface is used
to find and start the matching hardware interface.

* separate read & update in controller

consume is defined as read+update, but update
does not include read in ros_control terminology.

* Handle latency in pipeline loop

The controllers need to update at a rate of *at least* 125Hz,
but the wait_dequeue_timed call could in theory slow the loop down to 62.5Hz.
The old ur_modern_driver worked around this problem by sending goals
at 4*125Hz.

This patch exploits the onTimeout method of a consumer to update with
the specified frequency of the control loop, even if no new state message
arrived after the previous command.

* publish wrench w.r.t. tcp frame

The messages had an empty frame_id before and could not be displayed in RViz

* support ros_control in indigo
2017-12-08 14:05:07 +01:00

196 lines
3.5 KiB
C++

#pragma once
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
#include "ur_modern_driver/log.h"
#include "ur_modern_driver/queue/readerwriterqueue.h"
using namespace moodycamel;
using namespace std;
template <typename T>
class IConsumer
{
public:
virtual void setupConsumer()
{
}
virtual void teardownConsumer()
{
}
virtual void stopConsumer()
{
}
virtual void onTimeout()
{
}
virtual bool consume(shared_ptr<T> product) = 0;
};
template <typename T>
class MultiConsumer : public IConsumer<T>
{
private:
std::vector<IConsumer<T>*> consumers_;
public:
MultiConsumer(std::vector<IConsumer<T>*> consumers) : consumers_(consumers)
{
}
virtual void setupConsumer()
{
for (auto& con : consumers_)
{
con->setupConsumer();
}
}
virtual void teardownConsumer()
{
for (auto& con : consumers_)
{
con->teardownConsumer();
}
}
virtual void stopConsumer()
{
for (auto& con : consumers_)
{
con->stopConsumer();
}
}
virtual void onTimeout()
{
for (auto& con : consumers_)
{
con->onTimeout();
}
}
bool consume(shared_ptr<T> product)
{
bool res = true;
for (auto& con : consumers_)
{
if (!con->consume(product))
res = false;
}
return res;
}
};
template <typename T>
class IProducer
{
public:
virtual void setupProducer()
{
}
virtual void teardownProducer()
{
}
virtual void stopProducer()
{
}
virtual bool tryGet(std::vector<unique_ptr<T>>& products) = 0;
};
template <typename T>
class Pipeline
{
private:
typedef std::chrono::high_resolution_clock Clock;
typedef Clock::time_point Time;
IProducer<T>& producer_;
IConsumer<T>& consumer_;
BlockingReaderWriterQueue<unique_ptr<T>> queue_;
atomic<bool> running_;
thread pThread_, cThread_;
void run_producer()
{
producer_.setupProducer();
std::vector<unique_ptr<T>> products;
while (running_)
{
if (!producer_.tryGet(products))
{
break;
}
for (auto& p : products)
{
if (!queue_.try_enqueue(std::move(p)))
{
LOG_ERROR("Pipeline producer overflowed!");
}
}
products.clear();
}
producer_.teardownProducer();
LOG_DEBUG("Pipline producer ended");
consumer_.stopConsumer();
}
void run_consumer()
{
consumer_.setupConsumer();
unique_ptr<T> product;
while (running_)
{
// timeout was chosen because we should receive messages
// at roughly 125hz (every 8ms) and have to update
// the controllers (i.e. the consumer) with *at least* 125Hz
// So we update the consumer more frequently via onTimeout
if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8)))
{
consumer_.onTimeout();
continue;
}
if (!consumer_.consume(std::move(product)))
break;
}
consumer_.teardownConsumer();
LOG_DEBUG("Pipline consumer ended");
producer_.stopProducer();
}
public:
Pipeline(IProducer<T>& producer, IConsumer<T>& consumer)
: producer_(producer), consumer_(consumer), queue_{ 32 }, running_{ false }
{
}
void run()
{
if (running_)
return;
running_ = true;
pThread_ = thread(&Pipeline::run_producer, this);
cThread_ = thread(&Pipeline::run_consumer, this);
}
void stop()
{
if (!running_)
return;
LOG_DEBUG("Stopping pipeline");
consumer_.stopConsumer();
producer_.stopProducer();
running_ = false;
pThread_.join();
cThread_.join();
}
};