diff --git a/interlocking/interlockingcontroller/pointcontroller.py b/interlocking/interlockingcontroller/pointcontroller.py index 73c3434..6ee4312 100644 --- a/interlocking/interlockingcontroller/pointcontroller.py +++ b/interlocking/interlockingcontroller/pointcontroller.py @@ -14,14 +14,16 @@ def __init__(self, signal_controller: SignalController, infrastructure_providers self.points: dict[str, Point] = {} self.infrastructure_providers = infrastructure_providers self.settings = settings - self.flank_protection_controller = FlankProtectionController(self, signal_controller) + if self.settings.activate_flank_protection is True: + self.flank_protection_controller = FlankProtectionController(self, signal_controller) def reset(self): for point_id in self.points: self.points[point_id].orientation = "undefined" self.points[point_id].state = OccupancyState.FREE self.points[point_id].used_by = set() - self.flank_protection_controller.reset() + if self.settings.activate_flank_protection is True: + self.flank_protection_controller.reset() async def set_route(self, route, train_id: str): tasks = [] @@ -34,7 +36,8 @@ async def set_route(self, route, train_id: str): if orientation == "left" or orientation == "right": self.set_point_reserved(point, train_id) tasks.append(tg.create_task(self.turn_point(point, orientation))) - tasks.append(tg.create_task(self.flank_protection_controller.add_flank_protection_for_point(point, orientation, route, train_id))) + if self.settings.activate_flank_protection is True: + tasks.append(tg.create_task(self.flank_protection_controller.add_flank_protection_for_point(point, orientation, route, train_id))) else: raise ValueError("Turn should happen but is not possible") @@ -89,7 +92,9 @@ def set_point_free(self, point, train_id: str): logging.info(f"--- Set point {point.point_id} to free") point.state = OccupancyState.FREE point.used_by.remove(train_id) - self.flank_protection_controller.free_flank_protection_of_point(point, point.orientation) + + if self.settings.activate_flank_protection is True: + self.flank_protection_controller.free_flank_protection_of_point(point, point.orientation) def reset_route(self, route, train_id: str): for point in route.get_points_of_route(): diff --git a/interlocking/model/helper/settings.py b/interlocking/model/helper/settings.py index a325e32..a5035c6 100644 --- a/interlocking/model/helper/settings.py +++ b/interlocking/model/helper/settings.py @@ -6,9 +6,12 @@ class Settings(object): def __init__(self, max_number_of_points_at_same_time: int = 5, - default_interlocking_provider: Type[InfrastructureProvider] | None = LoggingInfrastructureProvider): + default_interlocking_provider: Type[InfrastructureProvider] | None = LoggingInfrastructureProvider, + activate_flank_protection: bool = True): self.max_number_of_points_at_same_time = max(max_number_of_points_at_same_time, 1) # For all elements that are not covered by an infrastructure provider, an instance of this default provider will # be created. This default provider can be None. self.default_interlocking_provider: Type[InfrastructureProvider] | None = default_interlocking_provider + + self.activate_flank_protection: bool = activate_flank_protection