Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandas wrapper #483

Merged
merged 53 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
227ea34
initial code for the pandas wrapper
nick-harder Nov 13, 2024
c3c591d
first functional version
nick-harder Nov 14, 2024
3661e40
-fixing tests
nick-harder Nov 14, 2024
ffc53f9
-improved functionality of fastseries
nick-harder Nov 14, 2024
4d09b6d
-small fix in flexable strategy
nick-harder Nov 14, 2024
73cb253
set default save frequency to None to speed up default simulation runs
nick-harder Nov 14, 2024
b94b089
-fix advanced strategies
nick-harder Nov 14, 2024
ad02294
-rename FastDateTimeIndex to FastIndex
nick-harder Nov 15, 2024
649ece2
-small improvements
nick-harder Nov 15, 2024
8a8a0c2
-improve speed of fastindex class
nick-harder Nov 15, 2024
1ce53a3
-improve structure and performance of the fast pandas
nick-harder Nov 15, 2024
94741db
-add iloc methods
nick-harder Nov 15, 2024
f92c105
-fix dmas storage strategy
nick-harder Nov 15, 2024
b1e9ff0
-create a TensorSeries to store tensor values without conversion to n…
nick-harder Nov 15, 2024
7346184
-add as_datetimeindex to convert FastIndex to pandas index
nick-harder Nov 15, 2024
5442e17
-adjust typing
nick-harder Nov 15, 2024
9a9afee
-fixing several tests
nick-harder Nov 15, 2024
9e59388
-remove initializing TensorFastSeries only when RL strategy is defined
nick-harder Nov 20, 2024
5685d39
fix test_get_actual_dispatch
maurerle Nov 20, 2024
217cfee
-fix check for learning strategy
nick-harder Nov 20, 2024
fdf2902
-fix tests and a small bug
nick-harder Nov 20, 2024
9ea05d6
-remove index from unit initialization
nick-harder Nov 20, 2024
94b8ad1
move convert_forecasts_to_fast_series to csvforecast
maurerle Nov 20, 2024
54cfe30
update notebook_03 to suit new usage
maurerle Nov 20, 2024
bfda716
Improve pandas mapper (#490)
maurerle Nov 21, 2024
bca37cc
Merge branch 'main' into pandas_wrapper
nick-harder Nov 21, 2024
a5dd426
-add a check to asign values to all fastseries
nick-harder Nov 21, 2024
e421030
allow setting an index part with a list or a series
maurerle Nov 21, 2024
4e93615
Merge branch 'main' into pandas_wrapper
nick-harder Nov 21, 2024
fb2e71a
fix type hints in Forecaster
maurerle Nov 21, 2024
9652686
Merge branch 'main' into pandas_wrapper
nick-harder Nov 22, 2024
03e69b1
fix tutorial 03
maurerle Nov 22, 2024
028b551
update some type hints
maurerle Nov 22, 2024
13afb66
update some type hints
nick-harder Nov 22, 2024
dcef5e7
-update usage of loc and at for consistency across the whole code
nick-harder Nov 22, 2024
5484eda
-switch naming in bidding startegies to more understandable ones
nick-harder Nov 22, 2024
f69ba69
-add iat method to fastseries just in case
nick-harder Nov 22, 2024
c761216
run all non-learning tests and fix errors
nick-harder Nov 22, 2024
564b7c7
add test for parse_duration
maurerle Nov 22, 2024
259043b
fix warning in dmas powerplant
maurerle Nov 22, 2024
db2db0e
-add release notes
nick-harder Nov 22, 2024
f5f0431
-fix learning strategies and broken config
nick-harder Nov 22, 2024
79dd828
reduce log level for async run status
maurerle Nov 22, 2024
efbaa8f
-adjust save_forecast function to reflect new code
nick-harder Nov 22, 2024
f70c7dc
-add a bit clarity to examples.py and remove creating a new local db …
nick-harder Nov 22, 2024
764ea28
Merge branch 'main' into pandas_wrapper
nick-harder Nov 25, 2024
6e32a59
revert increasing default save_frequency_hours to reduce memory usage
maurerle Nov 25, 2024
3cc85e0
-add max size check to the output role
nick-harder Nov 25, 2024
1eaf736
-rename things
nick-harder Nov 25, 2024
f01fd09
move calculate_content_size to utils
maurerle Nov 25, 2024
47f37c4
use fork multiprocessing with mango 2.x
maurerle Nov 25, 2024
dc19710
slicing out of range should not through but return empty iterable
maurerle Nov 25, 2024
445b168
fix output moving to utils
maurerle Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 86 additions & 115 deletions assume/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@
from typing import TypedDict

import numpy as np
import pandas as pd

from assume.common.fast_pandas import FastSeries, TensorFastSeries
from assume.common.forecasts import Forecaster
from assume.common.market_objects import MarketConfig, Orderbook, Product

try:
import torch as th
except ImportError:
th = None


class BaseStrategy:
pass
Expand All @@ -26,22 +21,12 @@
"""
A base class for a unit. This class is used as a foundation for all units.

Attributes:
id (str): The ID of the unit.
unit_operator (str): The operator of the unit.
technology (str): The technology of the unit.
bidding_strategies (dict[str, BaseStrategy]): The bidding strategies of the unit.
index (pandas.DatetimeIndex): The index of the unit.
node (str, optional): The node of the unit. Defaults to "".
forecaster (Forecaster, optional): The forecast of the unit. Defaults to None.
**kwargs: Additional keyword arguments.

Args:
id (str): The ID of the unit.
unit_operator (str): The operator of the unit.
technology (str): The technology of the unit.
bidding_strategies (dict[str, BaseStrategy]): The bidding strategies of the unit.
index (pandas.DatetimeIndex): The index of the unit.
index (FastIndex): The index of the unit.
node (str, optional): The node of the unit. Defaults to "".
forecaster (Forecaster, optional): The forecast of the unit. Defaults to None.
location (tuple[float, float], optional): The location of the unit. Defaults to (0.0, 0.0).
Expand All @@ -55,38 +40,41 @@
unit_operator: str,
technology: str,
bidding_strategies: dict[str, BaseStrategy],
index: pd.DatetimeIndex,
forecaster: Forecaster,
node: str = "node0",
forecaster: Forecaster = None,
location: tuple[float, float] = (0.0, 0.0),
**kwargs,
):
self.id = id
self.unit_operator = unit_operator
self.technology = technology
self.bidding_strategies: dict[str, BaseStrategy] = bidding_strategies
self.forecaster = forecaster
self.index = forecaster.index

self.node = node
self.location = location
self.bidding_strategies: dict[str, BaseStrategy] = bidding_strategies
self.index = index
self.outputs = defaultdict(lambda: pd.Series(0.0, index=self.index))
# series does not like to convert from tensor to float otherwise

# RL data stored as lists to simplify storing to the buffer
self.outputs["rl_observations"] = []
self.outputs["rl_actions"] = []
self.outputs["rl_rewards"] = []
self.outputs = defaultdict(lambda: FastSeries(value=0.0, index=self.index))
# series does not like to convert from tensor to float otherwise

# some data is stored as series to allow to store it in the outputs
self.outputs["actions"] = pd.Series(0.0, index=self.index, dtype=object)
self.outputs["exploration_noise"] = pd.Series(
0.0, index=self.index, dtype=object
)
self.outputs["reward"] = pd.Series(0.0, index=self.index, dtype=object)
# check if any bidding strategy is using the RL strategy
if any(
isinstance(strategy, LearningStrategy)
for strategy in self.bidding_strategies.values()
):
self.outputs["actions"] = TensorFastSeries(value=0.0, index=self.index)
self.outputs["exploration_noise"] = TensorFastSeries(
value=0.0,
index=self.index,
)
self.outputs["reward"] = FastSeries(value=0.0, index=self.index)

if forecaster:
self.forecaster = forecaster
else:
self.forecaster = defaultdict(lambda: pd.Series(0.0, index=self.index))
# RL data stored as lists to simplify storing to the buffer
self.outputs["rl_observations"] = []
self.outputs["rl_actions"] = []
self.outputs["rl_rewards"] = []

def calculate_bids(
self,
Expand Down Expand Up @@ -128,12 +116,12 @@

return bids

def calculate_marginal_cost(self, start: pd.Timestamp, power: float) -> float:
def calculate_marginal_cost(self, start: datetime, power: float) -> float:
"""
Calculates the marginal cost for the given power.
Calculates the marginal cost for the given power.`

Args:
start (pandas.Timestamp): The start time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
power (float): The power output of the unit.

Returns:
Expand Down Expand Up @@ -192,19 +180,23 @@
start = self.index[0]

product_type_mc = product_type + "_marginal_costs"
product_data = self.outputs[product_type].loc[start:end]

marginal_costs = product_data.index.map(
lambda t: self.calculate_marginal_cost(t, product_data.loc[t])
)
new_values = np.abs(marginal_costs * product_data.values)
# Adjusted code for accessing product data and mapping over the index
product_data = self.outputs[product_type].loc[
start:end
] # Slicing directly without `.loc`

marginal_costs = [
self.calculate_marginal_cost(t, product_data[idx])
for idx, t in enumerate(self.index[start:end])
]
new_values = np.abs(marginal_costs * product_data)
self.outputs[product_type_mc].loc[start:end] = new_values

def execute_current_dispatch(
self,
start: pd.Timestamp,
end: pd.Timestamp,
) -> pd.Series:
start: datetime,
end: datetime,
) -> np.array:
"""
Checks if the total dispatch plan is feasible.

Expand All @@ -218,7 +210,7 @@
Returns:
The volume of the unit within the given time range.
"""
return self.outputs["energy"][start:end]
return self.outputs["energy"].loc[start:end]

def get_output_before(self, dt: datetime, product_type: str = "energy") -> float:
"""
Expand Down Expand Up @@ -267,21 +259,21 @@
end_excl = end - self.index.freq

if isinstance(order["accepted_volume"], dict):
cashflow = [
float(order["accepted_price"][i] * order["accepted_volume"][i])
for i in order["accepted_volume"].keys()
]
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * self.index.freq.n
cashflow = np.array(

Check warning on line 262 in assume/common/base.py

View check run for this annotation

Codecov / codecov/patch

assume/common/base.py#L262

Added line #L262 was not covered by tests
[
float(order["accepted_price"][i] * order["accepted_volume"][i])
for i in order["accepted_volume"].keys()
]
)
else:
cashflow = float(
order.get("accepted_price", 0) * order.get("accepted_volume", 0)
)
elapsed_intervals = (end - start) / pd.Timedelta(self.index.freq)
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * elapsed_intervals
)

elapsed_intervals = (end - start) / self.index.freq
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * elapsed_intervals
)

def get_starting_costs(self, op_time: int) -> float:
"""
Expand Down Expand Up @@ -322,18 +314,18 @@
min_down_time: int = 0

def calculate_min_max_power(
self, start: pd.Timestamp, end: pd.Timestamp, product_type: str = "energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type: str = "energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max power for the given time period.

Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str): The product type of the unit.

Returns:
tuple[pandas.Series, pandas.Series]: The min and max power for the given time period.
tuple[np.array, np.array]: The min and max power for the given time period.
"""

def calculate_ramp(
Expand All @@ -355,7 +347,6 @@
Returns:
float: The corrected possible power to offer according to ramping restrictions.
"""

# was off before, but should be on now and min_down_time is not reached
if power > 0 and op_time < 0 and op_time > -self.min_down_time:
power = 0
Expand Down Expand Up @@ -383,20 +374,6 @@
)
return power

def get_clean_spread(self, prices: pd.DataFrame) -> float:
"""
Returns the clean spread for the given prices.

Args:
prices (pandas.DataFrame): The prices.

Returns:
float: The clean spread for the given prices.
"""
emission_cost = self.emission_factor * prices["co"].mean()
fuel_cost = prices[self.technology.replace("_combined", "")].mean()
return (fuel_cost + emission_cost) / self.efficiency

def get_operation_time(self, start: datetime) -> int:
"""
Returns the time the unit is operating (positive) or shut down (negative).
Expand All @@ -405,24 +382,32 @@
start (datetime.datetime): The start time.

Returns:
int: The operation time.
int: The operation time as a positive integer if operating, or negative if shut down.
"""
before = start - self.index.freq
# Set the time window based on max of min operating/down time
max_time = max(self.min_operating_time, self.min_down_time, 1)
begin = max(start - self.index.freq * max_time, self.index[0])
end = start - self.index.freq

max_time = max(self.min_operating_time, self.min_down_time)
begin = start - self.index.freq * max_time
end = before
arr = self.outputs["energy"][begin:end][::-1] > 0
if len(arr) < 1:
if start <= self.index[0]:
# before start of index
return max_time
is_off = not arr.iloc[0]

# Check energy output in the defined time window, reversed for most recent state
arr = (self.outputs["energy"].loc[begin:end] > 0)[::-1]

# Determine initial state (off if the first period shows zero energy output)
is_off = not arr[0]
run = 0

# Count consecutive periods with the same status, break on change
for val in arr:
if val == is_off:
if val != (not is_off): # Stop if the state changes
break
run += 1
return (-1) ** is_off * run

# Return positive time if operating, negative if shut down
return -run if is_off else run

def get_average_operation_times(self, start: datetime) -> tuple[float, float]:
"""
Expand All @@ -440,14 +425,14 @@
op_series = []

before = start - self.index.freq
arr = self.outputs["energy"][self.index[0] : before][::-1] > 0
arr = self.outputs["energy"].loc[self.index[0] : before][::-1] > 0

if len(arr) < 1:
# before start of index
return max(self.min_operating_time, 1), min(-self.min_down_time, -1)

op_series = []
status = arr.iloc[0]
status = arr[0]
run = 0
for val in arr:
if val == status:
Expand Down Expand Up @@ -537,33 +522,33 @@
efficiency_discharge: float

def calculate_min_max_charge(
self, start: pd.Timestamp, end: pd.Timestamp, product_type="energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type="energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max charging power for the given time period.

Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str, optional): The product type of the unit. Defaults to "energy".

Returns:
tuple[pandas.Series, pandas.Series]: The min and max charging power for the given time period.
tuple[np.array, np.array]: The min and max charging power for the given time period.
"""

def calculate_min_max_discharge(
self, start: pd.Timestamp, end: pd.Timestamp, product_type="energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type="energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max discharging power for the given time period.

Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str, optional): The product type of the unit. Defaults to "energy".

Returns:
tuple[pandas.Series, pandas.Series]: The min and max discharging power for the given time period.
tuple[np.array, np.array]: The min and max discharging power for the given time period.
"""

def get_soc_before(self, dt: datetime) -> float:
Expand All @@ -583,20 +568,6 @@
else:
return self.outputs["soc"].at[dt - self.index.freq]

def get_clean_spread(self, prices: pd.DataFrame) -> float:
"""
Returns the clean spread for the given prices.

Args:
prices (pandas.DataFrame): The prices.

Returns:
float: The clean spread for the given prices.
"""
emission_cost = self.emission_factor * prices["co"].mean()
fuel_cost = prices[self.technology.replace("_combined", "")].mean()
return (fuel_cost + emission_cost) / self.efficiency_charge

def calculate_ramp_discharge(
self,
previous_power: float,
Expand Down
Loading