Skip to content

Commit

Permalink
Automatically extract MQTT credentials (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
berezhinskiy authored Jan 12, 2023
1 parent 69b3f8a commit e53eb19
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 146 deletions.
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
FROM python:3.10.5-buster
FROM python:3.11-alpine

RUN apt update -y && apt upgrade -y
LABEL maintainer="Yaroslav Berezhinskiy <[email protected]>"
LABEL description="An implementation of a Prometheus exporter for EcoFlow portable power stations"

RUN /usr/local/bin/python -m pip install --upgrade pip
RUN apk update && apk add py3-pip
ADD requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

Expand Down
49 changes: 9 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

An implementation of a Prometheus exporter for [EcoFlow](https://www.ecoflow.com/) products. To receive information from the device, exporter works the same way as the official mobile application by subscribing to EcoFlow MQTT Broker `mqtt.ecoflow.com`

Unlike REST API exporters, it is not required to request for `APP_KEY` and `SECRET_KEY` since MQTT credentials can be extracted from `api.ecoflow.com` (see [Usage](#usage) section). Another benefit of such implementation is that it provides much more device information:
Unlike REST API exporters, it is not required to request for `APP_KEY` and `SECRET_KEY` since MQTT credentials can be extracted from `api.ecoflow.com`. Another benefit of such implementation is that it provides much more device information:

[![Dashboard](images/EcoflowMQTT.png?raw=true)](https://grafana.com/grafana/dashboards/17812-ecoflow/)

The project provides:

- Bash script to extract EcoFlow MQTT credentials
- Python program that accepts a number of arguments to collect information about a device and exports the collected metrics to a prometheus endpoint
- [Python program](ecoflow_exporter.py) that accepts a number of arguments to collect information about a device and exports the collected metrics to a prometheus endpoint
- [Dashboard for Grafana](https://grafana.com/grafana/dashboards/17812-ecoflow/)
- [Docker image](https://github.com/berezhinskiy/ecoflow_exporter/pkgs/container/ecoflow_exporter) for your convenience

Exporter collects all metrics names and their values sent by the device to MQTT EcoFlow Broker. In case of any new objects in the queue, metrics will be generated automatically based on the JSON object key/value. For example, payload:
The exporter collects all possible metrics names and their values sent by the device to MQTT EcoFlow Broker. In case of any new objects in the queue, new metrics will be generated automatically based on the JSON object key/value. For example, payload:

```json
{
Expand All @@ -37,7 +36,7 @@ ecoflow_inv_ac_in_vol{device="XXXXXXXXXXXXXXXX"} 242182.0
ecoflow_inv_inv_out_vol{device="XXXXXXXXXXXXXXXX"} 244582.0
```

All metrics are prefixed with `ecoflow` and reports label `device` for multiple device support (see [Usage](#usage) section)
All metrics are prefixed with `ecoflow` and reports label `device` for multiple device support

## Disclaimers

Expand All @@ -54,60 +53,30 @@ Please, create an issue to let me know if exporter works well (or not) with your

## Usage

- Get your unit's serial number (displayed inside application)
- Get MQTT credentials for your EcoFlow account:

```bash
> bash get_mqtt_credentials.sh
Checking if jq is installed
Checking if base64 is installed
Checking if curl is installed
Checking if sed is installed

Everything is ready to extract the mqtt data
Please log in now:

Ecoflow email:
Ecoflow password:
{
"code": "0",
"message": "Success",
"data": {
"url": "mqtt.ecoflow.com",
"port": "8883",
"protocol": "mqtts",
"certificateAccount": "app-b12d847861bb84eaa103446f606d41bb",
"certificatePassword": "28dd5feff0bf4420bfcdaecfc18418a6"
}
}
```
- Connect the device to WiFi and register an EcoFlow account using the official mobile application
- Get your unit's serial number
- Exporter is parameterized via environment variables:

Required:

`DEVICE_SN` - the device serial number

`MQTT_USERNAME` - the username provided by script as `certificateAccount`
`ECOFLOW_USERNAME` - EcoFlow account username

`MQTT_PASSWORD` - the password provided by script as `certificatePassword`
`ECOFLOW_PASSWORD` - EcoFlow account password

Optional:

`DEVICE_NAME` - If given, this name will be exported as `device` label instead of the device serial number

`MQTT_BROKER` - (default: `mqtt.ecoflow.com`)
`MQTT_PORT` - (default: `8883`)
`EXPORTER_PORT` - (default: `9090`)

`LOG_LEVEL` - (default: `INFO`) Possible values: `DEBUG`, `INFO`, `WARNING`, `ERROR`

- Example of running docker image:

```bash
docker run -e DEVICE_SN=<your device SN> -e MQTT_USERNAME=<your MQTT username> -e MQTT_PASSWORD=<your MQTT password> -it -p 9090:9090 --network=host ghcr.io/berezhinskiy/ecoflow_exporter
docker run -e DEVICE_SN=<your device SN> -e ECOFLOW_USERNAME=<your username> -e ECOFLOW_PASSWORD=<your password> -it -p 9090:9090 --network=host ghcr.io/berezhinskiy/ecoflow_exporter
```

will run the image with the exporter on `*:9090`
Expand Down
202 changes: 137 additions & 65 deletions ecoflow_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import time
import json
import re
import requests
import base64
import paho.mqtt.client as mqtt
from queue import Queue
from prometheus_client import start_http_server, Gauge, Counter
Expand All @@ -15,6 +17,126 @@ class EcoflowMetricException(Exception):
pass


class EcoflowAuthentication:
def __init__(self, ecoflow_username, ecoflow_password):
self.ecoflow_username = ecoflow_username
self.ecoflow_password = ecoflow_password
self.mqtt_url = "mqtt.ecoflow.com"
self.mqtt_port = 8883
self.mqtt_username = None
self.mqtt_password = None
self.authorize()

def authorize(self):
url = "https://api.ecoflow.com/auth/login"
headers = {"lang": "en_US", "content-type": "application/json"}
data = {"email": self.ecoflow_username,
"password": base64.b64encode(self.ecoflow_password.encode()).decode(),
"scene": "IOT_APP",
"userType": "ECOFLOW"}

log.info(f"Login to EcoFlow API {url}")
request = requests.post(url, json=data, headers=headers)
response = self.get_json_response(request)

try:
token = response["data"]["token"]
user_id = response["data"]["user"]["userId"]
user_name = response["data"]["user"]["name"]
except KeyError as key:
raise Exception(f"Failed to extract key {key} from response: {response}")

log.info(f"Successfully logged in: {user_name}")

url = "https://api.ecoflow.com/iot-auth/app/certification"
headers = {"lang": "en_US", "authorization": f"Bearer {token}"}
data = {"userId": user_id}

log.info(f"Requesting IoT MQTT credentials {url}")
request = requests.get(url, data=data, headers=headers)
response = self.get_json_response(request)

try:
self.mqtt_url = response["data"]["url"]
self.mqtt_port = int(response["data"]["port"])
self.mqtt_username = response["data"]["certificateAccount"]
self.mqtt_password = response["data"]["certificatePassword"]
except KeyError as key:
raise Exception(f"Failed to extract key {key} from {response}")

log.info(f"Successfully extracted account: {self.mqtt_username}")

def get_json_response(self, request):
if request.status_code != 200:
raise Exception(f"Got HTTP status code {request.status_code}: {request.text}")

try:
response = json.loads(request.text)
response_message = response["message"]
except KeyError as key:
raise Exception(f"Failed to extract key {key} from {response}")
except Exception as error:
raise Exception(f"Failed to parse response: {request.text} Error: {error}")

if response_message.lower() != "success":
raise Exception(f"{response_message}")

return response


class EcoflowMQTT():

def __init__(self, message_queue, device_sn, username, password, addr, port):
self.message_queue = message_queue
self.addr = addr
self.port = port
self.username = username
self.password = password
self.topic = f"/app/device/property/{device_sn}"

self.client = mqtt.Client(f'python-mqtt-{random.randint(0, 100)}')
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED)
self.client.tls_insecure_set(False)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message

log.info(f"Connecting to MQTT Broker {self.addr}:{self.port}")
self.client.connect(self.addr, self.port)
self.client.loop_start()

def on_connect(self, client, userdata, flags, rc):
match rc:
case 0:
self.client.subscribe(self.topic)
log.info(f"Subscribed to MQTT topic {self.topic}")
case -1:
log.error("Failed to connect to MQTT: connection timed out")
case 1:
log.error("Failed to connect to MQTT: incorrect protocol version")
case 2:
log.error("Failed to connect to MQTT: invalid client identifier")
case 3:
log.error("Failed to connect to MQTT: server unavailable")
case 4:
log.error("Failed to connect to MQTT: bad username or password")
case 5:
log.error("Failed to connect to MQTT: not authorised")
case _:
log.error(f"Failed to connect to MQTT: another error occured: {rc}")

return client

def on_disconnect(self, client, userdata, rc):
if rc != 0:
log.error(f"Unexpected MQTT disconnection: {rc}. Will auto-reconnect")
time.sleep(5)

def on_message(self, client, userdata, message):
self.message_queue.put(message.payload.decode("utf-8"))


class EcoflowMetric:
def __init__(self, ecoflow_payload_key, device_name):
self.ecoflow_payload_key = ecoflow_payload_key
Expand Down Expand Up @@ -58,7 +180,7 @@ def __init__(self, message_queue, device_name, collecting_interval_seconds=5):
self.online = Gauge("ecoflow_online", "1 if device is online", labelnames=["device"])
self.mqtt_messages_receive_total = Counter("ecoflow_mqtt_messages_receive_total", "total MQTT messages", labelnames=["device"])

def run_metrics_loop(self):
def loop(self):
time.sleep(self.collecting_interval_seconds)
while True:
queue_size = self.message_queue.qsize()
Expand All @@ -82,6 +204,8 @@ def run_metrics_loop(self):
try:
payload = json.loads(payload)
params = payload['params']
except KeyError as key:
log.error(f"Failed to extract key {key} from payload: {payload}")
except Exception as error:
log.error(f"Failed to parse MQTT payload: {payload} Error: {error}")
continue
Expand Down Expand Up @@ -124,61 +248,6 @@ def process_payload(self, params):
ac_in_current.set(0)


class EcoflowMQTT():

def __init__(self, message_queue, device_sn, username, password, broker_addr, broker_port):
self.message_queue = message_queue
self.broker_addr = broker_addr
self.broker_port = broker_port
self.username = username
self.password = password
self.topic = f"/app/device/property/{device_sn}"
self.client = mqtt.Client(f'python-mqtt-{random.randint(0, 100)}')

def connect(self):
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED)
self.client.tls_insecure_set(False)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message

log.info(f"Connecting to EcoFlow MQTT Broker {self.broker_addr}:{self.broker_port}")
self.client.connect(self.broker_addr, self.broker_port)
self.client.loop_start()

def on_connect(self, client, userdata, flags, rc):
match rc:
case 0:
log.info(f"Successfully connected to MQTT")
self.client.subscribe(self.topic)
log.debug(f"Subscribed to topic {self.topic}")
case -1:
log.error("Failed to connect to MQTT: connection timed out")
case 1:
log.error("Failed to connect to MQTT: incorrect protocol version")
case 2:
log.error("Failed to connect to MQTT: invalid client identifier")
case 3:
log.error("Failed to connect to MQTT: server unavailable")
case 4:
log.error("Failed to connect to MQTT: bad username or password")
case 5:
log.error("Failed to connect to MQTT: not authorised")
case _:
log.error(f"Failed to connect to MQTT: another error occured: {rc}")

return client

def on_disconnect(self, client, userdata, rc):
if rc != 0:
log.error(f"Unexpected MQTT disconnection: {rc}. Will auto-reconnect")
time.sleep(5)

def on_message(self, client, userdata, message):
self.message_queue.put(message.payload.decode("utf-8"))


def main():
log_level = os.getenv("LOG_LEVEL", "INFO")

Expand All @@ -198,24 +267,27 @@ def main():

device_sn = os.getenv("DEVICE_SN")
device_name = os.getenv("DEVICE_NAME") or device_sn
mqtt_username = os.getenv("MQTT_USERNAME")
mqtt_password = os.getenv("MQTT_PASSWORD")
broker_addr = os.getenv("MQTT_BROKER", "mqtt.ecoflow.com")
broker_port = int(os.getenv("MQTT_PORT", "8883"))
ecoflow_username = os.getenv("ECOFLOW_USERNAME")
ecoflow_password = os.getenv("ECOFLOW_PASSWORD")
exporter_port = int(os.getenv("EXPORTER_PORT", "9090"))

if (not device_sn or not mqtt_username or not mqtt_password):
log.error("Please, provide all required environment variables: DEVICE_SN, MQTT_USERNAME, MQTT_PASSWORD")
if (not device_sn or not ecoflow_username or not ecoflow_password):
log.error("Please, provide all required environment variables: DEVICE_SN, ECOFLOW_USERNAME, ECOFLOW_PASSWORD")
sys.exit(1)

try:
auth = EcoflowAuthentication(ecoflow_username, ecoflow_password)
except Exception as error:
log.error(error)
sys.exit(1)

message_queue = Queue()

ecoflow_mqtt = EcoflowMQTT(message_queue, device_sn, mqtt_username, mqtt_password, broker_addr, broker_port)
ecoflow_mqtt.connect()
EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port)

metrics = Worker(message_queue, device_name)
start_http_server(exporter_port)
metrics.run_metrics_loop()
metrics.loop()


if __name__ == '__main__':
Expand Down
36 changes: 0 additions & 36 deletions get_mqtt_credentials.sh

This file was deleted.

Loading

0 comments on commit e53eb19

Please sign in to comment.