From 600cda98a2af341e9eeb078adb2beaebedae0f14 Mon Sep 17 00:00:00 2001 From: Cabooman <81711263+Cabooman@users.noreply.github.com> Date: Mon, 8 Jan 2024 13:17:46 +0100 Subject: [PATCH] Wireless stuff on its separate task, separate modules for general wifi+OTA, webserver and MQTT, some messing around with the settings --- Software/Software.ino | 24 +- Software/USER_SETTINGS.cpp | 19 +- Software/USER_SETTINGS.h | 47 +- Software/src/devboard/mqtt/mqtt.cpp | 97 +++ Software/src/devboard/mqtt/mqtt.h | 55 ++ Software/src/devboard/webserver/webserver.cpp | 107 +-- Software/src/devboard/webserver/webserver.h | 46 +- Software/src/devboard/wifi/wifi.cpp | 97 +++ Software/src/devboard/wifi/wifi.h | 36 + .../knolleary-pubsubclient/PubSubClient.cpp | 769 ++++++++++++++++++ .../lib/knolleary-pubsubclient/PubSubClient.h | 184 +++++ 11 files changed, 1328 insertions(+), 153 deletions(-) create mode 100644 Software/src/devboard/mqtt/mqtt.cpp create mode 100644 Software/src/devboard/mqtt/mqtt.h create mode 100644 Software/src/devboard/wifi/wifi.cpp create mode 100644 Software/src/devboard/wifi/wifi.h create mode 100644 Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp create mode 100644 Software/src/lib/knolleary-pubsubclient/PubSubClient.h diff --git a/Software/Software.ino b/Software/Software.ino index bea32ea4..ef307faf 100644 --- a/Software/Software.ino +++ b/Software/Software.ino @@ -6,6 +6,8 @@ #include "USER_SETTINGS.h" #include "src/battery/BATTERIES.h" #include "src/devboard/config.h" +#include "src/devboard/webserver/webserver.h" +#include "src/devboard/wifi/wifi.h" #include "src/inverter/INVERTERS.h" #include "src/lib/adafruit-Adafruit_NeoPixel/Adafruit_NeoPixel.h" #include "src/lib/eModbus-eModbus/Logging.h" @@ -14,10 +16,6 @@ #include "src/lib/miwagner-ESP32-Arduino-CAN/CAN_config.h" #include "src/lib/miwagner-ESP32-Arduino-CAN/ESP32CAN.h" -#ifdef WEBSERVER -#include "src/devboard/webserver/webserver.h" -#endif - // Interval settings int intervalUpdateValues = 4800; // Interval at which to update inverter values / Modbus registers const int interval10 = 10; // Interval for 10ms tasks @@ -113,10 +111,6 @@ bool inverterAllowsContactorClosing = true; void setup() { init_serial(); -#ifdef WEBSERVER - init_webserver(); -#endif - init_CAN(); init_LED(); @@ -130,16 +124,12 @@ void setup() { inform_user_on_inverter(); inform_user_on_battery(); + + init_wireless(); } // Perform main program functions void loop() { - -#ifdef WEBSERVER - // Over-the-air updates by ElegantOTA - ElegantOTA.loop(); -#endif - // Input receive_can(); // Receive CAN messages. Runs as fast as possible #ifdef DUAL_CAN @@ -170,6 +160,12 @@ void loop() { #ifdef DUAL_CAN send_can2(); #endif + if (webserver_ota_started()) { + Serial.println("OTA started, stopping CAN traffic"); + ESP32Can.CANStop(); + bms_status = 5; //Inform inverter that we are updating + LEDcolor = BLUE; + } } // Initialization functions diff --git a/Software/USER_SETTINGS.cpp b/Software/USER_SETTINGS.cpp index ea1418d3..2c25efab 100644 --- a/Software/USER_SETTINGS.cpp +++ b/Software/USER_SETTINGS.cpp @@ -1,9 +1,18 @@ +#include #include "USER_SETTINGS.h" -#ifdef WEBSERVER -#define ENABLE_AP //Comment out this line to turn off the broadcasted AP -const char* ssid = "REPLACE_WITH_YOUR_SSID"; // maximum of 63 characters; -const char* password = "REPLACE_WITH_YOUR_PASSWORD"; // minimum of 8 characters; +#ifdef USE_WIFI + +// MQTT +// For more detailed settings, see mqtt.h +#ifdef USE_MQTT +const char* mqtt_user = "REDACTED"; +const char* mqtt_password = "REDACTED"; +#endif // USE_MQTT + +const char* ssid = "REDACTED"; // maximum of 63 characters; +const char* password = "REDACTED"; // minimum of 8 characters; const char* ssidAP = "Battery Emulator"; // maximum of 63 characters; const char* passwordAP = "123456789"; // minimum of 8 characters; set to NULL if you want the access point to be open -#endif + +#endif \ No newline at end of file diff --git a/Software/USER_SETTINGS.h b/Software/USER_SETTINGS.h index 9a677d4d..2d95415b 100644 --- a/Software/USER_SETTINGS.h +++ b/Software/USER_SETTINGS.h @@ -10,14 +10,14 @@ //#define CHADEMO_BATTERY //#define IMIEV_CZERO_ION_BATTERY //#define KIA_HYUNDAI_64_BATTERY -//#define NISSAN_LEAF_BATTERY +#define NISSAN_LEAF_BATTERY //#define RENAULT_ZOE_BATTERY //#define TESLA_MODEL_3_BATTERY //#define TEST_FAKE_BATTERY /* Select inverter communication protocol. See Wiki for which to use with your inverter: https://github.com/dalathegreat/BYD-Battery-Emulator-For-Gen24/wiki */ //#define BYD_CAN //Enable this line to emulate a "BYD Battery-Box Premium HVS" over CAN Bus -//#define BYD_MODBUS //Enable this line to emulate a "BYD 11kWh HVM battery" over Modbus RTU +#define BYD_MODBUS //Enable this line to emulate a "BYD 11kWh HVM battery" over Modbus RTU //#define LUNA2000_MODBUS //Enable this line to emulate a "Luna2000 battery" over Modbus RTU //#define PYLON_CAN //Enable this line to emulate a "Pylontech battery" over CAN bus //#define SMA_CAN //Enable this line to emulate a "BYD Battery-Box H 8.9kWh, 7 mod" over CAN bus @@ -38,12 +38,41 @@ //define INTERLOCK_REQUIRED //Nissan LEAF specific setting, if enabled requires both high voltage conenctors to be seated before starting /* Other options */ -#define DEBUG_VIA_USB //Enable this line to have the USB port output serial diagnostic data while program runs -//#define CONTACTOR_CONTROL //Enable this line to have pins 25,32,33 handle automatic precharge/contactor+/contactor- closing sequence -//#define PWM_CONTACTOR_CONTROL //Enable this line to use PWM logic for contactors, which lower power consumption and heat generation -//#define DUAL_CAN //Enable this line to activate an isolated secondary CAN Bus using add-on MCP2515 controller (Needed for FoxESS inverters) -//#define SERIAL_LINK_RECEIVER //Enable this line to receive battery data over RS485 pins from another Lilygo (This LilyGo interfaces with inverter) -//#define SERIAL_LINK_TRANSMITTER //Enable this line to send battery data over RS485 pins to another Lilygo (This LilyGo interfaces with battery) -//#define WEBSERVER //Enable this line to enable WiFi, and to run the webserver. See USER_SETTINGS.cpp for the Wifi settings. +//#define DEBUG_VIA_USB //Enable this line to have the USB port output serial diagnostic data while program runs +//#define CONTACTOR_CONTROL //Enable this line to have pins 25,32,33 handle automatic precharge/contactor+/contactor- closing sequence +//#define PWM_CONTACTOR_CONTROL //Enable this line to use PWM logic for contactors, which lower power consumption and heat generation +//#define DUAL_CAN //Enable this line to activate an isolated secondary CAN Bus using add-on MCP2515 controller (Needed for FoxESS inverters) +//#define SERIAL_LINK_RECEIVER //Enable this line to receive battery data over RS485 pins from another Lilygo (This LilyGo interfaces with inverter) +//#define SERIAL_LINK_TRANSMITTER //Enable this line to send battery data over RS485 pins to another Lilygo (This LilyGo interfaces with battery) + +// High level connectivity settings below. See USER_SETTINGS.cpp for detailed settings. + +// Enabling any define below enables Wifi functionality. See USER_SETTINGS.cpp for the Wifi settings. +#define USE_WEBSERVER //Enable this line to run the webserver and enable OTA updates. +#define USE_MQTT //Enable this line to enable MQTT functionality. See USER_SETTINGS.cpp for the MQTT settings. + +// Various Wifi specific defines, placed here to remove the need for messier code and issues with optimizations/linking. +//#define ENABLE_AP //Enable this line to run Wifi in AP mode (only broadcasting the LilyGO access point SSID). +#define ENABLE_STA //Enable this line to run Wifi in STA mode (connected to your home network). + +#define MQTT_SUBSCRIPTIONS {"my/topic/abc", "my/other/topic"} +#define MQTT_SERVER "192.168.xxx.yyy" +#define MQTT_PORT 1883 + +// ---- Derived defines below ---- + +#if defined(USE_WEBSERVER) || defined(USE_MQTT) + #define USE_WIFI +#endif + +#if defined(ENABLE_AP) && defined(ENABLE_STA) + #define WIFI_MODE WIFI_AP_STA +#elif defined(ENABLE_AP) + #define WIFI_MODE WIFI_AP +#elif defined(ENABLE_STA) + #define WIFI_MODE WIFI_STA +#else + #define WIFI_MODE WIFI_OFF // Just in case +#endif #endif diff --git a/Software/src/devboard/mqtt/mqtt.cpp b/Software/src/devboard/mqtt/mqtt.cpp new file mode 100644 index 00000000..cf6bc1ce --- /dev/null +++ b/Software/src/devboard/mqtt/mqtt.cpp @@ -0,0 +1,97 @@ +#include "mqtt.h" +#include +#include +#include +#include "../../../USER_SETTINGS.h" +#include "../../lib/knolleary-pubsubclient/PubSubClient.h" +#include "../wifi/wifi.h" + +const char* mqtt_subscriptions[] = MQTT_SUBSCRIPTIONS; +const size_t mqtt_nof_subscriptions = sizeof(mqtt_subscriptions) / sizeof(mqtt_subscriptions[0]); + +WiFiClient espClient; +PubSubClient client(espClient); +char msg[MSG_BUFFER_SIZE]; +int value = 0; +static unsigned long previousMillisUpdateVal; + +#ifdef USE_MQTT +/** Publish global values and call callbacks for specific modules */ +static void publish_values(void) { + + snprintf(msg, sizeof(msg), + "{\n" + " \"SOC\": %.3f,\n" + " \"StateOfHealth\": %.3f,\n" + " \"temperature_min\": %.3f,\n" + " \"temperature_max\": %.3f,\n" + " \"cell_max_voltage\": %d,\n" + " \"cell_min_voltage\": %d\n" + "}\n", + ((float)SOC) / 100.0, ((float)StateOfHealth) / 100.0, ((float)((int16_t)temperature_min)) / 10.0, + ((float)((int16_t)temperature_max)) / 10.0, cell_max_voltage, cell_min_voltage); + client.publish("battery/info", msg); + // Serial.println(msg); // Uncomment to print the payload on serial +} + +/* This is called whenever a subscribed topic changes (hopefully) */ +static void callback(char* topic, byte* payload, unsigned int length) { + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (unsigned int i = 0; i < length; i++) { + Serial.print((char)payload[i]); + } + Serial.println(); +} + +/* If we lose the connection, get it back and re-sub */ +static void reconnect() { + // attempt one reconnection + Serial.print("Attempting MQTT connection... "); + // Create a random client ID + String clientId = "LilyGoClient-"; + clientId += String(random(0xffff), HEX); + // Attempt to connect + if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) { + Serial.println("connected"); + + for (int i = 0; i < mqtt_nof_subscriptions; i++) { + client.subscribe(mqtt_subscriptions[i]); + Serial.print("Subscribed to: "); + Serial.println(mqtt_subscriptions[i]); + } + } else { + Serial.print("failed, rc="); + Serial.print(client.state()); + Serial.println(" try again in 5 seconds"); + // Wait 5 seconds before retrying + } +} +#endif // + +void init_mqtt(void) { + client.setServer(MQTT_SERVER, MQTT_PORT); + client.setCallback(callback); + + previousMillisUpdateVal = millis(); + reconnect(); + Serial.println("MQTT initialized"); +} + +void mqtt_loop(void) { + if (client.connected()) { + client.loop(); + if (millis() - previousMillisUpdateVal >= 5000) // Every 5s + { + previousMillisUpdateVal = millis(); + publish_values(); // Update values heading towards inverter. Prepare for sending on CAN, or write directly to Modbus. + } + } else { + if (millis() - previousMillisUpdateVal >= 5000) // Every 5s + { + previousMillisUpdateVal = millis(); + reconnect(); // Update values heading towards inverter. Prepare for sending on CAN, or write directly to Modbus. + } + } +} \ No newline at end of file diff --git a/Software/src/devboard/mqtt/mqtt.h b/Software/src/devboard/mqtt/mqtt.h new file mode 100644 index 00000000..0bdca5dc --- /dev/null +++ b/Software/src/devboard/mqtt/mqtt.h @@ -0,0 +1,55 @@ +/** + * MQTT add-on for the battery emulator + * + * Usage: + * + * Subscription - Add topics to MQTT_SUBSCRIPTIONS in USER_SETTINGS.h and handle the messages in mqtt.cpp:callback() + * + * Publishing - See example in mqtt.cpp:publish_values() for constructing the payload + * + * Home assistant - See below for an example, and the official documentation is quite good (https://www.home-assistant.io/integrations/sensor.mqtt/) + * in configuration.yaml: + * mqtt: !include mqtt.yaml + * + * in mqtt.yaml: + * sensor: + * - name: "Cell max" + * state_topic: "battery/info" + * unit_of_measurement: "mV" + * value_template: "{{ value_json.cell_max_voltage | int }}" + * - name: "Cell min" + * state_topic: "battery/info" + * unit_of_measurement: "mV" + * value_template: "{{ value_json.cell_min_voltage | int }}" + * - name: "Temperature max" + * state_topic: "battery/info" + * unit_of_measurement: "C" + * value_template: "{{ value_json.temperature_max | float }}" + * - name: "Temperature min" + * state_topic: "battery/info" + * unit_of_measurement: "C" + * value_template: "{{ value_json.temperature_min | float }}" + */ + +#ifndef __MQTT_H__ +#define __MQTT_H__ + +#include +#include "../../../USER_SETTINGS.h" + +#define MSG_BUFFER_SIZE (256) + +extern uint16_t SOC; +extern uint16_t StateOfHealth; +extern uint16_t temperature_min; //C+1, Goes thru convert2unsignedint16 function (15.0C = 150, -15.0C = 65385) +extern uint16_t temperature_max; //C+1, Goes thru convert2unsignedint16 function (15.0C = 150, -15.0C = 65385) +extern uint16_t cell_max_voltage; //mV, 0-4350 +extern uint16_t cell_min_voltage; //mV, 0-4350 + +extern const char* mqtt_user; +extern const char* mqtt_password; + +void init_mqtt(void); +void mqtt_loop(void); + +#endif diff --git a/Software/src/devboard/webserver/webserver.cpp b/Software/src/devboard/webserver/webserver.cpp index a97a1b83..bc7f4939 100644 --- a/Software/src/devboard/webserver/webserver.cpp +++ b/Software/src/devboard/webserver/webserver.cpp @@ -1,3 +1,4 @@ +#include #include "webserver.h" // Create AsyncWebServer object on port 80 @@ -5,6 +6,7 @@ AsyncWebServer server(80); // Measure OTA progress unsigned long ota_progress_millis = 0; +bool ota_started = false; const char index_html[] PROGMEM = R"rawliteral( @@ -33,93 +35,25 @@ const char index_html[] PROGMEM = R"rawliteral( )rawliteral"; -String wifi_state; -bool wifi_connected; - -// Wifi connect time declarations and definition -unsigned long wifi_connect_start_time; -unsigned long wifi_connect_current_time; -const long wifi_connect_timeout = 5000; // Timeout for WiFi connect in milliseconds - -void init_webserver() { -// Configure WiFi -#ifdef ENABLE_AP - WiFi.mode(WIFI_AP_STA); // Simultaneous WiFi AP and Router connection - init_WiFi_AP(); - init_WiFi_STA(ssid, password); -#else - WiFi.mode(WIFI_STA); // Only Router connection - init_WiFi_STA(ssid, password); -#endif +static void init_ElegantOTA() { + ElegantOTA.begin(&server); // Start ElegantOTA + // ElegantOTA callbacks + ElegantOTA.onStart(onOTAStart); + ElegantOTA.onProgress(onOTAProgress); + ElegantOTA.onEnd(onOTAEnd); +} +void init_webserver(void) { // Route for root / web page server.on("/", HTTP_GET, [](AsyncWebServerRequest* request) { request->send_P(200, "text/html", index_html, processor); }); - // Send a GET request to /update - server.on("/debug", HTTP_GET, - [](AsyncWebServerRequest* request) { request->send(200, "text/plain", "Debug: all OK."); }); - // Initialize ElegantOTA init_ElegantOTA(); // Start server server.begin(); -} - -void init_WiFi_AP() { - Serial.print("Creating Access Point: "); - Serial.println(ssidAP); - Serial.print("With password: "); - Serial.println(passwordAP); - - WiFi.softAP(ssidAP, passwordAP); - - IPAddress IP = WiFi.softAPIP(); - Serial.println("Access Point created."); - Serial.print("IP address: "); - Serial.println(IP); -} - -void init_WiFi_STA(const char* ssid, const char* password) { - // Connect to Wi-Fi network with SSID and password - Serial.print("Connecting to "); - Serial.println(ssid); - WiFi.begin(ssid, password); - - wifi_connect_start_time = millis(); - wifi_connect_current_time = wifi_connect_start_time; - while ((wifi_connect_current_time - wifi_connect_start_time) <= wifi_connect_timeout && - WiFi.status() != WL_CONNECTED) { // do this loop for up to 5000ms - // to break the loop when the connection is not established (wrong ssid or password). - delay(500); - Serial.print("."); - wifi_connect_current_time = millis(); - } - if (WiFi.status() == WL_CONNECTED) { // WL_CONNECTED is assigned when connected to a WiFi network - wifi_connected = true; - wifi_state = "Connected"; - // Print local IP address and start web server - Serial.println(""); - Serial.print("Connected to WiFi network: "); - Serial.println(ssid); - Serial.print("IP address: "); - Serial.println(WiFi.localIP()); - } else { - wifi_connected = false; - wifi_state = "Not connected"; - Serial.print("Not connected to WiFi network: "); - Serial.println(ssid); - Serial.println("Please check WiFi network name and password, and if WiFi network is available."); - } -} - -void init_ElegantOTA() { - ElegantOTA.begin(&server); // Start ElegantOTA - // ElegantOTA callbacks - ElegantOTA.onStart(onOTAStart); - ElegantOTA.onProgress(onOTAProgress); - ElegantOTA.onEnd(onOTAEnd); + Serial.println("Webserver and OTA handling online"); } String processor(const String& var) { @@ -156,8 +90,9 @@ String processor(const String& var) { } // Display ssid of network connected to and, if connected to the WiFi, its own IP content += "

