Improved reconnection
This commit is contained in:
37
src/mqtt.cpp
37
src/mqtt.cpp
@@ -37,6 +37,7 @@ const bool MQTTwrapper::disconnect()
|
||||
if (m_loopHandle)
|
||||
{
|
||||
vTaskDelete(m_loopHandle); // immediate terminate loop
|
||||
m_loopHandle = NULL;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -75,6 +76,10 @@ const bool MQTTwrapper::unsubscribe(topic_t topic)
|
||||
return false;
|
||||
}
|
||||
|
||||
const bool MQTTwrapper::connected() {
|
||||
return m_loopHandle != NULL;
|
||||
}
|
||||
|
||||
const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj)
|
||||
{
|
||||
std::string message;
|
||||
@@ -127,35 +132,43 @@ void MQTTwrapper::onMessage(const std::string topic, const std::string message)
|
||||
|
||||
void MQTTwrapper::clientLoop(void *params)
|
||||
{
|
||||
auto client = (MQTTwrapper *)(params);
|
||||
auto loopTime = client->m_config.m_mqttLoopTime;
|
||||
auto wrapper = (MQTTwrapper *)(params);
|
||||
auto &client = wrapper->m_client;
|
||||
auto &config = wrapper->m_config;
|
||||
auto &stateMap = wrapper->stateMap;
|
||||
const auto loopTime = config.m_mqttLoopTime;
|
||||
const auto mqttRetries = config.m_mqttRetries;
|
||||
const auto clientName = config.m_mqttClientName;
|
||||
uint8_t connectAttempt(0);
|
||||
LOG_INFO("MQTT starting client loop");
|
||||
while (connectAttempt++ < client->m_config.m_mqttRetries)
|
||||
while (connectAttempt++ < mqttRetries)
|
||||
{
|
||||
while (client->m_client.connected())
|
||||
while (client.connected())
|
||||
{
|
||||
client->m_client.loop();
|
||||
client.loop();
|
||||
vTaskDelay(pdMS_TO_TICKS(loopTime));
|
||||
}
|
||||
if (client->m_client.state() <= MQTT_CONNECTED)
|
||||
if (client.state() != MQTT_CONNECTED)
|
||||
{
|
||||
LOG_ERROR("MQTT disconnect reason ", client->m_client.state());
|
||||
LOG_ERROR("MQTT disconnect reason ", stateMap.at(client.state()));
|
||||
vTaskDelay(pdMS_TO_TICKS(loopTime * 50));
|
||||
const bool ok = client->m_client.connect(client->m_config.m_mqttClientName.c_str());
|
||||
const bool ok = client.connect(clientName.c_str());
|
||||
LOG_WARN("MQTT reconnected", ok ? "True" : "False");
|
||||
if (ok)
|
||||
{
|
||||
for (auto v : client->m_actionMap)
|
||||
for (auto &v : wrapper->m_actionMap)
|
||||
{
|
||||
LOG_WARN("MQTT resubscribing to",v.first.c_str());
|
||||
client->m_client.subscribe(v.first.c_str());
|
||||
const std::string &topic(v.first);
|
||||
LOG_WARN("MQTT resubscribing to", topic.c_str());
|
||||
if(!wrapper->m_client.subscribe(topic.c_str())){
|
||||
LOG_ERROR("Unable to resubscribe to", topic.c_str());
|
||||
}
|
||||
}
|
||||
connectAttempt = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_ERROR("MQTT client loop terminated, disconnected");
|
||||
client->m_loopHandle = NULL;
|
||||
wrapper->m_loopHandle = NULL;
|
||||
vTaskDelete(NULL); // delete the current task
|
||||
}
|
||||
|
||||
18
src/mqtt.h
18
src/mqtt.h
@@ -21,7 +21,22 @@ class MQTTwrapper
|
||||
{
|
||||
|
||||
private:
|
||||
static MQTTwrapper *getInstance(MQTTwrapper *inst = nullptr)
|
||||
const std::map<int, std::string> stateMap = {
|
||||
{-4, "MQTT_CONNECTION_TIMEOUT"},
|
||||
{-3, "MQTT_CONNECTION_LOST"},
|
||||
{-2, "MQTT_CONNECT_FAILED"},
|
||||
{-1, "MQTT_DISCONNECTED"},
|
||||
{0, "MQTT_CONNECTED"},
|
||||
{1, "MQTT_CONNECT_BAD_PROTOCOL"},
|
||||
{2, "MQTT_CONNECT_BAD_CLIENT_ID"},
|
||||
{3, "MQTT_CONNECT_UNAVAILABLE"},
|
||||
{4, "MQTT_CONNECT_BAD_CREDENTIALS"},
|
||||
{5, "MQTT_CONNECT_UNAUTHORIZED"}
|
||||
};
|
||||
|
||||
private:
|
||||
static MQTTwrapper *
|
||||
getInstance(MQTTwrapper *inst = nullptr)
|
||||
{
|
||||
static std::unique_ptr<MQTTwrapper> m_instance;
|
||||
if (inst)
|
||||
@@ -37,6 +52,7 @@ public:
|
||||
|
||||
const bool connect();
|
||||
const bool disconnect();
|
||||
const bool connected();
|
||||
|
||||
const bool subscribe(topic_t topic, action_t action);
|
||||
const bool unsubscribe(topic_t topic);
|
||||
|
||||
Reference in New Issue
Block a user