fixed auto reconnect and resubscribe inside mqtt wrapper loop
This commit is contained in:
@@ -17,6 +17,9 @@ namespace drivers
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
RS485(const uint32_t baud, const SerialConfig conf);
|
RS485(const uint32_t baud, const SerialConfig conf);
|
||||||
|
RS485(const RS485 &) = delete; // remove copy constructors
|
||||||
|
RS485 &operator=(const RS485 &) = delete;
|
||||||
|
|
||||||
const bool write(const std::vector<uint8_t> data);
|
const bool write(const std::vector<uint8_t> data);
|
||||||
const bool readAll(std::vector<uint8_t> &data);
|
const bool readAll(std::vector<uint8_t> &data);
|
||||||
const bool readN(const uint16_t nBytes, std::vector<uint8_t> &data);
|
const bool readN(const uint16_t nBytes, std::vector<uint8_t> &data);
|
||||||
@@ -54,6 +57,9 @@ namespace drivers
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
MODBUS(const uint32_t baud, const SerialConfig conf);
|
MODBUS(const uint32_t baud, const SerialConfig conf);
|
||||||
|
MODBUS(const MODBUS &) = delete; // remove copy constructors
|
||||||
|
MODBUS &operator=(const MODBUS &) = delete;
|
||||||
|
|
||||||
|
|
||||||
// Func 0x01
|
// Func 0x01
|
||||||
const bool readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils);
|
const bool readCoils(const uint8_t device, const uint16_t reg, const uint16_t num, std::vector<bool> &coils);
|
||||||
|
|||||||
13
src/main.cpp
13
src/main.cpp
@@ -34,12 +34,11 @@ void setup()
|
|||||||
{
|
{
|
||||||
Serial.begin(9600);
|
Serial.begin(9600);
|
||||||
LOG_ATTACH_SERIAL(Serial);
|
LOG_ATTACH_SERIAL(Serial);
|
||||||
conf.init();
|
conf.init(); // read the configuration from internal flash
|
||||||
}
|
}
|
||||||
|
|
||||||
void loop()
|
void loop()
|
||||||
{
|
{
|
||||||
const uint8_t baseRegister(0x00);
|
|
||||||
uint16_t k(0);
|
uint16_t k(0);
|
||||||
uint8_t sensors(0);
|
uint8_t sensors(0);
|
||||||
bool buzzing(false);
|
bool buzzing(false);
|
||||||
@@ -66,10 +65,17 @@ void loop()
|
|||||||
auto mqtt = MQTTwrapper();
|
auto mqtt = MQTTwrapper();
|
||||||
//////////////// NETWORK ////////////////
|
//////////////// NETWORK ////////////////
|
||||||
|
|
||||||
|
std::function<void(const ArduinoJson::JsonDocument &)> mycallback =
|
||||||
|
[&io](const ArduinoJson::JsonDocument &doc) {
|
||||||
|
io.digitalIOWrite(0, doc["stat"].as<bool>());
|
||||||
|
io.digitalIOWrite(15, doc["stat"].as<bool>());
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
//////////////// NETWORK ////////////////
|
//////////////// NETWORK ////////////////
|
||||||
/////////////// CALLBACK ////////////////
|
/////////////// CALLBACK ////////////////
|
||||||
Network.onEvent(
|
Network.onEvent(
|
||||||
[ð, &rtc, &mqtt, &buzzer, &led](arduino_event_id_t event, arduino_event_info_t info) -> void
|
[ð, &rtc, &mqtt, &buzzer, &led, &mycallback](arduino_event_id_t event, arduino_event_info_t info) -> void
|
||||||
{
|
{
|
||||||
eth.onEvent(event, info); // Arduino Ethernet event handler
|
eth.onEvent(event, info); // Arduino Ethernet event handler
|
||||||
if (!eth.isConnected())
|
if (!eth.isConnected())
|
||||||
@@ -95,6 +101,7 @@ void loop()
|
|||||||
if (mqtt.connect())
|
if (mqtt.connect())
|
||||||
{
|
{
|
||||||
mqtt.subscribe("test/esp32-in", testAction);
|
mqtt.subscribe("test/esp32-in", testAction);
|
||||||
|
mqtt.subscribe("test/esp32-functional", mycallback);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
delay(100);
|
delay(100);
|
||||||
|
|||||||
41
src/mqtt.cpp
41
src/mqtt.cpp
@@ -1,7 +1,7 @@
|
|||||||
#include <mqtt.h>
|
#include <mqtt.h>
|
||||||
|
|
||||||
#define STACK_DEPTH 4096
|
#define STACK_DEPTH 4096
|
||||||
#define PRIOTITY 1
|
#define PRIOTITY 0
|
||||||
|
|
||||||
MQTTwrapper::MQTTwrapper() : m_config(Config::getInstance()), m_tcp(NetworkClient()), m_client(PubSubClient(m_tcp)), m_loopHandle(NULL)
|
MQTTwrapper::MQTTwrapper() : m_config(Config::getInstance()), m_tcp(NetworkClient()), m_client(PubSubClient(m_tcp)), m_loopHandle(NULL)
|
||||||
{
|
{
|
||||||
@@ -19,7 +19,7 @@ const bool MQTTwrapper::connect()
|
|||||||
{
|
{
|
||||||
if (!m_client.connect(m_config.m_mqttClientName.c_str()))
|
if (!m_client.connect(m_config.m_mqttClientName.c_str()))
|
||||||
{
|
{
|
||||||
LOG_ERROR("Unable to connect to MQTT Host", m_config.m_mqttHost.c_str());
|
LOG_ERROR("MQTT unable to connect to host", m_config.m_mqttHost.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
LOG_INFO("MQTT client connected to", m_config.m_mqttHost.c_str());
|
LOG_INFO("MQTT client connected to", m_config.m_mqttHost.c_str());
|
||||||
@@ -78,6 +78,11 @@ const bool MQTTwrapper::unsubscribe(topic_t topic)
|
|||||||
const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj)
|
const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj)
|
||||||
{
|
{
|
||||||
std::string message;
|
std::string message;
|
||||||
|
if (!m_client.connected())
|
||||||
|
{
|
||||||
|
LOG_ERROR("MQTT client not connected");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (!ArduinoJson::serializeJson(obj, message))
|
if (!ArduinoJson::serializeJson(obj, message))
|
||||||
{
|
{
|
||||||
LOG_ERROR("MQTT failed to serialize object");
|
LOG_ERROR("MQTT failed to serialize object");
|
||||||
@@ -95,8 +100,8 @@ const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument o
|
|||||||
void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length)
|
void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length)
|
||||||
{
|
{
|
||||||
std::string pl;
|
std::string pl;
|
||||||
pl.resize(length+1);
|
pl.resize(length + 1);
|
||||||
std::snprintf(pl.data(), length+1, "%s", payload);
|
std::snprintf(pl.data(), length + 1, "%s", payload);
|
||||||
auto inst = getInstance();
|
auto inst = getInstance();
|
||||||
if (inst)
|
if (inst)
|
||||||
{
|
{
|
||||||
@@ -124,11 +129,31 @@ void MQTTwrapper::clientLoop(void *params)
|
|||||||
{
|
{
|
||||||
auto client = (MQTTwrapper *)(params);
|
auto client = (MQTTwrapper *)(params);
|
||||||
auto loopTime = client->m_config.m_mqttLoopTime;
|
auto loopTime = client->m_config.m_mqttLoopTime;
|
||||||
LOG_INFO("Starting MQTT client loop");
|
uint8_t connectAttempt(0);
|
||||||
while (client->m_client.connected())
|
LOG_INFO("MQTT starting client loop");
|
||||||
|
while (connectAttempt++ < client->m_config.m_mqttRetries)
|
||||||
{
|
{
|
||||||
client->m_client.loop();
|
while (client->m_client.connected())
|
||||||
vTaskDelay(pdMS_TO_TICKS(loopTime));
|
{
|
||||||
|
client->m_client.loop();
|
||||||
|
vTaskDelay(pdMS_TO_TICKS(loopTime));
|
||||||
|
}
|
||||||
|
if (client->m_client.state() <= MQTT_CONNECTED)
|
||||||
|
{
|
||||||
|
LOG_ERROR("MQTT disconnect reason ", client->m_client.state());
|
||||||
|
vTaskDelay(pdMS_TO_TICKS(loopTime * 50));
|
||||||
|
const bool ok = client->m_client.connect(client->m_config.m_mqttClientName.c_str());
|
||||||
|
LOG_WARN("MQTT reconnected", ok ? "True" : "False");
|
||||||
|
if (ok)
|
||||||
|
{
|
||||||
|
for (auto v : client->m_actionMap)
|
||||||
|
{
|
||||||
|
LOG_WARN("MQTT resubscribing to",v.first.c_str());
|
||||||
|
client->m_client.subscribe(v.first.c_str());
|
||||||
|
}
|
||||||
|
connectAttempt = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LOG_ERROR("MQTT client loop terminated, disconnected");
|
LOG_ERROR("MQTT client loop terminated, disconnected");
|
||||||
client->m_loopHandle = NULL;
|
client->m_loopHandle = NULL;
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
#include <config.h>
|
#include <config.h>
|
||||||
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
typedef std::string topic_t;
|
typedef std::string topic_t;
|
||||||
typedef std::function<void(const ArduinoJson::JsonDocument &)> action_t; // the actions receive a JsonObject containing the received message
|
typedef std::function<void(const ArduinoJson::JsonDocument &)> action_t; // the actions receive a JsonObject containing the received message
|
||||||
|
|||||||
Reference in New Issue
Block a user