From 01ee02bdd275175988f53aa3d5df8a2e2ec8cbad Mon Sep 17 00:00:00 2001 From: aestene Date: Fri, 13 Oct 2023 16:12:30 +0200 Subject: [PATCH] Generate steps based on type of task --- .github/workflows/pythonpackage.yml | 2 +- setup.py | 2 +- .../apis/models/start_mission_definition.py | 115 ++++++++++++------ src/robot_interface/models/mission/step.py | 2 +- 4 files changed, 79 insertions(+), 42 deletions(-) diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 59e5f1ee..f7cd8a91 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: ["3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/setup.py b/setup.py index 1405094d..f8aac5bd 100644 --- a/setup.py +++ b/setup.py @@ -66,5 +66,5 @@ "sphinx", ] }, - python_requires=">=3.8", + python_requires=">=3.10", ) diff --git a/src/isar/apis/models/start_mission_definition.py b/src/isar/apis/models/start_mission_definition.py index b5dcd1ef..45715ac9 100644 --- a/src/isar/apis/models/start_mission_definition.py +++ b/src/isar/apis/models/start_mission_definition.py @@ -10,9 +10,11 @@ from isar.mission_planner.mission_planner_interface import MissionPlannerError from robot_interface.models.mission.mission import Mission from robot_interface.models.mission.step import ( - STEPS, + DockingProcedure, DriveToPose, + Localize, RecordAudio, + STEPS, TakeImage, TakeThermalImage, TakeThermalVideo, @@ -29,6 +31,13 @@ class InspectionTypes(str, Enum): audio: str = "Audio" +class TaskType(str, Enum): + Inspection: str = "inspection" + DriveTo: str = "drive_to" + Localization: str = "localization" + Dock: str = "dock" + + class StartMissionInspectionDefinition(BaseModel): type: InspectionTypes = Field(default=InspectionTypes.image) inspection_target: InputPosition @@ -38,6 +47,7 @@ class StartMissionInspectionDefinition(BaseModel): class StartMissionTaskDefinition(BaseModel): + type: TaskType = Field(default=TaskType.Inspection) pose: InputPose inspections: List[StartMissionInspectionDefinition] tag: Optional[str] = None @@ -52,35 +62,13 @@ class StartMissionDefinition(BaseModel): def to_isar_mission(mission_definition: StartMissionDefinition) -> Mission: isar_tasks: List[Task] = [] - all_inspection_steps: List[STEPS] = [] - duplicate_ids: List[str] = [] + all_steps_in_mission: List[STEPS] = [] for task in mission_definition.tasks: - try: - tag_id: Optional[str] = task.tag - drive_step: DriveToPose = DriveToPose(pose=task.pose.to_alitra_pose()) - inspection_steps: List[STEPS] = [ - create_inspection_step( - inspection_type=inspection.type, - duration=inspection.duration, - target=inspection.inspection_target.to_alitra_position(), - tag_id=tag_id, - metadata=inspection.metadata, - id=inspection.id, - ) - for inspection in task.inspections - ] - except ValueError as e: - raise MissionPlannerError(f"Failed to create task: {str(e)}") - - duplicate_ids = get_duplicate_ids(items=inspection_steps) - if len(duplicate_ids) > 0: - raise MissionPlannerError( - f"Failed to create task: Duplicate step IDs are not allowed ({duplicate_ids})" - ) - all_inspection_steps.extend(inspection_steps) - - isar_task: Task = Task(steps=[drive_step, *inspection_steps], tag_id=tag_id) + steps: List[STEPS] = generate_steps(task) + all_steps_in_mission.extend(steps) + + isar_task: Task = Task(steps=steps, tag_id=task.tag) if task.id: isar_task.id = task.id isar_tasks.append(isar_task) @@ -88,17 +76,8 @@ def to_isar_mission(mission_definition: StartMissionDefinition) -> Mission: if not isar_tasks: raise MissionPlannerError("Mission does not contain any valid tasks") - duplicate_ids = get_duplicate_ids(items=isar_tasks) - if len(duplicate_ids) > 0: - raise MissionPlannerError( - f"Failed to create mission: Duplicate task IDs are not allowed ({duplicate_ids})" - ) - - duplicate_ids = get_duplicate_ids(items=all_inspection_steps) - if len(duplicate_ids) > 0: - raise MissionPlannerError( - f"Failed to create task: Duplicate step IDs are not allowed ({duplicate_ids})" - ) + check_for_duplicate_ids(isar_tasks) + check_for_duplicate_ids(all_steps_in_mission) isar_mission: Mission = Mission(tasks=isar_tasks) @@ -113,6 +92,64 @@ def to_isar_mission(mission_definition: StartMissionDefinition) -> Mission: return isar_mission +def check_for_duplicate_ids(items: Union[List[Task], List[STEPS]]): + duplicate_ids = get_duplicate_ids(items=items) + if len(duplicate_ids) > 0: + raise MissionPlannerError( + f"Failed to create as there were duplicate IDs which is not allowed " + f"({duplicate_ids})" + ) + + +def generate_steps(task) -> List[STEPS]: + try: + steps: List[STEPS] = [] + + match task.type: + case TaskType.Inspection: + steps.append(generate_steps_for_inspection_task(task=task)) # type: ignore + case TaskType.DriveTo: + steps.append(generate_steps_for_drive_to_task(task=task)) + case TaskType.Localization: + steps.append(generate_steps_for_localization_task(task=task)) + case TaskType.Dock: + steps.append(generate_steps_for_dock_task()) + except ValueError as e: + raise MissionPlannerError(f"Failed to create task: {str(e)}") + + return steps + + +def generate_steps_for_inspection_task(task: StartMissionTaskDefinition) -> List[STEPS]: + drive_step: DriveToPose = DriveToPose(pose=task.pose.to_alitra_pose()) + + inspection_steps: List[STEPS] = [ + create_inspection_step( + inspection_type=inspection.type, + duration=inspection.duration, + target=inspection.inspection_target.to_alitra_position(), + tag_id=task.tag, + metadata=inspection.metadata, + id=inspection.id, + ) + for inspection in task.inspections + ] + + return [drive_step, *inspection_steps] + + +def generate_steps_for_drive_to_task(task: StartMissionTaskDefinition) -> DriveToPose: + return DriveToPose(pose=task.pose.to_alitra_pose()) + + +def generate_steps_for_localization_task(task: StartMissionTaskDefinition) -> Localize: + return Localize(localization_pose=task.pose.to_alitra_pose()) + + +def generate_steps_for_dock_task() -> DockingProcedure: + return DockingProcedure(behavior="dock") + + def create_inspection_step( inspection_type: InspectionTypes, duration: float, diff --git a/src/robot_interface/models/mission/step.py b/src/robot_interface/models/mission/step.py index ae94b162..55dc2257 100644 --- a/src/robot_interface/models/mission/step.py +++ b/src/robot_interface/models/mission/step.py @@ -107,7 +107,7 @@ class DockingProcedure(MotionStep): Step which causes the robot to dock or undock """ - behavior: Literal["dock, undock"] + behavior: Literal["dock", "undock"] type: Literal["docking_procedure"] = "docking_procedure"