diff --git a/Pipfile b/Pipfile index 0f921336..3de0d303 100644 --- a/Pipfile +++ b/Pipfile @@ -6,10 +6,6 @@ name = "pypi" [packages] bitmath = "~=1.3" certifi = "==2023.7.22" -# The latest connexion is 2.14.2 which requires Flask < 2.3. -# So the latest Flask we can install is 2.2.5. (If you install 2.3.0 you'll -# get `AttributeError: module 'flask.json' has no attribute 'JSONEncoder'` -# b/c Flask 2.3.0 removed JSONEncoder. "connexion[swagger-ui]" = "~=2.14" click = "~=8.1" crate = "~=0.22" @@ -28,9 +24,9 @@ redis = "~=4.6" requests = "~=2.31" rq = "~=1.8" geopy = "~=2.2.0" +flask-mqtt = "*" [dev-packages] -# run `pipenv install --dev` to get the packages below in your env aiohttp = "~=3.8" backoff = "~=1.1" matplotlib = "~=3.3" diff --git a/src/server/mqtt_client.py b/src/server/mqtt_client.py new file mode 100644 index 00000000..f3998c33 --- /dev/null +++ b/src/server/mqtt_client.py @@ -0,0 +1,52 @@ +import logging +from utils.cfgreader import EnvReader, BoolVar +from flask_mqtt import Mqtt +import json +import requests + +class MqttConfig: + def __init__(self): + pass + + def if_mqtt_enabled(self) -> bool: + env_var = BoolVar('USE_MQTT', False) + return EnvReader().safe_read(env_var) + +def run_if_enabled(application, host, port, username, password, keepalive, tls, topic, ql_host, ql_port): + application.config['MQTT_BROKER_URL'] = host + application.config['MQTT_BROKER_PORT'] = port + application.config['MQTT_USERNAME'] = username + application.config['MQTT_PASSWORD'] = password + application.config['MQTT_KEEPALIVE'] = keepalive + application.config['MQTT_TLS_ENABLED'] = tls + topic = topic + + mqtt_client = Mqtt(application) + + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + @mqtt_client.on_connect() + def handle_connect(client, userdata, flags, rc): + if rc == 0: + logger.info('MQTT Connected successfully') + mqtt_client.subscribe(topic) # subscribe topic + else: + logger.info('Bad connection. Code:', rc) + + @mqtt_client.on_message() + def handle_mqtt_message(client, userdata, message): + data = dict( + topic=message.topic, + payload=message.payload.decode() + ) + logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) + try: + payload = json.loads(message.payload) + except ValueError: + payload = None + + if payload: + url = f'http://{ql_host}:{ql_port}/v2/notify' + headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} + r = requests.post(url, data=json.dumps(payload), headers=headers) diff --git a/src/server/wsgi.py b/src/server/wsgi.py index 64af6d61..bab29f08 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -1,6 +1,7 @@ from connexion import FlaskApp import logging import server +from server.mqtt_client import MqttConfig, run_if_enabled from utils.cfgreader import EnvReader, BoolVar from flask.logging import default_handler @@ -36,49 +37,9 @@ def new_wrapper() -> FlaskApp: application = quantumleap.app -def use_mqtt() -> bool: - env_var = BoolVar('USE_MQTT', False) - print(EnvReader().safe_read(env_var)) - return EnvReader().safe_read(env_var) - -if use_mqtt(): - application.config['MQTT_BROKER_URL'] = server.MQTT_HOST - application.config['MQTT_BROKER_PORT'] = server.MQTT_PORT - application.config['MQTT_USERNAME'] = server.MQTT_USERNAME - application.config['MQTT_PASSWORD'] = server.MQTT_PASSWORD - application.config['MQTT_KEEPALIVE'] = server.MQTT_KEEPALIVE - application.config['MQTT_TLS_ENABLED'] = server.MQTT_TLS_ENABLED - topic = server.MQTT_TOPIC - - mqtt_client = Mqtt(application) - - logger = logging.getLogger(__name__) - logger.setLevel(logging.INFO) - - @mqtt_client.on_connect() - def handle_connect(client, userdata, flags, rc): - if rc == 0: - logger.info('MQTT Connected successfully') - mqtt_client.subscribe(topic) # subscribe topic - else: - logger.info('Bad connection. Code:', rc) - - @mqtt_client.on_message() - def handle_mqtt_message(client, userdata, message): - data = dict( - topic=message.topic, - payload=message.payload.decode() - ) - logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) - try: - payload = json.loads(message.payload) - except ValueError: - payload = None - - if payload: - url = f'http://{server.DEFAULT_HOST}:{server.DEFAULT_PORT}/v2/notify' - headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} - r = requests.post(url, data=json.dumps(payload), headers=headers) +mqttConfig = MqttConfig() +if mqttConfig.if_mqtt_enabled(): + run_if_enabled(application, server.MQTT_HOST, server.MQTT_PORT, server.MQTT_USERNAME, server.MQTT_PASSWORD, server.MQTT_KEEPALIVE, server.MQTT_TLS_ENABLED, server.MQTT_TOPIC, server.DEFAULT_HOST, server.DEFAULT_PORT) """