Skip to content

Commit

Permalink
Merge branch 'improve_output_role' into improve-fast-pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-harder committed Nov 29, 2024
2 parents 08c57e6 + 12dd646 commit 5f9517f
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 117 deletions.
180 changes: 86 additions & 94 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,9 @@ def __init__(
self.current_dfs_size = 0

# initializes dfs for storing and writing asynchronous
self.write_dfs: dict = defaultdict(list)
self.write_buffers: dict = defaultdict(list)
self.locks = defaultdict(lambda: Lock())

# Buffers for batching
self.buffers = defaultdict(list)

self.kpi_defs: dict[str, OutputDef] = {
"avg_price": {
"value": "avg(price)",
Expand Down Expand Up @@ -224,30 +221,30 @@ def handle_output_message(self, content: dict, meta: MetaDict):
meta (MetaDict): The metadata associated with the message.
"""
content_data = content.get("data")
content_type = content.get("type")
market_id = content.get("market_id")

if content.get("type") == "store_order_book":
self.write_market_orders(content_data, content.get("market_id"))

elif content.get("type") == "store_market_results":
self.buffers["market_results"].extend(content_data)

elif content.get("type") == "market_dispatch":
self.buffers["market_dispatch"].extend(content_data)

elif content.get("type") == "unit_dispatch":
self.buffers["unit_dispatch"].extend(content_data)

elif content.get("type") == "rl_learning_params":
self.buffers["rl_learning_params"].extend(content_data)

elif content.get("type") == "store_flows":
self.write_flows(content_data)

elif content.get("type") == "store_units":
self.write_units_definition(content_data)
if not content_data:
return

elif content.get("type") == "grid_topology":
self.store_grid(content_data, content.get("market_id"))
if content_type in [
"market_meta",
"market_dispatch",
"unit_dispatch",
"rl_learning_params",
]:
# these can be processed as a single dataframe
self.write_buffers[content_type].extend(content_data)
elif content_type == "store_units":
table_name = content_data["unit_type"] + "_meta"
self.write_buffers[table_name].append(content_data)

elif content_type == "grid_flows":
# these need to be converted to df individually
self.write_buffers[content_type].append(content_data)
elif content_type in ["market_orders", "grid_topology"]:
# here we need an additional market_id
self.write_buffers[content_type].append((content_data, market_id))

# keep track of the memory usage of the data
self.current_dfs_size += calculate_content_size(content_data)
Expand All @@ -256,9 +253,9 @@ def handle_output_message(self, content: dict, meta: MetaDict):
logger.debug("storing output data due to size limit")
self.context.schedule_instant_task(coroutine=self.store_dfs())

def write_rl_params(self, rl_params: list[dict]):
def convert_rl_params(self, rl_params: list[dict]):
"""
Writes the RL parameters such as reward, regret, and profit to the corresponding data frame.
Convert the RL parameters such as reward, regret, and profit to a dataframe.
Args:
rl_params (dict): The RL parameters.
Expand All @@ -270,11 +267,11 @@ def write_rl_params(self, rl_params: list[dict]):
df["perform_evaluation"] = self.perform_evaluation
df["episode"] = self.episode

self.write_dfs["rl_params"].append(df)
return df

def write_market_results(self, market_results: list[dict]):
def convert_market_results(self, market_results: list[dict]):
"""
Writes market results to the corresponding data frame.
Convert market results to a dataframe.
Args:
market_meta (dict): The market metadata, which includes the clearing price and volume.
Expand All @@ -284,36 +281,7 @@ def write_market_results(self, market_results: list[dict]):

df = pd.DataFrame(market_results)
df["simulation"] = self.simulation_id
self.write_dfs["market_meta"].append(df)

def process_buffers(self):
"""
Processes all message buffers in a batch manner, ensuring efficient handling of all data types.
"""
for message_type, buffer in self.buffers.items():
if not buffer:
continue

# Process each message type in bulk
if message_type == "market_dispatch":
self.write_market_dispatch(buffer)
elif message_type == "unit_dispatch":
self.write_unit_dispatch(buffer)
elif message_type == "store_market_results":
self.write_market_results(buffer)
# elif message_type == "store_order_book":
# self.process_market_orders(buffer)
# elif message_type == "store_units":
# self.process_units_definition(buffer)
elif message_type == "rl_learning_params":
self.write_rl_params(buffer)
# elif message_type == "grid_topology":
# self.process_grid_topology(buffer)
# elif message_type == "store_flows":
# self.process_flows(buffer)

# Clear the buffer after processing
buffer.clear()
return df

async def store_dfs(self):
"""
Expand All @@ -322,16 +290,50 @@ async def store_dfs(self):
if not self.db and not self.export_csv_path:
return

self.process_buffers()

for table in self.write_dfs.keys():
if len(self.write_dfs[table]) == 0:
for table, data_list in self.write_buffers.items():
if len(data_list) == 0:
continue
df = None
with self.locks[table]:
# concat all dataframes
# use join='outer' to keep all columns and fill missing values with NaN
df = pd.concat(self.write_dfs[table], axis=0, join="outer")
self.write_dfs[table] = []
if table == "grid_topology":
for grid_data, market_id in data_list:
self.store_grid(grid_data, market_id)
data_list.clear()
continue

Check warning on line 302 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L299-L302

Added lines #L299 - L302 were not covered by tests

match table:
case "market_meta":
df = self.convert_market_results(data_list)
case "market_dispatch":
df = self.convert_market_dispatch(data_list)
case "unit_dispatch":
df = self.convert_unit_dispatch(data_list)
case "rl_learning_params":
df = self.convert_rl_params(data_list)
case "grid_flows":
dfs = []
for data in data_list:
df = self.convert_flows(data)
dfs.append(df)
df = pd.concat(dfs, axis=0, join="outer")

Check warning on line 318 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L314-L318

Added lines #L314 - L318 were not covered by tests
case "market_orders":
dfs = []
for market_data, market_id in data_list:
df = self.convert_market_orders(market_data, market_id)
dfs.append(df)
df = pd.concat(dfs, axis=0, join="outer")
case _:
# store_units has the name of the units_meta
dfs = []
for data in data_list:
df = self.convert_units_definition(data)
dfs.append(df)
df = pd.concat(dfs, axis=0, join="outer")
data_list.clear()
# concat all dataframes
# use join='outer' to keep all columns and fill missing values with NaN
if df is None:
continue

Check warning on line 336 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L336

Added line #L336 was not covered by tests

df.reset_index()
if df.empty:
Expand Down Expand Up @@ -463,9 +465,9 @@ def check_columns(self, table: str, df: pd.DataFrame, index: bool = True):
with self.db.begin() as db:
db.execute(text(query))

def write_market_orders(self, market_orders: any, market_id: str):
def convert_market_orders(self, market_orders: any, market_id: str):
"""
Writes market orders to the corresponding data frame.
Convert market orders to a dataframe.
Args:
market_orders (any): The market orders.
Expand Down Expand Up @@ -502,33 +504,25 @@ def write_market_orders(self, market_orders: any, market_id: str):
df["market_id"] = market_id

# Append to the shared DataFrame within lock
with self.locks["market_orders"]:
self.write_dfs["market_orders"].append(df)
return df

def write_units_definition(self, unit_info: dict):
def convert_units_definition(self, unit_info: dict):
"""
Writes unit definitions to the corresponding data frame and directly stores it in the database and CSV.
Convert unit definitions to a dataframe.
Args:
unit_info (dict): The unit information.
"""

table_name = unit_info["unit_type"] + "_meta"

if table_name is None:
logger.info(f"unknown {unit_info['unit_type']} is not exported")
return False
del unit_info["unit_type"]
unit_info["simulation"] = self.simulation_id
u_info = {unit_info["id"]: unit_info}
del unit_info["id"]

with self.locks[table_name]:
self.write_dfs[table_name].append(pd.DataFrame(u_info).T)
return pd.DataFrame(u_info).T

def write_market_dispatch(self, market_dispatch: list[dict]):
def convert_market_dispatch(self, market_dispatch: list[dict]):
"""
Writes the planned dispatch of the units after the market clearing to a CSV and database.
Convert the planned dispatch of the units to a DataFrame.
Args:
data (any): The records to be put into the table. Formatted like, "datetime, power, market_id, unit_id".
Expand All @@ -540,11 +534,11 @@ def write_market_dispatch(self, market_dispatch: list[dict]):
)
if not df.empty:
df["simulation"] = self.simulation_id
self.write_dfs["market_dispatch"].append(df)
return df

def write_unit_dispatch(self, unit_dispatch: list[dict]):
def convert_unit_dispatch(self, unit_dispatch: list[dict]):
"""
Writes the actual dispatch of the units to a DataFrame.
Convert the actual dispatch of the units to a DataFrame.
Args:
unit_dispatch (list): A list of dictionaries containing unit dispatch data.
Expand Down Expand Up @@ -573,8 +567,7 @@ def write_unit_dispatch(self, unit_dispatch: list[dict]):
data.set_index("time", inplace=True)
data["simulation"] = self.simulation_id

# Append to the unit_dispatch DataFrame
self.write_dfs["unit_dispatch"].append(data)
return data

async def on_stop(self):
"""
Expand Down Expand Up @@ -661,9 +654,9 @@ def get_sum_reward(self):

return rewards_by_unit

def write_flows(self, data: dict[tuple[datetime, str], float]):
def convert_flows(self, data: dict[tuple[datetime, str], float]):
"""
Writes the flows of the grid results into the database.
Convert the flows of the grid results into a dataframe.
Args:
data: The records to be put into the table. Formatted like, "(datetime, line), flow" if generated by pyomo or df if it comes from pypsa.
Expand Down Expand Up @@ -693,5 +686,4 @@ def write_flows(self, data: dict[tuple[datetime, str], float]):

df["simulation"] = self.simulation_id

with self.locks["flows"]:
self.write_dfs["flows"].append(df)
return df

Check warning on line 689 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L689

Added line #L689 was not covered by tests
6 changes: 3 additions & 3 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ async def store_order_book(self, orderbook: Orderbook):
if db_addr:
message = {
"context": "write_results",
"type": "store_order_book",
"type": "market_orders",
"market_id": self.marketconfig.market_id,
"data": orderbook,
}
Expand All @@ -724,7 +724,7 @@ async def store_market_results(self, market_meta):
if db_addr:
message = {
"context": "write_results",
"type": "store_market_results",
"type": "market_meta",
"market_id": self.marketconfig.market_id,
"data": market_meta,
}
Expand All @@ -746,7 +746,7 @@ async def store_flows(self, flows: dict[tuple, float]):
if db_addr:
message = {
"context": "write_results",
"type": "store_flows",
"type": "grid_flows",
"market_id": self.marketconfig.market_id,
"data": flows,
}
Expand Down
2 changes: 1 addition & 1 deletion assume/units/powerplant.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def calculate_min_max_power(
Note:
The calculation does not include ramping constraints and can be used for arbitrary start times in the future.
"""
# end includes the end of the last product, to get the last products' start time we deduct the frequency once
# end includes the end of the last product, to get the last products' start time we deduct the frequency once
end_excl = end - self.index.freq

base_load = self.outputs["energy"].loc[start:end_excl]
Expand Down
1 change: 1 addition & 0 deletions docs/source/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Upcoming Release
pandas Series. The `FastSeries` class retains a close resemblance to the pandas Series, including core
functionalities like indexing, slicing, and arithmetic operations. This ensures seamless integration,
allowing users to work with the new class without requiring significant code adaptation.
- **Performance Optimization:** Output role handles dict data directly and only converts to DataFrame on Database write.

**Bugfixes:**
- **Tutorials**: General fixes of the tutorials, to align with updated functionalitites of Assume
Expand Down
5 changes: 2 additions & 3 deletions examples/notebooks/04_reinforcement_learning_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,6 @@
"\n",
" duration = (end - start) / timedelta(hours=1)\n",
"\n",
"\n",
" # calculate profit as income - running_cost from this event\n",
" order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n",
" order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n",
Expand Down Expand Up @@ -2091,6 +2090,7 @@
"\n",
" return bids\n",
"\n",
"\n",
"# we define the class again and inherit from the initial class just to add the additional method to the original class\n",
"# this is a workaround to have different methods of the class in different cells\n",
"# which is good for the purpose of this tutorial\n",
Expand Down Expand Up @@ -2125,7 +2125,7 @@
" for order in orderbook:\n",
" start = order[\"start_time\"]\n",
" end = order[\"end_time\"]\n",
" \n",
"\n",
" # end includes the end of the last product, to get the last products' start time we deduct the frequency once\n",
" end_excl = end - unit.index.freq\n",
"\n",
Expand All @@ -2136,7 +2136,6 @@
"\n",
" duration = (end - start) / timedelta(hours=1)\n",
"\n",
"\n",
" # calculate profit as income - running_cost from this event\n",
" order_profit = order[\"accepted_price\"] * order[\"accepted_volume\"] * duration\n",
" order_cost = marginal_cost * order[\"accepted_volume\"] * duration\n",
Expand Down
Loading

0 comments on commit 5f9517f

Please sign in to comment.