mqtt wrapper first version working

This commit is contained in:
Emanuele Trabattoni
2025-07-18 02:00:58 +02:00
parent 52a89e58f7
commit e8f395f8ef
7 changed files with 217 additions and 195 deletions

136
src/mqtt.cpp Normal file
View File

@@ -0,0 +1,136 @@
#include <mqtt.h>
#define STACK_DEPTH 4096
#define PRIOTITY 1
MQTTwrapper::MQTTwrapper() : m_config(Config::getInstance()), m_tcp(NetworkClient()), m_client(PubSubClient(m_tcp)), m_loopHandle(NULL)
{
m_client.setServer(m_config.m_mqttHost.c_str(), m_config.m_mqttPort);
m_client.setKeepAlive(15);
getInstance(this);
}
MQTTwrapper::~MQTTwrapper()
{
disconnect();
}
const bool MQTTwrapper::connect()
{
if (!m_client.connect(m_config.m_mqttClientName.c_str()))
{
LOG_ERROR("Unable to connect to MQTT Host", m_config.m_mqttHost.c_str());
return false;
}
LOG_INFO("MQTT client connected to", m_config.m_mqttHost.c_str());
if (m_loopHandle == NULL)
{
xTaskCreate(clientLoop, "mqttLoop", STACK_DEPTH, this, PRIOTITY, &m_loopHandle);
m_client.setCallback(MQTTwrapper::callback);
}
return true;
}
const bool MQTTwrapper::disconnect()
{
m_client.disconnect();
if (m_loopHandle)
{
vTaskDelete(m_loopHandle); // immediate terminate loop
}
return true;
}
const bool MQTTwrapper::subscribe(topic_t topic, action_t action)
{
if (m_actionMap.contains(topic))
{
LOG_WARN("MQTT was already subscribed to", topic.c_str());
return true;
}
if (m_client.subscribe(topic.c_str()))
{
m_actionMap[topic] = action;
LOG_INFO("MQTT subscribed to", topic.c_str());
return true;
}
LOG_ERROR("MQTT unable to subscribe to", topic.c_str());
return false;
}
const bool MQTTwrapper::unsubscribe(topic_t topic)
{
if (!m_actionMap.contains(topic))
{
LOG_WARN("MQTT was NOT subscribed to", topic.c_str());
return false;
}
if (m_client.unsubscribe(topic.c_str()))
{
LOG_INFO("MQTT unsubscribed to", topic.c_str());
m_actionMap.erase(topic);
return true;
}
LOG_ERROR("MQTT unable to unsubscribe to", topic.c_str());
return false;
}
const bool MQTTwrapper::publish(topic_t topic, const ArduinoJson::JsonDocument obj)
{
std::string message;
if (!ArduinoJson::serializeJson(obj, message))
{
LOG_ERROR("MQTT failed to serialize object");
return false;
}
if (m_client.publish(topic.c_str(), message.c_str()))
{
LOG_DEBUG("MQTT published topic [", topic.c_str(), "] - message [", message.c_str(), "]");
return true;
}
LOG_ERROR("MQTT failed to publish topic [", topic.c_str(), "] - message [", message.c_str(), "]");
return false;
}
void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length)
{
std::string pl;
pl.resize(length+1);
std::snprintf(pl.data(), length+1, "%s", payload);
auto inst = getInstance();
if (inst)
{
inst->onMessage(std::string(topic), pl);
return;
}
LOG_ERROR("MQTT no client instance set");
return;
}
void MQTTwrapper::onMessage(const std::string topic, const std::string message)
{
ArduinoJson::JsonDocument obj;
LOG_DEBUG("MQTT received topic [", topic.c_str(), "] - message [", message.c_str(), "]");
if (ArduinoJson::deserializeJson(obj, message) == ArduinoJson::DeserializationError::Ok)
{
m_actionMap[topic](obj);
return;
}
LOG_ERROR("MQTT failed to deserialize message\n", message.c_str());
return;
}
void MQTTwrapper::clientLoop(void *params)
{
auto client = (MQTTwrapper *)(params);
auto loopTime = client->m_config.m_mqttLoopTime;
LOG_INFO("Starting MQTT client loop");
while (client->m_client.connected())
{
client->m_client.loop();
vTaskDelay(pdMS_TO_TICKS(loopTime));
}
LOG_ERROR("MQTT client loop terminated, disconnected");
client->m_loopHandle = NULL;
vTaskDelete(NULL); // delete the current task
}