diff --git a/Software/Software.ino b/Software/Software.ino index 832b6938..2ec566d6 100644 --- a/Software/Software.ino +++ b/Software/Software.ino @@ -71,6 +71,7 @@ uint16_t stat_batt_power = 0; // Power going in/out of battery uint16_t cell_max_voltage = 3700; // Stores the highest cell voltage value in the system uint16_t cell_min_voltage = 3700; // Stores the minimum cell voltage value in the system uint16_t cellvoltages[120]; // Stores all cell voltages +uint8_t nof_cellvoltages = 0; // Total number of cell voltages, set by each battery. bool LFP_Chemistry = false; // Common charger parameters @@ -141,6 +142,10 @@ void setup() { inform_user_on_inverter(); inform_user_on_battery(); + +#ifdef BATTERY_HAS_INIT + init_battery(); +#endif } // Perform main program functions @@ -150,6 +155,9 @@ void loop() { // Over-the-air updates by ElegantOTA ElegantOTA.loop(); WiFi_monitor_loop(); +#ifdef MQTT + mqtt_loop(); +#endif #endif // Input diff --git a/Software/USER_SETTINGS.cpp b/Software/USER_SETTINGS.cpp index 2b513933..9d826a48 100644 --- a/Software/USER_SETTINGS.cpp +++ b/Software/USER_SETTINGS.cpp @@ -16,6 +16,12 @@ volatile uint16_t MAXDISCHARGEAMP = 300; //30.0A , BYD CAN specific setting, Max discharge speed in Amp (Some inverters needs to be artificially limited) /* Charger settings (Optional, when generator charging) */ +// MQTT +#ifdef MQTT +const char* mqtt_user = "REDACTED"; +const char* mqtt_password = "REDACTED"; +#endif // USE_MQTT +/* Charger settings */ volatile float CHARGER_SET_HV = 384; // Reasonably appropriate 4.0v per cell charging of a 96s pack volatile float CHARGER_MAX_HV = 420; // Max permissible output (VDC) of charger volatile float CHARGER_MIN_HV = 200; // Min permissible output (VDC) of charger diff --git a/Software/USER_SETTINGS.h b/Software/USER_SETTINGS.h index 506806f8..0b4fe1c1 100644 --- a/Software/USER_SETTINGS.h +++ b/Software/USER_SETTINGS.h @@ -21,7 +21,7 @@ /* Select inverter communication protocol. See Wiki for which to use with your inverter: https://github.com/dalathegreat/BYD-Battery-Emulator-For-Gen24/wiki */ //#define BYD_CAN //Enable this line to emulate a "BYD Battery-Box Premium HVS" over CAN Bus -//#define BYD_MODBUS //Enable this line to emulate a "BYD 11kWh HVM battery" over Modbus RTU +//#define BYD_MODBUS //Enable this line to emulate a "BYD 11kWh HVM battery" over Modbus RTU //#define LUNA2000_MODBUS //Enable this line to emulate a "Luna2000 battery" over Modbus RTU //#define PYLON_CAN //Enable this line to emulate a "Pylontech battery" over CAN bus //#define SMA_CAN //Enable this line to emulate a "BYD Battery-Box H 8.9kWh, 7 mod" over CAN bus @@ -30,14 +30,21 @@ /* Other options */ #define DEBUG_VIA_USB //Enable this line to have the USB port output serial diagnostic data while program runs -//define INTERLOCK_REQUIRED //Nissan LEAF specific setting, if enabled requires both high voltage conenctors to be seated before starting +//#define INTERLOCK_REQUIRED //Nissan LEAF specific setting, if enabled requires both high voltage conenctors to be seated before starting //#define CONTACTOR_CONTROL //Enable this line to have pins 25,32,33 handle automatic precharge/contactor+/contactor- closing sequence //#define PWM_CONTACTOR_CONTROL //Enable this line to use PWM logic for contactors, which lower power consumption and heat generation //#define DUAL_CAN //Enable this line to activate an isolated secondary CAN Bus using add-on MCP2515 controller (Needed for FoxESS inverters) //#define SERIAL_LINK_RECEIVER //Enable this line to receive battery data over RS485 pins from another Lilygo (This LilyGo interfaces with inverter) //#define SERIAL_LINK_TRANSMITTER //Enable this line to send battery data over RS485 pins to another Lilygo (This LilyGo interfaces with battery) #define WEBSERVER //Enable this line to enable WiFi, and to run the webserver. See USER_SETTINGS.cpp for the Wifi settings. -#define LOAD_SAVED_SETTINGS_ON_BOOT //Enable this line to read settings stored via the webserver on boot +//#define LOAD_SAVED_SETTINGS_ON_BOOT //Enable this line to read settings stored via the webserver on boot + +/* MQTT options */ +//#define MQTT // Enable this line to enable MQTT +#define MQTT_SUBSCRIPTIONS \ + { "my/topic/abc", "my/other/topic" } +#define MQTT_SERVER "192.168.xxx.yyy" +#define MQTT_PORT 1883 /* Select charger used (Optional) */ //#define CHEVYVOLT_CHARGER //Enable this line to control a Chevrolet Volt charger connected to battery - for example, when generator charging or using an inverter without a charging function. diff --git a/Software/src/battery/BATTERIES.h b/Software/src/battery/BATTERIES.h index 87c1c8d6..2f823ab3 100644 --- a/Software/src/battery/BATTERIES.h +++ b/Software/src/battery/BATTERIES.h @@ -1,6 +1,8 @@ #ifndef BATTERIES_H #define BATTERIES_H +#include "../../USER_SETTINGS.h" + #ifdef BMW_I3_BATTERY #include "BMW-I3-BATTERY.h" //See this file for more i3 battery settings #endif @@ -19,6 +21,7 @@ #ifdef NISSAN_LEAF_BATTERY #include "NISSAN-LEAF-BATTERY.h" //See this file for more LEAF battery settings +#define BATTERY_HAS_INIT #endif #ifdef RENAULT_KANGOO_BATTERY diff --git a/Software/src/battery/NISSAN-LEAF-BATTERY.cpp b/Software/src/battery/NISSAN-LEAF-BATTERY.cpp index 9d20e2b9..17a28679 100644 --- a/Software/src/battery/NISSAN-LEAF-BATTERY.cpp +++ b/Software/src/battery/NISSAN-LEAF-BATTERY.cpp @@ -1,4 +1,7 @@ #include "NISSAN-LEAF-BATTERY.h" +#ifdef MQTT +#include "../devboard/mqtt/mqtt.h" +#endif #include "../lib/miwagner-ESP32-Arduino-CAN/CAN_config.h" #include "../lib/miwagner-ESP32-Arduino-CAN/ESP32CAN.h" @@ -936,3 +939,7 @@ uint16_t Temp_fromRAW_to_F(uint16_t temperature) { //This function feels horrib } return static_cast(1094 + (309 - temperature) * 2.5714285714285715); } + +void init_battery(void) { + nof_cellvoltages = 96; +} diff --git a/Software/src/battery/NISSAN-LEAF-BATTERY.h b/Software/src/battery/NISSAN-LEAF-BATTERY.h index 046bd3ca..fa88a959 100644 --- a/Software/src/battery/NISSAN-LEAF-BATTERY.h +++ b/Software/src/battery/NISSAN-LEAF-BATTERY.h @@ -27,6 +27,7 @@ extern uint16_t cell_max_voltage; //mV, 0-4350 extern uint16_t cell_min_voltage; //mV, 0-4350 extern uint8_t LEDcolor; //Enum, 0-10 extern uint16_t cellvoltages[120]; //mV 0-4350 per cell +extern uint8_t nof_cellvoltages; // Total number of cell voltages, set by each battery. extern bool batteryAllowsContactorClosing; //Bool, 1=true, 0=false void update_values_leaf_battery(); @@ -36,4 +37,6 @@ uint16_t convert2unsignedint16(int16_t signed_value); uint16_t Temp_fromRAW_to_F(uint16_t temperature); bool is_message_corrupt(CAN_frame_t rx_frame); +void init_battery(void); + #endif diff --git a/Software/src/battery/TESLA-MODEL-3-BATTERY.cpp b/Software/src/battery/TESLA-MODEL-3-BATTERY.cpp index aded7618..59aa7c18 100644 --- a/Software/src/battery/TESLA-MODEL-3-BATTERY.cpp +++ b/Software/src/battery/TESLA-MODEL-3-BATTERY.cpp @@ -455,6 +455,8 @@ void receive_can_tesla_model_3_battery(CAN_frame_t rx_frame) { mux = (rx_frame.data.u8[0]); static uint16_t volts; + static uint8_t mux_zero_counter = 0u; + static uint8_t mux_max = 0u; if (rx_frame.data.u8[1] == 0x2A) // status byte must be 0x2A to read cellvoltages { @@ -465,6 +467,19 @@ void receive_can_tesla_model_3_battery(CAN_frame_t rx_frame) { cellvoltages[1 + mux * 3] = volts; volts = ((rx_frame.data.u8[7] << 8) | rx_frame.data.u8[6]) / 10; cellvoltages[2 + mux * 3] = volts; + + // Track the max value of mux. If we've seen two 0 values for mux, we've probably gathered all + // cell voltages. Then, 2 + mux_max * 3 + 1 is the number of cell voltages. + mux_max = (mux > mux_max) ? mux : mux_max; + if (mux_zero_counter < 2 && mux == 0u) { + mux_zero_counter++; + if (mux_zero_counter == 2u) { + // The max index will be 2 + mux_max * 3 (see above), so "+ 1" for the number of cells + nof_cellvoltages = 2 + 3 * mux_max + 1; + // Increase the counter arbitrarily another time to make the initial if-statement evaluate to false + mux_zero_counter++; + } + } } break; case 0x2d2: diff --git a/Software/src/battery/TESLA-MODEL-3-BATTERY.h b/Software/src/battery/TESLA-MODEL-3-BATTERY.h index 4de5457e..a10d6cc7 100644 --- a/Software/src/battery/TESLA-MODEL-3-BATTERY.h +++ b/Software/src/battery/TESLA-MODEL-3-BATTERY.h @@ -29,6 +29,7 @@ extern uint16_t temperature_max; //C+1, Goes thru convert2unsignedint16 func extern uint16_t cell_max_voltage; //mV, 0-4350 extern uint16_t cell_min_voltage; //mV, 0-4350 extern uint16_t cellvoltages[120]; //mV 0-4350 per cell +extern uint8_t nof_cellvoltages; // Total number of cell voltages, set by each battery. extern uint8_t LEDcolor; //Enum, 0-10 extern bool batteryAllowsContactorClosing; //Bool, 1=true, 0=false extern bool inverterAllowsContactorClosing; //Bool, 1=true, 0=false diff --git a/Software/src/devboard/mqtt/mqtt.cpp b/Software/src/devboard/mqtt/mqtt.cpp new file mode 100644 index 00000000..7b7de371 --- /dev/null +++ b/Software/src/devboard/mqtt/mqtt.cpp @@ -0,0 +1,185 @@ +#include "mqtt.h" +#include +#include +#include +#include "../../../USER_SETTINGS.h" +#include "../../battery/BATTERIES.h" +#include "../../lib/knolleary-pubsubclient/PubSubClient.h" +#include "../utils/timer.h" + +const char* mqtt_subscriptions[] = MQTT_SUBSCRIPTIONS; +const size_t mqtt_nof_subscriptions = sizeof(mqtt_subscriptions) / sizeof(mqtt_subscriptions[0]); + +WiFiClient espClient; +PubSubClient client(espClient); +char mqtt_msg[MQTT_MSG_BUFFER_SIZE]; +int value = 0; +static unsigned long previousMillisUpdateVal; +MyTimer publish_global_timer(5000); + +static void publish_common_info(void); +static void publish_cell_voltages(void); + +/** Publish global values and call callbacks for specific modules */ +static void publish_values(void) { + publish_common_info(); + publish_cell_voltages(); +} + +static void publish_common_info(void) { + snprintf(mqtt_msg, sizeof(mqtt_msg), + "{\n" + " \"SOC\": %.3f,\n" + " \"StateOfHealth\": %.3f,\n" + " \"temperature_min\": %.3f,\n" + " \"temperature_max\": %.3f,\n" + " \"cell_max_voltage\": %d,\n" + " \"cell_min_voltage\": %d\n" + "}\n", + ((float)SOC) / 100.0, ((float)StateOfHealth) / 100.0, ((float)((int16_t)temperature_min)) / 10.0, + ((float)((int16_t)temperature_max)) / 10.0, cell_max_voltage, cell_min_voltage); + bool result = client.publish("battery/info", mqtt_msg, true); + //Serial.println(mqtt_msg); // Uncomment to print the payload on serial +} + +static void publish_cell_voltages(void) { + static bool mqtt_first_transmission = true; + + // If the cell voltage number isn't initialized... + if (nof_cellvoltages == 0u) { + return; + } + // At startup, re-post the discovery message for home assistant + if (mqtt_first_transmission == true) { + mqtt_first_transmission = false; + + // Base topic for any cell voltage "sensor" + String topic = "homeassistant/sensor/battery-emulator/cell_voltage"; + for (int i = 0; i < nof_cellvoltages; i++) { + // Build JSON message with device configuration for each cell voltage + // Probably shouldn't be BatteryEmulator here, instead "LeafBattery" + // or similar but hey, it works. + // mqtt_msg is a global buffer, should be fine since we run too much + // in a single thread :) + snprintf(mqtt_msg, sizeof(mqtt_msg), + "{" + "\"device\": {" + "\"identifiers\": [" + "\"battery-emulator\"" + "]," + "\"manufacturer\": \"DalaTech\"," + "\"model\": \"BatteryEmulator\"," + "\"name\": \"BatteryEmulator\"" + "}," + "\"device_class\": \"voltage\"," + "\"enabled_by_default\": true," + "\"object_id\": \"sensor_battery_voltage_cell%d\"," + "\"origin\": {" + "\"name\": \"BatteryEmulator\"," + "\"sw\": \"4.4.0-mqtt\"," + "\"url\": \"https://github.com/dalathegreat/Battery-Emulator\"" + "}," + "\"state_class\": \"measurement\"," + "\"name\": \"Battery Cell Voltage %d\"," + "\"state_topic\": \"battery/spec_data\"," + "\"unique_id\": \"battery-emulator_battery_voltage_cell%d\"," + "\"unit_of_measurement\": \"V\"," + "\"value_template\": \"{{ value_json.cell_voltages[%d] }}\"" + "}", + i + 1, i + 1, i + 1, i); + // End each discovery topic with cell number and '/config' + String cell_topic = topic + String(i + 1) + "/config"; + mqtt_publish_retain(cell_topic.c_str()); + } + } else { + // Every 5-ish seconds, build the JSON payload for the state topic. This requires + // some annoying formatting due to C++ not having nice Python-like string formatting. + // msg_length is a cumulative variable to track start position (param 1) and for + // modifying the maxiumum amount of characters to write (param 2). The third parameter + // is the string content + + // If cell voltages haven't been populated... + if (cellvoltages[0] == 0u) { + return; + } + + size_t msg_length = snprintf(mqtt_msg, sizeof(mqtt_msg), "{\n\"cell_voltages\":["); + for (size_t i = 0; i < nof_cellvoltages; ++i) { + msg_length += + snprintf(mqtt_msg + msg_length, sizeof(mqtt_msg) - msg_length, "%s%d", (i == 0) ? "" : ", ", cellvoltages[i]); + } + snprintf(mqtt_msg + msg_length, sizeof(mqtt_msg) - msg_length, "]\n}\n"); + + // Publish and print error if not OK + if (mqtt_publish_retain("battery/spec_data") == false) { + Serial.println("Cell voltage MQTT msg could not be sent"); + } + } +} + +/* This is called whenever a subscribed topic changes (hopefully) */ +static void callback(char* topic, byte* payload, unsigned int length) { + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (unsigned int i = 0; i < length; i++) { + Serial.print((char)payload[i]); + } + Serial.println(); +} + +/* If we lose the connection, get it back and re-sub */ +static void reconnect() { + // attempt one reconnection + Serial.print("Attempting MQTT connection... "); + // Create a random client ID + String clientId = "LilyGoClient-"; + clientId += String(random(0xffff), HEX); + // Attempt to connect + if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) { + Serial.println("connected"); + + for (int i = 0; i < mqtt_nof_subscriptions; i++) { + client.subscribe(mqtt_subscriptions[i]); + Serial.print("Subscribed to: "); + Serial.println(mqtt_subscriptions[i]); + } + } else { + Serial.print("failed, rc="); + Serial.print(client.state()); + Serial.println(" try again in 5 seconds"); + // Wait 5 seconds before retrying + } +} + +void init_mqtt(void) { + client.setServer(MQTT_SERVER, MQTT_PORT); + client.setCallback(callback); + Serial.println("MQTT initialized"); + + previousMillisUpdateVal = millis(); + reconnect(); +} + +void mqtt_loop(void) { + if (client.connected()) { + client.loop(); + if (publish_global_timer.elapsed() == true) // Every 5s + { + publish_values(); // Update values heading towards inverter. Prepare for sending on CAN, or write directly to Modbus. + } + } else { + if (millis() - previousMillisUpdateVal >= 5000) // Every 5s + { + previousMillisUpdateVal = millis(); + reconnect(); // Update values heading towards inverter. Prepare for sending on CAN, or write directly to Modbus. + } + } +} + +bool mqtt_publish_retain(const char* topic) { + if (client.connected() == true) { + return client.publish(topic, mqtt_msg, true); + } + return false; +} diff --git a/Software/src/devboard/mqtt/mqtt.h b/Software/src/devboard/mqtt/mqtt.h new file mode 100644 index 00000000..b23b5ca6 --- /dev/null +++ b/Software/src/devboard/mqtt/mqtt.h @@ -0,0 +1,60 @@ +/** + * MQTT add-on for the battery emulator + * + * Usage: + * + * Subscription - Add topics to MQTT_SUBSCRIPTIONS in USER_SETTINGS.h and handle the messages in mqtt.cpp:callback() + * + * Publishing - See example in mqtt.cpp:publish_values() for constructing the payload + * + * Home assistant - See below for an example, and the official documentation is quite good (https://www.home-assistant.io/integrations/sensor.mqtt/) + * in configuration.yaml: + * mqtt: !include mqtt.yaml + * + * in mqtt.yaml: + * sensor: + * - name: "Cell max" + * state_topic: "battery/info" + * unit_of_measurement: "mV" + * value_template: "{{ value_json.cell_max_voltage | int }}" + * - name: "Cell min" + * state_topic: "battery/info" + * unit_of_measurement: "mV" + * value_template: "{{ value_json.cell_min_voltage | int }}" + * - name: "Temperature max" + * state_topic: "battery/info" + * unit_of_measurement: "C" + * value_template: "{{ value_json.temperature_max | float }}" + * - name: "Temperature min" + * state_topic: "battery/info" + * unit_of_measurement: "C" + * value_template: "{{ value_json.temperature_min | float }}" + */ + +#ifndef __MQTT_H__ +#define __MQTT_H__ + +#include +#include "../../../USER_SETTINGS.h" + +#define MQTT_MSG_BUFFER_SIZE (1024) + +extern uint16_t SOC; +extern uint16_t StateOfHealth; +extern uint16_t temperature_min; //C+1, Goes thru convert2unsignedint16 function (15.0C = 150, -15.0C = 65385) +extern uint16_t temperature_max; //C+1, Goes thru convert2unsignedint16 function (15.0C = 150, -15.0C = 65385) +extern uint16_t cell_max_voltage; //mV, 0-4350 +extern uint16_t cell_min_voltage; //mV, 0-4350 +extern uint16_t cellvoltages[120]; //mV 0-4350 per cell +extern uint8_t nof_cellvoltages; // Total number of cell voltages, set by each battery. + +extern const char* mqtt_user; +extern const char* mqtt_password; + +extern char mqtt_msg[MQTT_MSG_BUFFER_SIZE]; + +void init_mqtt(void); +void mqtt_loop(void); +bool mqtt_publish_retain(const char* topic); + +#endif diff --git a/Software/src/devboard/utils/timer.cpp b/Software/src/devboard/utils/timer.cpp new file mode 100644 index 00000000..5b4d59bf --- /dev/null +++ b/Software/src/devboard/utils/timer.cpp @@ -0,0 +1,12 @@ +#include "timer.h" + +MyTimer::MyTimer(unsigned long interval) : interval(interval), previousMillis(0) {} + +bool MyTimer::elapsed() { + unsigned long currentMillis = millis(); + if (currentMillis - previousMillis >= interval) { + previousMillis = currentMillis; + return true; + } + return false; +} diff --git a/Software/src/devboard/utils/timer.h b/Software/src/devboard/utils/timer.h new file mode 100644 index 00000000..829ea1e5 --- /dev/null +++ b/Software/src/devboard/utils/timer.h @@ -0,0 +1,18 @@ +#ifndef __MYTIMER_H__ +#define __MYTIMER_H__ + +#include + +class MyTimer { + public: + /** interval in ms */ + MyTimer(unsigned long interval); + /** Returns true and resets the timer if it has elapsed */ + bool elapsed(); + + private: + unsigned long interval; + unsigned long previousMillis; +}; + +#endif // __MYTIMER_H__ diff --git a/Software/src/devboard/webserver/webserver.cpp b/Software/src/devboard/webserver/webserver.cpp index 21056f0e..039257a6 100644 --- a/Software/src/devboard/webserver/webserver.cpp +++ b/Software/src/devboard/webserver/webserver.cpp @@ -242,6 +242,11 @@ void init_webserver() { // Start server server.begin(); + +#ifdef MQTT + // Init MQTT + init_mqtt(); +#endif } void WiFi_monitor_loop() { diff --git a/Software/src/devboard/webserver/webserver.h b/Software/src/devboard/webserver/webserver.h index 1aca876d..f9043c6f 100644 --- a/Software/src/devboard/webserver/webserver.h +++ b/Software/src/devboard/webserver/webserver.h @@ -5,10 +5,16 @@ #include #include "../../../USER_SETTINGS.h" // Needed for WiFi ssid and password #include "../../lib/ayushsharma82-ElegantOTA/src/ElegantOTA.h" +#ifdef MQTT +#include "../../lib/knolleary-pubsubclient/PubSubClient.h" +#endif #include "../../lib/me-no-dev-AsyncTCP/src/AsyncTCP.h" #include "../../lib/me-no-dev-ESPAsyncWebServer/src/ESPAsyncWebServer.h" #include "../../lib/miwagner-ESP32-Arduino-CAN/ESP32CAN.h" #include "../config.h" // Needed for LED defines +#ifdef MQTT +#include "../mqtt/mqtt.h" +#endif extern uint16_t SOC; //SOC%, 0-100.00 (0-10000) extern uint16_t StateOfHealth; //SOH%, 0-100.00 (0-10000) diff --git a/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp b/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp new file mode 100644 index 00000000..2b48d2b6 --- /dev/null +++ b/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp @@ -0,0 +1,769 @@ +/* + + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include "Arduino.h" + +PubSubClient::PubSubClient() { + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(Client& client) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::~PubSubClient() { + free(this->buffer); +} + +boolean PubSubClient::connect(const char *id) { + return connect(id,NULL,NULL,0,0,0,0,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { + return connect(id,user,pass,0,0,0,0,1); +} + +boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { + if (!connected()) { + int result = 0; + + + if(_client->connected()) { + result = 1; + } else { + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + } + + if (result == 1) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + unsigned int j; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (j = 0;jbuffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x04|(willQos<<3)|(willRetain<<5); + } else { + v = 0x00; + } + if (cleanSession) { + v = v|0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + this->buffer[length++] = v; + + this->buffer[length++] = ((this->keepAlive) >> 8); + this->buffer[length++] = ((this->keepAlive) & 0xFF); + + CHECK_STRING_LENGTH(length,id) + length = writeString(id,this->buffer,length); + if (willTopic) { + CHECK_STRING_LENGTH(length,willTopic) + length = writeString(willTopic,this->buffer,length); + CHECK_STRING_LENGTH(length,willMessage) + length = writeString(willMessage,this->buffer,length); + } + + if(user != NULL) { + CHECK_STRING_LENGTH(length,user) + length = writeString(user,this->buffer,length); + if(pass != NULL) { + CHECK_STRING_LENGTH(length,pass) + length = writeString(pass,this->buffer,length); + } + } + + write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } + } + uint8_t llen; + uint32_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } + } + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; + } + return false; + } + return true; +} + +// reads a byte into result +boolean PubSubClient::readByte(uint8_t * result) { + uint32_t previousMillis = millis(); + while(!_client->available()) { + yield(); + uint32_t currentMillis = millis(); + if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){ + return false; + } + } + *result = _client->read(); + return true; +} + +// reads a byte into result[*index] and increments index +boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ + uint16_t current_index = *index; + uint8_t * write_address = &(result[current_index]); + if(readByte(write_address)){ + *index = current_index + 1; + return true; + } + return false; +} + +uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; + if(!readByte(this->buffer, &len)) return 0; + bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint32_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint32_t start = 0; + + do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + _state = MQTT_DISCONNECTED; + _client->stop(); + return 0; + } + if(!readByte(&digit)) return 0; + this->buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier <<=7; //multiplier *= 128 + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + if(!readByte(this->buffer, &len)) return 0; + if(!readByte(this->buffer, &len)) return 0; + skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; + start = 2; + if (this->buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + uint32_t idx = len; + + for (uint32_t i = start;istream) { + if (isPublish && idx-*lengthLength-2>skip) { + this->stream->write(digit); + } + } + + if (len < this->bufferSize) { + this->buffer[len] = digit; + len++; + } + idx++; + } + + if (!this->stream && idx > this->bufferSize) { + len = 0; // This will cause the packet to be ignored. + } + return len; +} + +boolean PubSubClient::loop() { + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + this->buffer[0] = MQTTPINGREQ; + this->buffer[1] = 0; + _client->write(this->buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = this->buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ + memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ + this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) this->buffer+llen+2; + // msgId only present for QOS>0 + if ((this->buffer[0]&0x06) == MQTTQOS1) { + msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; + payload = this->buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + this->buffer[0] = MQTTPUBACK; + this->buffer[1] = 2; + this->buffer[2] = (msgId >> 8); + this->buffer[3] = (msgId & 0xFF); + _client->write(this->buffer,4); + lastOutActivity = t; + + } else { + payload = this->buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + this->buffer[0] = MQTTPINGRESP; + this->buffer[1] = 0; + _client->write(this->buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } else if (!connected()) { + // readPacket has closed the connection + return false; + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(const char* topic, const char* payload) { + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); +} + +boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + + // Add payload + uint16_t i; + for (i=0;ibuffer[length++] = payload[i]; + } + + // Write the header + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); +} + +boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + uint8_t llen = 0; + uint8_t digit; + unsigned int rc = 0; + uint16_t tlen; + unsigned int pos = 0; + unsigned int i; + uint8_t header; + unsigned int len; + int expectedLength; + + if (!connected()) { + return false; + } + + tlen = strnlen(topic, this->bufferSize); + + header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + this->buffer[pos++] = header; + len = plength + 2 + tlen; + do { + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; + } + this->buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,this->buffer,pos); + + rc += _client->write(this->buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + + expectedLength = 1 + llen + 2 + tlen + plength; + + return (rc == expectedLength); +} + +boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { + if (connected()) { + // Send the header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + lastOutActivity = millis(); + return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); + } + return false; +} + +int PubSubClient::endPublish() { + return 1; +} + +size_t PubSubClient::write(uint8_t data) { + lastOutActivity = millis(); + return _client->write(data); +} + +size_t PubSubClient::write(const uint8_t *buffer, size_t size) { + lastOutActivity = millis(); + return _client->write(buffer,size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint16_t len = length; + do { + + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;i 0) && result) { + bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; + rc = _client->write(writeBuf,bytesToWrite); + result = (rc == bytesToWrite); + bytesRemaining -= rc; + writeBuf += rc; + } + return result; +#else + rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); + lastOutActivity = millis(); + return (rc == hlen+length); +#endif +} + +boolean PubSubClient::subscribe(const char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (qos > 1) { + return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->buffer,length); + this->buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +boolean PubSubClient::unsubscribe(const char* topic) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +void PubSubClient::disconnect() { + this->buffer[0] = MQTTDISCONNECT; + this->buffer[1] = 0; + _client->write(this->buffer,2); + _state = MQTT_DISCONNECTED; + _client->flush(); + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + boolean rc; + if (_client == NULL ) { + rc = false; + } else { + rc = (int)_client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + _client->flush(); + _client->stop(); + } + } else { + return this->_state == MQTT_CONNECTED; + } + } + return rc; +} + +PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { + IPAddress addr(ip[0],ip[1],ip[2],ip[3]); + return setServer(addr,port); +} + +PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; +} + +PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; +} + +PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { + this->callback = callback; + return *this; +} + +PubSubClient& PubSubClient::setClient(Client& client){ + this->_client = &client; + return *this; +} + +PubSubClient& PubSubClient::setStream(Stream& stream){ + this->stream = &stream; + return *this; +} + +int PubSubClient::state() { + return this->_state; +} + +boolean PubSubClient::setBufferSize(uint16_t size) { + if (size == 0) { + // Cannot set it back to 0 + return false; + } + if (this->bufferSize == 0) { + this->buffer = (uint8_t*)malloc(size); + } else { + uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + if (newBuffer != NULL) { + this->buffer = newBuffer; + } else { + return false; + } + } + this->bufferSize = size; + return (this->buffer != NULL); +} + +uint16_t PubSubClient::getBufferSize() { + return this->bufferSize; +} +PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { + this->keepAlive = keepAlive; + return *this; +} +PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { + this->socketTimeout = timeout; + return *this; +} diff --git a/Software/src/lib/knolleary-pubsubclient/PubSubClient.h b/Software/src/lib/knolleary-pubsubclient/PubSubClient.h new file mode 100644 index 00000000..0a950acf --- /dev/null +++ b/Software/src/lib/knolleary-pubsubclient/PubSubClient.h @@ -0,0 +1,184 @@ +/* + PubSubClient.h - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#ifndef PubSubClient_h +#define PubSubClient_h + +#include +#include "IPAddress.h" +#include "Client.h" +#include "Stream.h" + +#define MQTT_VERSION_3_1 3 +#define MQTT_VERSION_3_1_1 4 + +// MQTT_VERSION : Pick the version +//#define MQTT_VERSION MQTT_VERSION_3_1 +#ifndef MQTT_VERSION +#define MQTT_VERSION MQTT_VERSION_3_1_1 +#endif + +// MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize(). +#ifndef MQTT_MAX_PACKET_SIZE +#define MQTT_MAX_PACKET_SIZE 1024 +#endif + +// MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive() +#ifndef MQTT_KEEPALIVE +#define MQTT_KEEPALIVE 15 +#endif + +// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds. Override with setSocketTimeout() +#ifndef MQTT_SOCKET_TIMEOUT +#define MQTT_SOCKET_TIMEOUT 15 +#endif + +// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client +// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to +// pass the entire MQTT packet in each write call. +//#define MQTT_MAX_TRANSFER_SIZE 80 + +// Possible values for client.state() +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 + +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +// Maximum size of fixed header and variable length size header +#define MQTT_MAX_HEADER_SIZE 5 + +#if defined(ESP8266) || defined(ESP32) +#include +#define MQTT_CALLBACK_SIGNATURE std::function callback +#else +#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) +#endif + +#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} + +class PubSubClient : public Print { +private: + Client* _client; + uint8_t* buffer; + uint16_t bufferSize; + uint16_t keepAlive; + uint16_t socketTimeout; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + MQTT_CALLBACK_SIGNATURE; + uint32_t readPacket(uint8_t*); + boolean readByte(uint8_t * result); + boolean readByte(uint8_t * result, uint16_t * index); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + // Build up the header ready to send + // Returns the size of the header + // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start + // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); + IPAddress ip; + const char* domain; + uint16_t port; + Stream* stream; + int _state; +public: + PubSubClient(); + PubSubClient(Client& client); + PubSubClient(IPAddress, uint16_t, Client& client); + PubSubClient(IPAddress, uint16_t, Client& client, Stream&); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, Client& client); + PubSubClient(uint8_t *, uint16_t, Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(const char*, uint16_t, Client& client); + PubSubClient(const char*, uint16_t, Client& client, Stream&); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + + ~PubSubClient(); + + PubSubClient& setServer(IPAddress ip, uint16_t port); + PubSubClient& setServer(uint8_t * ip, uint16_t port); + PubSubClient& setServer(const char * domain, uint16_t port); + PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); + PubSubClient& setClient(Client& client); + PubSubClient& setStream(Stream& stream); + PubSubClient& setKeepAlive(uint16_t keepAlive); + PubSubClient& setSocketTimeout(uint16_t timeout); + + boolean setBufferSize(uint16_t size); + uint16_t getBufferSize(); + + boolean connect(const char* id); + boolean connect(const char* id, const char* user, const char* pass); + boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession); + void disconnect(); + boolean publish(const char* topic, const char* payload); + boolean publish(const char* topic, const char* payload, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const char* payload, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + // Start to publish a message. + // This API: + // beginPublish(...) + // one or more calls to write(...) + // endPublish() + // Allows for arbitrarily large payloads to be sent without them having to be copied into + // a new buffer and held in memory at one time + // Returns 1 if the message was started successfully, 0 if there was an error + boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + // Finish off this publish message (started with beginPublish) + // Returns 1 if the packet was sent successfully, 0 if there was an error + int endPublish(); + // Write a single byte of payload (only to be used with beginPublish/endPublish) + virtual size_t write(uint8_t); + // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish) + // Returns the number of bytes written + virtual size_t write(const uint8_t *buffer, size_t size); + boolean subscribe(const char* topic); + boolean subscribe(const char* topic, uint8_t qos); + boolean unsubscribe(const char* topic); + boolean loop(); + boolean connected(); + int state(); + +}; + + +#endif