Skip to content

Commit

Permalink
Merge branch 'main' of github.com:LBNL-ETA/frads
Browse files Browse the repository at this point in the history
  • Loading branch information
TammieYu committed Oct 20, 2023
2 parents 00a611b + e98ec41 commit 06899ee
Showing 1 changed file with 190 additions and 27 deletions.
217 changes: 190 additions & 27 deletions frads/eplus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from datetime import datetime, timedelta
import json
import os
from pathlib import Path
from typing import List, Optional, Callable, Union
from tempfile import TemporaryDirectory
import inspect
import ast

from epmodel import epmodel as epm
import epmodel
Expand Down Expand Up @@ -313,7 +313,7 @@ def __init__(self, epmodel: EnergyPlusModel, weather_file: Optional[str] = None)
)

self.api.runtime.callback_begin_new_environment(self.state, self._get_handles())
self.actuators = None
self.actuators = []
self._get_list_of_actuators()

def __enter__(self):
Expand All @@ -323,14 +323,14 @@ def __exit__(self, exc_type, exc_value, exc_tb):
self.api.state_manager.delete_state(self.state)

def _actuator_func(self, state):
actuators_list = []
if self.actuators is None:
list = self.api.api.listAllAPIDataCSV(state).decode("utf-8")
for line in list.split("\n"):
if len(self.actuators) == 0:
api_data: List[str] = (
self.api.api.listAllAPIDataCSV(state).decode("utf-8").splitlines()
)
for line in api_data:
if line.startswith("Actuator"):
line = line.replace(";", "")
actuators_list.append(line.split(",", 1)[1])
self.actuators = actuators_list
self.actuators.append(line.split(",", 1)[1].split(","))
else:
self.api.api.stopSimulation(state)

Expand All @@ -340,10 +340,9 @@ def _get_list_of_actuators(self):

