Skip to content

Commit

Permalink
Adding novel trends features (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Patricia-IESE authored Oct 11, 2024
2 parents 5d9f107 + 41d1541 commit 14e84db
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 29 deletions.
104 changes: 99 additions & 5 deletions badgers/generators/time_series/trends.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import abc
from typing import Tuple
from typing import Tuple, Union

import numpy as np
from numpy.random import default_rng

from badgers.core.base import GeneratorMixin
from badgers.core.decorators.time_series import preprocess_inputs
from badgers.generators.time_series.utils import generate_random_patterns_indices


class TrendsGenerator(GeneratorMixin):
Expand Down Expand Up @@ -36,13 +37,106 @@ def __init__(self, random_generator=default_rng(seed=0)):
def generate(self, X, y, slope) -> Tuple:
"""
:param X: the input signal to be transformed
:param y: not changed (here for API compatibility)
:param slope: the slope of the trend (increase per time unit)
:type slope: Union[float | list]
:return: the transformed signal Xt (X + linear trend), and y (not changed)
"""

offset = np.linspace(0, slope * len(X), len(X))
Xt = X + offset
return Xt, y


class AdditiveLinearTrendGenerator(TrendsGenerator):
"""
Add a linear trend to the input time-series data
"""

def __init__(self, random_generator=default_rng(seed=0)):
super().__init__(random_generator=random_generator)

@preprocess_inputs
def generate(self, X, y, slope, start: int, end: int) -> Tuple:
"""
:param X: the input signal to be transformed
:param y: not changed (here for API compatibility)
:param slope: (increase per time unit)
:type slope: Union[float | list]
:param end:
:param start:
:return: the transformed signal Xt (X + linear trend), and y (not changed)
"""
if start is None:
# when start is not given, it is chosen randomly in the first half of the signal
start = self.random_generator.uniform(0, int(0.5 * len(X)))

if end is None:
# when end is not given, the trend will last until the end of the signal
end = len(X)

# computing offset:
# - 0s until "start"
# - from "start" to "end": linear trend with slope "slope",
# - from "end" on: the last value
offset = np.zeros(shape=X.shape)
offset[start:end, :] = np.linspace(0, slope * (end - start), end - start)
offset[end:, :] = offset[end - 1, :]

Xt = X + offset
return Xt, y


class RandomlySpacedLinearTrends(TrendsGenerator):
"""
Generates randomly time intervals where a linear trend is added to the signal
Slopes, Tme intervals locations and widths are chosen randomly
"""

def __init__(self, random_generator=default_rng(seed=0)):
super().__init__(random_generator=random_generator)

@preprocess_inputs
def generate(self, X, y, n_patterns: int = 10, min_width_pattern: int = 5,
max_width_patterns: int = 10, slope_min: float = -0.05, slope_max: float = 0.05) -> Tuple:
"""
Generates randomly time intervals where a linear trend is added to the signal
Slopes, Tme intervals locations and widths are chosen randomly.
:param X:
:param y:
:param slope:
:param n_patterns: the total number of time intervals where a linear trend is add
:param min_width_pattern: the minimum with of the time intervals
:param max_width_patterns: the maximum with of the time intervals
:param slope_min: the minimum value of the slope (slope is chosen uniformly at random between min_slope and max_slope for each time interval and each column of X)
:param slope_max: the maximum value of the slope (slope is chosen uniformly at random between min_slope and max_slope for each time interval and each column of X)
:return:
"""

t = np.linspace(0, 1, len(X))
trend = t[:, np.newaxis] * slope
Xt = X.add(trend, axis=0)
# generate patterns indices and values
self.patterns_indices_ = generate_random_patterns_indices(
random_generator=self.random_generator,
n_patterns=n_patterns,
signal_size=len(X),
min_width_pattern=min_width_pattern,
max_width_patterns=max_width_patterns)

# generate random slopes
self.slopes_ = self.random_generator.uniform(low=slope_min, high=slope_max, size=(n_patterns, X.shape[1]))

offset = np.zeros(shape=X.shape)

for (start, end), slope in zip(self.patterns_indices_, self.slopes_):
# computing offset:
# - don't change until "start"
# - from "start" to "end": add linear trend with slope "slope",
# - from "end" on: add the last value
offset[start:end, :] = np.linspace(offset[start, :], offset[start, :] + slope * (end - start), end - start)
offset[end:, :] = offset[end - 1, :]

Xt = X + offset
return Xt, y
173 changes: 154 additions & 19 deletions docs/tutorials/time-series/Trends-Time-Series.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "badgers"
version = "0.0.8"
version = "0.0.9"
keywords = ["data quality", "bad data", "data science"]
authors = [
{ name = "Julien Siebert", email = "[email protected]" },
Expand Down
70 changes: 66 additions & 4 deletions tests/generators/time_series/test_trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from numpy.random import default_rng
from pandas._testing import assert_frame_equal

from badgers.generators.time_series.trends import GlobalAdditiveLinearTrendGenerator
from badgers.generators.time_series.trends import GlobalAdditiveLinearTrendGenerator, AdditiveLinearTrendGenerator, \
RandomlySpacedLinearTrends


class TestTrendsGenerator(unittest.TestCase):
class TestGlobalAdditiveLinearTrendsGenerator(unittest.TestCase):
def setUp(self):
self.random_generator = default_rng(seed=0)

Expand All @@ -20,13 +21,74 @@ def test_global_additive_linear_trend_generator(self):
y = None

slope = np.array([1, 2, 3, 4])
t = np.linspace(0, 1, len(X))
expected_Xt = pd.DataFrame(data=t[:, np.newaxis] * slope, columns=X.columns, index=X.index)

expected_Xt = pd.DataFrame(
data=np.array([np.linspace(0, len(X) * s, len(X)) for s in slope]).T,
columns=X.columns, index=X.index
)

Xt, _ = global_additive_linear_trend_generator.generate(X, y, slope=slope)

assert_frame_equal(Xt, expected_Xt)


class TestAdditiveLinearTrendsGenerator(unittest.TestCase):
def setUp(self):
self.random_generator = default_rng(seed=0)

def test_global_additive_linear_trend_generator(self):
# Test the generate method of AdditiveLinearTrendGenerator
additive_linear_trend_generator = AdditiveLinearTrendGenerator(
random_generator=self.random_generator)
X = pd.DataFrame(data=np.zeros(shape=(10, 4)), columns=[f'col{i}' for i in range(4)])
y = None
start = 3
end = 7

slope = np.array([0, 0.5, 1, 2])

expected_Xt = pd.DataFrame(
data=np.array(
[
[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 2. / 3., 4. / 3., 8. / 3.],
[0., 4. / 3., 8. / 3., 16. / 3.],
[0., 2., 4., 8.],
[0., 2., 4., 8.],
[0., 2., 4., 8.],
[0., 2., 4., 8.]
]
),
columns=X.columns, index=X.index
)

Xt, _ = additive_linear_trend_generator.generate(X, y, slope=slope, start=start, end=end)

assert_frame_equal(Xt, expected_Xt)


class TestRandomlySpacedLinearTrends(unittest.TestCase):
def setUp(self):
self.random_generator = default_rng(seed=0)

def test_global_additive_linear_trend_generator(self):
# Test the generate method of AdditiveLinearTrendGenerator
randomly_spaced_trend_generator = RandomlySpacedLinearTrends(
random_generator=self.random_generator)
X = pd.DataFrame(data=np.zeros(shape=(100, 4)), columns=[f'col{i}' for i in range(4)])
y = None

Xt, _ = randomly_spaced_trend_generator.generate(X, y, n_patterns=5, min_width_pattern=5, max_width_patterns=10)

# assert outside time intervals, constant values
for i in range(1, len(randomly_spaced_trend_generator.patterns_indices_)):
s = randomly_spaced_trend_generator.patterns_indices_[i - 1][1]
e = randomly_spaced_trend_generator.patterns_indices_[i][0]
self.assertEqual(Xt[s:e].diff().dropna().sum().sum(), 0)


if __name__ == '__main__':
unittest.main()

0 comments on commit 14e84db

Please sign in to comment.