diff --git a/src/envs/pybullet_blocks.py b/src/envs/pybullet_blocks.py index b45758bc58..9f37822745 100644 --- a/src/envs/pybullet_blocks.py +++ b/src/envs/pybullet_blocks.py @@ -11,7 +11,7 @@ from predicators.src.envs.blocks import BlocksEnv from predicators.src.envs.pybullet_env import PyBulletEnv, \ create_pybullet_block -from predicators.src.envs.pybullet_robots import \ +from predicators.src.pybullet_helpers.controllers import \ create_change_fingers_option, create_move_end_effector_to_pose_option from predicators.src.pybullet_helpers.robots import SingleArmPyBulletRobot, \ create_single_arm_pybullet_robot diff --git a/src/envs/pybullet_cover.py b/src/envs/pybullet_cover.py index 2054d1202f..4c51fe000c 100644 --- a/src/envs/pybullet_cover.py +++ b/src/envs/pybullet_cover.py @@ -10,7 +10,7 @@ from predicators.src.envs.cover import CoverEnv from predicators.src.envs.pybullet_env import PyBulletEnv, \ create_pybullet_block -from predicators.src.envs.pybullet_robots import \ +from predicators.src.pybullet_helpers.controllers import \ create_change_fingers_option, create_move_end_effector_to_pose_option from predicators.src.pybullet_helpers.robots import SingleArmPyBulletRobot, \ create_single_arm_pybullet_robot diff --git a/src/envs/pybullet_robots.py b/src/envs/pybullet_robots.py index 34738eeb52..5f70af1e0a 100644 --- a/src/envs/pybullet_robots.py +++ b/src/envs/pybullet_robots.py @@ -1,154 +1,19 @@ """Interfaces to PyBullet robots.""" from __future__ import annotations -from typing import TYPE_CHECKING, Callable, Collection, Dict, Iterator, List, \ - Optional, Sequence, Tuple, cast +from typing import TYPE_CHECKING, Collection, Iterator, List, Optional, \ + Sequence import numpy as np import pybullet as p -from gym.spaces import Box from predicators.src import utils from predicators.src.settings import CFG -from predicators.src.structs import Action, Array, JointsState, Object, \ - ParameterizedOption, Pose3D, State, Type +from predicators.src.structs import JointsState if TYPE_CHECKING: from predicators.src.pybullet_helpers.robots import SingleArmPyBulletRobot -################################# Controllers ################################# - - -def create_move_end_effector_to_pose_option( - robot: SingleArmPyBulletRobot, - name: str, - types: Sequence[Type], - params_space: Box, - get_current_and_target_pose_and_finger_status: Callable[ - [State, Sequence[Object], Array], Tuple[Pose3D, Pose3D, str]], - move_to_pose_tol: float, - max_vel_norm: float, - finger_action_nudge_magnitude: float, -) -> ParameterizedOption: - """A generic utility that creates a ParameterizedOption for moving the end - effector to a target pose, given a function that takes in the current - state, objects, and parameters, and returns the current pose and target - pose of the end effector, and the finger status.""" - - assert robot.get_name() == "fetch", "Move end effector to pose option " + \ - f"not implemented for robot {robot.get_name()}." - - def _policy(state: State, memory: Dict, objects: Sequence[Object], - params: Array) -> Action: - del memory # unused - # First handle the main arm joints. - current, target, finger_status = \ - get_current_and_target_pose_and_finger_status( - state, objects, params) - # Run IK to determine the target joint positions. - ee_delta = np.subtract(target, current) - # Reduce the target to conform to the max velocity constraint. - ee_norm = np.linalg.norm(ee_delta) - if ee_norm > max_vel_norm: - ee_delta = ee_delta * max_vel_norm / ee_norm - ee_action = np.add(current, ee_delta) - # Keep validate as False because validate=True would update the - # state of the robot during simulation, which overrides physics. - joints_state = robot.inverse_kinematics( - (ee_action[0], ee_action[1], ee_action[2]), validate=False) - # Handle the fingers. Fingers drift if left alone. - # When the fingers are not explicitly being opened or closed, we - # nudge the fingers toward being open or closed according to the - # finger status. - if finger_status == "open": - finger_delta = finger_action_nudge_magnitude - else: - assert finger_status == "closed" - finger_delta = -finger_action_nudge_magnitude - # Extract the current finger state. - state = cast(utils.PyBulletState, state) - finger_state = state.joints_state[robot.left_finger_joint_idx] - # The finger action is an absolute joint position for the fingers. - f_action = finger_state + finger_delta - # Override the meaningless finger values in joint_action. - joints_state[robot.left_finger_joint_idx] = f_action - joints_state[robot.right_finger_joint_idx] = f_action - action_arr = np.array(joints_state, dtype=np.float32) - # This clipping is needed sometimes for the joint limits. - action_arr = np.clip(action_arr, robot.action_space.low, - robot.action_space.high) - assert robot.action_space.contains(action_arr) - return Action(action_arr) - - def _terminal(state: State, memory: Dict, objects: Sequence[Object], - params: Array) -> bool: - del memory # unused - current, target, _ = \ - get_current_and_target_pose_and_finger_status( - state, objects, params) - squared_dist = np.sum(np.square(np.subtract(current, target))) - return squared_dist < move_to_pose_tol - - return ParameterizedOption(name, - types=types, - params_space=params_space, - policy=_policy, - initiable=lambda _1, _2, _3, _4: True, - terminal=_terminal) - - -def create_change_fingers_option( - robot: SingleArmPyBulletRobot, - name: str, - types: Sequence[Type], - params_space: Box, - get_current_and_target_val: Callable[[State, Sequence[Object], Array], - Tuple[float, float]], - max_vel_norm: float, - grasp_tol: float, -) -> ParameterizedOption: - """A generic utility that creates a ParameterizedOption for changing the - robot fingers, given a function that takes in the current state, objects, - and parameters, and returns the current and target finger joint values.""" - - assert robot.get_name() == "fetch", "Change fingers option not " + \ - f"implemented for robot {robot.get_name()}." - - def _policy(state: State, memory: Dict, objects: Sequence[Object], - params: Array) -> Action: - del memory # unused - current_val, target_val = get_current_and_target_val( - state, objects, params) - f_delta = target_val - current_val - f_delta = np.clip(f_delta, -max_vel_norm, max_vel_norm) - f_action = current_val + f_delta - # Don't change the rest of the joints. - state = cast(utils.PyBulletState, state) - target = np.array(state.joints_state, dtype=np.float32) - target[robot.left_finger_joint_idx] = f_action - target[robot.right_finger_joint_idx] = f_action - # This clipping is needed sometimes for the joint limits. - target = np.clip(target, robot.action_space.low, - robot.action_space.high) - assert robot.action_space.contains(target) - return Action(target) - - def _terminal(state: State, memory: Dict, objects: Sequence[Object], - params: Array) -> bool: - del memory # unused - current_val, target_val = get_current_and_target_val( - state, objects, params) - squared_dist = (target_val - current_val)**2 - return squared_dist < grasp_tol - - return ParameterizedOption(name, - types=types, - params_space=params_space, - policy=_policy, - initiable=lambda _1, _2, _3, _4: True, - terminal=_terminal) - - ########################### Other utility functions ########################### @@ -169,89 +34,6 @@ def get_kinematic_chain(robot: int, end_effector: int, return kinematic_chain -def pybullet_inverse_kinematics( - robot: int, - end_effector: int, - target_position: Pose3D, - target_orientation: Sequence[float], - joints: Sequence[int], - physics_client_id: int, - validate: bool = True, -) -> JointsState: - """Runs IK and returns a joints state for the given (free) joints. - - If validate is True, the PyBullet IK solver is called multiple - times, resetting the robot state each time, until the target - position is reached. If the target position is not reached after a - maximum number of iters, an exception is raised. - """ - # Figure out which joint each dimension of the return of IK corresponds to. - free_joints = [] - num_joints = p.getNumJoints(robot, physicsClientId=physics_client_id) - for idx in range(num_joints): - joint_info = p.getJointInfo(robot, - idx, - physicsClientId=physics_client_id) - if joint_info[3] > -1: - free_joints.append(idx) - assert set(joints).issubset(set(free_joints)) - - # Record the initial state of the joints so that we can reset them after. - if validate: - initial_joints_states = p.getJointStates( - robot, free_joints, physicsClientId=physics_client_id) - assert len(initial_joints_states) == len(free_joints) - - # Running IK once is often insufficient, so we run it multiple times until - # convergence. If it does not converge, an error is raised. - convergence_tol = CFG.pybullet_ik_tol - for _ in range(CFG.pybullet_max_ik_iters): - free_joint_vals = p.calculateInverseKinematics( - robot, - end_effector, - target_position, - targetOrientation=target_orientation, - physicsClientId=physics_client_id) - assert len(free_joints) == len(free_joint_vals) - if not validate: - break - # Update the robot state and check if the desired position and - # orientation are reached. - for joint, joint_val in zip(free_joints, free_joint_vals): - p.resetJointState(robot, - joint, - targetValue=joint_val, - physicsClientId=physics_client_id) - ee_link_state = p.getLinkState(robot, - end_effector, - computeForwardKinematics=True, - physicsClientId=physics_client_id) - position = ee_link_state[4] - # Note: we are checking positions only for convergence. - if np.allclose(position, target_position, atol=convergence_tol): - break - else: - raise Exception("Inverse kinematics failed to converge.") - - # Reset the joint states to their initial values to avoid modifying the - # PyBullet internal state. - if validate: - for joint, (pos, vel, _, _) in zip(free_joints, initial_joints_states): - p.resetJointState(robot, - joint, - targetValue=pos, - targetVelocity=vel, - physicsClientId=physics_client_id) - # Order the found free_joint_vals based on the requested joints. - joint_vals = [] - for joint in joints: - free_joint_idx = free_joints.index(joint) - joint_val = free_joint_vals[free_joint_idx] - joint_vals.append(joint_val) - - return joint_vals - - def run_motion_planning( robot: SingleArmPyBulletRobot, initial_state: JointsState, target_state: JointsState, collision_bodies: Collection[int], diff --git a/src/pybullet_helpers/controllers.py b/src/pybullet_helpers/controllers.py new file mode 100644 index 0000000000..d0b271ab34 --- /dev/null +++ b/src/pybullet_helpers/controllers.py @@ -0,0 +1,142 @@ +"""Generic controllers for the robots.""" +from __future__ import annotations + +from typing import Callable, Dict, Sequence, Tuple, cast + +import numpy as np +from gym.spaces import Box + +from predicators.src import utils +from predicators.src.pybullet_helpers.robots import SingleArmPyBulletRobot +from predicators.src.structs import Action, Array, Object, \ + ParameterizedOption, Pose3D, State, Type + + +def create_move_end_effector_to_pose_option( + robot: SingleArmPyBulletRobot, + name: str, + types: Sequence[Type], + params_space: Box, + get_current_and_target_pose_and_finger_status: Callable[ + [State, Sequence[Object], Array], Tuple[Pose3D, Pose3D, str]], + move_to_pose_tol: float, + max_vel_norm: float, + finger_action_nudge_magnitude: float, +) -> ParameterizedOption: + """A generic utility that creates a ParameterizedOption for moving the end + effector to a target pose, given a function that takes in the current + state, objects, and parameters, and returns the current pose and target + pose of the end effector, and the finger status.""" + + assert robot.get_name() == "fetch", "Move end effector to pose option " + \ + f"not implemented for robot {robot.get_name()}." + + def _policy(state: State, memory: Dict, objects: Sequence[Object], + params: Array) -> Action: + del memory # unused + # First handle the main arm joints. + current, target, finger_status = \ + get_current_and_target_pose_and_finger_status( + state, objects, params) + # Run IK to determine the target joint positions. + ee_delta = np.subtract(target, current) + # Reduce the target to conform to the max velocity constraint. + ee_norm = np.linalg.norm(ee_delta) + if ee_norm > max_vel_norm: + ee_delta = ee_delta * max_vel_norm / ee_norm + ee_action = np.add(current, ee_delta) + # Keep validate as False because validate=True would update the + # state of the robot during simulation, which overrides physics. + joints_state = robot.inverse_kinematics( + (ee_action[0], ee_action[1], ee_action[2]), validate=False) + # Handle the fingers. Fingers drift if left alone. + # When the fingers are not explicitly being opened or closed, we + # nudge the fingers toward being open or closed according to the + # finger status. + if finger_status == "open": + finger_delta = finger_action_nudge_magnitude + else: + assert finger_status == "closed" + finger_delta = -finger_action_nudge_magnitude + # Extract the current finger state. + state = cast(utils.PyBulletState, state) + finger_state = state.joints_state[robot.left_finger_joint_idx] + # The finger action is an absolute joint position for the fingers. + f_action = finger_state + finger_delta + # Override the meaningless finger values in joint_action. + joints_state[robot.left_finger_joint_idx] = f_action + joints_state[robot.right_finger_joint_idx] = f_action + action_arr = np.array(joints_state, dtype=np.float32) + # This clipping is needed sometimes for the joint limits. + action_arr = np.clip(action_arr, robot.action_space.low, + robot.action_space.high) + assert robot.action_space.contains(action_arr) + return Action(action_arr) + + def _terminal(state: State, memory: Dict, objects: Sequence[Object], + params: Array) -> bool: + del memory # unused + current, target, _ = \ + get_current_and_target_pose_and_finger_status( + state, objects, params) + squared_dist = np.sum(np.square(np.subtract(current, target))) + return squared_dist < move_to_pose_tol + + return ParameterizedOption(name, + types=types, + params_space=params_space, + policy=_policy, + initiable=lambda _1, _2, _3, _4: True, + terminal=_terminal) + + +def create_change_fingers_option( + robot: SingleArmPyBulletRobot, + name: str, + types: Sequence[Type], + params_space: Box, + get_current_and_target_val: Callable[[State, Sequence[Object], Array], + Tuple[float, float]], + max_vel_norm: float, + grasp_tol: float, +) -> ParameterizedOption: + """A generic utility that creates a ParameterizedOption for changing the + robot fingers, given a function that takes in the current state, objects, + and parameters, and returns the current and target finger joint values.""" + + assert robot.get_name() == "fetch", "Change fingers option not " + \ + f"implemented for robot {robot.get_name()}." + + def _policy(state: State, memory: Dict, objects: Sequence[Object], + params: Array) -> Action: + del memory # unused + current_val, target_val = get_current_and_target_val( + state, objects, params) + f_delta = target_val - current_val + f_delta = np.clip(f_delta, -max_vel_norm, max_vel_norm) + f_action = current_val + f_delta + # Don't change the rest of the joints. + state = cast(utils.PyBulletState, state) + target = np.array(state.joints_state, dtype=np.float32) + target[robot.left_finger_joint_idx] = f_action + target[robot.right_finger_joint_idx] = f_action + # This clipping is needed sometimes for the joint limits. + target = np.clip(target, robot.action_space.low, + robot.action_space.high) + assert robot.action_space.contains(target) + return Action(target) + + def _terminal(state: State, memory: Dict, objects: Sequence[Object], + params: Array) -> bool: + del memory # unused + current_val, target_val = get_current_and_target_val( + state, objects, params) + squared_dist = (target_val - current_val)**2 + return squared_dist < grasp_tol + + return ParameterizedOption(name, + types=types, + params_space=params_space, + policy=_policy, + initiable=lambda _1, _2, _3, _4: True, + terminal=_terminal) diff --git a/src/pybullet_helpers/inverse_kinematics.py b/src/pybullet_helpers/inverse_kinematics.py new file mode 100644 index 0000000000..1ad17db8e9 --- /dev/null +++ b/src/pybullet_helpers/inverse_kinematics.py @@ -0,0 +1,97 @@ +"""Vanilla Pybullet Inverse Kinematics. + +The IKFast solver is preferred over Pybullet IK, if available for the +given robot. +""" +from __future__ import annotations + +from typing import Sequence + +import numpy as np +import pybullet as p + +from predicators.src.settings import CFG +from predicators.src.structs import JointsState, Pose3D + + +def pybullet_inverse_kinematics( + robot: int, + end_effector: int, + target_position: Pose3D, + target_orientation: Sequence[float], + joints: Sequence[int], + physics_client_id: int, + validate: bool = True, +) -> JointsState: + """Runs IK and returns a joints state for the given (free) joints. + + If validate is True, the PyBullet IK solver is called multiple + times, resetting the robot state each time, until the target + position is reached. If the target position is not reached after a + maximum number of iters, an exception is raised. + """ + # Figure out which joint each dimension of the return of IK corresponds to. + free_joints = [] + num_joints = p.getNumJoints(robot, physicsClientId=physics_client_id) + for idx in range(num_joints): + joint_info = p.getJointInfo(robot, + idx, + physicsClientId=physics_client_id) + if joint_info[3] > -1: + free_joints.append(idx) + assert set(joints).issubset(set(free_joints)) + + # Record the initial state of the joints so that we can reset them after. + if validate: + initial_joints_states = p.getJointStates( + robot, free_joints, physicsClientId=physics_client_id) + assert len(initial_joints_states) == len(free_joints) + + # Running IK once is often insufficient, so we run it multiple times until + # convergence. If it does not converge, an error is raised. + convergence_tol = CFG.pybullet_ik_tol + for _ in range(CFG.pybullet_max_ik_iters): + free_joint_vals = p.calculateInverseKinematics( + robot, + end_effector, + target_position, + targetOrientation=target_orientation, + physicsClientId=physics_client_id) + assert len(free_joints) == len(free_joint_vals) + if not validate: + break + # Update the robot state and check if the desired position and + # orientation are reached. + for joint, joint_val in zip(free_joints, free_joint_vals): + p.resetJointState(robot, + joint, + targetValue=joint_val, + physicsClientId=physics_client_id) + ee_link_state = p.getLinkState(robot, + end_effector, + computeForwardKinematics=True, + physicsClientId=physics_client_id) + position = ee_link_state[4] + # Note: we are checking positions only for convergence. + if np.allclose(position, target_position, atol=convergence_tol): + break + else: + raise Exception("Inverse kinematics failed to converge.") + + # Reset the joint states to their initial values to avoid modifying the + # PyBullet internal state. + if validate: + for joint, (pos, vel, _, _) in zip(free_joints, initial_joints_states): + p.resetJointState(robot, + joint, + targetValue=pos, + targetVelocity=vel, + physicsClientId=physics_client_id) + # Order the found free_joint_vals based on the requested joints. + joint_vals = [] + for joint in joints: + free_joint_idx = free_joints.index(joint) + joint_val = free_joint_vals[free_joint_idx] + joint_vals.append(joint_val) + + return joint_vals diff --git a/src/pybullet_helpers/robots/single_arm.py b/src/pybullet_helpers/robots/single_arm.py index 4afd24f312..d76caf947c 100644 --- a/src/pybullet_helpers/robots/single_arm.py +++ b/src/pybullet_helpers/robots/single_arm.py @@ -8,7 +8,8 @@ from gym.spaces import Box from predicators.src import utils -from predicators.src.envs.pybullet_robots import get_kinematic_chain, \ +from predicators.src.envs.pybullet_robots import get_kinematic_chain +from predicators.src.pybullet_helpers.inverse_kinematics import \ pybullet_inverse_kinematics from predicators.src.settings import CFG from predicators.src.structs import Array, JointsState, Pose3D diff --git a/tests/envs/test_pybullet_robots.py b/tests/envs/test_pybullet_robots.py index 89919f6656..69e8fdd46e 100644 --- a/tests/envs/test_pybullet_robots.py +++ b/tests/envs/test_pybullet_robots.py @@ -7,7 +7,9 @@ from predicators.src import utils from predicators.src.envs.pybullet_env import create_pybullet_block from predicators.src.envs.pybullet_robots import get_kinematic_chain, \ - pybullet_inverse_kinematics, run_motion_planning + run_motion_planning +from predicators.src.pybullet_helpers.inverse_kinematics import \ + pybullet_inverse_kinematics from predicators.src.pybullet_helpers.robots import FetchPyBulletRobot, \ create_single_arm_pybullet_robot from predicators.src.settings import CFG