diff --git a/CMakeLists.txt b/CMakeLists.txt index 459aef7..18ee84e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,7 +67,6 @@ include_directories( ) add_library(ur_rtde_driver - src/comm/stream.cpp src/comm/tcp_socket.cpp src/ros/service_stopper.cpp src/ur/commander.cpp diff --git a/include/ur_rtde_driver/comm/pipeline.h b/include/ur_rtde_driver/comm/pipeline.h index b32145f..0a55c95 100644 --- a/include/ur_rtde_driver/comm/pipeline.h +++ b/include/ur_rtde_driver/comm/pipeline.h @@ -1,4 +1,6 @@ /* + * Copyright 2019, FZI Forschungszentrum Informatik (templating) + * * Copyright 2017, 2018 Simon Rasmussen (refactor) * * Copyright 2015, 2016 Thomas Timm Andersen (original version) @@ -24,6 +26,7 @@ #include #include "ur_rtde_driver/log.h" #include "ur_rtde_driver/queue/readerwriterqueue.h" +#include "ur_rtde_driver/comm/package.h" namespace ur_driver { @@ -104,7 +107,7 @@ public: } }; -template +template class IProducer { public: @@ -118,7 +121,7 @@ public: { } - virtual bool tryGet(std::vector>& products) = 0; + virtual bool tryGet(std::vector>>& products) = 0; }; class INotifier @@ -132,24 +135,25 @@ public: } }; -template +template class Pipeline { private: typedef std::chrono::high_resolution_clock Clock; typedef Clock::time_point Time; - IProducer& producer_; - IConsumer& consumer_; + using _package_type = URPackage; + IProducer& producer_; + IConsumer<_package_type>& consumer_; std::string name_; INotifier& notifier_; - BlockingReaderWriterQueue> queue_; + BlockingReaderWriterQueue> queue_; std::atomic running_; std::thread pThread_, cThread_; void run_producer() { producer_.setupProducer(); - std::vector> products; + std::vector> products; while (running_) { if (!producer_.tryGet(products)) @@ -177,7 +181,7 @@ private: void run_consumer() { consumer_.setupConsumer(); - std::unique_ptr product; + std::unique_ptr<_package_type> product; while (running_) { // timeout was chosen because we should receive messages @@ -201,7 +205,7 @@ private: } public: - Pipeline(IProducer& producer, IConsumer& consumer, std::string name, INotifier& notifier) + Pipeline(IProducer& producer, IConsumer<_package_type>& consumer, std::string name, INotifier& notifier) : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false } { } diff --git a/include/ur_rtde_driver/ur/producer.h b/include/ur_rtde_driver/comm/producer.h similarity index 79% rename from include/ur_rtde_driver/ur/producer.h rename to include/ur_rtde_driver/comm/producer.h index 186e8f9..3d49650 100644 --- a/include/ur_rtde_driver/ur/producer.h +++ b/include/ur_rtde_driver/comm/producer.h @@ -1,4 +1,6 @@ /* + * Copyright 2019, FZI Forschungszentrum Informatik (templating) + * * Copyright 2017, 2018 Simon Rasmussen (refactor) * * Copyright 2015, 2016 Thomas Timm Andersen (original version) @@ -21,19 +23,22 @@ #include "ur_rtde_driver/comm/pipeline.h" #include "ur_rtde_driver/comm/parser.h" #include "ur_rtde_driver/comm/stream.h" +#include "ur_rtde_driver/comm/package.h" namespace ur_driver { -template -class URProducer : public comm::IProducer +namespace comm +{ +template +class URProducer : public IProducer { private: - comm::URStream& stream_; - comm::URParser& parser_; + URStream& stream_; + URParser& parser_; std::chrono::seconds timeout_; public: - URProducer(comm::URStream& stream, comm::URParser& parser) : stream_(stream), parser_(parser), timeout_(1) + URProducer(URStream& stream, URParser& parser) : stream_(stream), parser_(parser), timeout_(1) { } @@ -50,7 +55,7 @@ public: stream_.disconnect(); } - bool tryGet(std::vector>& products) + bool tryGet(std::vector>>& products) { // 4KB should be enough to hold any packet received from UR uint8_t buf[4096]; @@ -83,4 +88,5 @@ public: return parser_.parse(bp, products); } }; +} // namespace comm } // namespace ur_driver diff --git a/include/ur_rtde_driver/comm/stream.h b/include/ur_rtde_driver/comm/stream.h index 9f8c3ff..7cfd37d 100644 --- a/include/ur_rtde_driver/comm/stream.h +++ b/include/ur_rtde_driver/comm/stream.h @@ -1,4 +1,6 @@ /* + * Copyright 2019, FZI Forschungszentrum Informatik (templating) + * * Copyright 2017, 2018 Simon Rasmussen (refactor) * * Copyright 2015, 2016 Thomas Timm Andersen (original version) @@ -30,6 +32,7 @@ namespace ur_driver { namespace comm { +template class URStream : public TCPSocket { private: @@ -66,5 +69,44 @@ public: bool read(uint8_t* buf, size_t buf_len, size_t& read); bool write(const uint8_t* buf, size_t buf_len, size_t& written); }; + +template +bool URStream::write(const uint8_t* buf, size_t buf_len, size_t& written) +{ + std::lock_guard lock(write_mutex_); + return TCPSocket::write(buf, buf_len, written); +} + +template +bool URStream::read(uint8_t* buf, size_t buf_len, size_t& total) +{ + std::lock_guard lock(read_mutex_); + + bool initial = true; + uint8_t* buf_pos = buf; + size_t remainder = sizeof(HeaderT::_package_size_type); + size_t read = 0; + + while (remainder > 0 && TCPSocket::read(buf_pos, remainder, read)) + { + TCPSocket::setOptions(getSocketFD()); + if (initial) + { + remainder = HeaderT::getPackageLength(buf); + if (remainder >= (buf_len - sizeof(HeaderT::_package_size_type))) + { + LOG_ERROR("Packet size %zd is larger than buffer %zu, discarding.", remainder, buf_len); + return false; + } + initial = false; + } + + total += read; + buf_pos += read; + remainder -= read; + } + + return remainder == 0; +} } // namespace comm } // namespace ur_driver diff --git a/include/ur_rtde_driver/ur/commander.h b/include/ur_rtde_driver/ur/commander.h index 6aaff47..1b14143 100644 --- a/include/ur_rtde_driver/ur/commander.h +++ b/include/ur_rtde_driver/ur/commander.h @@ -1,4 +1,6 @@ /* + * Copyright 2019, FZI Forschungszentrum Informatik (templating) + * * Copyright 2017, 2018 Simon Rasmussen (refactor) * * Copyright 2015, 2016 Thomas Timm Andersen (original version) @@ -21,20 +23,21 @@ #include #include #include "ur_rtde_driver/comm/stream.h" +#include "ur_rtde_driver/primary/package_header.h" namespace ur_driver { class URCommander { private: - comm::URStream& stream_; + comm::URStream& stream_; protected: bool write(const std::string& s); void formatArray(std::ostringstream& out, std::array& values); public: - URCommander(comm::URStream& stream) : stream_(stream) + URCommander(comm::URStream& stream) : stream_(stream) { } @@ -53,7 +56,7 @@ public: class URCommander_V1_X : public URCommander { public: - URCommander_V1_X(comm::URStream& stream) : URCommander(stream) + URCommander_V1_X(comm::URStream& stream) : URCommander(stream) { } @@ -65,7 +68,7 @@ public: class URCommander_V3_X : public URCommander { public: - URCommander_V3_X(comm::URStream& stream) : URCommander(stream) + URCommander_V3_X(comm::URStream& stream) : URCommander(stream) { } @@ -77,7 +80,7 @@ public: class URCommander_V3_1__2 : public URCommander_V3_X { public: - URCommander_V3_1__2(comm::URStream& stream) : URCommander_V3_X(stream) + URCommander_V3_1__2(comm::URStream& stream) : URCommander_V3_X(stream) { } @@ -87,7 +90,7 @@ public: class URCommander_V3_3 : public URCommander_V3_X { public: - URCommander_V3_3(comm::URStream& stream) : URCommander_V3_X(stream) + URCommander_V3_3(comm::URStream& stream) : URCommander_V3_X(stream) { } diff --git a/src/comm/stream.cpp b/src/comm/stream.cpp deleted file mode 100644 index 9ecb880..0000000 --- a/src/comm/stream.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2017, 2018 Simon Rasmussen (refactor) - * - * Copyright 2015, 2016 Thomas Timm Andersen (original version) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include "ur_rtde_driver/log.h" -#include "ur_rtde_driver/comm/stream.h" - -namespace ur_driver -{ -namespace comm -{ -bool URStream::write(const uint8_t* buf, size_t buf_len, size_t& written) -{ - std::lock_guard lock(write_mutex_); - return TCPSocket::write(buf, buf_len, written); -} - -bool URStream::read(uint8_t* buf, size_t buf_len, size_t& total) -{ - std::lock_guard lock(read_mutex_); - - bool initial = true; - uint8_t* buf_pos = buf; - size_t remainder = sizeof(int32_t); - size_t read = 0; - - while (remainder > 0 && TCPSocket::read(buf_pos, remainder, read)) - { - TCPSocket::setOptions(getSocketFD()); - if (initial) - { - remainder = be32toh(*(reinterpret_cast(buf))); - if (remainder >= (buf_len - sizeof(int32_t))) - { - LOG_ERROR("Packet size %zd is larger than buffer %zu, discarding.", remainder, buf_len); - return false; - } - initial = false; - } - - total += read; - buf_pos += read; - remainder -= read; - } - - return remainder == 0; -} -} // namespace comm -} // namespace ur_driver