From 72bfbf307fa9dd17582af9f566ae4af321e26ce4 Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Mon, 22 Apr 2024 12:14:46 -0600 Subject: [PATCH] Performance optimizations and tweaks - GTNET-SKT: Don't log every GTNET-SKT tag update - GTNET-SKT: Log all GTNET-SKT values, only when debugging - GTNET-SKT: calculate all the typecasts, THEN update the state all at once (instead of taking the lock 20 times and incrementally updating the state...) - Move a lot of logging to if statements so we avoid function call - asynchronous logging so log calls don't block threads (especially for file writes) --- .../pybennu/providers/power/solvers/rtds.py | 61 +++++++++++++------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/src/pybennu/pybennu/providers/power/solvers/rtds.py b/src/pybennu/pybennu/providers/power/solvers/rtds.py index d4a51c7..57e3c0b 100644 --- a/src/pybennu/pybennu/providers/power/solvers/rtds.py +++ b/src/pybennu/pybennu/providers/power/solvers/rtds.py @@ -138,6 +138,7 @@ from configparser import ConfigParser from datetime import datetime, timezone from io import TextIOWrapper +from logging.handlers import QueueHandler, QueueListener from pathlib import Path from pprint import pformat from subprocess import check_output @@ -516,12 +517,30 @@ def __init__( if len(self.gtnet_skt_tag_names) > 30: # max of 30 data points per channel, 10 channels total for GTNET card raise ValueError(f"maximum of 30 points allowed per GTNET-SKT channel, {len(self.gtnet_skt_tag_names)} are defined in config") + # asynchronous logging. Log messages are placed into a queue, + # which is serviced by a thread that runs each handler with + # messages from the queue. This isn't needed as much for stdout, + # but if we enable logging to file or elastic then it will be. + self.log_queue = queue.Queue() + queue_handler = QueueHandler(self.log_queue) + + formatter = logging.Formatter( + fmt="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + console_handler.setLevel(logging.DEBUG if self.debug else logging.INFO) + queue_listener = QueueListener(self.log_queue, console_handler, respect_handler_level=True) + queue_listener.start() + # Configure logging to stdout (includes debug messages from pypmu) logging.basicConfig( level=logging.DEBUG if self.debug else logging.INFO, - format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - stream=sys.stdout + handlers=[queue_handler], + # format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s", + # datefmt="%Y-%m-%d %H:%M:%S", + # stream=sys.stdout ) self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(logging.DEBUG if self.debug else logging.INFO) @@ -533,7 +552,6 @@ def __init__( self.log.warning("No limit set in 'csv-max-files', CSV files won't be cleaned up and could fill all available disk space") # Get bennu version - # TODO: this is broken self.bennu_version = "unknown" try: apt_result = check_output("apt-cache show bennu", shell=True).decode() @@ -589,7 +607,7 @@ def __init__( # Allow gtnet-skt fields to be read self.current_values.update(self.gtnet_skt_state) - # !! Hack !! + # Hack # TODO: rotate out values to keep this from eating up memory if running for a long time self.time_map = {} @@ -1040,7 +1058,8 @@ def query(self) -> str: BUS6_VA.real BUS6_VA.angle """ - self.log.debug("Processing query request") + if self.debug: + self.log.debug("Processing query request") if not self.current_values: msg = "ERR=No data points have been read yet from the RTDS" @@ -1055,7 +1074,8 @@ def query(self) -> str: return msg def read(self, tag: str) -> str: - self.log.debug(f"Processing read request for tag '{tag}'") + if self.debug: + self.log.debug(f"Processing read request for tag '{tag}'") if not self.current_values: msg = "ERR=Data points have not been initialized yet from the RTDS" @@ -1073,7 +1093,8 @@ def read(self, tag: str) -> str: return msg def write(self, tags: dict) -> str: - self.log.debug(f"Processing write request for tags: {tags}") + if self.debug: + self.log.debug(f"Processing write request for tags: {tags}") if not tags: msg = "ERR=No tags provided for write to RTDS" @@ -1099,7 +1120,8 @@ def write(self, tags: dict) -> str: self.log.error(f"{msg} (tags being written: {tags})") return msg - # Update the state tracker with the values from the tags being written + # Typecast values (e.g. bool to 0/1) + typecasted = {} for tag, val in tags.items(): # Validate types match what's in config, if they don't, then warn and typecast # NOTE: these will be strings if coming from pybennu-probe @@ -1116,15 +1138,18 @@ def write(self, tags: dict) -> str: val = "1" if self.gtnet_skt_tags[tag] == "int": - typecasted_value = int(val) + typecasted[tag] = int(val) else: - typecasted_value = float(val) + typecasted[tag] = float(val) - # NOTE: don't need to sort incoming values, since they're updating the - # dict (gtnet_skt_state), which is already in the proper order. - self.log.info(f"Updating GTNET-SKT tag '{tag}' to {typecasted_value} (previous value: {self.gtnet_skt_state[tag]})") - with self.__gtnet_lock: - self.gtnet_skt_state[tag] = typecasted_value + if self.debug: + self.log.debug(f"Updating GTNET-SKT tags to: {typecasted}") + + # Update the state tracker with the values from the tags being written. + # NOTE: don't need to sort incoming values, since they're updating the + # gtnet_skt_state dict, which is already in the proper order. + with self.__gtnet_lock: + self.gtnet_skt_state.update(typecasted) # If TCP, build and send, and retry connection if it fails # For UDP, the writer thread will handle sending the updated values @@ -1152,7 +1177,9 @@ def write(self, tags: dict) -> str: self.current_values.update(self.gtnet_skt_state) msg = f"ACK=Wrote {len(tags)} tags to RTDS via GTNET-SKT" - self.log.debug(msg) + + if self.debug: + self.log.debug(msg) return msg