forked from ibpsa/project1-boptest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
testcase.py
440 lines (348 loc) · 13 KB
/
testcase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# -*- coding: utf-8 -*-
"""
This module defines the API to the test case used by the REST requests to
perform functions such as advancing the simulation, retreiving test case
information, and calculating and reporting results.
"""
from pyfmi import load_fmu
import numpy as np
import copy
import config
import time
from data.data_manager import Data_Manager
from forecast.forecaster import Forecaster
from kpis.kpi_calculator import KPI_Calculator
class TestCase(object):
'''Class that implements the test case.
'''
def __init__(self):
'''Constructor.
'''
# Get configuration information
con = config.get_config()
# Define simulation model
self.fmupath = con['fmupath']
# Load fmu
self.fmu = load_fmu(self.fmupath, enable_logging=True)
# Get version and check is 2.0
self.fmu_version = self.fmu.get_version()
if self.fmu_version != '2.0':
raise ValueError('FMU must be version 2.0.')
# Instantiate a data manager for this test case
self.data_manager = Data_Manager(testcase=self)
# Load data and the kpis_json for the test case
self.data_manager.load_data_and_kpisjson()
# Instantiate a forecaster for this test case
self.forecaster = Forecaster(testcase=self)
# Instantiate a KPI calculator for the test case
self.cal = KPI_Calculator(testcase=self)
# Get available control inputs and outputs
input_names = self.fmu.get_model_variables(causality = 2).keys()
output_names = self.fmu.get_model_variables(causality = 3).keys()
# Get input and output meta-data
self.inputs_metadata = self._get_var_metadata(self.fmu, input_names, inputs=True)
self.outputs_metadata = self._get_var_metadata(self.fmu, output_names)
# Define outputs data
self.y = {'time':[]}
for key in output_names:
self.y[key] = []
self.y_store = copy.deepcopy(self.y)
# Define inputs data
self.u = {'time':[]}
for key in input_names:
self.u[key] = []
self.u_store = copy.deepcopy(self.u)
# Set default options
self.options = self.fmu.simulate_options()
self.options['CVode_options']['rtol'] = 1e-6
# Set default communication step
self.set_step(con['step'])
# Set default forecast parameters
self.set_forecast_parameters(con['horizon'], con['interval'])
# Set initial simulation start
self.start_time = 0
self.initialize = True
self.options['initialize'] = self.initialize
self.elapsed_control_time = []
def advance(self,u):
'''Advances the test case model simulation forward one step.
Parameters
----------
u : dict
Defines the control input data to be used for the step.
{<input_name> : <input_value>}
Returns
-------
y : dict
Contains the measurement data at the end of the step.
{<measurement_name> : <measurement_value>}
'''
# Calculate and store the elapsed time
if hasattr(self, 'tic_time'):
self.tac_time = time.time()
self.elapsed_control_time.append(self.tac_time-self.tic_time)
# Set final time
self.final_time = self.start_time + self.step
# Set control inputs if they exist and are written
# Check if possible to overwrite
if u.keys():
# If there are overwriting keys available
# Check that any are overwritten
written = False
for key in u.keys():
if u[key]:
written = True
break
# If there are, create input object
if written:
u_list = []
u_trajectory = self.start_time
for key in u.keys():
if key != 'time' and u[key]:
value = float(u[key])
# Check min/max if not activation input
if '_activate' not in key:
checked_value = self._check_value_min_max(key, value)
else:
checked_value = value
u_list.append(key)
u_trajectory = np.vstack((u_trajectory, checked_value))
input_object = (u_list, np.transpose(u_trajectory))
# Otherwise, input object is None
else:
input_object = None
# Otherwise, input object is None
else:
input_object = None
# Simulate
self.options['initialize'] = self.initialize
res = self.fmu.simulate(start_time=self.start_time,
final_time=self.final_time,
options=self.options,
input=input_object)
# Get result and store measurement
for key in self.y.keys():
self.y[key] = res[key][-1]
self.y_store[key] = self.y_store[key] + res[key].tolist()[1:]
# Store control inputs
for key in self.u.keys():
self.u_store[key] = self.u_store[key] + res[key].tolist()[1:]
# Advance start time
self.start_time = self.final_time
# Prevent inialize
self.initialize = False
# Raise the flag to compute time lapse
self.tic_time = time.time()
return self.y
def reset(self):
'''Reset the test.
'''
self.__init__()
def get_step(self):
'''Returns the current simulation step in seconds.'''
return self.step
def set_step(self,step):
'''Sets the simulation step in seconds.
Parameters
----------
step : int
Simulation step in seconds.
Returns
-------
None
'''
self.step = float(step)
return None
def get_inputs(self):
'''Returns a dictionary of control inputs and their meta-data.
Parameters
----------
None
Returns
-------
inputs : dict
Dictionary of control inputs and their meta-data.
'''
inputs = self.inputs_metadata
return inputs
def get_measurements(self):
'''Returns a dictionary of measurements and their meta-data.
Parameters
----------
None
Returns
-------
measurements : dict
Dictionary of measurements and their meta-data.
'''
measurements = self.outputs_metadata
return measurements
def get_results(self):
'''Returns measurement and control input trajectories.
Parameters
----------
None
Returns
-------
Y : dict
Dictionary of measurement and control input names and their
trajectories as lists.
{'y':{<measurement_name>:<measurement_trajectory>},
'u':{<input_name>:<input_trajectory>}
}
'''
Y = {'y':self.y_store, 'u':self.u_store}
return Y
def get_kpis(self):
'''Returns KPI data.
Requires standard sensor signals.
Parameters
----------
None
Returns
-------
kpis : dict
Dictionary containing KPI names and values.
{<kpi_name>:<kpi_value>}
'''
# Calculate the core kpis
kpis = self.cal.get_core_kpis()
return kpis
def set_forecast_parameters(self,horizon,interval):
'''Sets the forecast horizon and interval, both in seconds.
Parameters
----------
horizon : int
Forecast horizon in seconds.
interval : int
Forecast interval in seconds.
Returns
-------
None
'''
self.horizon = float(horizon)
self.interval = float(interval)
return None
def get_forecast_parameters(self):
'''Returns the current forecast horizon and interval parameters.'''
forecast_parameters = dict()
forecast_parameters['horizon'] = self.horizon
forecast_parameters['interval'] = self.interval
return forecast_parameters
def get_forecast(self):
'''Returns the test case data forecast
Parameters
----------
None
Returns
-------
forecast : dict
Dictionary with the requested forecast data
{<variable_name>:<variable_forecast_trajectory>}
where <variable_name> is a string with the variable
key and <variable_forecast_trajectory> is a list with
the forecasted values. 'time' is included as a variable
'''
# Get the forecast
forecast = self.forecaster.get_forecast(horizon=self.horizon,
interval=self.interval)
return forecast
def get_name(self):
'''Returns the name of the test case fmu.
Parameters
----------
None
Returns
-------
name : str
Name of test case fmu.
'''
name = self.fmupath[7:-4]
return name
def get_elapsed_control_time(self):
'''Returns the elapsed control time vector for the case.
Parameters
----------
None
Returns
-------
elapsed_control_time : list of floats
elapsed_control_time for each control step.
'''
elapsed_control_time = self.elapsed_control_time
return elapsed_control_time
def _get_var_metadata(self, fmu, var_list, inputs=False):
'''Build a dictionary of variables and their metadata.
Parameters
----------
fmu : pyfmi fmu object
FMU from which to get variable metadata
var_list : list of str
List of variable names
Returns
-------
var_metadata : dict
Dictionary of variable names as keys and metadata as fields.
{<var_name_str> :
"Unit" : str,
"Description" : str,
"Minimum" : float,
"Maximum" : float
}
'''
# Inititalize
var_metadata = dict()
# Get metadata
for var in var_list:
# Units
if var == 'time':
unit = 's'
description = 'Time of simulation'
mini = None
maxi = None
elif '_activate' in var:
unit = None
description = fmu.get_variable_description(var)
mini = None
maxi = None
else:
unit = fmu.get_variable_unit(var)
description = fmu.get_variable_description(var)
if inputs:
mini = fmu.get_variable_min(var)
maxi = fmu.get_variable_max(var)
else:
mini = None
maxi = None
var_metadata[var] = {'Unit':unit,
'Description':description,
'Minimum':mini,
'Maximum':maxi}
return var_metadata
def _check_value_min_max(self, var, value):
'''Check that the input value does not violate the min or max.
Note that if it does, the value is truncated to the minimum or maximum.
Parameters
----------
var : str
Name of variable
value : numeric
Specified value of variable
Return
------
checked_value : float
Value of variable truncated by min and max.
'''
# Get minimum and maximum for variable
mini = self.inputs_metadata[var]['Minimum']
maxi = self.inputs_metadata[var]['Maximum']
# Check the value and truncate if necessary
if value > maxi:
checked_value = maxi
print('WARNING: Value of {0} for {1} is above maximum of {2}. Using {2}.'.format(value, var, maxi))
elif value < mini:
checked_value = mini
print('WARNING: Value of {0} for {1} is below minimum of {2}. Using {2}.'.format(value, var, mini))
else:
checked_value = value
return checked_value