2 Commits

Author SHA1 Message Date
Emanuele Trabattoni
16bb029e93 Fix lock on MODBUS 2025-07-22 11:15:57 +02:00
Emanuele Trabattoni
146a2b558b Improved reconnection 2025-07-22 11:15:36 +02:00
7 changed files with 63 additions and 21 deletions

View File

@@ -71,7 +71,6 @@ namespace drivers
const bool MODBUS::readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils) const bool MODBUS::readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils)
{ {
constexpr uint8_t func = 0x01; constexpr uint8_t func = 0x01;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Read coils: dev[", device, "], reg[", reg, "], num[", num, "]"); LOG_DEBUG("Read coils: dev[", device, "], reg[", reg, "], num[", num, "]");
return readBinary(device, func, reg, num, coils); return readBinary(device, func, reg, num, coils);
} }
@@ -80,7 +79,6 @@ namespace drivers
const bool MODBUS::readInputs(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<bool> &inputs) const bool MODBUS::readInputs(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<bool> &inputs)
{ {
constexpr uint8_t func = 0x02; constexpr uint8_t func = 0x02;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Read multi inputs: dev[", device, "], reg[", reg, "], num[", num, "]"); LOG_DEBUG("Read multi inputs: dev[", device, "], reg[", reg, "], num[", num, "]");
return readBinary(device, func, reg, num, inputs); return readBinary(device, func, reg, num, inputs);
} }
@@ -89,7 +87,6 @@ namespace drivers
const bool MODBUS::readHoldingRegisters(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<uint16_t> &values) const bool MODBUS::readHoldingRegisters(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<uint16_t> &values)
{ {
constexpr uint8_t func = 0x03; constexpr uint8_t func = 0x03;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Read multi holding registers: dev[", device, "], reg[", reg, "], num[", num, "]"); LOG_DEBUG("Read multi holding registers: dev[", device, "], reg[", reg, "], num[", num, "]");
return readInteger(device, func, reg, num, values); return readInteger(device, func, reg, num, values);
} }
@@ -98,7 +95,6 @@ namespace drivers
const bool MODBUS::readInputRegisters(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<uint16_t> &values) const bool MODBUS::readInputRegisters(const uint8_t device, const uint16_t reg, const uint8_t num, std::vector<uint16_t> &values)
{ {
constexpr uint8_t func = 0x04; constexpr uint8_t func = 0x04;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Read multi input registers: dev[", device, "], reg[", reg, "], num[", num, "]"); LOG_DEBUG("Read multi input registers: dev[", device, "], reg[", reg, "], num[", num, "]");
return readInteger(device, func, reg, num, values); return readInteger(device, func, reg, num, values);
} }
@@ -107,7 +103,6 @@ namespace drivers
const bool MODBUS::writeCoil(const uint8_t device, const uint16_t coil, const bool value) const bool MODBUS::writeCoil(const uint8_t device, const uint16_t coil, const bool value)
{ {
constexpr uint8_t func = 0x05; constexpr uint8_t func = 0x05;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Write single coil: dev[", device, "], coil[", coil, "], value[", value ? "true" : "false", "]"); LOG_DEBUG("Write single coil: dev[", device, "], coil[", coil, "], value[", value ? "true" : "false", "]");
return writeBinary(device, func, coil, {value}); return writeBinary(device, func, coil, {value});
} }
@@ -116,7 +111,6 @@ namespace drivers
const bool MODBUS::writeRegister(const uint8_t device, const uint16_t reg, const uint16_t value) const bool MODBUS::writeRegister(const uint8_t device, const uint16_t reg, const uint16_t value)
{ {
constexpr uint8_t func = 0x06; constexpr uint8_t func = 0x06;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Write single register: dev[", device, "], reg[", reg, "], value[", value, "]"); LOG_DEBUG("Write single register: dev[", device, "], reg[", reg, "], value[", value, "]");
return writeInteger(device, func, reg, {value}, false); return writeInteger(device, func, reg, {value}, false);
} }
@@ -125,7 +119,6 @@ namespace drivers
const bool MODBUS::writeCoils(const uint8_t device, const uint16_t coils, const std::vector<bool> &values) const bool MODBUS::writeCoils(const uint8_t device, const uint16_t coils, const std::vector<bool> &values)
{ {
constexpr uint8_t func = 0x0F; constexpr uint8_t func = 0x0F;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Write multi coils: dev[", device, "], start[", coils, "], num[", values.size(), "]"); LOG_DEBUG("Write multi coils: dev[", device, "], start[", coils, "], num[", values.size(), "]");
return writeBinary(device, func, coils, values); return writeBinary(device, func, coils, values);
} }
@@ -134,7 +127,6 @@ namespace drivers
const bool MODBUS::writeRegisters(const uint8_t device, const uint16_t reg, const std::vector<uint16_t> &values) const bool MODBUS::writeRegisters(const uint8_t device, const uint16_t reg, const std::vector<uint16_t> &values)
{ {
constexpr uint8_t func = 0x10; constexpr uint8_t func = 0x10;
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG("Write multi registers: dev[", device, "], start[", reg, "], num[", values.size(), "]"); LOG_DEBUG("Write multi registers: dev[", device, "], start[", reg, "], num[", values.size(), "]");
return writeInteger(device, func, reg, values, true); return writeInteger(device, func, reg, values, true);
} }

