Skip to content

Commit

Permalink
-add max size check to the output role
Browse files Browse the repository at this point in the history
-save dfs when size limit is reached
  • Loading branch information
nick-harder committed Nov 25, 2024
1 parent 6e32a59 commit 3cc85e0
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import shutil
import sys
from collections import defaultdict
from datetime import datetime
from multiprocessing import Lock
Expand Down Expand Up @@ -94,6 +95,9 @@ def __init__(
# construct all timeframe under which hourly values are written to excel and db
self.start = start
self.end = end
self.current_size_bytes = 0
self.max_size_mb = 300

# initializes dfs for storing and writing asynchronous
self.write_dfs: dict = defaultdict(list)
self.locks = defaultdict(lambda: Lock())
Expand Down Expand Up @@ -210,30 +214,38 @@ def handle_output_message(self, content: dict, meta: MetaDict):
content (dict): The content of the message.
meta (MetaDict): The metadata associated with the message.
"""
content_data = content.get("data")

if content.get("type") == "store_order_book":
self.write_market_orders(content.get("data"), content.get("market_id"))
self.write_market_orders(content_data, content.get("market_id"))

elif content.get("type") == "store_market_results":
self.write_market_results(content.get("data"))
self.write_market_results(content_data)

elif content.get("type") == "store_units":
self.write_units_definition(content.get("data"))
self.write_units_definition(content_data)

elif content.get("type") == "market_dispatch":
self.write_market_dispatch(content.get("data"))
self.write_market_dispatch(content_data)

elif content.get("type") == "unit_dispatch":
self.write_unit_dispatch(content.get("data"))
self.write_unit_dispatch(content_data)

elif content.get("type") == "rl_learning_params":
self.write_rl_params(content.get("data"))
self.write_rl_params(content_data)

elif content.get("type") == "grid_topology":
self.store_grid(content.get("data"), content.get("market_id"))
self.store_grid(content_data, content.get("market_id"))

elif content.get("type") == "store_flows":
self.write_flows(content.get("data"))
self.write_flows(content_data)

# # keep track of the memory usage of the data
self.current_size_bytes += self.calculate_content_size(content_data)
# if the current size is larger than self.max_size_mb, store the data
if self.current_size_bytes > self.max_size_mb * 1024 * 1024:
logger.debug("storing output data due to size limit")
self.context.schedule_instant_task(coroutine=self.store_dfs())

def write_rl_params(self, rl_params: dict):
"""
Expand Down Expand Up @@ -311,6 +323,8 @@ async def store_dfs(self):

self.write_dfs[table] = []

self.current_size_bytes = 0

def store_grid(
self,
grid: dict[str, pd.DataFrame],
Expand Down Expand Up @@ -614,3 +628,14 @@ def write_flows(self, data: dict[tuple[datetime, str], float]):

with self.locks["flows"]:
self.write_dfs["flows"].append(df)

def calculate_content_size(self, content):
if isinstance(content, dict): # For dictionaries
return sys.getsizeof(content) + sum(
sys.getsizeof(value) for value in content.values()
)
elif isinstance(content, list): # For lists, including lists of dicts
return sys.getsizeof(content) + sum(
self.calculate_content_size(item) for item in content
)
return sys.getsizeof(content)

0 comments on commit 3cc85e0

Please sign in to comment.