Skip to content

Commit

Permalink
chore: clean up TimeSeriesRate and remove reindex from consumer (#668)
Browse files Browse the repository at this point in the history
  • Loading branch information
olelod authored Nov 1, 2024
1 parent adb5bf5 commit c435999
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 271 deletions.
219 changes: 93 additions & 126 deletions src/libecalc/common/utils/rates.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import math
from abc import ABC, abstractmethod
from abc import ABC
from collections import defaultdict
from collections.abc import Iterable, Iterator
from datetime import datetime
Expand Down Expand Up @@ -175,45 +175,96 @@ def __lt__(self, other) -> bool:

return all(self_value < other_value for self_value, other_value in zip(self.values, other.values))

@property
def period(self):
return Period(start=self.periods.periods[0].start, end=self.periods.periods[-1].end)

@property
def first_date(self):
return self.period().start
return self.period.start

@property
def last_date(self):
return self.period().end
return self.period.end

@property
def all_dates(self) -> list[datetime]:
return self.start_dates() + [self.last_date()]
return self.start_dates + [self.last_date]

@property
def start_dates(self):
return [period.start for period in self.periods]

@property
def end_dates(self):
return [period.end for period in self.periods]

@property
def max(self):
return max(self.values)

@abstractmethod
def resample(self, freq: Frequency, include_start_date: bool, include_end_date: bool) -> Self: ...
def resample(self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True) -> Self:
"""
Resample using forward-fill This means that a value is assumed to be the same until the next observation,
e.g. covering the whole period interval.
Args:
freq: The frequency the time series should be resampled to
include_start_date: Whether to include the start date if it is not part of the requested reporting frequency
include_end_date: Whether to include the end date if it is not part of the requested reporting frequency
Returns:
TimeSeries resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.periods, self.values

ds = pd.Series(index=self.start_dates, data=self.values)
new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)
new_time_steps = new_periods.start_dates
ds_resampled = ds.reindex(new_time_steps).ffill()

return self.__class__(
periods=new_periods,
values=ds_resampled.values.tolist(),
unit=self.unit,
)

def extend(self, other: TimeSeries) -> Self:
"""Extend the time series with another time series.
The two time series needs to follow immediately after each other in time.
Args:
other: The time series to extend with
Returns:
The extended time series
"""
if self.unit != other.unit:
raise ValueError(f"Mismatching units: '{self.unit}' != `{other.unit}`")

if len(self) > 0 and len(other) > 0:
if self.last_date != other.first_date and self.first_date != other.last_date:
raise ValueError("Can not extend two TimeSeries when there is a gap in time between them.")
if self.last_date == other.first_date:
return self.__class__(
periods=self.periods + other.periods,
values=self.values + other.values,
unit=self.unit,
)
return self.__class__(
periods=self.periods + other.periods,
values=self.values + other.values,
periods=other.periods + self.periods,
values=other.values + self.values,
unit=self.unit,
)

def merge(self, other: TimeSeries) -> Self:
"""
Merge two TimeSeries with differing periods
The periods need to be non-overlapping and follow right after eavh other
The periods need to be non-overlapping and follow right after each other
Args:
other:
Expand All @@ -227,13 +278,13 @@ def merge(self, other: TimeSeries) -> Self:
if self.unit != other.unit:
raise ValueError(f"Mismatching units: '{self.unit}' != '{other.unit}'")

if Period.intersects(self.period(), other.period()) != 0:
if Period.intersects(self.period, other.period) != 0:
raise ValueError("Can not merge two TimeSeries with overlapping periods.")

if self.first_date() != other.last_date() and self.last_date() != other.first_date():
if self.first_date != other.last_date and self.last_date != other.first_date:
raise ValueError("Can not merge two TimeSeries when there is a gap in time between them.")

if self.first_date() < other.last_date():
if self.first_date < other.last_date:
first = self
second = other
else:
Expand Down Expand Up @@ -352,26 +403,26 @@ def __setitem__(
f"Could not update timeseries, Combination of indices of type '{type(indices)}' and values of type '{type(values)}' is not supported"
)

def reindex_periods(
def fill_values_for_new_periods(
self,
new_periods: Iterable[Period],
fillna: Union[float, str] = 0.0,
) -> np.ndarray:
fillna: Union[float, str, bool, int],
) -> TimeSeries:
"""Based on a consumer time function result (EnergyFunctionResult), the corresponding time vector and
the consumer time vector, we calculate the actual consumer (consumption) rate.
"""
for period in self.periods:
if period not in new_periods:
raise ValueError(
f"You can not alter the existing periods. This is not resampling. Period {period} is not part of the new periods."
)
new_values: defaultdict[Period, Union[float, str]] = defaultdict(float)
new_values.update({t: fillna for t in new_periods})
for t, v in zip(self.periods, self.values):
if t in new_values:
new_values[t] = v
else:
logger.warning(
"Reindexing consumer time vector and losing data. This should not happen."
" Please contact eCalc support."
)

return np.array([rate_sum for time, rate_sum in sorted(new_values.items())])
return self.__class__(periods=new_periods, values=new_values.values(), unit=self.unit)

def __eq__(self, other: object) -> bool:
if not isinstance(other, TimeSeries):
Expand All @@ -384,64 +435,10 @@ def __eq__(self, other: object) -> bool:
)


