#include #define STACK_DEPTH 8192 #define BUFFER_SIZE 2048 #define PRIORITY 2 MQTTwrapper::MQTTwrapper() : m_config(Config::getInstance()), m_tcp(NetworkClient()), m_client(PubSubClient(m_tcp)), m_loopHandle(NULL) { m_client.setServer(m_config.m_mqttHost.c_str(), m_config.m_mqttPort); m_client.setKeepAlive(m_config.m_mqttKeepalive); m_client.setBufferSize(BUFFER_SIZE); getInstance(this); } MQTTwrapper::~MQTTwrapper() { disconnect(); } const bool MQTTwrapper::connect() { if (!m_client.connect(m_config.m_mqttClientName.c_str())) { LOG_ERROR("MQTT unable to connect to host", m_config.m_mqttHost.c_str()); return false; } LOG_INFO("MQTT client connected to", m_config.m_mqttHost.c_str()); if (m_loopHandle == NULL) { xTaskCreate(clientLoop, "mqttLoop", STACK_DEPTH, this, PRIORITY, &m_loopHandle); m_client.setCallback(MQTTwrapper::callback); } return true; } const bool MQTTwrapper::disconnect() { m_client.disconnect(); if (m_loopHandle) { vTaskDelete(m_loopHandle); // immediate terminate loop m_loopHandle = NULL; } return true; } const bool MQTTwrapper::subscribe(const topic_t &topic, const action_t action) { if (m_actionMap.contains(topic)) { LOG_WARN("MQTT was already subscribed to", topic.c_str()); return true; } if (m_client.subscribe(topic.c_str())) { m_actionMap[topic] = action; LOG_INFO("MQTT subscribed to", topic.c_str()); return true; } LOG_ERROR("MQTT unable to subscribe to", topic.c_str()); return false; } const bool MQTTwrapper::unsubscribe(const topic_t &topic) { if (!m_actionMap.contains(topic)) { LOG_WARN("MQTT was NOT subscribed to", topic.c_str()); return false; } if (m_client.unsubscribe(topic.c_str())) { LOG_INFO("MQTT unsubscribed to", topic.c_str()); m_actionMap.erase(topic); return true; } LOG_ERROR("MQTT unable to unsubscribe to", topic.c_str()); return false; } const bool MQTTwrapper::connected() { return m_loopHandle != NULL; } const bool MQTTwrapper::publish(const topic_t &topic, const ArduinoJson::JsonDocument obj) { std::string message; if (!m_client.connected()) { LOG_ERROR("MQTT client not connected"); return false; } if (!ArduinoJson::serializeJson(obj, message)) { LOG_ERROR("MQTT failed to serialize object"); return false; } if (m_client.publish(topic.c_str(), message.c_str())) { LOG_DEBUG("MQTT published topic [", topic.c_str(), "] - message [", message.c_str(), "]"); return true; } LOG_ERROR("MQTT failed to publish topic [", topic.c_str(), "] - message [", message.c_str(), "]"); return false; } void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length) { std::string pl; pl.resize(length + 1); std::snprintf(pl.data(), length + 1, "%s", payload); auto inst = getInstance(); if (inst) { inst->onMessage(std::string(topic), pl); return; } LOG_ERROR("MQTT no client instance set"); return; } void MQTTwrapper::onMessage(const std::string topic, const std::string message) { ArduinoJson::JsonDocument obj; LOG_DEBUG("MQTT received topic [", topic.c_str(), "] - message [", message.c_str(), "]"); if (ArduinoJson::deserializeJson(obj, message) == ArduinoJson::DeserializationError::Ok) { m_actionMap[topic](obj); return; } LOG_ERROR("MQTT failed to deserialize message\n", message.c_str()); return; } void MQTTwrapper::clientLoop(void *params) { auto wrapper = (MQTTwrapper *)(params); auto &client = wrapper->m_client; auto &config = wrapper->m_config; auto &stateMap = wrapper->stateMap; const auto loopTime = config.m_mqttLoopTime; const auto mqttRetries = config.m_mqttRetries; const auto clientName = config.m_mqttClientName; uint8_t connectAttempt(0); LOG_INFO("MQTT starting client loop"); while (connectAttempt++ < mqttRetries) { while (client.connected()) { client.loop(); delay(loopTime); } if (client.state() != MQTT_CONNECTED) { LOG_ERROR("MQTT disconnect reason ", stateMap.at(client.state()).c_str()); delay(loopTime * 50); const bool ok = client.connect(clientName.c_str()); LOG_WARN("MQTT reconnected", ok ? "True" : "False"); if (ok) { for (auto &v : wrapper->m_actionMap) { const std::string &topic(v.first); LOG_WARN("MQTT resubscribing to", topic.c_str()); if (!wrapper->m_client.subscribe(topic.c_str())) { LOG_ERROR("Unable to resubscribe to", topic.c_str()); } } connectAttempt = 0; } } } LOG_ERROR("MQTT client loop terminated, disconnected"); wrapper->m_loopHandle = NULL; vTaskDelete(NULL); // delete the current task }