From 12dd646692c4e39ac9842cb91f359c4408d9de8e Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Fri, 29 Nov 2024 09:26:38 +0100 Subject: [PATCH 1/2] improve output writing speed only convert to pandas when writing to the database this improves all simulation speeds by a factor of at least two Co-authored-by: Nick Harder --- assume/common/outputs.py | 223 +++++++++++++++++++++------------- assume/markets/base_market.py | 6 +- docs/source/release_notes.rst | 1 + tests/test_outputs.py | 32 ++--- 4 files changed, 157 insertions(+), 105 deletions(-) diff --git a/assume/common/outputs.py b/assume/common/outputs.py index c5f9ac7a..559c082e 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -63,7 +63,7 @@ def __init__( learning_mode: bool = False, perform_evaluation: bool = False, additional_kpis: dict[str, OutputDef] = {}, - max_dfs_size_mb: int = 250, + max_dfs_size_mb: int = 300, ): super().__init__() @@ -105,7 +105,7 @@ def __init__( self.current_dfs_size = 0 # initializes dfs for storing and writing asynchronous - self.write_dfs: dict = defaultdict(list) + self.write_buffers: dict = defaultdict(list) self.locks = defaultdict(lambda: Lock()) self.kpi_defs: dict[str, OutputDef] = { @@ -221,70 +221,62 @@ def handle_output_message(self, content: dict, meta: MetaDict): meta (MetaDict): The metadata associated with the message. """ content_data = content.get("data") + content_type = content.get("type") + market_id = content.get("market_id") - if content.get("type") == "store_order_book": - self.write_market_orders(content_data, content.get("market_id")) - - elif content.get("type") == "store_market_results": - self.write_market_results(content_data) - - elif content.get("type") == "store_units": - self.write_units_definition(content_data) - - elif content.get("type") == "market_dispatch": - self.write_market_dispatch(content_data) - - elif content.get("type") == "unit_dispatch": - self.write_unit_dispatch(content_data) - - elif content.get("type") == "rl_learning_params": - self.write_rl_params(content_data) - - elif content.get("type") == "grid_topology": - self.store_grid(content_data, content.get("market_id")) - - elif content.get("type") == "store_flows": - self.write_flows(content_data) + if not content_data: + return - # # keep track of the memory usage of the data + if content_type in ["market_meta", "market_dispatch", "unit_dispatch", "rl_learning_params"]: + # these can be processed as a single dataframe + self.write_buffers[content_type].extend(content_data) + elif content_type == "store_units": + table_name = content_data["unit_type"] + "_meta" + self.write_buffers[table_name].append(content_data) + + elif content_type == "grid_flows": + # these need to be converted to df individually + self.write_buffers[content_type].append(content_data) + elif content_type in ["market_orders", "grid_topology"]: + # here we need an additional market_id + self.write_buffers[content_type].append((content_data, market_id)) + + # keep track of the memory usage of the data self.current_dfs_size += calculate_content_size(content_data) # if the current size is larger than self.max_dfs_size, store the data if self.current_dfs_size > self.max_dfs_size: logger.debug("storing output data due to size limit") self.context.schedule_instant_task(coroutine=self.store_dfs()) - def write_rl_params(self, rl_params: dict): + def convert_rl_params(self, rl_params: list[dict]): """ - Writes the RL parameters such as reward, regret, and profit to the corresponding data frame. + Convert the RL parameters such as reward, regret, and profit to a dataframe. Args: rl_params (dict): The RL parameters. """ df = pd.DataFrame.from_records(rl_params, index="datetime") - if df.empty: - return - df["simulation"] = self.simulation_id df["learning_mode"] = self.learning_mode df["perform_evaluation"] = self.perform_evaluation df["episode"] = self.episode - self.write_dfs["rl_params"].append(df) + return df - def write_market_results(self, market_meta: dict): + def convert_market_results(self, market_results: list[dict]): """ - Writes market results to the corresponding data frame. + Convert market results to a dataframe. Args: market_meta (dict): The market metadata, which includes the clearing price and volume. """ - - df = pd.DataFrame(market_meta) - if df.empty: + if len(market_results) == 0: return + + df = pd.DataFrame(market_results) df["simulation"] = self.simulation_id - self.write_dfs["market_meta"].append(df) + return df async def store_dfs(self): """ @@ -293,15 +285,52 @@ async def store_dfs(self): if not self.db and not self.export_csv_path: return - for table in self.write_dfs.keys(): - if len(self.write_dfs[table]) == 0: + for table, data_list in self.write_buffers.items(): + + if len(data_list) == 0: continue + df = None with self.locks[table]: - # concat all dataframes - # use join='outer' to keep all columns and fill missing values with NaN - df = pd.concat(self.write_dfs[table], axis=0, join="outer") - self.write_dfs[table] = [] - + if table == "grid_topology": + for grid_data, market_id in data_list: + self.store_grid(grid_data, market_id) + data_list.clear() + continue + + match table: + case "market_meta": + df = self.convert_market_results(data_list) + case "market_dispatch": + df = self.convert_market_dispatch(data_list) + case "unit_dispatch": + df = self.convert_unit_dispatch(data_list) + case "rl_learning_params": + df = self.convert_rl_params(data_list) + case "grid_flows": + dfs = [] + for data in data_list: + df = self.convert_flows(data) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + case "market_orders": + dfs = [] + for market_data, market_id in data_list: + df = self.convert_market_orders(market_data, market_id) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + case _: + # store_units has the name of the units_meta + dfs = [] + for data in data_list: + df = self.convert_units_definition(data) + dfs.append(df) + df = pd.concat(dfs, axis=0, join="outer") + data_list.clear() + # concat all dataframes + # use join='outer' to keep all columns and fill missing values with NaN + if df is None: + continue + df.reset_index() if df.empty: continue @@ -432,86 +461,109 @@ def check_columns(self, table: str, df: pd.DataFrame, index: bool = True): with self.db.begin() as db: db.execute(text(query)) - def write_market_orders(self, market_orders: any, market_id: str): + def convert_market_orders(self, market_orders: any, market_id: str): """ - Writes market orders to the corresponding data frame. + Convert market orders to a dataframe. Args: market_orders (any): The market orders. market_id (str): The id of the market. """ - # check if market results list is empty and skip the function and raise a warning + # Check if market orders are empty and exit early if not market_orders: return + # Separate orders outside of lock to reduce locking time market_orders = separate_orders(market_orders) + + # Construct DataFrame and perform vectorized operations df = pd.DataFrame.from_records(market_orders, index="start_time") + + # Replace lambda functions with vectorized operations if "eligible_lambda" in df.columns: - df["eligible_lambda"] = df["eligible_lambda"].apply(lambda x: x.__name__) - if "evaluation_frequency" in df.columns: - df["evaluation_frequency"] = df["evaluation_frequency"].apply( - lambda x: repr(x) + df["eligible_lambda"] = df["eligible_lambda"].map( + lambda x: getattr(x, "__name__", None) ) + if "evaluation_frequency" in df.columns: + df["evaluation_frequency"] = df["evaluation_frequency"].astype(str) - del df["only_hours"] - del df["agent_addr"] - - if "bid_type" not in df.columns: - df["bid_type"] = None + # Remove unnecessary columns (use a list to minimize deletion calls) + df.drop(columns=["only_hours", "agent_addr"], inplace=True, errors=False) - if "node" not in df.columns: - df["node"] = None + # Add missing columns with defaults + for col in ["bid_type", "node"]: + if col not in df.columns: + df[col] = None + # Add constant columns df["simulation"] = self.simulation_id df["market_id"] = market_id - with self.locks["market_orders"]: - self.write_dfs["market_orders"].append(df) + # Append to the shared DataFrame within lock + return df - def write_units_definition(self, unit_info: dict): + def convert_units_definition(self, unit_info: dict): """ - Writes unit definitions to the corresponding data frame and directly stores it in the database and CSV. + Convert unit definitions to a dataframe. Args: unit_info (dict): The unit information. """ - - table_name = unit_info["unit_type"] + "_meta" - - if table_name is None: - logger.info(f"unknown {unit_info['unit_type']} is not exported") - return False del unit_info["unit_type"] unit_info["simulation"] = self.simulation_id u_info = {unit_info["id"]: unit_info} del unit_info["id"] - with self.locks[table_name]: - self.write_dfs[table_name].append(pd.DataFrame(u_info).T) + return pd.DataFrame(u_info).T - def write_market_dispatch(self, data: any): + def convert_market_dispatch(self, market_dispatch: list[dict]): """ - Writes the planned dispatch of the units after the market clearing to a CSV and database. + Convert the planned dispatch of the units to a DataFrame. Args: data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id". """ - df = pd.DataFrame(data, columns=["datetime", "power", "market_id", "unit_id"]) + + df = pd.DataFrame( + market_dispatch, + columns=["datetime", "power", "market_id", "unit_id"], + ) if not df.empty: df["simulation"] = self.simulation_id - self.write_dfs["market_dispatch"].append(df) + return df - def write_unit_dispatch(self, unit_dispatch: dict): + def convert_unit_dispatch(self, unit_dispatch: list[dict]): """ - Writes the actual dispatch of the units to a CSV and database. + Convert the actual dispatch of the units to a DataFrame. Args: - data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id". - """ - data = pd.concat([pd.DataFrame.from_dict(d) for d in unit_dispatch]) - data = data.set_index("time") + unit_dispatch (list): A list of dictionaries containing unit dispatch data. + Each dictionary includes arrays for multiple values (e.g., power, costs) and other metadata. + """ + + # Flatten and expand the arrays in `unit_dispatch` into a list of records for DataFrame construction + records = [] + for dispatch in unit_dispatch: + time_values = dispatch["time"] + num_records = len(time_values) + + # Create a record for each time step, expanding array-based fields + for i in range(num_records): + record = { + key: (value[i] if isinstance(value, (list | np.ndarray)) else value) + for key, value in dispatch.items() + } + record["time"] = time_values[i] + records.append(record) + + # Convert the list of records into a DataFrame + data = pd.DataFrame.from_records(records) + + # Set the index and add the simulation ID + data.set_index("time", inplace=True) data["simulation"] = self.simulation_id - self.write_dfs["unit_dispatch"].append(data) + + return data async def on_stop(self): """ @@ -598,9 +650,9 @@ def get_sum_reward(self): return rewards_by_unit - def write_flows(self, data: dict[tuple[datetime, str], float]): + def convert_flows(self, data: dict[tuple[datetime, str], float]): """ - Writes the flows of the grid results into the database. + Convert the flows of the grid results into a dataframe. Args: data: The records to be put into the table. Formatted like, "(datetime, line), flow" if generated by pyomo or df if it comes from pypsa. @@ -630,5 +682,4 @@ def write_flows(self, data: dict[tuple[datetime, str], float]): df["simulation"] = self.simulation_id - with self.locks["flows"]: - self.write_dfs["flows"].append(df) + return df diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 6f6eaf8d..86ca4c35 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -702,7 +702,7 @@ async def store_order_book(self, orderbook: Orderbook): if db_addr: message = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "market_id": self.marketconfig.market_id, "data": orderbook, } @@ -724,7 +724,7 @@ async def store_market_results(self, market_meta): if db_addr: message = { "context": "write_results", - "type": "store_market_results", + "type": "market_meta", "market_id": self.marketconfig.market_id, "data": market_meta, } @@ -746,7 +746,7 @@ async def store_flows(self, flows: dict[tuple, float]): if db_addr: message = { "context": "write_results", - "type": "store_flows", + "type": "grid_flows", "market_id": self.marketconfig.market_id, "data": flows, } diff --git a/docs/source/release_notes.rst b/docs/source/release_notes.rst index c3e90bc4..101da5c2 100644 --- a/docs/source/release_notes.rst +++ b/docs/source/release_notes.rst @@ -20,6 +20,7 @@ Upcoming Release pandas Series. The `FastSeries` class retains a close resemblance to the pandas Series, including core functionalities like indexing, slicing, and arithmetic operations. This ensures seamless integration, allowing users to work with the new class without requiring significant code adaptation. +- **Performance Optimization:** Output role handles dict data directly and only converts to DataFrame on Database write. **Bugfixes:** - **Tutorials**: General fixes of the tutorials, to align with updated functionalitites of Assume diff --git a/tests/test_outputs.py b/tests/test_outputs.py index 3b50c42a..fbcc5976 100644 --- a/tests/test_outputs.py +++ b/tests/test_outputs.py @@ -19,16 +19,16 @@ def test_output_market_orders(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "sender": "CRM_pos", "data": [], } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_orders"]) == 0 + assert len(output_writer.write_buffers["market_orders"]) == 0 orderbook = [ { @@ -67,12 +67,12 @@ def test_output_market_orders(): content = { "context": "write_results", - "type": "store_order_book", + "type": "market_orders", "sender": "CRM_pos", "data": orderbook, } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_orders"]) == 1 + assert len(output_writer.write_buffers["market_orders"]) == 1 def test_output_market_results(): @@ -80,11 +80,11 @@ def test_output_market_results(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_market_results", + "type": "market_meta", "sender": "CRM_pos", "data": [ { @@ -105,7 +105,7 @@ def test_output_market_results(): ], } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_meta"]) == 1, "market_meta" + assert len(output_writer.write_buffers["market_meta"]) == 1, "market_meta" def test_output_market_dispatch(): @@ -113,12 +113,12 @@ def test_output_market_dispatch(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = {"context": "write_results", "type": "market_dispatch", "data": []} output_writer.handle_output_message(content, meta) # empty dfs are discarded - assert len(output_writer.write_dfs["market_dispatch"]) == 0, "market_dispatch" + assert len(output_writer.write_buffers["market_dispatch"]) == 0, "market_dispatch" content = { "context": "write_results", @@ -126,7 +126,7 @@ def test_output_market_dispatch(): "data": [[start, 90, "EOM", "TestUnit"]], } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["market_dispatch"]) == 1, "market_dispatch" + assert len(output_writer.write_buffers["market_dispatch"]) == 1, "market_dispatch" def test_output_unit_dispatch(): @@ -134,7 +134,7 @@ def test_output_unit_dispatch(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", @@ -150,7 +150,7 @@ def test_output_unit_dispatch(): } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["unit_dispatch"]) == 1, "unit_dispatch" + assert len(output_writer.write_buffers["unit_dispatch"]) == 1, "unit_dispatch" def test_output_write_flows(): @@ -158,13 +158,13 @@ def test_output_write_flows(): start = datetime(2020, 1, 1) end = datetime(2020, 1, 2) output_writer = WriteOutput("test_sim", start, end, engine) - assert len(output_writer.write_dfs.keys()) == 0 + assert len(output_writer.write_buffers.keys()) == 0 meta = {"sender_id": None} content = { "context": "write_results", - "type": "store_flows", + "type": "grid_flows", "data": {(datetime(2019, 1, 1, 0, 0), "north_south_example"): 0.0}, } output_writer.handle_output_message(content, meta) - assert len(output_writer.write_dfs["flows"]) == 1, "flows" + assert len(output_writer.write_buffers["grid_flows"]) == 1, "grid_flows" From d21a4e566d2c7c6c65555d6b76ae14c64a2e8683 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Fri, 29 Nov 2024 10:41:45 +0100 Subject: [PATCH 2/2] restructure output role and lint --- assume/common/outputs.py | 305 +++++++++--------- assume/scenario/loader_csv.py | 2 +- assume/units/demand.py | 2 +- assume/units/powerplant.py | 2 +- ...forcement_learning_algorithm_example.ipynb | 2 +- .../04_reinforcement_learning_example.ipynb | 5 +- .../notebooks/09_example_Sim_and_xRL.ipynb | 2 +- 7 files changed, 163 insertions(+), 157 deletions(-) diff --git a/assume/common/outputs.py b/assume/common/outputs.py index 559c082e..d758cddc 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -95,7 +95,7 @@ def __init__( # check if episode=0 and delete all similar runs if self.episode == 0: - self.del_similar_runs() + self.delete_similar_runs() # construct all timeframe under which hourly values are written to excel and db self.start = start @@ -165,7 +165,7 @@ def delete_db_scenario(self, simulation_id: str): f"could not clear old scenarios from table {table_name} - {e}" ) - def del_similar_runs(self): + def delete_similar_runs(self): """ Deletes all similar runs from the database based on the simulation ID. This ensures that we overwrite simulations results when restarting one. Please note that a simulation which you also want to keep need to be assigned anew ID. """ @@ -209,7 +209,10 @@ def on_ready(self): cache=True, ) self.context.schedule_recurrent_task( - self.store_dfs, recurrency_task, src="no_wait" + self.store_dfs, + recurrency_task, + src="no_wait", + # this should not wait for the task to finish to block the simulation ) def handle_output_message(self, content: dict, meta: MetaDict): @@ -227,20 +230,25 @@ def handle_output_message(self, content: dict, meta: MetaDict): if not content_data: return - if content_type in ["market_meta", "market_dispatch", "unit_dispatch", "rl_learning_params"]: + if content_type in [ + "market_meta", + "market_dispatch", + "unit_dispatch", + "rl_learning_params", + ]: # these can be processed as a single dataframe self.write_buffers[content_type].extend(content_data) elif content_type == "store_units": table_name = content_data["unit_type"] + "_meta" self.write_buffers[table_name].append(content_data) - + elif content_type == "grid_flows": # these need to be converted to df individually self.write_buffers[content_type].append(content_data) elif content_type in ["market_orders", "grid_topology"]: # here we need an additional market_id self.write_buffers[content_type].append((content_data, market_id)) - + # keep track of the memory usage of the data self.current_dfs_size += calculate_content_size(content_data) # if the current size is larger than self.max_dfs_size, store the data @@ -278,6 +286,144 @@ def convert_market_results(self, market_results: list[dict]): df["simulation"] = self.simulation_id return df + def convert_market_orders(self, market_orders: any, market_id: str): + """ + Convert market orders to a dataframe. + + Args: + market_orders (any): The market orders. + market_id (str): The id of the market. + """ + # Check if market orders are empty and exit early + if not market_orders: + return + + # Separate orders outside of lock to reduce locking time + market_orders = separate_orders(market_orders) + + # Construct DataFrame and perform vectorized operations + df = pd.DataFrame.from_records(market_orders, index="start_time") + + # Replace lambda functions with vectorized operations + if "eligible_lambda" in df.columns: + df["eligible_lambda"] = df["eligible_lambda"].map( + lambda x: getattr(x, "__name__", None) + ) + if "evaluation_frequency" in df.columns: + df["evaluation_frequency"] = df["evaluation_frequency"].astype(str) + + # Remove unnecessary columns (use a list to minimize deletion calls) + df.drop(columns=["only_hours", "agent_addr"], inplace=True, errors=False) + + # Add missing columns with defaults + for col in ["bid_type", "node"]: + if col not in df.columns: + df[col] = None + + # Add constant columns + df["simulation"] = self.simulation_id + df["market_id"] = market_id + + # Append to the shared DataFrame within lock + return df + + def convert_units_definition(self, unit_info: dict): + """ + Convert unit definitions to a dataframe. + + Args: + unit_info (dict): The unit information. + """ + del unit_info["unit_type"] + unit_info["simulation"] = self.simulation_id + u_info = {unit_info["id"]: unit_info} + del unit_info["id"] + + return pd.DataFrame(u_info).T + + def convert_market_dispatch(self, market_dispatch: list[dict]): + """ + Convert the planned dispatch of the units to a DataFrame. + + Args: + data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id". + """ + + df = pd.DataFrame( + market_dispatch, + columns=["datetime", "power", "market_id", "unit_id"], + ) + if not df.empty: + df["simulation"] = self.simulation_id + return df + + def convert_unit_dispatch(self, unit_dispatch: list[dict]): + """ + Convert the actual dispatch of the units to a DataFrame. + + Args: + unit_dispatch (list): A list of dictionaries containing unit dispatch data. + Each dictionary includes arrays for multiple values (e.g., power, costs) and other metadata. + """ + + # Flatten and expand the arrays in `unit_dispatch` into a list of records for DataFrame construction + records = [] + for dispatch in unit_dispatch: + time_values = dispatch["time"] + num_records = len(time_values) + + # Create a record for each time step, expanding array-based fields + for i in range(num_records): + record = { + key: (value[i] if isinstance(value, (list | np.ndarray)) else value) + for key, value in dispatch.items() + } + record["time"] = time_values[i] + records.append(record) + + # Convert the list of records into a DataFrame + data = pd.DataFrame.from_records(records) + + # Set the index and add the simulation ID + data.set_index("time", inplace=True) + data["simulation"] = self.simulation_id + + return data + + def convert_flows(self, data: dict[tuple[datetime, str], float]): + """ + Convert the flows of the grid results into a dataframe. + + Args: + data: The records to be put into the table. Formatted like, "(datetime, line), flow" if generated by pyomo or df if it comes from pypsa. + """ + # Daten in ein DataFrame umwandeln depending on the data format which differs when different solver are used + # transformation done here to avoid adapting format during clearing + + # if data is dataframe + if isinstance(data, pd.DataFrame): + df = data + + # if data is dict + elif isinstance(data, dict): + # Convert the dictionary to a DataFrame + df = pd.DataFrame.from_dict( + data, orient="index", columns=["flow"] + ).reset_index() + # Split the 'index' column into 'timestamp' and 'line' + df[["timestamp", "line"]] = pd.DataFrame( + df["index"].tolist(), index=df.index + ) + # Rename the columns + df = df.drop(columns=["index"]) + + # set timestamp to index + df.set_index("timestamp", inplace=True) + + df["simulation"] = self.simulation_id + + return df + async def store_dfs(self): """ Stores the data frames to CSV files and the database. Is scheduled as a recurrent task based on the frequency. @@ -286,7 +432,6 @@ async def store_dfs(self): return for table, data_list in self.write_buffers.items(): - if len(data_list) == 0: continue df = None @@ -296,7 +441,7 @@ async def store_dfs(self): self.store_grid(grid_data, market_id) data_list.clear() continue - + match table: case "market_meta": df = self.convert_market_results(data_list) @@ -327,10 +472,10 @@ async def store_dfs(self): df = pd.concat(dfs, axis=0, join="outer") data_list.clear() # concat all dataframes - # use join='outer' to keep all columns and fill missing values with NaN + # use join='outer' to keep all columns and fill missing values with NaN if df is None: - continue - + continue + df.reset_index() if df.empty: continue @@ -461,110 +606,6 @@ def check_columns(self, table: str, df: pd.DataFrame, index: bool = True): with self.db.begin() as db: db.execute(text(query)) - def convert_market_orders(self, market_orders: any, market_id: str): - """ - Convert market orders to a dataframe. - - Args: - market_orders (any): The market orders. - market_id (str): The id of the market. - """ - # Check if market orders are empty and exit early - if not market_orders: - return - - # Separate orders outside of lock to reduce locking time - market_orders = separate_orders(market_orders) - - # Construct DataFrame and perform vectorized operations - df = pd.DataFrame.from_records(market_orders, index="start_time") - - # Replace lambda functions with vectorized operations - if "eligible_lambda" in df.columns: - df["eligible_lambda"] = df["eligible_lambda"].map( - lambda x: getattr(x, "__name__", None) - ) - if "evaluation_frequency" in df.columns: - df["evaluation_frequency"] = df["evaluation_frequency"].astype(str) - - # Remove unnecessary columns (use a list to minimize deletion calls) - df.drop(columns=["only_hours", "agent_addr"], inplace=True, errors=False) - - # Add missing columns with defaults - for col in ["bid_type", "node"]: - if col not in df.columns: - df[col] = None - - # Add constant columns - df["simulation"] = self.simulation_id - df["market_id"] = market_id - - # Append to the shared DataFrame within lock - return df - - def convert_units_definition(self, unit_info: dict): - """ - Convert unit definitions to a dataframe. - - Args: - unit_info (dict): The unit information. - """ - del unit_info["unit_type"] - unit_info["simulation"] = self.simulation_id - u_info = {unit_info["id"]: unit_info} - del unit_info["id"] - - return pd.DataFrame(u_info).T - - def convert_market_dispatch(self, market_dispatch: list[dict]): - """ - Convert the planned dispatch of the units to a DataFrame. - - Args: - data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id". - """ - - df = pd.DataFrame( - market_dispatch, - columns=["datetime", "power", "market_id", "unit_id"], - ) - if not df.empty: - df["simulation"] = self.simulation_id - return df - - def convert_unit_dispatch(self, unit_dispatch: list[dict]): - """ - Convert the actual dispatch of the units to a DataFrame. - - Args: - unit_dispatch (list): A list of dictionaries containing unit dispatch data. - Each dictionary includes arrays for multiple values (e.g., power, costs) and other metadata. - """ - - # Flatten and expand the arrays in `unit_dispatch` into a list of records for DataFrame construction - records = [] - for dispatch in unit_dispatch: - time_values = dispatch["time"] - num_records = len(time_values) - - # Create a record for each time step, expanding array-based fields - for i in range(num_records): - record = { - key: (value[i] if isinstance(value, (list | np.ndarray)) else value) - for key, value in dispatch.items() - } - record["time"] = time_values[i] - records.append(record) - - # Convert the list of records into a DataFrame - data = pd.DataFrame.from_records(records) - - # Set the index and add the simulation ID - data.set_index("time", inplace=True) - data["simulation"] = self.simulation_id - - return data - async def on_stop(self): """ This function makes it possible to calculate Key Performance Indicators. @@ -649,37 +690,3 @@ def get_sum_reward(self): rewards_by_unit = np.array(rewards_by_unit) return rewards_by_unit - - def convert_flows(self, data: dict[tuple[datetime, str], float]): - """ - Convert the flows of the grid results into a dataframe. - - Args: - data: The records to be put into the table. Formatted like, "(datetime, line), flow" if generated by pyomo or df if it comes from pypsa. - """ - # Daten in ein DataFrame umwandeln depending on the data format which differs when different solver are used - # transformation done here to avoid adapting format during clearing - - # if data is dataframe - if isinstance(data, pd.DataFrame): - df = data - - # if data is dict - elif isinstance(data, dict): - # Convert the dictionary to a DataFrame - df = pd.DataFrame.from_dict( - data, orient="index", columns=["flow"] - ).reset_index() - # Split the 'index' column into 'timestamp' and 'line' - df[["timestamp", "line"]] = pd.DataFrame( - df["index"].tolist(), index=df.index - ) - # Rename the columns - df = df.drop(columns=["index"]) - - # set timestamp to index - df.set_index("timestamp", inplace=True) - - df["simulation"] = self.simulation_id - - return df diff --git a/assume/scenario/loader_csv.py b/assume/scenario/loader_csv.py index fcb36ca2..afed7fe0 100644 --- a/assume/scenario/loader_csv.py +++ b/assume/scenario/loader_csv.py @@ -878,7 +878,7 @@ def run_learning( # initialize policies already here to set the obs_dim and act_dim in the learning role actors_and_critics = None world.learning_role.initialize_policy(actors_and_critics=actors_and_critics) - world.output_role.del_similar_runs() + world.output_role.delete_similar_runs() # check if we already stored policies for this simulation save_path = world.learning_config["trained_policies_save_path"] diff --git a/assume/units/demand.py b/assume/units/demand.py index 37a02684..a435de14 100644 --- a/assume/units/demand.py +++ b/assume/units/demand.py @@ -101,7 +101,7 @@ def calculate_min_max_power( Returns: tuple[pandas.Series, pandas.Series]: The bid colume as both the minimum and maximum power output of the unit. """ - + # end includes the end of the last product, to get the last products' start time we deduct the frequency once end_excl = end - self.index.freq bid_volume = (self.volume - self.outputs[product_type]).loc[start:end_excl] diff --git a/assume/units/powerplant.py b/assume/units/powerplant.py index 5ab7fbff..55580e2c 100644 --- a/assume/units/powerplant.py +++ b/assume/units/powerplant.py @@ -260,7 +260,7 @@ def calculate_min_max_power( Note: The calculation does not include ramping constraints and can be used for arbitrary start times in the future. """ - # end includes the end of the last product, to get the last products' start time we deduct the frequency once + # end includes the end of the last product, to get the last products' start time we deduct the frequency once end_excl = end - self.index.freq base_load = self.outputs["energy"].loc[start:end_excl] diff --git a/examples/notebooks/04_reinforcement_learning_algorithm_example.ipynb b/examples/notebooks/04_reinforcement_learning_algorithm_example.ipynb index 291fcf34..a44fcd7e 100644 --- a/examples/notebooks/04_reinforcement_learning_algorithm_example.ipynb +++ b/examples/notebooks/04_reinforcement_learning_algorithm_example.ipynb @@ -261,7 +261,7 @@ " # initialize policies already here to set the obs_dim and act_dim in the learning role\n", " actors_and_critics = None\n", " world.learning_role.initialize_policy(actors_and_critics=actors_and_critics)\n", - " world.output_role.del_similar_runs()\n", + " world.output_role.delete_similar_runs()\n", "\n", " # check if we already stored policies for this simulation\n", " save_path = world.learning_config[\"trained_policies_save_path\"]\n", diff --git a/examples/notebooks/04_reinforcement_learning_example.ipynb b/examples/notebooks/04_reinforcement_learning_example.ipynb index d70c78d4..cda4a1c0 100644 --- a/examples/notebooks/04_reinforcement_learning_example.ipynb +++ b/examples/notebooks/04_reinforcement_learning_example.ipynb @@ -1073,7 +1073,6 @@ "\n", " duration = (end - start) / timedelta(hours=1)\n", "\n", - "\n", " # calculate profit as income - running_cost from this event\n", " order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n", " order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n", @@ -2091,6 +2090,7 @@ "\n", " return bids\n", "\n", + "\n", "# we define the class again and inherit from the initial class just to add the additional method to the original class\n", "# this is a workaround to have different methods of the class in different cells\n", "# which is good for the purpose of this tutorial\n", @@ -2125,7 +2125,7 @@ " for order in orderbook:\n", " start = order[\"start_time\"]\n", " end = order[\"end_time\"]\n", - " \n", + "\n", " # end includes the end of the last product, to get the last products' start time we deduct the frequency once\n", " end_excl = end - unit.index.freq\n", "\n", @@ -2136,7 +2136,6 @@ "\n", " duration = (end - start) / timedelta(hours=1)\n", "\n", - "\n", " # calculate profit as income - running_cost from this event\n", " order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n", " order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n", diff --git a/examples/notebooks/09_example_Sim_and_xRL.ipynb b/examples/notebooks/09_example_Sim_and_xRL.ipynb index dfa41a09..a0e9b250 100644 --- a/examples/notebooks/09_example_Sim_and_xRL.ipynb +++ b/examples/notebooks/09_example_Sim_and_xRL.ipynb @@ -529,7 +529,7 @@ " # initialize policies already here to set the obs_dim and act_dim in the learning role\n", " actors_and_critics = None\n", " world.learning_role.initialize_policy(actors_and_critics=actors_and_critics)\n", - " world.output_role.del_similar_runs()\n", + " world.output_role.delete_similar_runs()\n", "\n", " # check if we already stored policies for this simulation\n", " save_path = world.learning_config[\"trained_policies_save_path\"]\n",