class TimeSeriesString(TimeSeries[str]):
def resample(self, freq: Frequency, include_start_date: bool, include_end_date: bool) -> Self:
"""
Resample using forward-fill This means that a value is assumed to be the same until the next observation,
e.g. covering the whole period interval.
Args:
freq: The frequency the time series should be resampled to
include_start_date:
include_end_date
Returns:
TimeSeriesString resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=[period.start for period in self.periods], data=self.values)

# New resampled pd.Series
ds_resampled = ds.resample(freq).ffill()

return TimeSeriesString(
periods=ds_resampled.index.to_pydatetime().tolist(),
values=ds_resampled.values.tolist(),
unit=self.unit,
)


class TimeSeriesInt(TimeSeries[int]):
def resample(self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True) -> Self:
"""
Resample using forward-fill This means that a value is assumed to be the same until the next observation,
e.g. covering the whole period interval.
class TimeSeriesString(TimeSeries[str]): ...

Args:
freq: The frequency the time series should be resampled to

Returns:
TimeSeriesInt resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=self.start_dates(), data=self.values)

new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)
new_time_steps = new_periods.start_dates
# New resampled pd.Series
ds_resampled = ds.reindex(new_time_steps).ffill()

return TimeSeriesInt(
periods=new_periods,
values=list(ds_resampled.values.tolist()),
unit=self.unit,
)
class TimeSeriesInt(TimeSeries[int]): ...


