From ff06d8efc4c9e118b8d79072a5126f3452c56820 Mon Sep 17 00:00:00 2001 From: Jostein Solaas Date: Fri, 22 Nov 2024 13:54:09 +0100 Subject: [PATCH] fix: flow diagram generation The fix simplifies the diagram. Details below fuel and electricity consumers will no longer available. It also assumes no changes over time. --- src/ecalc_cli/commands/run.py | 3 +- src/ecalc_cli/io/output.py | 16 +- .../application/energy/energy_component.py | 31 + .../application/energy/energy_model.py | 4 +- .../application/energy/model_change_event.py | 31 + src/libecalc/dto/component_graph.py | 4 +- src/libecalc/dto/components.py | 148 ++++- .../flow_diagram/EcalcModelMapper.py | 625 +++--------------- .../presentation/flow_diagram/fde_models.py | 3 +- src/libecalc/presentation/yaml/model.py | 2 +- src/libecalc/testing/dto_energy_model.py | 2 +- .../flow_diagram/test_ecalc_model_mapper.py | 13 +- 12 files changed, 322 insertions(+), 560 deletions(-) create mode 100644 src/libecalc/application/energy/model_change_event.py diff --git a/src/ecalc_cli/commands/run.py b/src/ecalc_cli/commands/run.py index e8ba3228aa..7b5dcd5bfd 100644 --- a/src/ecalc_cli/commands/run.py +++ b/src/ecalc_cli/commands/run.py @@ -125,8 +125,7 @@ def run( if flow_diagram: write_flow_diagram( - model_dto=model.dto, - result_options=model.result_options, + energy_model=model, output_folder=output_folder, name_prefix=name_prefix, ) diff --git a/src/ecalc_cli/io/output.py b/src/ecalc_cli/io/output.py index 2a5e24d942..a255dab29f 100644 --- a/src/ecalc_cli/io/output.py +++ b/src/ecalc_cli/io/output.py @@ -7,7 +7,6 @@ from libecalc.application.graph_result import GraphResult from libecalc.common.run_info import RunInfo from libecalc.common.time_utils import resample_periods -from libecalc.dto import Asset, ResultOptions from libecalc.infrastructure.file_utils import OutputFormat, get_result_output from libecalc.presentation.exporter.configs.configs import LTPConfig, STPConfig from libecalc.presentation.exporter.configs.formatter_config import PeriodFormatterConfig @@ -15,8 +14,9 @@ from libecalc.presentation.exporter.formatters.formatter import CSVFormatter from libecalc.presentation.exporter.handlers.handler import MultiFileHandler from libecalc.presentation.exporter.infrastructure import ExportableGraphResult -from libecalc.presentation.flow_diagram.EcalcModelMapper import EcalcModelMapper +from libecalc.presentation.flow_diagram.EcalcModelMapper import EnergyModelFlowDiagram from libecalc.presentation.json_result.result import EcalcModelResult as EcalcModelResultDTO +from libecalc.presentation.yaml.model import YamlModel def write_output(output: str, output_file: Path = None): @@ -168,12 +168,11 @@ def export_tsv( exporter.export(row_based_data) -def write_flow_diagram(model_dto: Asset, result_options: ResultOptions, output_folder: Path, name_prefix: str): +def write_flow_diagram(energy_model: YamlModel, output_folder: Path, name_prefix: str): """Write FDE diagram to file. Args: - model_dto: eCalc model - result_options: Result options specifying start, end and frequency + energy_model: The yaml energy model output_folder: Desired output location of FDE diagram name_prefix: Name of FDE diagram file @@ -183,10 +182,9 @@ def write_flow_diagram(model_dto: Asset, result_options: ResultOptions, output_f EcalcCLIError: If a OSError occurs during the writing of diagram to file. """ - flow_diagram = EcalcModelMapper.from_dto_to_fde( - ecalc_model=model_dto, - result_options=result_options, - ) + flow_diagram = EnergyModelFlowDiagram( + energy_model=energy_model, model_period=energy_model.variables.period + ).get_energy_flow_diagram() flow_diagram_filename = f"{name_prefix}.flow-diagram.json" if name_prefix != "" else "flow-diagram.json" flow_diagram_path = output_folder / flow_diagram_filename try: diff --git a/src/libecalc/application/energy/energy_component.py b/src/libecalc/application/energy/energy_component.py index 6437c22ae2..44bb3ab5f0 100644 --- a/src/libecalc/application/energy/energy_component.py +++ b/src/libecalc/application/energy/energy_component.py @@ -1,6 +1,8 @@ import abc from libecalc.application.energy.component_energy_context import ComponentEnergyContext +from libecalc.application.energy.model_change_event import ModelChangeEvent +from libecalc.common.component_type import ComponentType from libecalc.core.result import EcalcModelResult @@ -15,5 +17,34 @@ class EnergyComponent(abc.ABC): @abc.abstractmethod def id(self) -> str: ... + @abc.abstractmethod + def get_component_process_type(self) -> ComponentType: ... + + @abc.abstractmethod + def get_name(self) -> str: ... + + @abc.abstractmethod + def get_change_events(self) -> list[ModelChangeEvent]: + """ + Get all the change events for this component + """ + ... + + @abc.abstractmethod + def is_provider(self) -> bool: + """ + Whether the energy component provides energy to other energy components. + """ + ... + + @abc.abstractmethod + def is_container(self) -> bool: + """ + Whether the energy component is a container for other energy components. + """ + ... + + +class EvaluatableEnergyComponent(EnergyComponent, abc.ABC): @abc.abstractmethod def evaluate_energy_usage(self, energy_context: ComponentEnergyContext) -> EcalcModelResult: ... diff --git a/src/libecalc/application/energy/energy_model.py b/src/libecalc/application/energy/energy_model.py index 16669a43ed..257cd4e1b5 100644 --- a/src/libecalc/application/energy/energy_model.py +++ b/src/libecalc/application/energy/energy_model.py @@ -18,9 +18,9 @@ def get_regularity(self, component_id: str) -> dict[datetime, Expression]: ... @abc.abstractmethod - def get_consumers(self, provider_id: str) -> list[EnergyComponent]: + def get_consumers(self, provider_id: str = None) -> list[EnergyComponent]: """ - Get consumers of the given provider + Get consumers of the given provider. If no provider is given, assume top-level. """ ... diff --git a/src/libecalc/application/energy/model_change_event.py b/src/libecalc/application/energy/model_change_event.py new file mode 100644 index 0000000000..75fd773bf8 --- /dev/null +++ b/src/libecalc/application/energy/model_change_event.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Self + +from libecalc.common.time_utils import Period + + +@dataclass(eq=True, frozen=True) +class ModelChangeEvent: + """ + An event that (might) change the structure of the diagram. Since dates in the model might be used to set + expressions only, the structure might not change even though there is a change event. + """ + + name: str + period: Period + + @property + def start(self) -> datetime: + return self.period.start + + @property + def end(self) -> datetime: + return self.period.end + + @classmethod + def from_period(cls, period) -> Self: + return cls( + name=str(period.start), + period=period, + ) diff --git a/src/libecalc/dto/component_graph.py b/src/libecalc/dto/component_graph.py index 5d72167583..5a484c41ab 100644 --- a/src/libecalc/dto/component_graph.py +++ b/src/libecalc/dto/component_graph.py @@ -10,7 +10,9 @@ # TODO: Rename to energy graph, use composition instead of inheritance. Alternatively make YamlModel the EnergyGraph/EnergyModel and use Graph directly in YamlModel # Currently it is practical to have the EnergyModel graph related functions here to deal with dto tests. class ComponentGraph(Graph): - def get_consumers(self, provider_id: str) -> list[EnergyComponent]: + def get_consumers(self, provider_id: str = None) -> list[EnergyComponent]: + if provider_id is None: + provider_id = self.root consumer_ids = self.get_successors(provider_id) return [self.get_node(consumer_id) for consumer_id in consumer_ids] diff --git a/src/libecalc/dto/components.py b/src/libecalc/dto/components.py index b5206a16ff..edb590a8e1 100644 --- a/src/libecalc/dto/components.py +++ b/src/libecalc/dto/components.py @@ -9,7 +9,9 @@ from libecalc.application.energy.component_energy_context import ComponentEnergyContext from libecalc.application.energy.emitter import Emitter +from libecalc.application.energy.energy_component import EnergyComponent from libecalc.application.energy.energy_model import EnergyModel +from libecalc.application.energy.model_change_event import ModelChangeEvent from libecalc.common.component_type import ComponentType from libecalc.common.consumption_type import ConsumptionType from libecalc.common.energy_usage_type import EnergyUsageType @@ -131,7 +133,7 @@ def validate_fuel_exist(cls, fuel, info: ValidationInfo): return fuel -class ElectricityConsumer(BaseConsumer): +class ElectricityConsumer(BaseConsumer, EnergyComponent): component_type: Literal[ ComponentType.COMPRESSOR, ComponentType.PUMP, @@ -151,6 +153,21 @@ class ElectricityConsumer(BaseConsumer): lambda data: check_model_energy_usage_type(data, EnergyUsageType.POWER) ) + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return False + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [ModelChangeEvent.from_period(period) for period in self.energy_usage_model.keys()] + @field_validator("energy_usage_model", mode="before") @classmethod def check_energy_usage_model(cls, energy_usage_model): @@ -162,7 +179,7 @@ def check_energy_usage_model(cls, energy_usage_model): return energy_usage_model -class FuelConsumer(BaseConsumer, Emitter): +class FuelConsumer(BaseConsumer, Emitter, EnergyComponent): component_type: Literal[ ComponentType.COMPRESSOR, ComponentType.GENERIC, @@ -177,6 +194,21 @@ class FuelConsumer(BaseConsumer, Emitter): lambda data: check_model_energy_usage_type(data, EnergyUsageType.FUEL) ) + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return False + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [ModelChangeEvent.from_period(period) for period in self.energy_usage_model.keys()] + def evaluate_emissions( self, energy_context: ComponentEnergyContext, @@ -230,15 +262,45 @@ class PumpOperationalSettings(EcalcBaseModel): fluid_density: Expression -class CompressorComponent(BaseConsumer): +class CompressorComponent(BaseConsumer, EnergyComponent): component_type: Literal[ComponentType.COMPRESSOR] = ComponentType.COMPRESSOR energy_usage_model: dict[Period, CompressorModel] + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return False + + def get_component_process_type(self) -> ComponentType: + return self.component_type -class PumpComponent(BaseConsumer): + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [ModelChangeEvent.from_period(period) for period in self.energy_usage_model.keys()] + + +class PumpComponent(BaseConsumer, EnergyComponent): component_type: Literal[ComponentType.PUMP] = ComponentType.PUMP energy_usage_model: dict[Period, PumpModel] + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return False + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [ModelChangeEvent.from_period(period) for period in self.energy_usage_model.keys()] + class Stream(EcalcBaseModel): model_config = ConfigDict(populate_by_name=True) @@ -294,19 +356,35 @@ class SystemComponentConditions(EcalcBaseModel): crossover: list[Crossover] -class ConsumerSystem(BaseConsumer, Emitter): +class ConsumerSystem(BaseConsumer, Emitter, EnergyComponent): component_type: Literal[ComponentType.CONSUMER_SYSTEM_V2] = Field( ComponentType.CONSUMER_SYSTEM_V2, title="TYPE", description="The type of the component", ) + component_conditions: SystemComponentConditions stream_conditions_priorities: Priorities[SystemStreamConditions] consumers: Union[list[CompressorComponent], list[PumpComponent]] + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return True + def is_fuel_consumer(self) -> bool: return self.consumes == ConsumptionType.FUEL + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [] + def evaluate_emissions( self, energy_context: ComponentEnergyContext, @@ -382,7 +460,7 @@ def evaluate_stream_conditions( return dict(parsed_priorities) -class GeneratorSet(BaseEquipment, Emitter): +class GeneratorSet(BaseEquipment, Emitter, EnergyComponent): component_type: Literal[ComponentType.GENERATOR_SET] = ComponentType.GENERATOR_SET fuel: dict[Period, FuelType] generator_set_model: dict[Period, GeneratorSetSampled] @@ -401,6 +479,21 @@ class GeneratorSet(BaseEquipment, Emitter): None, title="MAX_USAGE_FROM_SHORE", description="The peak load/effect that is expected for one hour, per year." ) + def is_provider(self) -> bool: + return True + + def is_container(self) -> bool: + return False + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [ModelChangeEvent.from_period(period) for period in self.generator_set_model.keys()] + def evaluate_emissions( self, energy_context: ComponentEnergyContext, @@ -472,8 +565,9 @@ def get_graph(self) -> ComponentGraph: return graph -class Installation(BaseComponent): +class Installation(BaseComponent, EnergyComponent): component_type: Literal[ComponentType.INSTALLATION] = ComponentType.INSTALLATION + user_defined_category: Optional[InstallationUserDefinedCategoryType] = Field(default=None, validate_default=True) hydrocarbon_export: dict[Period, Expression] fuel_consumers: list[ @@ -484,6 +578,21 @@ class Installation(BaseComponent): ] = Field(default_factory=list) venting_emitters: list[YamlVentingEmitter] = Field(default_factory=list) + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return True + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [] + @property def id(self) -> str: return generate_id(self.name) @@ -496,7 +605,7 @@ def id(self) -> str: @field_validator("user_defined_category", mode="before") def check_user_defined_category(cls, user_defined_category, info: ValidationInfo): - """Provide which value and context to make it easier for user to correct wrt mandatory changes.""" + # Provide which value and context to make it easier for user to correct wrt mandatory changes. if user_defined_category is not None: if user_defined_category not in list(InstallationUserDefinedCategoryType): name_context_str = "" @@ -512,7 +621,7 @@ def check_user_defined_category(cls, user_defined_category, info: ValidationInfo @model_validator(mode="after") def check_fuel_consumers_or_venting_emitters_exist(self): try: - if self.fuel_consumers or self.venting_emitters or self.generator_sets: + if self.fuel_consumers or self.venting_emitters: return self except AttributeError: raise ValueError( @@ -534,15 +643,30 @@ def get_graph(self) -> ComponentGraph: return graph -class Asset(Component): +class Asset(Component, EnergyComponent): @property def id(self): return generate_id(self.name) - component_type: Literal[ComponentType.ASSET] = ComponentType.ASSET - name: ComponentNameStr + installations: list[Installation] = Field(default_factory=list) + component_type: Literal[ComponentType.ASSET] = ComponentType.ASSET + + def is_provider(self) -> bool: + return False + + def is_container(self) -> bool: + return True + + def get_component_process_type(self) -> ComponentType: + return self.component_type + + def get_name(self) -> str: + return self.name + + def get_change_events(self) -> list[ModelChangeEvent]: + return [] @model_validator(mode="after") def validate_unique_names(self): diff --git a/src/libecalc/presentation/flow_diagram/EcalcModelMapper.py b/src/libecalc/presentation/flow_diagram/EcalcModelMapper.py index 369fc2c268..02172f9eef 100644 --- a/src/libecalc/presentation/flow_diagram/EcalcModelMapper.py +++ b/src/libecalc/presentation/flow_diagram/EcalcModelMapper.py @@ -1,34 +1,9 @@ -from collections.abc import Iterable -from datetime import datetime -from typing import Union +from typing import assert_never +from libecalc.application.energy.energy_component import EnergyComponent +from libecalc.application.energy.energy_model import EnergyModel from libecalc.common.component_type import ComponentType -from libecalc.common.consumer_type import ConsumerType -from libecalc.common.time_utils import Period, Periods -from libecalc.dto import ( - CompressorConsumerFunction, - CompressorSystemConsumerFunction, - CompressorTrainSimplifiedWithKnownStages, - ConsumerFunction, - ElectricEnergyUsageModel, - FuelEnergyUsageModel, - GeneratorSet, - Installation, - PumpSystemConsumerFunction, - ResultOptions, - SingleSpeedCompressorTrain, - VariableSpeedCompressorTrain, - VariableSpeedCompressorTrainMultipleStreamsAndPressures, -) -from libecalc.dto.components import ( - Asset, - CompressorComponent, - ConsumerSystem, - ElectricityConsumer, - FuelConsumer, - PumpComponent, -) -from libecalc.dto.models.compressor import CompressorWithTurbine +from libecalc.common.time_utils import Period from libecalc.presentation.flow_diagram.fde_models import ( Edge, Flow, @@ -39,7 +14,6 @@ ) FUEL_NODE = Node(id="fuel-input", title="Fuel", type=NodeType.INPUT_OUTPUT_NODE) -INPUT_NODE = Node(id="input", title="Input", type=NodeType.INPUT_OUTPUT_NODE) EMISSION_NODE = Node(id="emission-output", title="Emission", type=NodeType.INPUT_OUTPUT_NODE) FUEL_FLOW = Flow(id="fuel-flow", label="Fuel", type=FlowType.FUEL) @@ -47,513 +21,110 @@ ELECTRICITY_FLOW = Flow(id="electricity-flow", label="Electricity", type=FlowType.ELECTRICITY) -def _create_generator_set_node(generator_set: GeneratorSet, installation: Installation) -> Node: - return Node( - id=f"{installation.name}-generator-set-{generator_set.name}", - title=generator_set.name, - type=NodeType.GENERATOR, - ) - - -def _create_legacy_pump_system_diagram( - energy_usage_model: dict[Period, PumpSystemConsumerFunction], - consumer_id: str, - consumer_title: str, - global_end_date: datetime, -) -> list[FlowDiagram]: - """Create subchart for energy usage model - :param energy_usage_model: - :return: list of flow diagrams.list of flow diagrams as we always add a default date in dtos. - """ - flow_diagrams = [] - periods = _get_periods({key.start for key in energy_usage_model.keys()}, global_end_date) - for period in periods: - energy_usage_model_step = energy_usage_model[period] - flow_diagrams.append( - FlowDiagram( - id=consumer_id, - title=consumer_title, - start_date=period.start, - end_date=period.end, - nodes=[ - Node( - id=pump.name, - title=pump.name, - type=NodeType.PUMP, +class EnergyModelFlowDiagram: + def __init__(self, energy_model: EnergyModel, model_period: Period): + self._energy_model = energy_model + self._model_period = model_period + + def _get_node_type(self, energy_component: EnergyComponent) -> NodeType: + component_type = energy_component.get_component_process_type() + assert component_type != ComponentType.ASSET, "We don't use the asset node" + match component_type: + case ComponentType.INSTALLATION: + return NodeType.INSTALLATION + case ComponentType.PUMP | ComponentType.PUMP_V2: + return NodeType.PUMP + case ComponentType.COMPRESSOR | ComponentType.COMPRESSOR_V2: + return NodeType.COMPRESSOR + case ComponentType.COMPRESSOR_SYSTEM: + return NodeType.COMPRESSOR_SYSTEM + case ComponentType.PUMP_SYSTEM: + return NodeType.PUMP_SYSTEM + case ComponentType.CONSUMER_SYSTEM_V2: + consumers = self._energy_model.get_consumers(energy_component.id) + if len(consumers) < 1: + # Guess compressor, I don't think this should happen + return NodeType.COMPRESSOR_SYSTEM + consumer = consumers[0] + consumer_type = self._get_node_type(consumer) + return NodeType.COMPRESSOR_SYSTEM if consumer_type == NodeType.COMPRESSOR else NodeType.PUMP_SYSTEM + case ComponentType.GENERATOR_SET: + return NodeType.GENERATOR + case ComponentType.GENERIC: + # TODO: handle tabular? Check model? + return NodeType.DIRECT + case ComponentType.VENTING_EMITTER: + # TODO: Missing appropriate NodeType + return NodeType.DIRECT + case ComponentType.TRAIN_V2: + # TODO: what + return NodeType.DIRECT + case _: + assert_never(component_type) + + def _get_energy_component_fde(self, energy_component: EnergyComponent) -> list[FlowDiagram] | None: + # differentiate between container and provider so we can decide whether to continue rendering consumers or hide them behind a click + # is_container and is_provider on energy component? get_consumers on energy model, but also get_children? + consumers = self._energy_model.get_consumers(energy_component.id) + + if not consumers: + return None + + nodes = [FUEL_NODE, EMISSION_NODE] + edges = [] + for consumer in consumers: + nodes.append(self._get_node(consumer)) + edges.append(self._get_edge(from_node=FUEL_NODE.id, to_node=consumer.id, flow=FUEL_FLOW.id)) + edges.append(self._get_edge(from_node=consumer.id, to_node=EMISSION_NODE.id, flow=EMISSIONS_FLOW.id)) + + if consumer.is_provider(): + # Assuming provider provides electricity, and that consumers should be included in the same fde + el_consumers = self._energy_model.get_consumers(consumer.id) + for el_consumer in el_consumers: + nodes.append(self._get_node(el_consumer)) + edges.append( + self._get_edge(from_node=consumer.id, to_node=el_consumer.id, flow=ELECTRICITY_FLOW.id) ) - for pump in energy_usage_model_step.pumps - ], - edges=[], - flows=[], - ) - ) - return flow_diagrams - -def _create_legacy_compressor_system_diagram( - energy_usage_model: dict[Period, CompressorSystemConsumerFunction], - consumer_id: str, - consumer_title: str, - global_end_date: datetime, -) -> list[FlowDiagram]: - """Create subchart for energy usage model - :param energy_usage_model: - :return: list of flow diagrams.list of flow diagrams as we always add a default date in dtos. - """ - flow_diagrams = [] - periods = _get_periods({key.start for key in energy_usage_model.keys()}, global_end_date) - for period in periods: - energy_usage_model_step = energy_usage_model[period] - - flow_diagrams.append( + return [ FlowDiagram( - id=consumer_id, - title=consumer_title, - start_date=period.start, - end_date=period.end, - nodes=[ - Node( - id=compressor.name, - title=compressor.name, - type=NodeType.COMPRESSOR, - subdiagram=FlowDiagram( - id=compressor.name, - title=compressor.name, - start_date=period.start, - end_date=period.end, - nodes=[ - Node( - id=f"{compressor.name} stage {index}", - title=f"{compressor.name} stage {index}", - type=NodeType.COMPRESSOR, - ) - for index in range(len(compressor.compressor_train.compressor_train.stages)) - ], - edges=[], - flows=[], - ) - if hasattr(compressor.compressor_train, "compressor_train") - else [], - ) - for compressor in energy_usage_model_step.compressors + id=energy_component.id, + title=energy_component.get_name(), + start_date=self._model_period.start, + end_date=self._model_period.end, + nodes=nodes, + edges=edges, + flows=[ + FUEL_FLOW, + EMISSIONS_FLOW, + ELECTRICITY_FLOW, ], - edges=[], - flows=[], ) - ) - return flow_diagrams - - -def _create_system_diagram( - system: ConsumerSystem, - global_end_date: datetime, -) -> list[FlowDiagram]: - timesteps = _get_timesteps(system.consumers) - return [ - FlowDiagram( - id=system.id, - title=system.name, - start_date=min(timesteps), - end_date=max([*timesteps, global_end_date]), - nodes=[ - Node( - id=consumer.name, - title=consumer.name, - type=NodeType.COMPRESSOR if consumer.component_type == ComponentType.COMPRESSOR else NodeType.PUMP, - ) - for consumer in system.consumers - ], - edges=[], - flows=[], - ) - ] - - -_dto_model_type_to_fde_render_type_map = { - ConsumerType.DIRECT: NodeType.DIRECT, - ConsumerType.PUMP_SYSTEM: NodeType.PUMP_SYSTEM, - ConsumerType.COMPRESSOR_SYSTEM: NodeType.COMPRESSOR_SYSTEM, - ConsumerType.COMPRESSOR: NodeType.COMPRESSOR, - ConsumerType.PUMP: NodeType.PUMP, - ConsumerType.TABULATED: NodeType.TABULATED, -} - - -def _create_compressor_train_diagram( - energy_usage_model: dict[Period, CompressorConsumerFunction], - node_id: str, - title: str, - global_end_date: datetime, -): - """Create subchart for energy usage model - :param energy_usage_model: - :return: list of flow diagrams.list of flow diagrams as we always add a default date in dtos. - """ - periods = _get_periods({key.start for key in energy_usage_model.keys()}, global_end_date) - compressor_train_step = list(energy_usage_model.values())[0].model - return [ - FlowDiagram( - id=node_id, - title=title, - start_date=period.start, - end_date=period.end, - nodes=[ - Node( - id=f"{title} stage {index}", - title=f"{title} stage {index}", - type=NodeType.COMPRESSOR, - ) - for index, chart in enumerate(compressor_train_step.stages) - ], - edges=[], - flows=[], - ) - for period in periods - if hasattr(compressor_train_step, "stages") - ] - - -def _create_compressor_with_turbine_stages_diagram( - energy_usage_model: dict[Period, CompressorConsumerFunction], - node_id: str, - title: str, - global_end_date: datetime, -): - """Create subchart for energy usage model - :param energy_usage_model: - :return: list of flow diagrams.list of flow diagrams as we always add a default date in dtos. - """ - flow_diagrams = [] - periods = _get_periods({key.start for key in energy_usage_model.keys()}, global_end_date) - for period in periods: - compressor_train_type = list(energy_usage_model.values())[0].model.compressor_train - - if hasattr(compressor_train_type, "stages"): - flow_diagrams.append( - FlowDiagram( - id=node_id, - title=title, - start_date=period.start, - end_date=period.end, - nodes=[ - Node( - id=f"{title} stage {index}", - title=f"{title} stage {index}", - type=NodeType.COMPRESSOR, - ) - for index, chart in enumerate(compressor_train_type.stages) - ], - edges=[], - flows=[], - ) - ) - return flow_diagrams - - -def _is_compressor_with_turbine( - temporal_energy_usage_model: dict[Period, ConsumerFunction], -) -> bool: - """Checking if compressor type is compressor with turbine. - - Note: this does not handle consumer systems with CompressorWithTurbine - """ - for energy_usage_model in temporal_energy_usage_model.values(): - if isinstance(energy_usage_model, CompressorConsumerFunction): - if isinstance(energy_usage_model.model, CompressorWithTurbine): - return True - return False - - -def _create_consumer_node( - consumer: Union[FuelConsumer, ElectricityConsumer, ConsumerSystem], - installation_name: str, - global_end_date: datetime, -) -> Node: - node_id = f"{installation_name}-consumer-{consumer.name}" - title = consumer.name - if isinstance(consumer, FuelConsumer | ElectricityConsumer): - component_type = list(consumer.energy_usage_model.values())[0].typ - fde_type = _dto_model_type_to_fde_render_type_map.get(component_type, "default") - if component_type == ConsumerType.PUMP_SYSTEM: - subdiagram = _create_legacy_pump_system_diagram( - consumer.energy_usage_model, node_id, title, global_end_date - ) - elif component_type == ConsumerType.COMPRESSOR_SYSTEM: - subdiagram = _create_legacy_compressor_system_diagram( - consumer.energy_usage_model, node_id, title, global_end_date - ) - elif _is_compressor_with_turbine(consumer.energy_usage_model): - fde_type = NodeType.TURBINE - subdiagram = _create_compressor_with_turbine_stages_diagram( - consumer.energy_usage_model, node_id, title, global_end_date - ) - elif _is_compressor_train(consumer.energy_usage_model): - subdiagram = _create_compressor_train_diagram(consumer.energy_usage_model, node_id, title, global_end_date) - else: - subdiagram = None - - return Node( - id=node_id, - title=consumer.name, - type=fde_type, - subdiagram=subdiagram, - ) - elif isinstance(consumer, ConsumerSystem): - return Node( - id=consumer.id, - title=consumer.name, - type=NodeType.PUMP_SYSTEM - if consumer.consumers[0].component_type == ComponentType.PUMP - else NodeType.COMPRESSOR_SYSTEM, - subdiagram=_create_system_diagram(consumer, global_end_date=global_end_date), - ) - else: - raise ValueError( - f"Unknown consumer of type '{getattr(consumer, 'component_type', 'unknown')}' with name '{getattr(consumer, 'name', 'unknown')}'" - ) - - -def _is_compressor_train( - energy_usage_models: dict[ - Period, - Union[ElectricEnergyUsageModel, FuelEnergyUsageModel], - ], -) -> bool: - """Checking if compressor type is compressor train simplified with known stages.""" - for energy_usage_model in energy_usage_models.values(): - if isinstance(energy_usage_model, CompressorConsumerFunction) and isinstance( - energy_usage_model.model, - CompressorTrainSimplifiedWithKnownStages - | VariableSpeedCompressorTrain - | SingleSpeedCompressorTrain - | VariableSpeedCompressorTrainMultipleStreamsAndPressures, - ): - return True - return False - - -def _get_timesteps( - components: list[ - Union[ - Asset, - Installation, - FuelConsumer, - GeneratorSet, - ElectricityConsumer, - PumpComponent, - CompressorComponent, ] - ], - shallow: bool = False, -) -> set[datetime]: - """ - Return a set of all timesteps for a component - :param components: - :param shallow: if we should get timesteps for consumers in a generator set or only for the generator set. - Generator set is the only component that has dates in its own model and consumers with start dates. - :return: - """ - timesteps: set[datetime] = set() - for component in components: - if isinstance(component, Asset): - timesteps = timesteps.union(_get_timesteps(component.installations, shallow=shallow)) - elif isinstance(component, Installation): - timesteps = timesteps.union(_get_timesteps(component.fuel_consumers, shallow=shallow)) - elif isinstance(component, GeneratorSet): - timesteps = timesteps.union({key.start for key in component.generator_set_model.keys()}) - if not shallow: - timesteps = timesteps.union(_get_timesteps(component.consumers, shallow=shallow)) - elif isinstance( - component, - ElectricityConsumer | FuelConsumer | PumpComponent | CompressorComponent, - ): - timesteps = timesteps.union({key.start for key in component.energy_usage_model.keys()}) - elif isinstance(component, ConsumerSystem): - timesteps = timesteps.union(_get_timesteps(component.consumers, shallow=shallow)) - else: - raise ValueError( - f"Unknown consumer of type '{getattr(component, 'component_type', 'unknown')}' with name '{getattr(component, 'name', 'unknown')}'" - ) - return timesteps - - -def _consumer_is_active( - consumer: Union[FuelConsumer, ElectricityConsumer, GeneratorSet], - period: Period, -) -> bool: - """Check whether the consumer is active or not. - :param consumer: - :param period: the current period - :return: - """ - consumer_start_dates = _get_timesteps([consumer], shallow=True) - consumer_start_date = sorted(consumer_start_dates)[0] - return consumer_start_date < period.end - -def _create_installation_flow_diagram( - installation: Installation, - period: Period, - global_end_date: datetime, -) -> FlowDiagram: - generator_sets = [ - generator_set for generator_set in installation.fuel_consumers if isinstance(generator_set, GeneratorSet) - ] - generator_set_nodes = [] - electricity_consumer_nodes = [] - generator_set_to_electricity_consumers = [] - for generator_set_dto in generator_sets: - if _consumer_is_active(generator_set_dto, period): - generator_set_node = _create_generator_set_node( - generator_set_dto, - installation, - ) - generator_set_nodes.append(generator_set_node) - - for consumer in generator_set_dto.consumers: - if _consumer_is_active(consumer, period): - electricity_consumer_node = _create_consumer_node( - consumer, - installation_name=installation.name, - global_end_date=global_end_date, - ) - electricity_consumer_nodes.append(electricity_consumer_node) - generator_set_to_electricity_consumers.append( - Edge( - from_node=generator_set_node.id, - to_node=electricity_consumer_node.id, - flow=ELECTRICITY_FLOW.id, - ) - ) - - fuel_consumers_except_generator_sets = [ - fuel_consumer for fuel_consumer in installation.fuel_consumers if not isinstance(fuel_consumer, GeneratorSet) - ] - - fuel_consumer_except_generator_set_nodes = [ - _create_consumer_node( - fuel_consumer_dto, - installation_name=installation.name, - global_end_date=global_end_date, + def _get_edge(self, from_node: str, to_node: str, flow: str): + return Edge( + from_node=from_node, + to_node=to_node, + flow=flow, ) - for fuel_consumer_dto in fuel_consumers_except_generator_sets - if _consumer_is_active(fuel_consumer_dto, period) - ] - - fuel_consumer_nodes = [ - *generator_set_nodes, - *fuel_consumer_except_generator_set_nodes, - ] - fuel_to_fuel_consumer = [ - Edge( - from_node=FUEL_NODE.id, - to_node=consumer.id, - flow=FUEL_FLOW.id, - ) - for consumer in fuel_consumer_nodes - ] - fuel_consumer_to_emission = [ - Edge( - from_node=consumer.id, - to_node=EMISSION_NODE.id, - flow=EMISSIONS_FLOW.id, - ) - for consumer in fuel_consumer_nodes - ] - - return FlowDiagram( - id=f"installation-{installation.name}", - title=installation.name, - start_date=period.start, - end_date=period.end, - edges=[*fuel_to_fuel_consumer, *generator_set_to_electricity_consumers, *fuel_consumer_to_emission], - nodes=[FUEL_NODE, *fuel_consumer_nodes, *electricity_consumer_nodes, EMISSION_NODE], - flows=[FUEL_FLOW, ELECTRICITY_FLOW, EMISSIONS_FLOW], - ) - - -def _get_periods( - start_dates: Iterable[datetime], - global_end_date: datetime, -) -> Periods: - start_dates = sorted(start_dates) - return Periods.create_periods([*start_dates, global_end_date], include_before=False, include_after=False) - - -def _create_installation_fde(installation: Installation, global_end_date: datetime) -> list[FlowDiagram]: - """Create flow diagrams for each timestep including only the consumers relevant in that timestep - :param installation: - :return: - """ - start_dates = _get_timesteps(installation.fuel_consumers) - installation_periods = _get_periods(start_dates, global_end_date) - installation_flow_diagrams = [ - _create_installation_flow_diagram(installation, installation_period, global_end_date) - for installation_period in installation_periods - ] - - if len(installation_flow_diagrams) <= 1: - return installation_flow_diagrams - - return _filter_duplicate_flow_diagrams(installation_flow_diagrams) - - -def _filter_duplicate_flow_diagrams(installation_flow_diagrams: list[FlowDiagram]) -> list[FlowDiagram]: - """We might create flow diagrams that are equal. When having several consumers with different dates, - the user can change a property that isn't visible in the flow diagram. - """ - filtered_flow_diagrams = [installation_flow_diagrams[0]] - last_not_filtered_flow_diagram = installation_flow_diagrams[0] - for index in range(1, len(installation_flow_diagrams)): - current_fd = installation_flow_diagrams[index] - is_visibly_equal = ( - last_not_filtered_flow_diagram.nodes == current_fd.nodes - and last_not_filtered_flow_diagram.edges == current_fd.edges - and last_not_filtered_flow_diagram.flows == current_fd.flows + def _get_node(self, energy_component: EnergyComponent) -> Node: + return Node( + id=energy_component.id, # Node id and subdiagram id should match + title=energy_component.get_name(), + type=self._get_node_type(energy_component).value, + subdiagram=self._get_energy_component_fde(energy_component), ) - if not is_visibly_equal: - filtered_flow_diagrams.append(current_fd) - last_not_filtered_flow_diagram = current_fd - else: - # Extend the time period for the flow-diagram that is not filtered if the current is a duplicate. - last_not_filtered_flow_diagram.end_date = current_fd.end_date - return filtered_flow_diagrams - - -def _create_installation_node(installation: Installation, global_end_date: datetime) -> Node: - return Node( - id=f"installation-{installation.name}", - title=installation.name, - type=NodeType.INSTALLATION, - subdiagram=_create_installation_fde(installation, global_end_date), - ) - - -def _get_sorted_start_dates(ecalc_model: Asset): - return sorted(_get_timesteps([ecalc_model])) - - -def _get_global_dates(ecalc_model: Asset, result_options: ResultOptions) -> tuple[datetime, datetime]: - user_defined_start_date = result_options.start - user_defined_end_date = result_options.end - - if user_defined_start_date is not None and user_defined_end_date is not None: - return user_defined_start_date, user_defined_end_date - - start_dates = _get_sorted_start_dates(ecalc_model) - start_date = user_defined_start_date or start_dates[0] - end_date = user_defined_end_date or datetime(start_dates[-1].year + 1, 1, 1) - return start_date, end_date - -class EcalcModelMapper: - @staticmethod - def from_dto_to_fde(ecalc_model: Asset, result_options: ResultOptions) -> FlowDiagram: - global_start_date, global_end_date = _get_global_dates(ecalc_model, result_options) - installation_nodes = [ - _create_installation_node(installation, global_end_date) for installation in ecalc_model.installations + def get_energy_flow_diagram(self) -> FlowDiagram: + energy_components = self._energy_model.get_energy_components() + installations = [ + energy_component + for energy_component in energy_components + if energy_component.get_component_process_type() == ComponentType.INSTALLATION ] + installation_nodes = [self._get_node(installation) for installation in installations] edges = [] for installation in installation_nodes: @@ -578,6 +149,6 @@ def from_dto_to_fde(ecalc_model: Asset, result_options: ResultOptions) -> FlowDi nodes=[FUEL_NODE, *installation_nodes, EMISSION_NODE], flows=[FUEL_FLOW, EMISSIONS_FLOW], edges=edges, - start_date=global_start_date, - end_date=global_end_date, + start_date=self._model_period.start, + end_date=self._model_period.end, ) diff --git a/src/libecalc/presentation/flow_diagram/fde_models.py b/src/libecalc/presentation/flow_diagram/fde_models.py index 2a17764a3c..3486da89e7 100644 --- a/src/libecalc/presentation/flow_diagram/fde_models.py +++ b/src/libecalc/presentation/flow_diagram/fde_models.py @@ -1,6 +1,7 @@ from __future__ import annotations from datetime import datetime +from enum import Enum from typing import Optional, Union from pydantic import BaseModel, ConfigDict @@ -8,7 +9,7 @@ from libecalc.common.string.string_utils import to_camel_case -class NodeType: +class NodeType(str, Enum): """Supported Node types in FDE diagram""" GENERATOR = "generator" diff --git a/src/libecalc/presentation/yaml/model.py b/src/libecalc/presentation/yaml/model.py index 23842c60d7..9ab725ade4 100644 --- a/src/libecalc/presentation/yaml/model.py +++ b/src/libecalc/presentation/yaml/model.py @@ -66,7 +66,7 @@ def get_regularity(self, component_id: str) -> dict[datetime, Expression]: installation = graph.get_node(installation_id) return installation.regularity - def get_consumers(self, provider_id: str) -> list[EnergyComponent]: + def get_consumers(self, provider_id: str = None) -> list[EnergyComponent]: return self.get_graph().get_consumers(provider_id) def get_energy_components(self) -> list[EnergyComponent]: diff --git a/src/libecalc/testing/dto_energy_model.py b/src/libecalc/testing/dto_energy_model.py index 50dce80cbe..fdc14b8e79 100644 --- a/src/libecalc/testing/dto_energy_model.py +++ b/src/libecalc/testing/dto_energy_model.py @@ -12,7 +12,7 @@ def get_regularity(self, component_id: str) -> dict[datetime, Expression]: installation = graph.get_node(installation_id) return installation.regularity - def get_consumers(self, provider_id: str) -> list[EnergyComponent]: + def get_consumers(self, provider_id: str = None) -> list[EnergyComponent]: return self.component.get_graph().get_consumers(provider_id) def get_energy_components(self) -> list[EnergyComponent]: diff --git a/tests/libecalc/output/flow_diagram/test_ecalc_model_mapper.py b/tests/libecalc/output/flow_diagram/test_ecalc_model_mapper.py index dccec6baab..d48e4fbbbb 100644 --- a/tests/libecalc/output/flow_diagram/test_ecalc_model_mapper.py +++ b/tests/libecalc/output/flow_diagram/test_ecalc_model_mapper.py @@ -7,7 +7,7 @@ from libecalc import dto from libecalc.fixtures import YamlCase from libecalc.presentation.flow_diagram.EcalcModelMapper import ( - EcalcModelMapper, + EnergyModelFlowDiagram, FlowDiagram, Node, _filter_duplicate_flow_diagrams, @@ -18,7 +18,9 @@ class TestEcalcModelMapper: @pytest.mark.snapshot def test_all_energy_usage_models(self, all_energy_usage_models_yaml: YamlCase, snapshot): model = all_energy_usage_models_yaml.get_yaml_model() - actual_fd = EcalcModelMapper.from_dto_to_fde(model.dto, result_options=dto.ResultOptions()) + actual_fd = EnergyModelFlowDiagram( + energy_model=model, model_period=model.variables.period + ).get_energy_flow_diagram() snapshot_name = "all_energy_usage_models_fde.json" snapshot.assert_match( @@ -32,8 +34,11 @@ def test_all_energy_usage_models(self, all_energy_usage_models_yaml: YamlCase, s assert last_subdiagram.end_date == datetime(2021, 1, 1) @pytest.mark.snapshot - def test_case_with_dates(self, installation_with_dates_dto_fd: dto.Asset, snapshot): - actual_fd = EcalcModelMapper.from_dto_to_fde(installation_with_dates_dto_fd, result_options=dto.ResultOptions()) + def test_case_with_dates(self, installation_with_dates_dto_fd: dto.Asset, snapshot, energy_model_from_dto_factory): + model = energy_model_from_dto_factory(installation_with_dates_dto_fd) + actual_fd = EnergyModelFlowDiagram( + energy_model=model, model_period=model.variables.period + ).get_energy_flow_diagram() snapshot_name = "actual_fde.json" snapshot.assert_match( json.dumps(actual_fd.model_dump(), sort_keys=True, indent=4, default=str), snapshot_name=snapshot_name