diff --git a/assume/common/outputs.py b/assume/common/outputs.py index f441b1cb..04ea6f40 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -4,6 +4,7 @@ import logging import shutil +import sys from collections import defaultdict from datetime import datetime from multiprocessing import Lock @@ -94,6 +95,9 @@ def __init__( # construct all timeframe under which hourly values are written to excel and db self.start = start self.end = end + self.current_size_bytes = 0 + self.max_size_mb = 300 + # initializes dfs for storing and writing asynchronous self.write_dfs: dict = defaultdict(list) self.locks = defaultdict(lambda: Lock()) @@ -210,30 +214,38 @@ def handle_output_message(self, content: dict, meta: MetaDict): content (dict): The content of the message. meta (MetaDict): The metadata associated with the message. """ + content_data = content.get("data") if content.get("type") == "store_order_book": - self.write_market_orders(content.get("data"), content.get("market_id")) + self.write_market_orders(content_data, content.get("market_id")) elif content.get("type") == "store_market_results": - self.write_market_results(content.get("data")) + self.write_market_results(content_data) elif content.get("type") == "store_units": - self.write_units_definition(content.get("data")) + self.write_units_definition(content_data) elif content.get("type") == "market_dispatch": - self.write_market_dispatch(content.get("data")) + self.write_market_dispatch(content_data) elif content.get("type") == "unit_dispatch": - self.write_unit_dispatch(content.get("data")) + self.write_unit_dispatch(content_data) elif content.get("type") == "rl_learning_params": - self.write_rl_params(content.get("data")) + self.write_rl_params(content_data) elif content.get("type") == "grid_topology": - self.store_grid(content.get("data"), content.get("market_id")) + self.store_grid(content_data, content.get("market_id")) elif content.get("type") == "store_flows": - self.write_flows(content.get("data")) + self.write_flows(content_data) + + # # keep track of the memory usage of the data + self.current_size_bytes += self.calculate_content_size(content_data) + # if the current size is larger than self.max_size_mb, store the data + if self.current_size_bytes > self.max_size_mb * 1024 * 1024: + logger.debug("storing output data due to size limit") + self.context.schedule_instant_task(coroutine=self.store_dfs()) def write_rl_params(self, rl_params: dict): """ @@ -311,6 +323,8 @@ async def store_dfs(self): self.write_dfs[table] = [] + self.current_size_bytes = 0 + def store_grid( self, grid: dict[str, pd.DataFrame], @@ -614,3 +628,14 @@ def write_flows(self, data: dict[tuple[datetime, str], float]): with self.locks["flows"]: self.write_dfs["flows"].append(df) + + def calculate_content_size(self, content): + if isinstance(content, dict): # For dictionaries + return sys.getsizeof(content) + sum( + sys.getsizeof(value) for value in content.values() + ) + elif isinstance(content, list): # For lists, including lists of dicts + return sys.getsizeof(content) + sum( + self.calculate_content_size(item) for item in content + ) + return sys.getsizeof(content)