class TimeSeriesBoolean(TimeSeries[bool]):
Expand All @@ -453,6 +450,8 @@ def resample(self, freq: Frequency, include_start_date: bool = True, include_end
Args:
freq: The frequency the time series should be resampled to
include_start_date: Whether to include the start date if it is not part of the requested reporting frequency
include_end_date: Whether to include the end date if it is not part of the requested reporting frequency
Returns:
TimeSeriesBoolean resampled to the given frequency
Expand All @@ -467,8 +466,8 @@ def resample(self, freq: Frequency, include_start_date: bool = True, include_end

# Iterate over all pairs of subsequent dates in the new time vector
for period in new_periods:
start_index = self.all_dates().index(max([date for date in self.all_dates() if date <= period.start]))
end_index = self.all_dates().index(max([date for date in self.all_dates() if date < period.end]))
start_index = self.all_dates.index(max([date for date in self.all_dates if date <= period.start]))
end_index = self.all_dates.index(max([date for date in self.all_dates if date < period.end]))
resampled.append(all(self.values[start_index : end_index + 1]))

return TimeSeriesBoolean(
Expand All @@ -493,54 +492,19 @@ def __mul__(self, other: object) -> Self:
class TimeSeriesFloat(TimeSeries[float]):
@field_validator("values", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any, info: ValidationInfo) -> list[TimeSeriesValue]:
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
return v

def resample(self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True) -> Self:
"""
Resample using forward-fill This means that a value is assumed to be the same until the next observation,
e.g. covering the whole period interval.
Args:
freq: The frequency the time series should be resampled to
Returns:
TimeSeriesFloat resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=self.start_dates(), data=self.values)

new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)
new_start_dates = new_periods.start_dates
ds_resampled = ds.reindex(new_start_dates).ffill()

return self.__class__(
periods=new_periods,
values=[float(x) for x in ds_resampled.values.tolist()],
unit=self.unit,
)

def reindex(self, new_periods: Periods) -> Self:
"""
Ensure to map correct value to correct period in the final resulting time vector.
"""
reindex_values = self.reindex_periods(new_periods)
return self.__class__(periods=new_periods, values=reindex_values.tolist(), unit=self.unit)


class TimeSeriesVolumesCumulative(TimeSeries[float]):
"""This will represent the sum of the volumes in all periods up to and including each individual period."""

@field_validator("values", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any, info: ValidationInfo) -> list[TimeSeriesValue]:
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
Expand All @@ -560,14 +524,16 @@ def resample(
Args:
freq: The frequency the time series should be resampled to or the Periods to resample to
include_start_date: Whether to include the start date if it is not part of the requested reporting frequency
include_end_date: Whether to include the end date if it is not part of the requested reporting frequency
Returns:
TimeSeriesVolumesCumulative resampled to the given frequency or given Periods
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=self.all_dates(), data=[0] + self.values) # cumulative volume always zero at start date
ds = pd.Series(index=self.all_dates, data=[0] + self.values) # cumulative volume always zero at start date
new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)
Expand Down Expand Up @@ -634,7 +600,7 @@ def to_volumes(self) -> TimeSeriesVolumes:
class TimeSeriesVolumes(TimeSeries[float]):
@field_validator("values", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any, info: ValidationInfo) -> list[TimeSeriesValue]:
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
Expand Down Expand Up @@ -684,8 +650,8 @@ def to_rate(self, regularity: Optional[list[float]] = None) -> TimeSeriesRate:
Returns:
Average production rate
"""
if len(self.all_dates()) > 1:
delta_days = calculate_delta_days(np.asarray(self.all_dates())).tolist()
if len(self.all_dates) > 1:
delta_days = calculate_delta_days(np.asarray(self.all_dates)).tolist()
average_rates = [volume / days for volume, days in zip(self.values, delta_days)]
else:
average_rates = self.values
Expand Down Expand Up @@ -738,7 +704,7 @@ def __add__(self, other: TimeSeriesStreamDayRate) -> TimeSeriesStreamDayRate:
class TimeSeriesCalendarDayRate(TimeSeriesFloat):
"""
Application layer only - only calendar day rate/used for reporting
Probably not needed, as we want to provide info on regularity etc for the fixed calendar rate data too
Probably not needed, as we want to provide info on regularity etc. for the fixed calendar rate data too
"""

...
Expand All @@ -761,13 +727,14 @@ class TimeSeriesRate(TimeSeries[float]):

@field_validator("values", "regularity", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any, info: ValidationInfo) -> list[TimeSeriesValue]:
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
return v

@field_validator("regularity")
@classmethod
def check_regularity_length(cls, regularity: list[float], info: ValidationInfo) -> list[float]:
regularity_length = len(regularity)
periods_length = len(info.data["periods"].periods)
Expand Down Expand Up @@ -858,13 +825,13 @@ def merge(self, other: TimeSeries) -> TimeSeriesRate:
"Mismatching rate type. Currently you can not merge stream/calendar day rates with calendar/stream day rates."
)

if Period.intersects(self.period(), other.period()) != 0:
if Period.intersects(self.period, other.period) != 0:
raise ValueError("Can not merge two TimeSeries with overlapping periods")

if self.first_date() != other.last_date() and self.last_date() != other.first_date():
if self.first_date != other.last_date and self.last_date != other.first_date:
raise ValueError("Can not merge two TimeSeries when there is a gap in time between them")

if self.first_date() < other.last_date():
if self.first_date < other.last_date:
first = self
second = other
else:
Expand Down Expand Up @@ -893,7 +860,7 @@ def for_period(self, period: Period) -> Self:
Returns:
"""
if not Period.intersects(self.period(), period):
if not Period.intersects(self.period, period):
return self.__class__(
periods=Periods([]),
values=[],
Expand Down
Loading

0 comments on commit c435999

Please sign in to comment.