mirror of
https://gitlab.com/obbart/universal_robots_ros_driver.git
synced 2026-04-12 11:00:47 +02:00
Tested producer part with real data
This commit is contained in:
@@ -46,8 +46,8 @@ public:
|
||||
* \brief Creates a new URPackage object.
|
||||
*/
|
||||
|
||||
URPackage();
|
||||
virtual ~URPackage();
|
||||
URPackage() = default;
|
||||
virtual ~URPackage() = default;
|
||||
|
||||
using _header_type = HeaderT;
|
||||
|
||||
|
||||
@@ -35,6 +35,9 @@ class Parser
|
||||
|
||||
{
|
||||
public:
|
||||
Parser() = default;
|
||||
virtual ~Parser() = default;
|
||||
|
||||
/*!
|
||||
* \brief declares the parse function.
|
||||
*
|
||||
|
||||
@@ -138,10 +138,44 @@ public:
|
||||
template <typename HeaderT>
|
||||
class Pipeline
|
||||
{
|
||||
public:
|
||||
using _package_type = URPackage<HeaderT>;
|
||||
Pipeline(IProducer<HeaderT>& producer, IConsumer<_package_type>& consumer, std::string name, INotifier& notifier)
|
||||
: producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
|
||||
{
|
||||
}
|
||||
|
||||
void run()
|
||||
{
|
||||
if (running_)
|
||||
return;
|
||||
|
||||
running_ = true;
|
||||
pThread_ = std::thread(&Pipeline::run_producer, this);
|
||||
cThread_ = std::thread(&Pipeline::run_consumer, this);
|
||||
notifier_.started(name_);
|
||||
}
|
||||
|
||||
void stop()
|
||||
{
|
||||
if (!running_)
|
||||
return;
|
||||
|
||||
LOG_INFO("Stopping pipeline! <%s>", name_.c_str());
|
||||
|
||||
consumer_.stopConsumer();
|
||||
producer_.stopProducer();
|
||||
|
||||
running_ = false;
|
||||
|
||||
pThread_.join();
|
||||
cThread_.join();
|
||||
notifier_.stopped(name_);
|
||||
}
|
||||
|
||||
private:
|
||||
typedef std::chrono::high_resolution_clock Clock;
|
||||
typedef Clock::time_point Time;
|
||||
using _package_type = URPackage<HeaderT>;
|
||||
IProducer<HeaderT>& producer_;
|
||||
IConsumer<_package_type>& consumer_;
|
||||
std::string name_;
|
||||
@@ -203,40 +237,6 @@ private:
|
||||
running_ = false;
|
||||
notifier_.stopped(name_);
|
||||
}
|
||||
|
||||
public:
|
||||
Pipeline(IProducer<HeaderT>& producer, IConsumer<_package_type>& consumer, std::string name, INotifier& notifier)
|
||||
: producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
|
||||
{
|
||||
}
|
||||
|
||||
void run()
|
||||
{
|
||||
if (running_)
|
||||
return;
|
||||
|
||||
running_ = true;
|
||||
pThread_ = std::thread(&Pipeline::run_producer, this);
|
||||
cThread_ = std::thread(&Pipeline::run_consumer, this);
|
||||
notifier_.started(name_);
|
||||
}
|
||||
|
||||
void stop()
|
||||
{
|
||||
if (!running_)
|
||||
return;
|
||||
|
||||
LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
|
||||
|
||||
consumer_.stopConsumer();
|
||||
producer_.stopProducer();
|
||||
|
||||
running_ = false;
|
||||
|
||||
pThread_.join();
|
||||
cThread_.join();
|
||||
notifier_.stopped(name_);
|
||||
}
|
||||
};
|
||||
} // namespace comm
|
||||
} // namespace ur_driver
|
||||
|
||||
@@ -34,11 +34,11 @@ class URProducer : public IProducer<HeaderT>
|
||||
{
|
||||
private:
|
||||
URStream<HeaderT>& stream_;
|
||||
URParser<HeaderT>& parser_;
|
||||
Parser<HeaderT>& parser_;
|
||||
std::chrono::seconds timeout_;
|
||||
|
||||
public:
|
||||
URProducer(URStream<HeaderT>& stream, URParser<HeaderT>& parser) : stream_(stream), parser_(parser), timeout_(1)
|
||||
URProducer(URStream<HeaderT>& stream, Parser<HeaderT>& parser) : stream_(stream), parser_(parser), timeout_(1)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#ifndef UR_RTDE_DRIVER_SHELL_CONSUMER_H_INCLUDED
|
||||
#define UR_RTDE_DRIVER_SHELL_CONSUMER_H_INCLUDED
|
||||
|
||||
#include "ur_rtde_driver/log.h"
|
||||
#include "ur_rtde_driver/comm/pipeline.h"
|
||||
#include "ur_rtde_driver/comm/package.h"
|
||||
|
||||
@@ -42,7 +43,11 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
virtual bool consume(std::shared_ptr<URPackage<HeaderT>> product);
|
||||
virtual bool consume(std::shared_ptr<URPackage<HeaderT>> product)
|
||||
{
|
||||
LOG_INFO("%s", product->toString().c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
/* data */
|
||||
|
||||
@@ -130,7 +130,7 @@ bool URStream<HeaderT>::read(uint8_t* buf, const size_t buf_len, size_t& total)
|
||||
|
||||
bool initial = true;
|
||||
uint8_t* buf_pos = buf;
|
||||
size_t remainder = sizeof(HeaderT::_package_size_type);
|
||||
size_t remainder = sizeof(typename HeaderT::_package_size_type);
|
||||
size_t read = 0;
|
||||
|
||||
while (remainder > 0 && TCPSocket::read(buf_pos, remainder, read))
|
||||
@@ -139,7 +139,7 @@ bool URStream<HeaderT>::read(uint8_t* buf, const size_t buf_len, size_t& total)
|
||||
if (initial)
|
||||
{
|
||||
remainder = HeaderT::getPackageLength(buf);
|
||||
if (remainder >= (buf_len - sizeof(HeaderT::_package_size_type)))
|
||||
if (remainder >= (buf_len - sizeof(typename HeaderT::_package_size_type)))
|
||||
{
|
||||
LOG_ERROR("Packet size %zd is larger than buffer %zu, discarding.", remainder, buf_len);
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user