Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate steps based on type of task #492

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@
"sphinx",
]
},
python_requires=">=3.8",
python_requires=">=3.10",
)
114 changes: 75 additions & 39 deletions src/isar/apis/models/start_mission_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from isar.mission_planner.mission_planner_interface import MissionPlannerError
from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.step import (
STEPS,
DockingProcedure,
DriveToPose,
Localize,
RecordAudio,
STEPS,
TakeImage,
TakeThermalImage,
TakeThermalVideo,
Expand All @@ -29,6 +31,13 @@ class InspectionTypes(str, Enum):
audio: str = "Audio"


class TaskType(str, Enum):
Inspection: str = "inspection"
DriveTo: str = "drive_to"
Localization: str = "localization"
Dock: str = "dock"


class StartMissionInspectionDefinition(BaseModel):
type: InspectionTypes = Field(default=InspectionTypes.image)
inspection_target: InputPosition
Expand All @@ -39,6 +48,7 @@ class StartMissionInspectionDefinition(BaseModel):


class StartMissionTaskDefinition(BaseModel):
type: TaskType = Field(default=TaskType.Inspection)
pose: InputPose
inspections: List[StartMissionInspectionDefinition]
tag: Optional[str] = None
Expand All @@ -53,53 +63,22 @@ class StartMissionDefinition(BaseModel):

def to_isar_mission(mission_definition: StartMissionDefinition) -> Mission:
isar_tasks: List[Task] = []
all_inspection_steps: List[STEPS] = []
duplicate_ids: List[str] = []
all_steps_in_mission: List[STEPS] = []

for task in mission_definition.tasks:
try:
tag_id: Optional[str] = task.tag
drive_step: DriveToPose = DriveToPose(pose=task.pose.to_alitra_pose())
inspection_steps: List[STEPS] = [
create_inspection_step(
inspection_type=inspection.type,
duration=inspection.duration,
target=inspection.inspection_target.to_alitra_position(),
tag_id=tag_id,
metadata=inspection.metadata,
id=inspection.id,
)
for inspection in task.inspections
]
except ValueError as e:
raise MissionPlannerError(f"Failed to create task: {str(e)}")

duplicate_ids = get_duplicate_ids(items=inspection_steps)
if len(duplicate_ids) > 0:
raise MissionPlannerError(
f"Failed to create task: Duplicate step IDs are not allowed ({duplicate_ids})"
)
all_inspection_steps.extend(inspection_steps)

isar_task: Task = Task(steps=[drive_step, *inspection_steps], tag_id=tag_id)
steps: List[STEPS] = generate_steps(task)
all_steps_in_mission.extend(steps)

isar_task: Task = Task(steps=steps, tag_id=task.tag)
if task.id:
isar_task.id = task.id
isar_tasks.append(isar_task)

if not isar_tasks:
raise MissionPlannerError("Mission does not contain any valid tasks")

duplicate_ids = get_duplicate_ids(items=isar_tasks)
if len(duplicate_ids) > 0:
raise MissionPlannerError(
f"Failed to create mission: Duplicate task IDs are not allowed ({duplicate_ids})"
)

duplicate_ids = get_duplicate_ids(items=all_inspection_steps)
if len(duplicate_ids) > 0:
raise MissionPlannerError(
f"Failed to create task: Duplicate step IDs are not allowed ({duplicate_ids})"
)
check_for_duplicate_ids(isar_tasks)
check_for_duplicate_ids(all_steps_in_mission)

isar_mission: Mission = Mission(tasks=isar_tasks)

Expand All @@ -114,6 +93,63 @@ def to_isar_mission(mission_definition: StartMissionDefinition) -> Mission:
return isar_mission


def check_for_duplicate_ids(items: Union[List[Task], List[STEPS]]):
duplicate_ids = get_duplicate_ids(items=items)
if len(duplicate_ids) > 0:
raise MissionPlannerError(
f"Failed to create as there were duplicate IDs which is not allowed "
f"({duplicate_ids})"
)


def generate_steps(task) -> List[STEPS]:
steps: List[STEPS] = []
try:
match task.type:
case TaskType.Inspection:
steps.extend(generate_steps_for_inspection_task(task=task))
case TaskType.DriveTo:
steps.append(generate_steps_for_drive_to_task(task=task))
case TaskType.Localization:
steps.append(generate_steps_for_localization_task(task=task))
case TaskType.Dock:
steps.append(generate_steps_for_dock_task())
except ValueError as e:
raise MissionPlannerError(f"Failed to create task: {str(e)}")

return steps


def generate_steps_for_inspection_task(task: StartMissionTaskDefinition) -> List[STEPS]:
drive_step: DriveToPose = DriveToPose(pose=task.pose.to_alitra_pose())

inspection_steps: List[STEPS] = [
create_inspection_step(
inspection_type=inspection.type,
duration=inspection.duration,
target=inspection.inspection_target.to_alitra_position(),
tag_id=task.tag,
metadata=inspection.metadata,
id=inspection.id,
)
for inspection in task.inspections
]

return [drive_step, *inspection_steps]


def generate_steps_for_drive_to_task(task: StartMissionTaskDefinition) -> DriveToPose:
return DriveToPose(pose=task.pose.to_alitra_pose())


def generate_steps_for_localization_task(task: StartMissionTaskDefinition) -> Localize:
return Localize(localization_pose=task.pose.to_alitra_pose())


def generate_steps_for_dock_task() -> DockingProcedure:
return DockingProcedure(behavior="dock")


def create_inspection_step(
inspection_type: InspectionTypes,
duration: float,
Expand Down
2 changes: 1 addition & 1 deletion src/robot_interface/models/mission/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DockingProcedure(MotionStep):
Step which causes the robot to dock or undock
"""

behavior: Literal["dock, undock"]
behavior: Literal["dock", "undock"]
type: Literal["docking_procedure"] = "docking_procedure"


Expand Down