actuator_state = self.api.state_manager.new_state()
self.api.runtime.set_console_output_status(actuator_state, False)
method = getattr(
self.api.runtime, "callback_begin_system_timestep_before_predictor"
self.api.runtime.callback_end_zone_timestep_after_zone_reporting(
actuator_state, self._actuator_func
)
method(actuator_state, self._actuator_func)

if self.epw is not None:
self.api.runtime.run_energyplus(
Expand Down Expand Up @@ -397,6 +396,130 @@ def actuate(self, component_type: str, name: str, key: str, value: float):
self.state, self.actuator_handles[key][name], value
)

def actuate_cooling_setpoint(self, zone: str, value: float):
"""Set cooling setpoint for a zone.
Args:
zone: The name of the zone to set the cooling setpoint for.
value: The value to set the cooling setpoint to.
Raises:
ValueError: If the zone is not found.
ValueError: If the zone does not have a thermostat.
Example:
>>> epsetup.actuate_cooling_setpoint("zone1", 24)
"""
zone_stat = set(
[
self.model.zone_control_thermostat[stat].zone_or_zonelist_name
for stat in self.model.zone_control_thermostat
]
)
if zone not in self.model.zone:
raise ValueError(f"Zone {zone} not found in model.")
elif zone not in zone_stat:
raise ValueError(f"Zone {zone} does not have a thermostat.")

self.actuate(
component_type="Zone Temperature Control",
name="Cooling Setpoint",
key=zone,
value=value,
)

def actuate_heating_setpoint(self, zone: str, value: float):
"""Set heating setpoint for a zone.
Args:
zone: The name of the zone to set the heating setpoint for.
value: The value to set the heating setpoint to.
Raises:
ValueError: If the zone is not found.
ValueError: If the zone does not have a thermostat.
Example:
>>> epsetup.actuate_cooling_setpoint("zone1", 20)
"""
zone_stat = set(
[
self.model.zone_control_thermostat[stat].zone_or_zonelist_name
for stat in self.model.zone_control_thermostat
]
)
if zone not in self.model.zone:
raise ValueError(f"Zone {zone} not found in model.")
elif zone not in zone_stat:
raise ValueError(f"Zone {zone} does not have a thermostat.")

self.actuate(
component_type="Zone Temperature Control",
name="Heating Setpoint",
key=zone,
value=value,
)

def actuate_lighting_power(self, zone: str, value: float):
"""Set lighting power for a zone.
Args:
zone: The name of the zone to set the lighting power for.
value: The value to set the lighting power to.
Raises:
ValueError: If the zone is not found.
ValueError: If the zone does not have lighting.
Example:
>>> epsetup.actuate_lighting_power("zone1", 1000)
"""
zone_lgt = set(
[
self.model.lights[lgt].zone_or_zonelist_or_space_or_spacelist_name
for lgt in self.model.lights
]
)
if zone not in self.model.zone:
raise ValueError(f"Zone {zone} not found in model.")
elif zone not in zone_lgt:
raise ValueError(f"Zone {zone} does not have lighting.")

self.actuate(
component_type="Lights",
name="Lighting Level",
key=zone,
value=value,
)

def actuate_cfs_state(self, surface: str, construction_state: str):
"""Set construction state for a surface.
Args:
surface: The name of the surface to set the cfs state for.
construction_state: The name of the cfs state to set the surface to.
Raises:
ValueError: If the surface is not found.
ValueError: If the cfs state is not found.
Example:
>>> epsetup.actuate_construction_state("window1", "cfs1")
"""
if surface not in self.model.fenestration_surface_detailed:
raise ValueError(f"Surface {surface} not found in model.")
elif (
construction_state not in self.model.construction_complex_fenestration_state
):
raise ValueError(f"CFS state {construction_state} not found in model.")

self.actuate(
component_type="Surface",
name="Construction State",
key=surface,
value=self.construction_handles[construction_state],
)

def get_variable_value(self, name: str, key: str) -> float:
"""Get the value of a variable in the EnergyPlus model during runtime.
Expand Down Expand Up @@ -496,7 +619,7 @@ def run(
silent: bool = False,
annual: bool = False,
design_day: bool = False,
) -> EnergyPlusResult:
) -> None:
"""Run EnergyPlus simulation.
Args:
Expand Down Expand Up @@ -560,20 +683,6 @@ def run(
self.api.runtime.set_console_output_status(self.state, not silent)
self.api.runtime.run_energyplus(self.state, [*opt, f"{output_prefix}.json"])

# TODO: Setup temp directory for EnergyPlus output
# with TemporaryDirectory() as temp_dir:
# with open(os.path.join(temp_dir, "input.json"), "w") as wtr:
# wtr.write(self.model.model_dump_json(by_alias=True, exclude_none=True))
# opt.extend(["-d", temp_dir, "input.json"])
# self.api.runtime.set_console_output_status(self.state, not silent)
# self.api.runtime.run_energyplus(self.state, opt)
# # load everything except input.json and sqlite.err
# for ofile in os.listdir(temp_dir):
# if ofile not in ["input.json", "sqlite.err"]:
# with open(os.path.join(temp_dir, ofile)) as f:
# setattr(self.result, ofile, f.read())
# return self.result

def set_callback(self, method_name: str, func: Callable):
"""Set callback function for EnergyPlus runtime API.
Expand All @@ -593,9 +702,63 @@ def set_callback(self, method_name: str, func: Callable):
raise AttributeError(
f"Method {method_name} not found in EnergyPlus runtime API."
)

self._analyze_callback(func)

# method(self.state, partial(func, self))
method(self.state, func)

def _analyze_callback(self, func: Callable) -> None:
"""Request variables from callback function.
Args:
func: Callback function.
Example:
>>> epsetup._analyze_callback(func)
"""
source_code = inspect.getsource(func)
tree = ast.parse(source_code)
key_value_pairs = []

for node in ast.walk(tree):
if isinstance(node, ast.Call) and hasattr(node.func, "attr"):
if node.func.attr == "get_variable_value":
if len(node.args) == 2:
key_value_dict = {
"name": ast.literal_eval(node.args[0]),
"key": ast.literal_eval(node.args[1]),
}
elif len(node.keywords) == 2:
key_value_dict = {
node.keywords[0].arg: node.keywords[0].value.value,
node.keywords[1].arg: node.keywords[1].value.value,
}
else:
raise ValueError(f"Invalid number of arguments in {func}.")
key_value_pairs.append(key_value_dict)

elif node.func.attr == "actuate":
if len(node.args) == 4:
key_value = [ast.literal_eval(node.args[i]) for i in range(3)]
elif len(node.keywords) == 4:
key_value_dict = {
node.keywords[i].arg: node.keywords[i].value.value
for i in range(4)
}
key_value = [
key_value_dict["component_type"],
key_value_dict["name"],
key_value_dict["key"],
]
else:
raise ValueError(f"Invalid number of arguments in {func}.")
if key_value not in self.actuators:
raise ValueError(f"Actuator {key_value} not found in model.")

for key_value_dict in key_value_pairs:
self.request_variable(**key_value_dict)


def load_idf(fpath: Union[str, Path]) -> dict:
"""Load IDF file as JSON object.
Expand Down

0 comments on commit 06899ee

Please sign in to comment.