diff --git a/assume/common/outputs.py b/assume/common/outputs.py index 10c40ed5..72dffa8a 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -105,12 +105,9 @@ def __init__( self.current_dfs_size = 0 # initializes dfs for storing and writing asynchronous - self.write_dfs: dict = defaultdict(list) + self.write_buffers: dict = defaultdict(list) self.locks = defaultdict(lambda: Lock()) - # Buffers for batching - self.buffers = defaultdict(list) - self.kpi_defs: dict[str, OutputDef] = { "avg_price": { "value": "avg(price)", @@ -224,30 +221,30 @@ def handle_output_message(self, content: dict, meta: MetaDict): meta (MetaDict): The metadata associated with the message. """ content_data = content.get("data") + content_type = content.get("type") + market_id = content.get("market_id") - if content.get("type") == "store_order_book": - self.write_market_orders(content_data, content.get("market_id")) - - elif content.get("type") == "store_market_results": - self.buffers["market_results"].extend(content_data) - - elif content.get("type") == "market_dispatch": - self.buffers["market_dispatch"].extend(content_data) - - elif content.get("type") == "unit_dispatch": - self.buffers["unit_dispatch"].extend(content_data) - - elif content.get("type") == "rl_learning_params": - self.buffers["rl_learning_params"].extend(content_data) - - elif content.get("type") == "store_flows": - self.write_flows(content_data) - - elif content.get("type") == "store_units": - self.write_units_definition(content_data) + if not content_data: + return - elif content.get("type") == "grid_topology": - self.store_grid(content_data, content.get("market_id")) + if content_type in [ + "market_meta", + "market_dispatch", + "unit_dispatch", + "rl_learning_params", + ]: + # these can be processed as a single dataframe + self.write_buffers[content_type].extend(content_data) + elif content_type == "store_units": + table_name = content_data["unit_type"] + "_meta" + self.write_buffers[table_name].append(content_data) + + elif content_type == "grid_flows": + # these need to be converted to df individually + self.write_buffers[content_type].append(content_data) + elif content_type in ["market_orders", "grid_topology"]: + # here we need an additional market_id + self.write_buffers[content_type].append((content_data, market_id)) # keep track of the memory usage of the data self.current_dfs_size += calculate_content_size(content_data) @@ -256,9 +253,9 @@ def handle_output_message(self, content: dict, meta: MetaDict): logger.debug("storing output data due to size limit") self.context.schedule_instant_task(coroutine=self.store_dfs()) - def write_rl_params(self, rl_params: list[dict]): + def convert_rl_params(self, rl_params: list[dict]): """ - Writes the RL parameters such as reward, regret, and profit to the corresponding data frame. + Convert the RL parameters such as reward, regret, and profit to a dataframe. Args: rl_params (dict): The RL parameters. @@ -270,11 +267,11 @@ def write_rl_params(self, rl_params: list[dict]): df["perform_evaluation"] = self.perform_evaluation df["episode"] = self.episode - self.write_dfs["rl_params"].append(df) + return df - def write_market_results(self, market_results: list[dict]): + def convert_market_results(self, market_results: list[dict]): """ - Writes market results to the corresponding data frame. + Convert market results to a dataframe. Args: market_meta (dict): The market metadata, which includes the clearing price and volume. @@ -284,36 +281,7 @@ def write_market_results(self, market_results: list[dict]): df = pd.DataFrame(market_results) df["simulation"] = self.simulation_id - self.write_dfs["market_meta"].append(df) - - def process_buffers(self): - """ - Processes all message buffers in a batch manner, ensuring efficient handling of all data types. - """ - for message_type, buffer in self.buffers.items(): - if not buffer: - continue - - # Process each message type in bulk - if message_type == "market_dispatch": - self.write_market_dispatch(buffer) - elif message_type == "unit_dispatch": - self.write_unit_dispatch(buffer) - elif message_type == "store_market_results": - self.write_market_results(buffer) - # elif message_type == "store_order_book": - # self.process_market_orders(buffer) - # elif message_type == "store_units": - # self.process_units_definition(buffer) - elif message_type == "rl_learning_params": - self.write_rl_params(buffer) - # elif message_type == "grid_topology": - # self.process_grid_topology(buffer) - # elif message_type == "store_flows": - # self.process_flows(buffer) - - # Clear the buffer after processing - buffer.clear() + return df async def store_dfs(self): """ @@ -322,16 +290,50 @@ async def store_dfs(self): if not self.db and not self.export_csv_path: return - self.process_buffers() - - for table in self.write_dfs.keys(): - if len(self.write_dfs[table]) == 0: + for table, data_list in self.write_buffers.items(): + if len(data_list) == 0: continue + df = None with self.locks[table]: - # concat all dataframes - # use join='outer' to keep all columns and fill missing values with NaN - df = pd.concat(self.write_dfs[table], axis=0, join="outer") - self.write_dfs[table] = [] + if table == "grid_topology": + for grid_data, market_id in data_list: + self.store_grid(grid_data, market_id) + data_list.clear() + continue + + match table: + case "market_meta": + df = self.convert_market_results(data_list) + case "market_dispatch": + df = self.convert_market_dispatch(data_list) + case "unit_dispatch": + df = self.convert_unit_dispatch(data_list) + case "rl_learning_params": + df = self.convert_rl_params(data_list) + case "grid_flows": + dfs = [] + for data in data_list: + df = self.convert_flows(data) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + case "market_orders": + dfs = [] + for market_data, market_id in data_list: + df = self.convert_market_orders(market_data, market_id) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + case _: + # store_units has the name of the units_meta + dfs = [] + for data in data_list: + df = self.convert_units_definition(data) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + data_list.clear() + # concat all dataframes + # use join='outer' to keep all columns and fill missing values with NaN + if df is None: + continue df.reset_index() if df.empty: @@ -463,9 +465,9 @@ def check_columns(self, table: str, df: pd.DataFrame, index: bool = True): with self.db.begin() as db: db.execute(text(query)) - def write_market_orders(self, market_orders: any, market_id: str): + def convert_market_orders(self, market_orders: any, market_id: str): """ - Writes market orders to the corresponding data frame. + Convert market orders to a dataframe. Args: market_orders (any): The market orders. @@ -502,33 +504,25 @@ def write_market_orders(self, market_orders: any, market_id: str): df["market_id"] = market_id # Append to the shared DataFrame within lock - with self.locks["market_orders"]: - self.write_dfs["market_orders"].append(df) + return df - def write_units_definition(self, unit_info: dict): + def convert_units_definition(self, unit_info: dict): """ - Writes unit definitions to the corresponding data frame and directly stores it in the database and CSV. + Convert unit definitions to a dataframe. Args: unit_info (dict): The unit information. """ - - table_name = unit_info["unit_type"] + "_meta" - - if table_name is None: - logger.info(f"unknown {unit_info['unit_type']} is not exported") - return False del unit_info["unit_type"] unit_info["simulation"] = self.simulation_id u_info = {unit_info["id"]: unit_info} del unit_info["id"] - with self.locks[table_name]: - self.write_dfs[table_name].append(pd.DataFrame(u_info).T) + return pd.DataFrame(u_info).T - def write_market_dispatch(self, market_dispatch: list[dict]): + def convert_market_dispatch(self, market_dispatch: list[dict]): """ - Writes the planned dispatch of the units after the market clearing to a CSV and database. + Convert the planned dispatch of the units to a DataFrame. Args: data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id". @@ -540,11 +534,11 @@ def write_market_dispatch(self, market_dispatch: list[dict]): ) if not df.empty: df["simulation"] = self.simulation_id - self.write_dfs["market_dispatch"].append(df) + return df - def write_unit_dispatch(self, unit_dispatch: list[dict]): + def convert_unit_dispatch(self, unit_dispatch: list[dict]): """ - Writes the actual dispatch of the units to a DataFrame. + Convert the actual dispatch of the units to a DataFrame. Args: unit_dispatch (list): A list of dictionaries containing unit dispatch data. @@ -573,8 +567,7 @@ def write_unit_dispatch(self, unit_dispatch: list[dict]): data.set_index("time", inplace=True) data["simulation"] = self.simulation_id - # Append to the unit_dispatch DataFrame - self.write_dfs["unit_dispatch"].append(data) + return data async def on_stop(self): """ @@ -661,9 +654,9 @@ def get_sum_reward(self): return rewards_by_unit - def write_flows(self, data: dict[tuple[datetime, str], float]): + def convert_flows(self, data: dict[tuple[datetime, str], float]): """ - Writes the flows of the grid results into the database. + Convert the flows of the grid results into a dataframe. Args: data: The records to be put into the table. Formatted like, "(datetime, line), flow" if generated by pyomo or df if it comes from pypsa. @@ -693,5 +686,4 @@ def write_flows(self, data: dict[tuple[datetime, str], float]): df["simulation"] = self.simulation_id - with self.locks["flows"]: - self.write_dfs["flows"].append(df) + return df diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 6f6eaf8d..86ca4c35 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -702,7 +702,7 @@ async def store_order_book(self, orderbook: Orderbook): if db_addr: message = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "market_id": self.marketconfig.market_id, "data": orderbook, } @@ -724,7 +724,7 @@ async def store_market_results(self, market_meta): if db_addr: message = { "context": "write_results", - "type": "store_market_results", + "type": "market_meta", "market_id": self.marketconfig.market_id, "data": market_meta, } @@ -746,7 +746,7 @@ async def store_flows(self, flows: dict[tuple, float]): if db_addr: message = { "context": "write_results", - "type": "store_flows", + "type": "grid_flows", "market_id": self.marketconfig.market_id, "data": flows, } diff --git a/assume/units/powerplant.py b/assume/units/powerplant.py index 5ab7fbff..55580e2c 100644 --- a/assume/units/powerplant.py +++ b/assume/units/powerplant.py @@ -260,7 +260,7 @@ def calculate_min_max_power( Note: The calculation does not include ramping constraints and can be used for arbitrary start times in the future. """ - # end includes the end of the last product, to get the last products' start time we deduct the frequency once + # end includes the end of the last product, to get the last products' start time we deduct the frequency once end_excl = end - self.index.freq base_load = self.outputs["energy"].loc[start:end_excl] diff --git a/docs/source/release_notes.rst b/docs/source/release_notes.rst index c3e90bc4..101da5c2 100644 --- a/docs/source/release_notes.rst +++ b/docs/source/release_notes.rst @@ -20,6 +20,7 @@ Upcoming Release pandas Series. The `FastSeries` class retains a close resemblance to the pandas Series, including core functionalities like indexing, slicing, and arithmetic operations. This ensures seamless integration, allowing users to work with the new class without requiring significant code adaptation. +- **Performance Optimization:** Output role handles dict data directly and only converts to DataFrame on Database write. **Bugfixes:** - **Tutorials**: General fixes of the tutorials, to align with updated functionalitites of Assume diff --git a/examples/notebooks/04_reinforcement_learning_example.ipynb b/examples/notebooks/04_reinforcement_learning_example.ipynb index d70c78d4..cda4a1c0 100644 --- a/examples/notebooks/04_reinforcement_learning_example.ipynb +++ b/examples/notebooks/04_reinforcement_learning_example.ipynb @@ -1073,7 +1073,6 @@ "\n", " duration = (end - start) / timedelta(hours=1)\n", "\n", - "\n", " # calculate profit as income - running_cost from this event\n", " order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n", " order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n", @@ -2091,6 +2090,7 @@ "\n", " return bids\n", "\n", + "\n", "# we define the class again and inherit from the initial class just to add the additional method to the original class\n", "# this is a workaround to have different methods of the class in different cells\n", "# which is good for the purpose of this tutorial\n", @@ -2125,7 +2125,7 @@ " for order in orderbook:\n", " start = order[\"start_time\"]\n", " end = order[\"end_time\"]\n", - " \n", + "\n", " # end includes the end of the last product, to get the last products' start time we deduct the frequency once\n", " end_excl = end - unit.index.freq\n", "\n", @@ -2136,7 +2136,6 @@ "\n", " duration = (end - start) / timedelta(hours=1)\n", "\n", - "\n", " # calculate profit as income - running_cost from this event\n", " order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n", " order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n", diff --git a/tests/test_outputs.py b/tests/test_outputs.py index 86dc7db5..fbcc5976 100644 --- a/tests/test_outputs.py +++ b/tests/test_outputs.py @@ -19,16 +19,16 @@ def test_output_market_orders(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "sender": "CRM_pos", "data": [], } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_orders"]) == 0 + assert len(output_writer.write_buffers["market_orders"]) == 0 orderbook = [ { @@ -67,12 +67,12 @@ def test_output_market_orders(): content = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "sender": "CRM_pos", "data": orderbook, } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_orders"]) == 1 + assert len(output_writer.write_buffers["market_orders"]) == 1 def test_output_market_results(): @@ -80,11 +80,11 @@ def test_output_market_results(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_market_results", + "type": "market_meta", "sender": "CRM_pos", "data": [ { @@ -105,7 +105,7 @@ def test_output_market_results(): ], } output_writer.handle_output_message(content, meta) - assert len(output_writer.buffers["market_results"]) == 1, "market_results" + assert len(output_writer.write_buffers["market_meta"]) == 1, "market_meta" def test_output_market_dispatch(): @@ -113,12 +113,12 @@ def test_output_market_dispatch(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = {"context": "write_results", "type": "market_dispatch", "data": []} output_writer.handle_output_message(content, meta) # empty dfs are discarded - assert len(output_writer.write_dfs["market_dispatch"]) == 0, "market_dispatch" + assert len(output_writer.write_buffers["market_dispatch"]) == 0, "market_dispatch" content = { "context": "write_results", @@ -126,7 +126,7 @@ def test_output_market_dispatch(): "data": [[start, 90, "EOM", "TestUnit"]], } output_writer.handle_output_message(content, meta) - assert len(output_writer.buffers["market_dispatch"]) == 1, "market_dispatch" + assert len(output_writer.write_buffers["market_dispatch"]) == 1, "market_dispatch" def test_output_unit_dispatch(): @@ -134,7 +134,7 @@ def test_output_unit_dispatch(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", @@ -150,7 +150,7 @@ def test_output_unit_dispatch(): } output_writer.handle_output_message(content, meta) - assert len(output_writer.buffers["unit_dispatch"]) == 1, "unit_dispatch" + assert len(output_writer.write_buffers["unit_dispatch"]) == 1, "unit_dispatch" def test_output_write_flows(): @@ -158,13 +158,13 @@ def test_output_write_flows(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_flows", + "type": "grid_flows", "data": {(datetime(2019, 1, 1, 0, 0), "north_south_example"): 0.0}, } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["flows"]) == 1, "flows" + assert len(output_writer.write_buffers["grid_flows"]) == 1, "grid_flows"