diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 63d4f6a..ca5216f 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -44,7 +44,7 @@ const bool MQTTwrapper::disconnect() return true; } -const bool MQTTwrapper::subscribe(const topic_t &topic, const action_t action) +const bool MQTTwrapper::subscribe(const Topic &topic, const ActionCallback action) { if (m_actionMap.contains(topic)) { @@ -61,7 +61,7 @@ const bool MQTTwrapper::subscribe(const topic_t &topic, const action_t action) return false; } -const bool MQTTwrapper::unsubscribe(const topic_t &topic) +const bool MQTTwrapper::unsubscribe(const Topic &topic) { if (!m_actionMap.contains(topic)) { @@ -83,7 +83,7 @@ const bool MQTTwrapper::connected() return m_loopHandle != NULL; } -const bool MQTTwrapper::publish(const topic_t &topic, const ArduinoJson::JsonDocument obj) +const bool MQTTwrapper::publish(const Topic &topic, const ArduinoJson::JsonDocument obj) { std::string message; if (!m_client.connected()) @@ -99,12 +99,32 @@ const bool MQTTwrapper::publish(const topic_t &topic, const ArduinoJson::JsonDoc if (m_client.publish(topic.c_str(), message.c_str())) { LOG_DEBUG("MQTT published topic [", topic.c_str(), "] - message [", message.c_str(), "]"); + if (m_onPublish) + { + m_onPublish(topic, message); + } return true; } LOG_ERROR("MQTT failed to publish topic [", topic.c_str(), "] - message [", message.c_str(), "]"); return false; } +void MQTTwrapper::setOnMessageCb(MessageCallback cb) +{ + if (cb) + m_onReceive = cb; + else + LOG_ERROR("MQTT invalid onReceive Callback"); +} + +void MQTTwrapper::setOnPublishCb(MessageCallback cb) +{ + if (cb) + m_onPublish = cb; + else + LOG_ERROR("MQTT invalid onPublish Callback"); +} + void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length) { std::string pl; @@ -120,13 +140,15 @@ void MQTTwrapper::callback(char *topic, uint8_t *payload, unsigned int length) return; } -void MQTTwrapper::onMessage(const std::string topic, const std::string message) +void MQTTwrapper::onMessage(const Topic topic, const Message message) { ArduinoJson::JsonDocument obj; LOG_DEBUG("MQTT received topic [", topic.c_str(), "] - message [", message.c_str(), "]"); if (ArduinoJson::deserializeJson(obj, message) == ArduinoJson::DeserializationError::Ok) { m_actionMap[topic](obj); + if (m_onReceive) + m_onReceive(topic, message); return; } LOG_ERROR("MQTT failed to deserialize message\n", message.c_str()); diff --git a/src/mqtt.h b/src/mqtt.h index f199ac5..be90107 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -13,12 +13,16 @@ #include #include -typedef std::string topic_t; -typedef std::function action_t; // the actions receive a JsonObject containing the received message -typedef std::map action_map_t; - class MQTTwrapper { +public: + using Topic = std::string; + using Message = std::string; + using MessageCallback = std::function; + using ActionCallback = std::function; // the actions receive a JsonObject containing the received message + using StateChangeCallback = std::function; + + using ActionMap = std::map; private: const std::map stateMap = { @@ -31,8 +35,7 @@ private: {2, "MQTT_CONNECT_BAD_CLIENT_ID"}, {3, "MQTT_CONNECT_UNAVAILABLE"}, {4, "MQTT_CONNECT_BAD_CREDENTIALS"}, - {5, "MQTT_CONNECT_UNAUTHORIZED"} - }; + {5, "MQTT_CONNECT_UNAUTHORIZED"}}; private: static MQTTwrapper * @@ -54,10 +57,13 @@ public: const bool disconnect(); const bool connected(); - const bool subscribe(const topic_t &topic, const action_t action); - const bool unsubscribe(const topic_t &topic); + const bool subscribe(const Topic &topic, const ActionCallback action); + const bool unsubscribe(const Topic &topic); - const bool publish(const topic_t &topic, const ArduinoJson::JsonDocument obj); + const bool publish(const Topic &topic, const ArduinoJson::JsonDocument obj); + + void setOnMessageCb(MessageCallback cb); + void setOnPublishCb(MessageCallback cb); private: static void callback(char *topic, uint8_t *payload, unsigned int length); // C-style callback only to invoke onMessage @@ -68,8 +74,11 @@ private: private: const Config &m_config; - action_map_t m_actionMap; + ActionMap m_actionMap; NetworkClient m_tcp; PubSubClient m_client; TaskHandle_t m_loopHandle; + + MessageCallback m_onPublish; + MessageCallback m_onReceive; };