Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eplus): get_cfs_construction_name #56

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 90 additions & 28 deletions frads/eplus.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ def ep_datetime_parser(inp: str):
month, day = [int(i) for i in date.split("/")]
hr, mi, sc = [int(i) for i in time.split(":")]
if hr == 24 and mi == 0 and sc == 0:
return datetime.datetime(1900, month, day, 0, mi, sc) + datetime.timedelta(
days=1
)
return datetime.datetime(
1900, month, day, 0, mi, sc
) + datetime.timedelta(days=1)
else:
return datetime.datetime(1900, month, day, hr, mi, sc)


class EnergyPlusResult:
def __init__(self):
...
def __init__(self): ...


class EnergyPlusSetup:
Expand Down Expand Up @@ -87,7 +86,9 @@ def __init__(
k: WorkflowConfig.from_dict(v) for k, v in self.rmodels.items()
}
# Default to Three-Phase Method
self.rworkflows = {k: ThreePhaseMethod(v) for k, v in self.rconfigs.items()}
self.rworkflows = {
k: ThreePhaseMethod(v) for k, v in self.rconfigs.items()
}
for v in self.rworkflows.values():
v.config.settings.save_matrices = True
v.generate_matrices(view_matrices=False)
Expand All @@ -98,8 +99,11 @@ def __init__(
self.variable_handles = {}
self.actuator_handles = {}
self.construction_handles = {}
self.construction_names = {}
self.enable_radiance = enable_radiance
self.api.runtime.callback_begin_new_environment(self.state, self._get_handles())
self.api.runtime.callback_begin_new_environment(
self.state, self._get_handles()
)
self.actuators = []
self._get_list_of_actuators()

Expand All @@ -115,7 +119,9 @@ def __exit__(self, exc_type, exc_value, exc_tb):
def _actuator_func(self, state):
if len(self.actuators) == 0:
api_data: List[str] = (
self.api.api.listAllAPIDataCSV(state).decode("utf-8").splitlines()
self.api.api.listAllAPIDataCSV(state)
.decode("utf-8")
.splitlines()
)
for line in api_data:
if line.startswith("Actuator"):
Expand All @@ -134,14 +140,18 @@ def _get_list_of_actuators(self):
with tempfile.TemporaryDirectory() as tmpdir:
inp = Path(tmpdir) / "in.json"
with open(inp, "w") as wtr:
wtr.write(self.model.model_dump_json(by_alias=True, exclude_none=True))
wtr.write(
self.model.model_dump_json(by_alias=True, exclude_none=True)
)

if self.epw is not None:
self.api.runtime.run_energyplus(
actuator_state, ["-p", "actuator", "-w", self.epw, str(inp)]
)
elif self.model.sizing_period_design_day is not None:
self.api.runtime.run_energyplus(actuator_state, ["-D", str(inp)])
self.api.runtime.run_energyplus(
actuator_state, ["-D", str(inp)]
)
else:
raise ValueError(
"Specify weather file in EnergyPlusSetup "
Expand All @@ -167,7 +177,9 @@ def actuate(self, component_type: str, name: str, key: str, value: float):
Examples:
>>> epsetup.actuate("Weather Data", "Outdoor Dew Point", "Environment", 10)
"""
if key not in self.actuator_handles: # check if key exists in actuator handles
if (
key not in self.actuator_handles
): # check if key exists in actuator handles
self.actuator_handles[key] = {}
if (
name not in self.actuator_handles[key]
Expand Down Expand Up @@ -280,6 +292,23 @@ def get_variable_value(self, name: str, key: str) -> float:
self.state, self.variable_handles[key][name]
)

def get_cfs_state(self, window: str) -> str:
"""Return the current complex fenestration state with input window name

Args:
window: name of the window

Returns:
name of the cfs state
"""

cfs_handle = self.api.exchange.get_actuator_value(
self.state, self.actuator_handles[window]["Construction State"]
)

cfs_name = self.construction_names[cfs_handle]
return cfs_name

def request_variable(self, name: str, key: str) -> None:
"""Request a variable from the EnergyPlus model for access during runtime.

Expand All @@ -306,24 +335,32 @@ def callback_function(state):
for key in self.variable_handles:
try:
for name in self.variable_handles[key]:
handle = self.api.exchange.get_variable_handle(state, name, key)
handle = self.api.exchange.get_variable_handle(
state, name, key
)
if handle == -1:
raise ValueError(
"Variable handle not found: "
f"name = {name}, key = {key}"
)
self.variable_handles[key][name] = handle
except TypeError:
print("No variables requested for", self.variable_handles, key)
print(
"No variables requested for", self.variable_handles, key
)

if self.model.construction_complex_fenestration_state is not None:
for cfs in self.model.construction_complex_fenestration_state:
handle = self.api.api.getConstructionHandle(state, cfs.encode())
handle = self.api.api.getConstructionHandle(
state, cfs.encode()
)
if handle == -1:
raise ValueError(
"Construction handle not found: " f"Construction = {cfs}"
"Construction handle not found: "
f"Construction = {cfs}"
)
self.construction_handles[cfs] = handle
self.construction_names[handle] = cfs

return callback_function

Expand Down Expand Up @@ -420,10 +457,14 @@ def run(
}

with open(f"{output_prefix}.json", "w") as wtr:
wtr.write(self.model.model_dump_json(by_alias=True, exclude_none=True))
wtr.write(
self.model.model_dump_json(by_alias=True, exclude_none=True)
)

self.api.runtime.set_console_output_status(self.state, not silent)
self.api.runtime.run_energyplus(self.state, [*opt, f"{output_prefix}.json"])
self.api.runtime.run_energyplus(
self.state, [*opt, f"{output_prefix}.json"]
)

def set_callback(self, method_name: str, func: Callable):
"""Set callback function for EnergyPlus runtime API.
Expand All @@ -450,7 +491,9 @@ def set_callback(self, method_name: str, func: Callable):
# method(self.state, partial(func, self))
method(self.state, func)

def _request_variables_from_callback(self, callable_nodes: List[ast.Call]) -> None:
def _request_variables_from_callback(
self, callable_nodes: List[ast.Call]
) -> None:
key_value_pairs = set()
for node in callable_nodes:
key_value_dict = {}
Expand All @@ -470,29 +513,38 @@ def _request_variables_from_callback(self, callable_nodes: List[ast.Call]) -> No
self.request_variable(**key_value_dict)
elif node.func.attr == "get_diffuse_horizontal_irradiance":
self.request_variable(
name="Site Diffuse Solar Radiation Rate per Area", key="Environment"
name="Site Diffuse Solar Radiation Rate per Area",
key="Environment",
)
elif node.func.attr == "get_direct_normal_irradiance":
self.request_variable(
name="Site Direct Solar Radiation Rate per Area", key="Environment"
name="Site Direct Solar Radiation Rate per Area",
key="Environment",
)
elif node.func.attr in ("calculate_wpi", "calculate_edgps"):
self.request_variable(
name="Site Diffuse Solar Radiation Rate per Area", key="Environment"
name="Site Diffuse Solar Radiation Rate per Area",
key="Environment",
)
self.request_variable(
name="Site Direct Solar Radiation Rate per Area", key="Environment"
name="Site Direct Solar Radiation Rate per Area",
key="Environment",
)

def _check_actuators_from_callback(self, callable_nodes: List[ast.Call]) -> None:
def _check_actuators_from_callback(
self, callable_nodes: List[ast.Call]
) -> None:
def get_zone_from_pair_arg(node: ast.Call) -> str:
if len(node.args) == 2:
zone = ast.literal_eval(node.args[0])
elif len(node.keywords) == 2:
key_value_dict = {
node.keywords[i].arg: node.keywords[i].value.value for i in range(2)
node.keywords[i].arg: node.keywords[i].value.value
for i in range(2)
}
zone = key_value_dict.get("zone", key_value_dict.get("surface", None))
zone = key_value_dict.get(
"zone", key_value_dict.get("surface", None)
)
else:
raise ValueError(f"Invalid number of arguments in {node}.")
return zone
Expand All @@ -501,7 +553,9 @@ def get_zone_from_pair_arg(node: ast.Call) -> str:
key_value = None
if node.func.attr == "actuate":
if len(node.args) == 4:
key_value = [ast.literal_eval(node.args[i]) for i in range(3)]
key_value = [
ast.literal_eval(node.args[i]) for i in range(3)
]
elif len(node.keywords) == 4:
key_value_dict = {
node.keywords[i].arg: node.keywords[i].value.value
Expand All @@ -519,10 +573,18 @@ def get_zone_from_pair_arg(node: ast.Call) -> str:
key_value = ["Surface", "Construction State", zone]
elif node.func.attr == "actuate_cooling_setpoint":
zone = get_zone_from_pair_arg(node)
key_value = ["Zone Temperature Control", "Cooling Setpoint", zone]
key_value = [
"Zone Temperature Control",
"Cooling Setpoint",
zone,
]
elif node.func.attr == "actuate_heating_setpoint":
zone = get_zone_from_pair_arg(node)
key_value = ["Zone Temperature Control", "Heating Setpoint", zone]
key_value = [
"Zone Temperature Control",
"Heating Setpoint",
zone,
]
elif node.func.attr == "actuate_lighting_power":
zone = get_zone_from_pair_arg(node)
key_value = ["Lights", "Electricity Rate", zone]
Expand Down
Loading