Skip to content

Commit

Permalink
Performance optimizations and tweaks
Browse files Browse the repository at this point in the history
- GTNET-SKT: Don't log every GTNET-SKT tag update
- GTNET-SKT: Log all GTNET-SKT values, only when debugging
- GTNET-SKT: calculate all the typecasts, THEN update the state all at once (instead of taking the lock 20 times and incrementally updating the state...)
- Move a lot of logging to if statements so we avoid function call
- asynchronous logging so log calls don't block threads (especially for file writes)
  • Loading branch information
GhostofGoes committed Apr 22, 2024
1 parent 705cce3 commit 72bfbf3
Showing 1 changed file with 44 additions and 17 deletions.
61 changes: 44 additions & 17 deletions src/pybennu/pybennu/providers/power/solvers/rtds.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
from configparser import ConfigParser
from datetime import datetime, timezone
from io import TextIOWrapper
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from pprint import pformat
from subprocess import check_output
Expand Down Expand Up @@ -516,12 +517,30 @@ def __init__(
if len(self.gtnet_skt_tag_names) > 30: # max of 30 data points per channel, 10 channels total for GTNET card
raise ValueError(f"maximum of 30 points allowed per GTNET-SKT channel, {len(self.gtnet_skt_tag_names)} are defined in config")

# asynchronous logging. Log messages are placed into a queue,
# which is serviced by a thread that runs each handler with
# messages from the queue. This isn't needed as much for stdout,
# but if we enable logging to file or elastic then it will be.
self.log_queue = queue.Queue()
queue_handler = QueueHandler(self.log_queue)

formatter = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG if self.debug else logging.INFO)
queue_listener = QueueListener(self.log_queue, console_handler, respect_handler_level=True)
queue_listener.start()

# Configure logging to stdout (includes debug messages from pypmu)
logging.basicConfig(
level=logging.DEBUG if self.debug else logging.INFO,
format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout
handlers=[queue_handler],
# format="%(asctime)s.%(msecs)03d [%(levelname)s] (%(name)s) %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S",
# stream=sys.stdout
)
self.log = logging.getLogger(self.__class__.__name__)
self.log.setLevel(logging.DEBUG if self.debug else logging.INFO)
Expand All @@ -533,7 +552,6 @@ def __init__(
self.log.warning("No limit set in 'csv-max-files', CSV files won't be cleaned up and could fill all available disk space")

# Get bennu version
# TODO: this is broken
self.bennu_version = "unknown"
try:
apt_result = check_output("apt-cache show bennu", shell=True).decode()
Expand Down Expand Up @@ -589,7 +607,7 @@ def __init__(
# Allow gtnet-skt fields to be read
self.current_values.update(self.gtnet_skt_state)

# !! Hack !!
# Hack
# TODO: rotate out values to keep this from eating up memory if running for a long time
self.time_map = {}

Expand Down Expand Up @@ -1040,7 +1058,8 @@ def query(self) -> str:
BUS6_VA.real
BUS6_VA.angle
"""
self.log.debug("Processing query request")
if self.debug:
self.log.debug("Processing query request")

if not self.current_values:
msg = "ERR=No data points have been read yet from the RTDS"
Expand All @@ -1055,7 +1074,8 @@ def query(self) -> str:
return msg

def read(self, tag: str) -> str:
self.log.debug(f"Processing read request for tag '{tag}'")
if self.debug:
self.log.debug(f"Processing read request for tag '{tag}'")

if not self.current_values:
msg = "ERR=Data points have not been initialized yet from the RTDS"
Expand All @@ -1073,7 +1093,8 @@ def read(self, tag: str) -> str:
return msg

def write(self, tags: dict) -> str:
self.log.debug(f"Processing write request for tags: {tags}")
if self.debug:
self.log.debug(f"Processing write request for tags: {tags}")

if not tags:
msg = "ERR=No tags provided for write to RTDS"
Expand All @@ -1099,7 +1120,8 @@ def write(self, tags: dict) -> str:
self.log.error(f"{msg} (tags being written: {tags})")
return msg

# Update the state tracker with the values from the tags being written
# Typecast values (e.g. bool to 0/1)
typecasted = {}
for tag, val in tags.items():
# Validate types match what's in config, if they don't, then warn and typecast
# NOTE: these will be strings if coming from pybennu-probe
Expand All @@ -1116,15 +1138,18 @@ def write(self, tags: dict) -> str:
val = "1"

if self.gtnet_skt_tags[tag] == "int":
typecasted_value = int(val)
typecasted[tag] = int(val)
else:
typecasted_value = float(val)
typecasted[tag] = float(val)

# NOTE: don't need to sort incoming values, since they're updating the
# dict (gtnet_skt_state), which is already in the proper order.
self.log.info(f"Updating GTNET-SKT tag '{tag}' to {typecasted_value} (previous value: {self.gtnet_skt_state[tag]})")
with self.__gtnet_lock:
self.gtnet_skt_state[tag] = typecasted_value
if self.debug:
self.log.debug(f"Updating GTNET-SKT tags to: {typecasted}")

# Update the state tracker with the values from the tags being written.
# NOTE: don't need to sort incoming values, since they're updating the
# gtnet_skt_state dict, which is already in the proper order.
with self.__gtnet_lock:
self.gtnet_skt_state.update(typecasted)

# If TCP, build and send, and retry connection if it fails
# For UDP, the writer thread will handle sending the updated values
Expand Down Expand Up @@ -1152,7 +1177,9 @@ def write(self, tags: dict) -> str:
self.current_values.update(self.gtnet_skt_state)

msg = f"ACK=Wrote {len(tags)} tags to RTDS via GTNET-SKT"
self.log.debug(msg)

if self.debug:
self.log.debug(msg)

return msg

Expand Down

0 comments on commit 72bfbf3

Please sign in to comment.