From 3f599b25fe89f1d91b1c24a227fadcd57c2e2e51 Mon Sep 17 00:00:00 2001 From: Fall Byrd Date: Sat, 12 Oct 2024 22:06:37 -0700 Subject: [PATCH] Pushing up code for poking at and debugging --- backend/api/core.py | 5 +- backend/api/routes.py | 14 ++ backend/api/server.py | 35 +++++ backend/hardware/core.py | 2 +- backend/main.py | 124 ++++++++++++++--- backend/microlab/core.py | 254 ++++++++++++++++++++++++++-------- backend/microlab/interface.py | 66 +++++++-- backend/recipes/core.py | 4 +- 8 files changed, 410 insertions(+), 94 deletions(-) create mode 100644 backend/api/server.py diff --git a/backend/api/core.py b/backend/api/core.py index 69758e4..2b9b775 100644 --- a/backend/api/core.py +++ b/backend/api/core.py @@ -5,7 +5,8 @@ import logging import api.routes -from config import microlabConfig as config +from api.server import APIServer +# from config import microlabConfig as config from api.app import app from microlab.interface import MicrolabInterface @@ -18,4 +19,4 @@ def runFlask(in_queue, out_queue): werkzeugLogger.setLevel(logging.WARNING) api.routes.microlabInterface = MicrolabInterface(in_queue, out_queue) - app.run(host="0.0.0.0", port=config.apiPort) + APIServer(app).run() diff --git a/backend/api/routes.py b/backend/api/routes.py index fd08ee3..62059ac 100644 --- a/backend/api/routes.py +++ b/backend/api/routes.py @@ -3,6 +3,7 @@ """ from api.app import app +from api.server import APIServer from flask import jsonify, request, send_file from werkzeug.utils import secure_filename from os.path import join @@ -388,3 +389,16 @@ def fetchLogs(): mostRecent = logFiles[-1] data = data + Path(mostRecent).read_text() return (jsonify({'logs': data}), 200) + + +@app.route('/shutdown', methods=['PUT']) +def shutdown_server(): + """Stops server """ + # At first read this seems excessive to kill the server however after + # digging deeper into it this may be the cleanest way to deal with this. + print('Hit shutdown endpoint') + try: + return (jsonify({'response': 'ok'}), 200) + finally: + print('Calling server shutdown now') + APIServer.shutdown() diff --git a/backend/api/server.py b/backend/api/server.py new file mode 100644 index 0000000..4c15579 --- /dev/null +++ b/backend/api/server.py @@ -0,0 +1,35 @@ +import logging + +from waitress.server import create_server +from flask import Flask + +from config import microlabConfig as config + + +class APIServer: + + _server = None + + def __init__(self, app: Flask): + # self._server = make_server('0.0.0.0', config.apiPort, app) + self._app = app + # self.get_server(app) + # ctx = app.app_context() + # ctx.push() + + def run(self): + logging.info('Starting backend server') + # self._server.serve_forever() + self.get_server(self._app).run() + + @classmethod + def get_server(cls, app: Flask): + if cls._server is None: + cls._server = create_server(app, host='0.0.0.0', port=config.apiPort) + return cls._server + + @classmethod + def shutdown(cls): + logging.info('Shutting down backend server') + cls._server.close() + logging.info('Shutting down backend server complete') diff --git a/backend/hardware/core.py b/backend/hardware/core.py index 9f84fa0..3ba2e99 100644 --- a/backend/hardware/core.py +++ b/backend/hardware/core.py @@ -51,7 +51,7 @@ def get_microlab_hardware_controller(cls): return cls._microlabHardware - def loadHardware(self, deviceDefinition: list[dict]): + def loadHardware(self, deviceDefinition: list[dict]) -> tuple[bool, str]: """ Loads and initializes the hardware devices diff --git a/backend/main.py b/backend/main.py index d77cdd3..68dc3ac 100644 --- a/backend/main.py +++ b/backend/main.py @@ -5,8 +5,12 @@ Look in api.routes for the actual api code """ -from multiprocessing import Process, Queue -from microlab.core import startMicrolabProcess +import signal + +import requests + +from multiprocessing import Process, Queue, Value +from microlab.core import MicrolabHardwareManager from api.core import runFlask import config import multiprocessing_logging @@ -16,6 +20,8 @@ from util.logFormatter import MultiLineFormatter import sys +from hardware.core import MicroLabHardware + def setupLogging(): logHandlers = [] @@ -37,25 +43,101 @@ def setupLogging(): multiprocessing_logging.install_mp_handler() -if __name__ == "__main__": - config.initialSetup() - setupLogging() +class BackendManager: - logging.info("### STARTING MAIN MICROLAB SERVICE ###") + def __init__(self): + self._microlab_manager_should_run = Value('i', 1) + self._q1 = Queue() + self._q2 = Queue() - q1 = Queue() - q2 = Queue() + def _shutdown_flask(self): + shutdown_url = f'http://localhost:{config.microlabConfig.apiPort}/shutdown' + print(f'_shutdown_flask: shutdown_url: {shutdown_url}') + response = requests.put(shutdown_url, timeout=1) + print(f'_shutdown_flask: response code: {response.status_code}') + # try: + # self._flaskProcess.terminate() + # except ValueError as e: + # print(e) + # # When you terminate the process this way it will kill it however it will + # # raise a secondary exception when killing the process, claiming there is no process to kill + # # even though there certainly is. I suspect this may be Flasks behavior in how it handles signals + # pass - microlabProcess = Process( - target=startMicrolabProcess, args=(q1, q2), name="microlab" - ) - microlabProcess.start() - flaskProcess = Process(target=runFlask, args=(q2, q1), name="flask") - flaskProcess.start() - - microlabProcess.join() - flaskProcess.join() - q1.close() - q2.close() - q1.join_thread() - q2.join_thread() + # except AttributeError as e: + # print(e) + # pass + + def _handle_exit_signals(self, signum, frame): + print('in _handle_exit_signals') + self._microlab_manager_should_run.value = 0 + self._shutdown_flask() + # print('in _handle_exit_signals closing queue 1') + # self._q1.close() + # print('in _handle_exit_signals closing queue 2') + # self._q2.close() + # print('in _handle_exit_signals queues closed') + + def run(self): + config.initialSetup() + setupLogging() + + logging.info("### STARTING MAIN MICROLAB SERVICE ###") + + print('MAIN before') + # We're setting up a shared memory value here so that if the main process recieves a signal to terminate + # we can update the value to indicate to the process that it needs to terminate + + self._microlab_hardware = MicroLabHardware.get_microlab_hardware_controller() + # self._microlab_manager = MicrolabHardwareManager( + # self._microlab_hardware, q1, q2, self._microlab_manager_should_run + # ) + self._microlab_manager_process = MicrolabHardwareManager( + self._microlab_hardware, self._q1, self._q2, self._microlab_manager_should_run + ) + print('MAIN before lab start') + # self._microlab_manager_process = Process(target=self._microlab_manager.run, name="microlab") + self._microlab_manager_process.start() + print(f'MAIN self._microlab_manager_process pid: {self._microlab_manager_process.pid}') + + self._flaskProcess = Process(target=runFlask, args=(self._q2, self._q1), name="flask", daemon=True) + print('MAIN before flask start') + self._flaskProcess.start() + + print(f'MAIN self._flaskProcess pid: {self._flaskProcess.pid}') + signal.signal(signal.SIGINT, self._handle_exit_signals) + signal.signal(signal.SIGTERM, self._handle_exit_signals) + + print('MAIN before flask join') + self._flaskProcess.join() + + print('MAIN before lab join') + try: + self._microlab_manager_process.join() + except Exception as e: + # We re-raise any exceptions in execution and if we see them we need to shut down flask + print(f'Hit lab exception: {e}') + self._shutdown_flask() + # raise + import sys + sys.exit(1) + + print('MAIN before q1 close') + self._q1.close() + print('MAIN before q1 join') + self._q1.join_thread() + print('MAIN before q2 close') + self._q2.close() + print('MAIN before q2 join') + self._q2.join_thread() + print('MAIN run done') + # sys.exit(0) + + +def main(): + backend_manager = BackendManager() + backend_manager.run() + + +if __name__ == "__main__": + main() diff --git a/backend/microlab/core.py b/backend/microlab/core.py index 461c570..eb7da84 100644 --- a/backend/microlab/core.py +++ b/backend/microlab/core.py @@ -2,23 +2,161 @@ Contains function for starting up the microlab process """ import logging -import sys import time -import threading +import traceback import signal import hardware.devicelist import recipes.core import recipes.state +# from threading import Thread, Event, Lock +from multiprocessing import Queue, Process +from typing import Optional + from config import microlabConfig as config from hardware.core import MicroLabHardware -HALT = threading.Event() -MUTEX = threading.Lock() +# HALT = Event() +# MUTEX = Lock() + + +class MicrolabHardwareManager(Process): + + def __init__( + self, microlab_hardware: MicroLabHardware, in_queue: Queue, out_queue: Queue, should_run: int, + *args, **kwargs + ): + super().__init__(*args, **kwargs) + self._microlab_hardware: MicroLabHardware = microlab_hardware + self._in_queue = in_queue + self._out_queue = out_queue + # should_run is actually a multiprocessing.Value that is an int, if the main thread recieves a signal + # to exit it will update this shared value. It is typed as an int as opposed to multiprocessing.Value + # because when typed as a 'Value' (by mypy at least) it shows as an invalid type + self._should_run = should_run + + self._command_dict = { + "start": recipes.core.start, + "status": recipes.core.status, + "stop": recipes.core.stop, + "selectOption": recipes.core.selectOption, + "reloadConfig": config.reloadConfig, + "reloadHardware": self._reload_hardware, + } + + self._execution_exception = None + + def _setup_signal_handlers(self): + signal.signal(signal.SIGINT, self._shutdown) + signal.signal(signal.SIGTERM, self._shutdown) + + def _shutdown(self, signum, frame): + logging.info('Begining microlab shutdown process.') + self._should_run = 0 + + def _cleanup(self): + logging.info("") + logging.info("Shutting down microlab.") + self._microlab_hardware.turnOffEverything() + logging.info("Shutdown completed.") + # import sys + # sys.exit() + # self._out_queue.close() + + def _run_command(self, command_string: str, command_args: Optional[str]) -> Optional[str]: + result = None + command = self._command_dict[command_string] + + if command_args: + result = command(command_args) + else: + result = command() + + return result + + def _update_queue_data(self): + if not self._in_queue.empty(): + data = self._in_queue.get(timeout=5) # Receive data + # status just fetches data and so doesn't need a lock, everything + # else is a mutation and needs a lock to prevent conflicts with + # the other thread + result = self._run_command(data["command"], data.get("args", None)) + if result is not None: + self._out_queue.put(result, timeout=5) # Send data back + + # while True: + # time.sleep(0.01) + # if not self._in_queue.empty(): + # data = self._in_queue.get() # Receive data + # # status just fetches data and so doesn't need a lock, everything + # # else is a mutation and needs a lock to prevent conflicts with + # # the other thread + # if data["command"] == "status": + # result = self._command_dict[data["command"]](data["args"]) + # else: + # with self._lock: + # if data["args"]: + # result = self._command_dict[data["command"]](data["args"]) + # else: + # result = self._command_dict[data["command"]] + + # if result is not None: + # self._out_queue.put(result) # Send data back + + def _update_microlab(self): + if recipes.state.currentRecipe: + recipes.state.currentRecipe.tickTasks() + recipes.state.currentRecipe.checkStepCompletion() + + # if HALT.is_set(): + # self._microlab_hardware.turnOffEverything() + # break + + def _reload_hardware(self): + logging.info("Reloading microlab device configuration") + hardwareConfig = hardware.devicelist.loadHardwareConfiguration() + deviceDefinitions = hardwareConfig['devices'] + return self._microlab_hardware.loadHardware(deviceDefinitions) + + def run(self): + # We setup signal handlers here as this is what is the target of (multiprocessing) Process.run. + # If signal handlers are setup before existing in their own process they will be setup for the + # base context *and* be inherited by the new Process being started + self._setup_signal_handlers() + + # Any non-zero value will evalutate to True, self._should_run can be changed by either + # the spawning process recieving an exit signal and updating the value (it is a multiprocess.Value), + # or this process recieving an exit signal directly. This way we should make a best effort to shutdown + # the associated hardware + while self._should_run: + # print('run loop pre sleep') + time.sleep(0.01) + try: + # print('run loop pre _update_queue_data') + self._update_queue_data() + # print('run loop pre _update_microlab') + self._update_microlab() + # print('run loop post _update_microlab') + except Exception as e: + self._execution_exception = e + logging.error(f'While running microlab hardware encountered exception: {e}. Shutting down microlab.') + logging.debug(traceback.print_exc()) + break -def startMicrolabProcess(in_queue, out_queue): + # print('run loop pre _cleanup') + self._cleanup() + # print('run loop post _cleanup') + + # def join(self, *args, **kwargs): + # super().join(*args, **kwargs) + + # if self._execution_exception: + # raise self._execution_exception + + +# def startMicrolabProcess(in_queue, out_queue): """ Starts up the microlab process @@ -73,62 +211,62 @@ def startMicrolabProcess(in_queue, out_queue): (False, message) on failure. reference "selectOption" in /recipes/__init__.py for more info """ - microlabHardware = MicroLabHardware.get_microlab_hardware_controller() + # microlabHardware = MicroLabHardware.get_microlab_hardware_controller() - def runMicrolab(): - while True: - time.sleep(0.01) - MUTEX.acquire() - if recipes.state.currentRecipe: - recipes.state.currentRecipe.tickTasks() - recipes.state.currentRecipe.checkStepCompletion() - MUTEX.release() + # def runMicrolab(): + # while True: + # time.sleep(0.01) + # MUTEX.acquire() + # if recipes.state.currentRecipe: + # recipes.state.currentRecipe.tickTasks() + # recipes.state.currentRecipe.checkStepCompletion() + # MUTEX.release() - if HALT.is_set(): - microlabHardware.turnOffEverything() - break + # if HALT.is_set(): + # microlabHardware.turnOffEverything() + # break - microlab = threading.Thread(target=runMicrolab) - microlab.start() + # microlab = threading.Thread(target=runMicrolab) + # microlab.start() - def handleSignal(_a, _b): - logging.info("") - logging.info("Shutting down microlab.") - HALT.set() - microlab.join() - logging.info("Shutdown completed.") - sys.exit() + # def handleSignal(_a, _b): + # logging.info("") + # logging.info("Shutting down microlab.") + # HALT.set() + # microlab.join() + # logging.info("Shutdown completed.") + # sys.exit() - signal.signal(signal.SIGINT, handleSignal) - signal.signal(signal.SIGTERM, handleSignal) + # signal.signal(signal.SIGINT, handleSignal) + # signal.signal(signal.SIGTERM, handleSignal) - def reloadHardware(): - logging.info("Reloading microlab device configuration") - hardwareConfig = hardware.devicelist.loadHardwareConfiguration() - deviceDefinitions = hardwareConfig['devices'] - return microlabHardware.loadHardware(deviceDefinitions) - - commandDict = { - "start": recipes.core.start, - "status": recipes.core.status, - "stop": recipes.core.stop, - "selectOption": recipes.core.selectOption, - "reloadConfig": lambda x: config.reloadConfig(), - "reloadHardware": lambda x: reloadHardware(), - } - - while True: - time.sleep(0.01) - if not in_queue.empty(): - data = in_queue.get() # Receive data - # status just fetches data and so doesn't need a lock, everything - # else is a mutation and needs a lock to prevent conflicts with - # the other thread - if data["command"] == "status": - result = commandDict[data["command"]](data["args"]) - else: - MUTEX.acquire() - result = commandDict[data["command"]](data["args"]) - MUTEX.release() - if result is not None: - out_queue.put(result) # Send data back + # def reloadHardware(): + # logging.info("Reloading microlab device configuration") + # hardwareConfig = hardware.devicelist.loadHardwareConfiguration() + # deviceDefinitions = hardwareConfig['devices'] + # return microlabHardware.loadHardware(deviceDefinitions) + + # commandDict = { + # "start": recipes.core.start, + # "status": recipes.core.status, + # "stop": recipes.core.stop, + # "selectOption": recipes.core.selectOption, + # "reloadConfig": lambda x: config.reloadConfig(), + # "reloadHardware": lambda x: reloadHardware(), + # } + + # while True: + # time.sleep(0.01) + # if not in_queue.empty(): + # data = in_queue.get() # Receive data + # # status just fetches data and so doesn't need a lock, everything + # # else is a mutation and needs a lock to prevent conflicts with + # # the other thread + # if data["command"] == "status": + # result = commandDict[data["command"]](data["args"]) + # else: + # MUTEX.acquire() + # result = commandDict[data["command"]](data["args"]) + # MUTEX.release() + # if result is not None: + # out_queue.put(result) # Send data back diff --git a/backend/microlab/interface.py b/backend/microlab/interface.py index 214e9c8..659aa5c 100644 --- a/backend/microlab/interface.py +++ b/backend/microlab/interface.py @@ -1,3 +1,4 @@ +from queue import Full, Empty class MicrolabInterface: @@ -28,9 +29,15 @@ def start(self, name): (False, message) on failure. """ # Validate that the microlab hardware controller has initialized - self.toMicrolab.put({"command": "start", "args": name}) + print('MicrolabInterface start pre-put:') + self.toMicrolab.put({"command": "start", "args": name}, timeout=1) + print('MicrolabInterface start post-put:') - return self.fromMicrolab.get() + print('MicrolabInterface start pre-get:') + get_val = self.fromMicrolab.get(timeout=1) + print('MicrolabInterface start post-get:') + + return get_val def status(self): """ @@ -63,8 +70,33 @@ def status(self): An ISO date string for when the current step is expected to be completed, or null if unknown. """ - self.toMicrolab.put({"command": "status", "args": None}) - res = self.fromMicrolab.get() + # self.toMicrolab.put({"command": "status", "args": None}) + # res = self.fromMicrolab.get() + print('MicrolabInterface status pre-put:') + try: + self.toMicrolab.put({"command": "status", "args": None}, timeout=1) + except Full: + # Unable to put + pass + + except ValueError: + # Queue has been closed + pass + print('MicrolabInterface status post-put:') + + res = 'Unable to get status data' + print('MicrolabInterface status pre-get:') + try: + res = self.fromMicrolab.get(timeout=1) + except Empty: + # Unable to get + pass + + except ValueError: + # Queue has been closed + pass + print('MicrolabInterface status post-get:') + return res def stop(self): @@ -74,7 +106,9 @@ def stop(self): :return: None ... at least for now. """ - self.toMicrolab.put({"command": "stop", "args": None}) + print('MicrolabInterface stop pre-put:') + self.toMicrolab.put({"command": "stop", "args": None}, timeout=1) + print('MicrolabInterface stop post-put:') def selectOption(self, option): """ @@ -88,8 +122,13 @@ def selectOption(self, option): (True,'') on success (False,message) on failure """ - self.toMicrolab.put({"command": "selectOption", "args": option}) - res = self.fromMicrolab.get() + print('MicrolabInterface selectOption pre-put:') + self.toMicrolab.put({"command": "selectOption", "args": option}, timeout=1) + print('MicrolabInterface selectOption post-put:') + + print('MicrolabInterface selectOption pre-get:') + res = self.fromMicrolab.get(timeout=1) + print('MicrolabInterface selectOption post-get:') return res def reloadConfig(self): @@ -99,7 +138,9 @@ def reloadConfig(self): :return: None """ - self.toMicrolab.put({"command": "reloadConfig", "args": None}) + print('MicrolabInterface reloadConfig pre-put:') + self.toMicrolab.put({"command": "reloadConfig", "args": None}, timeout=1) + print('MicrolabInterface reloadConfig post-put:') def reloadHardware(self): """ @@ -109,6 +150,11 @@ def reloadHardware(self): (True, '') on success (False, message) on failure """ - self.toMicrolab.put({"command": "reloadHardware", "args": None}) - res = self.fromMicrolab.get() + print('MicrolabInterface reloadHardware pre-put:') + self.toMicrolab.put({"command": "reloadHardware", "args": None}, timeout=1) + print('MicrolabInterface reloadHardware post-put:') + + print('MicrolabInterface reloadHardware pre-get:') + res = self.fromMicrolab.get(timeout=1) + print('MicrolabInterface reloadHardware post-get:') return res diff --git a/backend/recipes/core.py b/backend/recipes/core.py index e17f739..88a5ad8 100644 --- a/backend/recipes/core.py +++ b/backend/recipes/core.py @@ -89,7 +89,7 @@ def start(name): return True, '' -def status(_): +def status(): """ Get the status of the machine. :return: @@ -151,7 +151,7 @@ def status(_): return message -def stop(_): +def stop(): """ Stop the currently running recipe.