SSID: " + String(ssid) + "

"; + String wifi_state = (WiFi.status() == WL_CONNECTED) ? "Connected" : "Not connected"; content += "

Wifi status: " + wifi_state + "

"; - if (wifi_connected == true) { + if (WiFi.status() == WL_CONNECTED) { content += "

IP: " + WiFi.localIP().toString() + "

"; } // Close the block @@ -316,14 +251,10 @@ String processor(const String& var) { void onOTAStart() { // Log when OTA has started Serial.println("OTA update started!"); - ESP32Can.CANStop(); - bms_status = 5; //Inform inverter that we are updating - LEDcolor = BLUE; + ota_started = true; } void onOTAProgress(size_t current, size_t final) { - bms_status = 5; //Inform inverter that we are updating - LEDcolor = BLUE; // Log every 1 second if (millis() - ota_progress_millis > 1000) { ota_progress_millis = millis(); @@ -338,6 +269,12 @@ void onOTAEnd(bool success) { } else { Serial.println("There was an error during OTA update!"); } - bms_status = 5; //Inform inverter that we are updating - LEDcolor = BLUE; +} + +void webserver_loop(void) { + ElegantOTA.loop(); +} + +bool webserver_ota_started(void) { + return ota_started; } diff --git a/Software/src/devboard/webserver/webserver.h b/Software/src/devboard/webserver/webserver.h index 62a83096..feea26cb 100644 --- a/Software/src/devboard/webserver/webserver.h +++ b/Software/src/devboard/webserver/webserver.h @@ -30,46 +30,6 @@ extern bool batteryAllowsContactorClosing; //Bool, 1=true, 0=false extern bool inverterAllowsContactorClosing; //Bool, 1=true, 0=false extern const char* ssid; -extern const char* password; -extern const char* ssidAP; -extern const char* passwordAP; - -/** - * @brief Initialization function for the webserver. - * - * @param[in] void - * - * @return void - */ -void init_webserver(); - -/** - * @brief Initialization function that creates a WiFi Access Point. - * - * @param[in] void - * - * @return void - */ -void init_WiFi_AP(); - -/** - * @brief Initialization function that connects to an existing network. - * - * @param[in] ssid WiFi network name - * @param[in] password WiFi network password - * - * @return void - */ -void init_WiFi_STA(const char* ssid, const char* password); - -/** - * @brief Initialization function for ElegantOTA. - * - * @param[in] void - * - * @return void - */ -void init_ElegantOTA(); /** * @brief Replaces placeholder with content section in web page @@ -108,4 +68,10 @@ void onOTAProgress(size_t current, size_t final); */ void onOTAEnd(bool success); +void init_webserver(void); + +void webserver_loop(void); + +bool webserver_ota_started(void); + #endif diff --git a/Software/src/devboard/wifi/wifi.cpp b/Software/src/devboard/wifi/wifi.cpp new file mode 100644 index 00000000..6f78d5ca --- /dev/null +++ b/Software/src/devboard/wifi/wifi.cpp @@ -0,0 +1,97 @@ +#include +#include +#include "../../../USER_SETTINGS.h" +#include "../webserver/webserver.h" +#include "../mqtt/mqtt.h" +#include "wifi.h" + +// Wifi connect time declarations and definition +unsigned long wifi_connect_start_time; +unsigned long wifi_connect_current_time; +const long wifi_connect_timeout = 5000; // Timeout for WiFi connect in milliseconds + + +static void wifi_reconnect(void) { + if (WiFi.status() != WL_CONNECTED) { + WiFi.reconnect(); + } +} + +static void init_STA(void) { + + // Connect to Wi-Fi network with SSID and password + Serial.print("Connecting to "); + Serial.println(ssid); + WiFi.begin(ssid, password); + + wifi_connect_start_time = millis(); + wifi_connect_current_time = wifi_connect_start_time; + while ((wifi_connect_current_time - wifi_connect_start_time) <= wifi_connect_timeout && + WiFi.status() != WL_CONNECTED) { // do this loop for up to 5000ms + // to break the loop when the connection is not established (wrong ssid or password). + delay(500); + Serial.print("."); + wifi_connect_current_time = millis(); + } + if (WiFi.status() == WL_CONNECTED) { // WL_CONNECTED is assigned when connected to a WiFi network + // Print local IP address and start web server + Serial.println(""); + Serial.print("Connected to WiFi network: "); + Serial.println(ssid); + Serial.print("IP address: "); + Serial.println(WiFi.localIP()); + } else { + Serial.print("Not connected to WiFi network: "); + Serial.println(ssid); + Serial.println("Please check WiFi network name and password, and if WiFi network is available."); + } +} + +static void init_AP(void) { + Serial.print("Creating Access Point: "); + Serial.println(ssidAP); + Serial.print("With password: "); + Serial.println(passwordAP); + + WiFi.softAP(ssidAP, passwordAP); + + IPAddress IP = WiFi.softAPIP(); + Serial.println("Access Point created."); + Serial.print("IP address: "); + Serial.println(IP); +} + +static void init_wifi(void) { + WiFi.mode(WIFI_MODE); +#ifdef ENABLE_AP + init_AP(); +#endif +#ifdef ENABLE_STA + init_STA(); +#endif +} + +// Exported functions below + +void init_wireless(void) { + xTaskCreate(wifi_taskfunction, "Wifi task", 1024 * 8, NULL, 1, NULL); + // xTaskCreate(webserver_taskfunction, "Webserver task", 1024 * 4, NULL, 1, NULL); // Extra space for OTA, we should look at the high water marks for these tasks... + // xTaskCreate(mqtt_taskfunction, "MQTT task", 1024, NULL, 1, NULL); +} + +bool wifi_is_connected(void) { + return WiFi.status() == WL_CONNECTED; +} + +void wifi_taskfunction(void *pvParameters) { + init_wifi(); + init_webserver(); + init_mqtt(); + + while(true) { + wifi_reconnect(); + webserver_loop(); + mqtt_loop(); + delay(1); + } +} diff --git a/Software/src/devboard/wifi/wifi.h b/Software/src/devboard/wifi/wifi.h new file mode 100644 index 00000000..90e72c64 --- /dev/null +++ b/Software/src/devboard/wifi/wifi.h @@ -0,0 +1,36 @@ +#ifndef __WIFI_H__ +#define __WIFI_H__ + +extern const char* ssid; +extern const char* password; +extern const char* ssidAP; +extern const char* passwordAP; + +/** + * @brief Initialization function for Wifi + * + * @param[in] void + * + * @return void + */ +void init_wireless(void); + +/** + * @brief Returns the WiFi connections status + * + * @param[in] void + * + * @return bool: true = connected, false = disconnected + */ +bool wifi_is_connected(void); + +/** + * @brief FreeRTOS taskfunction + * + * @param[in] *pvParameters: Optional parameter pointer + * + * @return void + */ +void wifi_taskfunction(void *pvParameter); + +#endif \ No newline at end of file diff --git a/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp b/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp new file mode 100644 index 00000000..2b48d2b6 --- /dev/null +++ b/Software/src/lib/knolleary-pubsubclient/PubSubClient.cpp @@ -0,0 +1,769 @@ +/* + + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include "Arduino.h" + +PubSubClient::PubSubClient() { + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(Client& client) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::~PubSubClient() { + free(this->buffer); +} + +boolean PubSubClient::connect(const char *id) { + return connect(id,NULL,NULL,0,0,0,0,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { + return connect(id,user,pass,0,0,0,0,1); +} + +boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { + if (!connected()) { + int result = 0; + + + if(_client->connected()) { + result = 1; + } else { + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + } + + if (result == 1) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + unsigned int j; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (j = 0;jbuffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x04|(willQos<<3)|(willRetain<<5); + } else { + v = 0x00; + } + if (cleanSession) { + v = v|0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + this->buffer[length++] = v; + + this->buffer[length++] = ((this->keepAlive) >> 8); + this->buffer[length++] = ((this->keepAlive) & 0xFF); + + CHECK_STRING_LENGTH(length,id) + length = writeString(id,this->buffer,length); + if (willTopic) { + CHECK_STRING_LENGTH(length,willTopic) + length = writeString(willTopic,this->buffer,length); + CHECK_STRING_LENGTH(length,willMessage) + length = writeString(willMessage,this->buffer,length); + } + + if(user != NULL) { + CHECK_STRING_LENGTH(length,user) + length = writeString(user,this->buffer,length); + if(pass != NULL) { + CHECK_STRING_LENGTH(length,pass) + length = writeString(pass,this->buffer,length); + } + } + + write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } + } + uint8_t llen; + uint32_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } + } + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; + } + return false; + } + return true; +} + +// reads a byte into result +boolean PubSubClient::readByte(uint8_t * result) { + uint32_t previousMillis = millis(); + while(!_client->available()) { + yield(); + uint32_t currentMillis = millis(); + if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){ + return false; + } + } + *result = _client->read(); + return true; +} + +// reads a byte into result[*index] and increments index +boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ + uint16_t current_index = *index; + uint8_t * write_address = &(result[current_index]); + if(readByte(write_address)){ + *index = current_index + 1; + return true; + } + return false; +} + +uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; + if(!readByte(this->buffer, &len)) return 0; + bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint32_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint32_t start = 0; + + do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + _state = MQTT_DISCONNECTED; + _client->stop(); + return 0; + } + if(!readByte(&digit)) return 0; + this->buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier <<=7; //multiplier *= 128 + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + if(!readByte(this->buffer, &len)) return 0; + if(!readByte(this->buffer, &len)) return 0; + skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; + start = 2; + if (this->buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + uint32_t idx = len; + + for (uint32_t i = start;istream) { + if (isPublish && idx-*lengthLength-2>skip) { + this->stream->write(digit); + } + } + + if (len < this->bufferSize) { + this->buffer[len] = digit; + len++; + } + idx++; + } + + if (!this->stream && idx > this->bufferSize) { + len = 0; // This will cause the packet to be ignored. + } + return len; +} + +boolean PubSubClient::loop() { + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + this->buffer[0] = MQTTPINGREQ; + this->buffer[1] = 0; + _client->write(this->buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = this->buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ + memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ + this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) this->buffer+llen+2; + // msgId only present for QOS>0 + if ((this->buffer[0]&0x06) == MQTTQOS1) { + msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; + payload = this->buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + this->buffer[0] = MQTTPUBACK; + this->buffer[1] = 2; + this->buffer[2] = (msgId >> 8); + this->buffer[3] = (msgId & 0xFF); + _client->write(this->buffer,4); + lastOutActivity = t; + + } else { + payload = this->buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + this->buffer[0] = MQTTPINGRESP; + this->buffer[1] = 0; + _client->write(this->buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } else if (!connected()) { + // readPacket has closed the connection + return false; + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(const char* topic, const char* payload) { + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); +} + +boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + + // Add payload + uint16_t i; + for (i=0;ibuffer[length++] = payload[i]; + } + + // Write the header + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); +} + +boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + uint8_t llen = 0; + uint8_t digit; + unsigned int rc = 0; + uint16_t tlen; + unsigned int pos = 0; + unsigned int i; + uint8_t header; + unsigned int len; + int expectedLength; + + if (!connected()) { + return false; + } + + tlen = strnlen(topic, this->bufferSize); + + header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + this->buffer[pos++] = header; + len = plength + 2 + tlen; + do { + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; + } + this->buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,this->buffer,pos); + + rc += _client->write(this->buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + + expectedLength = 1 + llen + 2 + tlen + plength; + + return (rc == expectedLength); +} + +boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { + if (connected()) { + // Send the header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + lastOutActivity = millis(); + return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); + } + return false; +} + +int PubSubClient::endPublish() { + return 1; +} + +size_t PubSubClient::write(uint8_t data) { + lastOutActivity = millis(); + return _client->write(data); +} + +size_t PubSubClient::write(const uint8_t *buffer, size_t size) { + lastOutActivity = millis(); + return _client->write(buffer,size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint16_t len = length; + do { + + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;i 0) && result) { + bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; + rc = _client->write(writeBuf,bytesToWrite); + result = (rc == bytesToWrite); + bytesRemaining -= rc; + writeBuf += rc; + } + return result; +#else + rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); + lastOutActivity = millis(); + return (rc == hlen+length); +#endif +} + +boolean PubSubClient::subscribe(const char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (qos > 1) { + return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->buffer,length); + this->buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +boolean PubSubClient::unsubscribe(const char* topic) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + } + return false; +} + +void PubSubClient::disconnect() { + this->buffer[0] = MQTTDISCONNECT; + this->buffer[1] = 0; + _client->write(this->buffer,2); + _state = MQTT_DISCONNECTED; + _client->flush(); + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + boolean rc; + if (_client == NULL ) { + rc = false; + } else { + rc = (int)_client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + _client->flush(); + _client->stop(); + } + } else { + return this->_state == MQTT_CONNECTED; + } + } + return rc; +} + +PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { + IPAddress addr(ip[0],ip[1],ip[2],ip[3]); + return setServer(addr,port); +} + +PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; +} + +PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; +} + +PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { + this->callback = callback; + return *this; +} + +PubSubClient& PubSubClient::setClient(Client& client){ + this->_client = &client; + return *this; +} + +PubSubClient& PubSubClient::setStream(Stream& stream){ + this->stream = &stream; + return *this; +} + +int PubSubClient::state() { + return this->_state; +} + +boolean PubSubClient::setBufferSize(uint16_t size) { + if (size == 0) { + // Cannot set it back to 0 + return false; + } + if (this->bufferSize == 0) { + this->buffer = (uint8_t*)malloc(size); + } else { + uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + if (newBuffer != NULL) { + this->buffer = newBuffer; + } else { + return false; + } + } + this->bufferSize = size; + return (this->buffer != NULL); +} + +uint16_t PubSubClient::getBufferSize() { + return this->bufferSize; +} +PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { + this->keepAlive = keepAlive; + return *this; +} +PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { + this->socketTimeout = timeout; + return *this; +} diff --git a/Software/src/lib/knolleary-pubsubclient/PubSubClient.h b/Software/src/lib/knolleary-pubsubclient/PubSubClient.h new file mode 100644 index 00000000..c70d9fd3 --- /dev/null +++ b/Software/src/lib/knolleary-pubsubclient/PubSubClient.h @@ -0,0 +1,184 @@ +/* + PubSubClient.h - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#ifndef PubSubClient_h +#define PubSubClient_h + +#include +#include "IPAddress.h" +#include "Client.h" +#include "Stream.h" + +#define MQTT_VERSION_3_1 3 +#define MQTT_VERSION_3_1_1 4 + +// MQTT_VERSION : Pick the version +//#define MQTT_VERSION MQTT_VERSION_3_1 +#ifndef MQTT_VERSION +#define MQTT_VERSION MQTT_VERSION_3_1_1 +#endif + +// MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize(). +#ifndef MQTT_MAX_PACKET_SIZE +#define MQTT_MAX_PACKET_SIZE 256 +#endif + +// MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive() +#ifndef MQTT_KEEPALIVE +#define MQTT_KEEPALIVE 15 +#endif + +// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds. Override with setSocketTimeout() +#ifndef MQTT_SOCKET_TIMEOUT +#define MQTT_SOCKET_TIMEOUT 15 +#endif + +// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client +// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to +// pass the entire MQTT packet in each write call. +//#define MQTT_MAX_TRANSFER_SIZE 80 + +// Possible values for client.state() +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 + +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +// Maximum size of fixed header and variable length size header +#define MQTT_MAX_HEADER_SIZE 5 + +#if defined(ESP8266) || defined(ESP32) +#include +#define MQTT_CALLBACK_SIGNATURE std::function callback +#else +#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) +#endif + +#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} + +class PubSubClient : public Print { +private: + Client* _client; + uint8_t* buffer; + uint16_t bufferSize; + uint16_t keepAlive; + uint16_t socketTimeout; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + MQTT_CALLBACK_SIGNATURE; + uint32_t readPacket(uint8_t*); + boolean readByte(uint8_t * result); + boolean readByte(uint8_t * result, uint16_t * index); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + // Build up the header ready to send + // Returns the size of the header + // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start + // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); + IPAddress ip; + const char* domain; + uint16_t port; + Stream* stream; + int _state; +public: + PubSubClient(); + PubSubClient(Client& client); + PubSubClient(IPAddress, uint16_t, Client& client); + PubSubClient(IPAddress, uint16_t, Client& client, Stream&); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, Client& client); + PubSubClient(uint8_t *, uint16_t, Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(const char*, uint16_t, Client& client); + PubSubClient(const char*, uint16_t, Client& client, Stream&); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + + ~PubSubClient(); + + PubSubClient& setServer(IPAddress ip, uint16_t port); + PubSubClient& setServer(uint8_t * ip, uint16_t port); + PubSubClient& setServer(const char * domain, uint16_t port); + PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); + PubSubClient& setClient(Client& client); + PubSubClient& setStream(Stream& stream); + PubSubClient& setKeepAlive(uint16_t keepAlive); + PubSubClient& setSocketTimeout(uint16_t timeout); + + boolean setBufferSize(uint16_t size); + uint16_t getBufferSize(); + + boolean connect(const char* id); + boolean connect(const char* id, const char* user, const char* pass); + boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession); + void disconnect(); + boolean publish(const char* topic, const char* payload); + boolean publish(const char* topic, const char* payload, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const char* payload, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + // Start to publish a message. + // This API: + // beginPublish(...) + // one or more calls to write(...) + // endPublish() + // Allows for arbitrarily large payloads to be sent without them having to be copied into + // a new buffer and held in memory at one time + // Returns 1 if the message was started successfully, 0 if there was an error + boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + // Finish off this publish message (started with beginPublish) + // Returns 1 if the packet was sent successfully, 0 if there was an error + int endPublish(); + // Write a single byte of payload (only to be used with beginPublish/endPublish) + virtual size_t write(uint8_t); + // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish) + // Returns the number of bytes written + virtual size_t write(const uint8_t *buffer, size_t size); + boolean subscribe(const char* topic); + boolean subscribe(const char* topic, uint8_t qos); + boolean unsubscribe(const char* topic); + boolean loop(); + boolean connected(); + int state(); + +}; + + +#endif