View File

@@ -60,6 +60,10 @@ namespace drivers
MODBUS(const MODBUS &) = delete; // remove copy constructors MODBUS(const MODBUS &) = delete; // remove copy constructors
MODBUS &operator=(const MODBUS &) = delete; MODBUS &operator=(const MODBUS &) = delete;
// Get transaction lock
std::unique_lock<std::mutex> getLock() {
return std::unique_lock<std::mutex>(m_mutex);
}
// Func 0x01 // Func 0x01
const bool readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils); const bool readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils);

View File

@@ -76,6 +76,7 @@ namespace drivers
{ {
std::vector<uint16_t> value; std::vector<uint16_t> value;
delayRequest(); delayRequest();
auto lock = m_bus.getLock();
m_bus.readHoldingRegisters(m_address, REG_Regset, 2, value); m_bus.readHoldingRegisters(m_address, REG_Regset, 2, value);
if (value.empty()) if (value.empty())
return UINT8_MAX; return UINT8_MAX;
@@ -86,6 +87,7 @@ namespace drivers
{ {
std::vector<uint16_t> value; std::vector<uint16_t> value;
delayRequest(); delayRequest();
auto lock = m_bus.getLock();
m_bus.readHoldingRegisters(m_address, REG_PartCount, 2, value); m_bus.readHoldingRegisters(m_address, REG_PartCount, 2, value);
if (value.empty()) if (value.empty())
return UINT16_MAX; return UINT16_MAX;
@@ -103,6 +105,7 @@ namespace drivers
{ {
bool ok(true); bool ok(true);
delayRequest(); delayRequest();
auto lock = m_bus.getLock();
LOG_WARN("Powermeter Counter STOP"); LOG_WARN("Powermeter Counter STOP");
ok &= m_bus.writeRegisters(m_address, REG_PartCount, {nullVal, stopAll}); ok &= m_bus.writeRegisters(m_address, REG_PartCount, {nullVal, stopAll});
delayRequest(); delayRequest();
@@ -126,6 +129,7 @@ namespace drivers
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
delayRequest(); delayRequest();
auto lock = m_bus.getLock();
if (m_bus.readHoldingRegisters(m_address, reg, dataWords, values) && values.size() == dataWords) if (m_bus.readHoldingRegisters(m_address, reg, dataWords, values) && values.size() == dataWords)
{ {
floatval_t fv; // potrebbe essere il contrario, vedremo floatval_t fv; // potrebbe essere il contrario, vedremo

View File

@@ -22,6 +22,7 @@ namespace drivers
} }
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
auto lock = m_bus.getLock();
if (m_bus.readHoldingRegisters(m_address, REG_TEMP + ch, 1, rawT) && !rawT.empty()) if (m_bus.readHoldingRegisters(m_address, REG_TEMP + ch, 1, rawT) && !rawT.empty())
{ {
return rawT.front() / 10.0f; return rawT.front() / 10.0f;
@@ -40,6 +41,7 @@ namespace drivers
std::vector<float> out; std::vector<float> out;
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
auto lock = m_bus.getLock();
if (m_bus.readHoldingRegisters(m_address, REG_TEMP, getNum(), rawT) && !rawT.empty()) if (m_bus.readHoldingRegisters(m_address, REG_TEMP, getNum(), rawT) && !rawT.empty())
{ {
out.reserve(rawT.size()); out.reserve(rawT.size());
@@ -67,6 +69,7 @@ namespace drivers
{ // convert to decimal degreees to register value { // convert to decimal degreees to register value
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
auto lock = m_bus.getLock();
if (m_bus.writeRegister(m_address, REG_TEMPCORR + channel, v*10)) if (m_bus.writeRegister(m_address, REG_TEMPCORR + channel, v*10))
{ {
channel++; channel++;
@@ -88,6 +91,7 @@ namespace drivers
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
auto lock = m_bus.getLock();
if (m_bus.readHoldingRegisters(m_address, REG_TEMPCORR, getNum(), rawV)) if (m_bus.readHoldingRegisters(m_address, REG_TEMPCORR, getNum(), rawV))
{ {
out.reserve(rawV.size()); out.reserve(rawV.size());
@@ -114,6 +118,7 @@ namespace drivers
std::vector<uint16_t> rawT; std::vector<uint16_t> rawT;
while (retries++ < maxRetries) while (retries++ < maxRetries)
{ {
auto lock = m_bus.getLock();
if (m_bus.readHoldingRegisters(m_address, REG_TEMP, T_MAX, rawT)) if (m_bus.readHoldingRegisters(m_address, REG_TEMP, T_MAX, rawT))
{ {
for (auto v : rawT) for (auto v : rawT)

View File

@@ -37,6 +37,7 @@ const bool MQTTwrapper::disconnect()
if (m_loopHandle) if (m_loopHandle)
{ {
vTaskDelete(m_loopHandle); // immediate terminate loop vTaskDelete(m_loopHandle); // immediate terminate loop
m_loopHandle = NULL;
} }
return true; return true;
} }
@@ -75,6 +76,10 @@ const bool MQTTwrapper::unsubscribe(topic_t topic)
return false; return false;
} }
const bool MQTTwrapper::connected() {
return m_loopHandle != NULL;
}
const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj) const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj)
{ {
std::string message; std::string message;
@@ -127,35 +132,43 @@ void MQTTwrapper::onMessage(const std::string topic, const std::string message)
void MQTTwrapper::clientLoop(void *params) void MQTTwrapper::clientLoop(void *params)
{ {
auto client = (MQTTwrapper *)(params); auto wrapper = (MQTTwrapper *)(params);
auto loopTime = client->m_config.m_mqttLoopTime; auto &client = wrapper->m_client;
auto &config = wrapper->m_config;
auto &stateMap = wrapper->stateMap;
const auto loopTime = config.m_mqttLoopTime;
const auto mqttRetries = config.m_mqttRetries;
const auto clientName = config.m_mqttClientName;
uint8_t connectAttempt(0); uint8_t connectAttempt(0);
LOG_INFO("MQTT starting client loop"); LOG_INFO("MQTT starting client loop");
while (connectAttempt++ < client->m_config.m_mqttRetries) while (connectAttempt++ < mqttRetries)
{ {
while (client->m_client.connected()) while (client.connected())
{ {
client->m_client.loop(); client.loop();
vTaskDelay(pdMS_TO_TICKS(loopTime)); vTaskDelay(pdMS_TO_TICKS(loopTime));
} }
if (client->m_client.state() <= MQTT_CONNECTED) if (client.state() != MQTT_CONNECTED)
{ {
LOG_ERROR("MQTT disconnect reason ", client->m_client.state()); LOG_ERROR("MQTT disconnect reason ", stateMap.at(client.state()));
vTaskDelay(pdMS_TO_TICKS(loopTime * 50)); vTaskDelay(pdMS_TO_TICKS(loopTime * 50));
const bool ok = client->m_client.connect(client->m_config.m_mqttClientName.c_str()); const bool ok = client.connect(clientName.c_str());
LOG_WARN("MQTT reconnected", ok ? "True" : "False"); LOG_WARN("MQTT reconnected", ok ? "True" : "False");
if (ok) if (ok)
{ {
for (auto v : client->m_actionMap) for (auto &v : wrapper->m_actionMap)
{ {
LOG_WARN("MQTT resubscribing to",v.first.c_str()); const std::string &topic(v.first);
client->m_client.subscribe(v.first.c_str()); LOG_WARN("MQTT resubscribing to", topic.c_str());
if(!wrapper->m_client.subscribe(topic.c_str())){
LOG_ERROR("Unable to resubscribe to", topic.c_str());
}
} }
connectAttempt = 0; connectAttempt = 0;
} }
} }
} }
LOG_ERROR("MQTT client loop terminated, disconnected"); LOG_ERROR("MQTT client loop terminated, disconnected");
client->m_loopHandle = NULL; wrapper->m_loopHandle = NULL;
vTaskDelete(NULL); // delete the current task vTaskDelete(NULL); // delete the current task
} }

View File

@@ -21,7 +21,22 @@ class MQTTwrapper
{ {
private: private:
static MQTTwrapper *getInstance(MQTTwrapper *inst = nullptr) const std::map<int, std::string> stateMap = {
{-4, "MQTT_CONNECTION_TIMEOUT"},
{-3, "MQTT_CONNECTION_LOST"},
{-2, "MQTT_CONNECT_FAILED"},
{-1, "MQTT_DISCONNECTED"},
{0, "MQTT_CONNECTED"},
{1, "MQTT_CONNECT_BAD_PROTOCOL"},
{2, "MQTT_CONNECT_BAD_CLIENT_ID"},
{3, "MQTT_CONNECT_UNAVAILABLE"},
{4, "MQTT_CONNECT_BAD_CREDENTIALS"},
{5, "MQTT_CONNECT_UNAUTHORIZED"}
};
private:
static MQTTwrapper *
getInstance(MQTTwrapper *inst = nullptr)
{ {
static std::unique_ptr<MQTTwrapper> m_instance; static std::unique_ptr<MQTTwrapper> m_instance;
if (inst) if (inst)
@@ -37,6 +52,7 @@ public:
const bool connect(); const bool connect();
const bool disconnect(); const bool disconnect();
const bool connected();
const bool subscribe(topic_t topic, action_t action); const bool subscribe(topic_t topic, action_t action);
const bool unsubscribe(topic_t topic); const bool unsubscribe(topic_t topic);

View File

@@ -4,6 +4,7 @@ remoteIO::remoteIO(const uint8_t address, drivers::MODBUS &bus) : m_address(addr
{ {
LOG_INFO("Initializing relay module"); LOG_INFO("Initializing relay module");
std::vector<uint16_t> response; std::vector<uint16_t> response;
auto lock = m_bus.getLock();
if (!m_bus.readHoldingRegisters(m_address, REG_VERSION, 1, response)) if (!m_bus.readHoldingRegisters(m_address, REG_VERSION, 1, response))
{ {
LOG_ERROR("Unable to inizialize relay module"); LOG_ERROR("Unable to inizialize relay module");
@@ -21,6 +22,7 @@ remoteIO::~remoteIO()
const bool remoteIO::setOut(const channel_t ch, const bool value) const bool remoteIO::setOut(const channel_t ch, const bool value)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
LOG_DEBUG("Write Channel", ch, "->", value ? "True" : "False"); LOG_DEBUG("Write Channel", ch, "->", value ? "True" : "False");
@@ -29,6 +31,7 @@ const bool remoteIO::setOut(const channel_t ch, const bool value)
const bool remoteIO::toggleOut(const channel_t ch) const bool remoteIO::toggleOut(const channel_t ch)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
std::vector<bool> value; std::vector<bool> value;
@@ -40,6 +43,7 @@ const bool remoteIO::toggleOut(const channel_t ch)
const bool remoteIO::setOutPort(const std::vector<bool> values) const bool remoteIO::setOutPort(const std::vector<bool> values)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
LOG_DEBUG("Write Port", CH_MAX); LOG_DEBUG("Write Port", CH_MAX);
@@ -48,6 +52,7 @@ const bool remoteIO::setOutPort(const std::vector<bool> values)
const bool remoteIO::getOut(const channel_t ch, bool &value) const bool remoteIO::getOut(const channel_t ch, bool &value)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
std::vector<bool> values; std::vector<bool> values;
@@ -60,6 +65,7 @@ const bool remoteIO::getOut(const channel_t ch, bool &value)
const bool remoteIO::getOutPort(std::vector<bool> &values) const bool remoteIO::getOutPort(std::vector<bool> &values)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
LOG_DEBUG("Read Port", CH_MAX); LOG_DEBUG("Read Port", CH_MAX);
@@ -68,6 +74,7 @@ const bool remoteIO::getOutPort(std::vector<bool> &values)
const bool remoteIO::getIn(const channel_t input, bool &value) const bool remoteIO::getIn(const channel_t input, bool &value)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
std::vector<bool> values; std::vector<bool> values;
@@ -80,6 +87,7 @@ const bool remoteIO::getIn(const channel_t input, bool &value)
const bool remoteIO::getInPort(std::vector<bool> &values) const bool remoteIO::getInPort(std::vector<bool> &values)
{ {
auto lock = m_bus.getLock();
if (!m_initialized) if (!m_initialized)
return false; return false;
LOG_DEBUG("Read Inputs", CH_MAX); LOG_DEBUG("Read Inputs", CH_MAX);