-
Notifications
You must be signed in to change notification settings - Fork 4
/
electricity_tasks.py
289 lines (239 loc) · 16.1 KB
/
electricity_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from tactis.gluon.dataset import get_dataset
from gluonts.dataset.util import to_pandas
import numpy as np
from ..base import UnivariateCRPSTask
from ..config import DATA_STORAGE_PATH
from ..utils import get_random_window_univar, datetime_to_str
from ..memorization_mitigation import add_realistic_noise
from . import WeightCluster
class ElectricityIncreaseInPredictionTask(UnivariateCRPSTask):
"""
A task where the consumption of electricity spikes in prediction part,
due to a heat wave and people using a lot of air conditioning.
The spikes should be deducted from the context and reflected in the forecast.
TODO: A multivariate extension of this task, where weather is another time series
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + ["instruction following"]
__version__ = "0.0.3" # Modification will trigger re-caching
def random_instance(self):
datasets = ["electricity_hourly"]
# Select a random dataset
dataset_name = self.random.choice(datasets)
dataset = get_dataset(dataset_name, regenerate=False, path=DATA_STORAGE_PATH)
assert len(dataset.train) == len(
dataset.test
), "Train and test sets must contain the same number of time series"
# Get the dataset metadata
metadata = dataset.metadata
# Select a random time series
ts_index = self.random.choice(len(dataset.train))
full_series = to_pandas(list(dataset.test)[ts_index])
# Select a random window
window = get_random_window_univar(
full_series,
prediction_length=metadata.prediction_length,
history_factor=self.random.randint(3, 7),
random=self.random,
)
# Extract the history and future series
history_series = window.iloc[: -metadata.prediction_length]
future_series = window.iloc[-metadata.prediction_length :]
if dataset_name == "electricity_hourly":
# Sample a starting point in the first half of the prediction
future_series.index = future_series.index.to_timestamp()
# Arbitrary way to select a start date: sort the values of future_series (excluding the last 4 points), pick it from the largest 5 values
spike_start_point = self.random.choice(
np.argsort(future_series.values[:-4])[-5:][::-1]
)
spike_start_date = future_series.index[spike_start_point]
spike_duration = self.random.choice(
[1, 2, 3]
) # Arbitrarily picked from 1,2,3
spike_magnitude = self.random.choice(
[3, 4, 5]
) # Arbitrarily set to twice or thrice the max value in the time series
# Add spike to the data
future_series.iloc[
spike_start_point : spike_start_point + spike_duration
] = (spike_magnitude * future_series.iloc[spike_start_point])
# Convert future index to timestamp for consistency
history_series.index = history_series.index.to_timestamp()
# Transform
history_series = add_realistic_noise(history_series, self.random)
future_series = add_realistic_noise(future_series, self.random)
background = f"This is the electricity consumption recorded in Kilowatt (kW) in city A."
scenario = self.get_scenario(
spike_start_date, spike_duration, spike_magnitude
)
else:
raise NotImplementedError(f"Dataset {dataset_name} is not supported.")
# Instantiate the class variables
self.past_time = history_series.to_frame()
self.future_time = future_series.to_frame()
self.constraints = None
self.background = background
self.scenario = scenario
# ROI metric parameters
self.region_of_interest = slice(
spike_start_point, spike_start_point + spike_duration
)
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
return f"Suppose that there is a heat wave in city A from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'} in city A, leading to excessive use of air conditioning, and {spike_magnitude} times the usual electricity being consumed."
class ElectricityIncreaseInPredictionWithDistractorText(
ElectricityIncreaseInPredictionTask
):
"""
ElectricityIncreaseInPredictionTask with 3 different distractors in the context. The model would have to retrieve the right context to succeed in this task.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + [
"instruction following",
"retrieval: context",
]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
relevant_context = f"Suppose that there is a heat wave in city A from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, leading to excessive use of air conditioning, and {spike_magnitude} times the usual electricity being consumed."
distractor_types = [1, 2, 3]
distractor_type = self.random.choice(distractor_types)
if distractor_type == 1:
distractor_factors = [3, 4, 5, 6, 7, 8]
distractor_factor = self.random.choice(distractor_factors)
distractor_text = f"There was a festival in neighbouring cities B and C that resulted in {spike_magnitude+distractor_factor} times the usual electricity being consumed there. But this did not affect electricity consumption in city A."
elif distractor_type == 2:
spike_month = spike_start_date.month
distractor_factors = [3, 4, 5, 6, 7, 8]
distractor_factor = self.random.choice(distractor_factors)
distractor_text = f"Historically, over the past 3 years, there have been patterns of increased electricity usage due to extreme cold weather in city A in the month of {spike_month}, decreasing electricity consumption by {spike_magnitude+distractor_factor} times the usual electricity being consumed there. But this year, the cold wave is not expected to happen." # One concern with this is that both the history and the scenario probably belong to the same month, so this text may not affect the model
elif distractor_type == 3:
dip_percentages = [75, 85, 95]
dip_percentage = self.random.choice(dip_percentages)
distractor_text = f"A brief technical issue in the electricity grid caused a major dip of {dip_percentage}% in electricity consumption 2 weeks ago. This issue is not expected to happen again this week."
distractor_context_order = self.random.choice(
[1, 2]
) # Put relevant context before or after the distractor
if distractor_context_order == 1:
return " ".join([distractor_text, relevant_context])
elif distractor_context_order == 2:
return " ".join([relevant_context, distractor_text])
class ElectricityIncreaseInPredictionWithDistractorWithDates(
ElectricityIncreaseInPredictionTask
):
"""
ElectricityIncreaseInPredictionTask with a distractor with the same dates in the context. The model would have to retrieve the right context to succeed in this task.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + [
"instruction following",
"retrieval: context",
]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
distractor_types = [1, 2]
distractor_type = self.random.choice(distractor_types)
if distractor_type == 1:
distractor_factors = [3, 4, 5, 6, 7, 8]
distractor_factor = self.random.choice(distractor_factors)
distractor_text = f"There was a festival in neighbouring cities B and C that resulted in {spike_magnitude+distractor_factor} times the usual electricity being consumed there from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'}. But this did not affect electricity consumption in city A."
elif distractor_type == 2:
dip_percentages = [75, 85, 95]
dip_percentage = self.random.choice(dip_percentages)
distractor_text = f"A brief technical issue in the electricity grid in a nearby city caused a major dip of {dip_percentage}% from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'}. This issue has affected many nearby cities, but not this city."
return (
distractor_text
+ f"Suppose that there is a heat wave in city A from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, leading to excessive use of air conditioning, and {spike_magnitude} times the usual electricity being consumed."
)
class ElectricityIncreaseInPredictionWithSplitContext(
ElectricityIncreaseInPredictionTask
):
"""
ElectricityIncreaseInPredictionTask with a context providing the wrong magnitude of the spike, but correcting it later, providing the wrong magnitude of the spike.
The model would need to just follow instructions, but it would have to link instructions together to succeed.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + ["instruction following"]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
distractor_factors = [3, 4, 5, 6, 7, 8]
distractor_factor = self.random.choice(distractor_factors)
return f"Suppose that there is a heat wave in city A from {datetime_to_str(spike_start_date)} for {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, which would typically lead to excessive use of air conditioning, and {spike_magnitude+distractor_factor} times the usual electricity being consumed. But in this case, residents sought to conserve energy and used lesser air conditioning, resulting in excessive usage of only {spike_magnitude} times the usual electricity."
class ShortNewsElectricityIncreaseInPredictionTask(ElectricityIncreaseInPredictionTask):
"""
A version of the ElectricityIncreaseInPredictionTask where the relevent
information must be retrieved from within a short news article provided in context.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + [
"instruction following",
"retrieval: context",
]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
# This news article was generated with the assistance of Claude
scenario = f"A heatwave struck the city, which began on {datetime_to_str(spike_start_date)} and lasted for approximately {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, saw temperatures soar to unprecedented levels. According to the city's electricity provider, power consumption during the peak of the heatwave reached approximately {spike_magnitude} times the typical usage for this time of year."
return scenario
class MediumNewsElectricityIncreaseInPredictionTask(
ElectricityIncreaseInPredictionTask
):
"""
A version of the ElectricityIncreaseInPredictionTask where the relevent
information must be retrieved from within a medium length news article provided in context.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + [
"instruction following",
"retrieval: context",
]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
# This news article was generated with the assistance of Claude
scenario = f"A sudden and intense heatwave struck the city, causing a dramatic surge in electricity consumption as residents sought refuge from the scorching temperatures. The extreme weather event, which began on {datetime_to_str(spike_start_date)} and lasted for approximately {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, saw temperatures soar to unprecedented levels. In response, citizens across the metropolitan area turned to their air conditioning units en masse, leading to a significant strain on the local power grid. According to the city's electricity provider, power consumption during the peak of the heatwave reached approximately {spike_magnitude} times the typical usage for this time of year. \nFor now, citizens are encouraged to stay hydrated, check on vulnerable neighbors, and use air conditioning responsibly as the community works together to beat the heat."
return scenario
class LongNewsElectricityIncreaseInPredictionTask(ElectricityIncreaseInPredictionTask):
"""
A version of the ElectricityIncreaseInPredictionTask where the relevent
information must be retrieved from within a long news article provided in context.
"""
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov", "c_f"]
_skills = UnivariateCRPSTask._skills + [
"instruction following",
"retrieval: context",
]
__version__ = "0.0.2" # Modification will trigger re-caching
def get_scenario(self, spike_start_date, spike_duration, spike_magnitude):
# This news article was generated with the assistance of Claude
scenario = f"A sudden and intense heatwave struck the city, causing a dramatic surge in electricity consumption as residents sought refuge from the scorching temperatures. The extreme weather event, which began on {datetime_to_str(spike_start_date)} and lasted for approximately {spike_duration} {'hour' if spike_duration == 1 else 'hours'}, saw temperatures soar to unprecedented levels. In response, citizens across the metropolitan area turned to their air conditioning units en masse, leading to a significant strain on the local power grid.According to the city's electricity provider, power consumption during the peak of the heatwave reached approximately {spike_magnitude} times the typical usage for this time of year. \"We've never seen anything quite like this,\" said Jane Smith, spokesperson for PowerCity Utilities. \"The sudden spike in demand pushed our systems to their limits.\" \nAs the city recovers from this unprecedented power surge, experts are already discussing long-term solutions to manage similar situations in the future. These may include upgrades to the power grid, incentives for energy-efficient appliances, and the development of more robust emergency response protocols. \nFor now, citizens are encouraged to stay hydrated, check on vulnerable neighbors, and use air conditioning responsibly as the community works together to beat the heat."
return scenario
__TASKS__ = [
ElectricityIncreaseInPredictionTask,
ElectricityIncreaseInPredictionWithDistractorText,
ElectricityIncreaseInPredictionWithDistractorWithDates,
ElectricityIncreaseInPredictionWithSplitContext,
ShortNewsElectricityIncreaseInPredictionTask,
MediumNewsElectricityIncreaseInPredictionTask,
LongNewsElectricityIncreaseInPredictionTask,
]
__CLUSTERS__ = [
WeightCluster(
weight=1,
tasks=[
ElectricityIncreaseInPredictionTask,
],
),
WeightCluster(
weight=1,
tasks=[
ElectricityIncreaseInPredictionWithDistractorText,
ElectricityIncreaseInPredictionWithDistractorWithDates,
ElectricityIncreaseInPredictionWithSplitContext,
],
),
WeightCluster(
weight=1,
tasks=[
ShortNewsElectricityIncreaseInPredictionTask,
MediumNewsElectricityIncreaseInPredictionTask,
LongNewsElectricityIncreaseInPredictionTask,
],
),
]