From a0c04de1c0479222ad21f07a4640e3b79f5e6c6c Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Tue, 10 Dec 2024 11:05:43 -0600 Subject: [PATCH] Improve argument parsing in Actions The `RosGzBridge` and `GzServer` now support different spellings for boolean arguments (`True`, `true`). This also simplifies how conditionals are used to create composable nodes by evaluating the conditionals and using them as regular Python booleans instead of relying on `PythonExpression`. It was actually the `PythonExpression` that was preventing support of boolean arguments spelled `true`/`false`. Signed-off-by: Addisu Z. Taddese --- .../ros_gz_bridge/actions/ros_gz_bridge.py | 168 +++++++++--------- ros_gz_sim/ros_gz_sim/actions/gzserver.py | 155 ++++++++-------- 2 files changed, 156 insertions(+), 167 deletions(-) diff --git a/ros_gz_bridge/ros_gz_bridge/actions/ros_gz_bridge.py b/ros_gz_bridge/ros_gz_bridge/actions/ros_gz_bridge.py index bcebee0c..3149250b 100644 --- a/ros_gz_bridge/ros_gz_bridge/actions/ros_gz_bridge.py +++ b/ros_gz_bridge/ros_gz_bridge/actions/ros_gz_bridge.py @@ -14,16 +14,14 @@ """Module for the ros_gz bridge action.""" -from typing import List -from typing import Optional +from typing import Dict, List, Optional, Union from launch.action import Action -from launch.actions import GroupAction -from launch.conditions import IfCondition from launch.frontend import Entity, expose_action, Parser from launch.launch_context import LaunchContext from launch.some_substitutions_type import SomeSubstitutionsType -from launch.substitutions import PythonExpression +from launch.substitutions import TextSubstitution +from launch.utilities.type_utils import normalize_typed_substitution, perform_typed_substitution from launch_ros.actions import ComposableNodeContainer, LoadComposableNodes, Node from launch_ros.descriptions import ComposableNode @@ -37,13 +35,13 @@ def __init__( *, bridge_name: SomeSubstitutionsType, config_file: SomeSubstitutionsType, - container_name: Optional[SomeSubstitutionsType] = 'ros_gz_container', - create_own_container: Optional[SomeSubstitutionsType] = 'False', - namespace: Optional[SomeSubstitutionsType] = '', - use_composition: Optional[SomeSubstitutionsType] = 'False', - use_respawn: Optional[SomeSubstitutionsType] = 'False', - log_level: Optional[SomeSubstitutionsType] = 'info', - bridge_params: Optional[SomeSubstitutionsType] = '', + container_name: SomeSubstitutionsType = 'ros_gz_container', + create_own_container: Union[bool, SomeSubstitutionsType] = False, + namespace: SomeSubstitutionsType = '', + use_composition: Union[bool, SomeSubstitutionsType] = False, + use_respawn: Union[bool, SomeSubstitutionsType] = False, + log_level: SomeSubstitutionsType = 'info', + bridge_params: SomeSubstitutionsType = '', **kwargs ) -> None: """ @@ -60,20 +58,36 @@ def __init__( :param: bridge_params Extra parameters to pass to the bridge. """ super().__init__(**kwargs) + self.__bridge_name = bridge_name self.__config_file = config_file self.__container_name = container_name - self.__create_own_container = create_own_container self.__namespace = namespace - self.__use_composition = use_composition - self.__use_respawn = use_respawn + + # This is here to allow using strings or booleans as values for boolean variables when the Action is used from Python + # i.e., this allows users to do: + # RosGzBridge(bridge_name='bridge1', use_composition='true', create_own_container=True) + # Note that use_composition is set to a string while create_own_container is set to a boolean. The reverse would also work. + # At some point, we might want to deprecate this and only allow setting booleans since that's + # what users would expect when calling this from Python + if isinstance(create_own_container, str): + self.__create_own_container = normalize_typed_substitution(TextSubstitution(text=create_own_container), bool) + else: + self.__create_own_container = normalize_typed_substitution(create_own_container, bool) + + if isinstance(use_composition, str): + self.__use_composition = normalize_typed_substitution(TextSubstitution(text=use_composition), bool) + else: + self.__use_composition = normalize_typed_substitution(use_composition, bool) + + self.__use_respawn = normalize_typed_substitution(use_respawn, bool) self.__log_level = log_level self.__bridge_params = bridge_params @classmethod def parse(cls, entity: Entity, parser: Parser): """Parse ros_gz_bridge.""" - _, kwargs = super().parse(entity, parser) + kwargs:Dict = super().parse(entity, parser)[1] bridge_name = entity.get_attr( 'bridge_name', data_type=str, @@ -169,77 +183,59 @@ def execute(self, context: LaunchContext) -> Optional[List[Action]]: bridge_params_pairs = simplified_bridge_params.split(',') parsed_bridge_params = dict(pair.split(':') for pair in bridge_params_pairs) - if isinstance(self.__use_composition, list): - self.__use_composition = self.__use_composition[0] - - if isinstance(self.__create_own_container, list): - self.__create_own_container = self.__create_own_container[0] - - if isinstance(self.__use_respawn, list): - self.__use_respawn = self.__use_respawn[0] - - # Standard node configuration - load_nodes = GroupAction( - condition=IfCondition(PythonExpression(['not ', self.__use_composition])), - actions=[ - Node( - package='ros_gz_bridge', - executable='bridge_node', - name=self.__bridge_name, - namespace=self.__namespace, - output='screen', - respawn=bool(self.__use_respawn), - respawn_delay=2.0, - parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], - arguments=['--ros-args', '--log-level', self.__log_level], - ), - ], - ) + use_composition_eval = perform_typed_substitution(context, self.__use_composition, bool) + create_own_container_eval = perform_typed_substitution(context, self.__create_own_container, bool) + + launch_descriptions: List[Action] = [] + + if not use_composition_eval: + # Standard node configuration + launch_descriptions.append(Node( + package='ros_gz_bridge', + executable='bridge_node', + name=self.__bridge_name, + namespace=self.__namespace, + output='screen', + respawn=perform_typed_substitution(context, self.__use_respawn, bool), + respawn_delay=2.0, + parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], + arguments=['--ros-args', '--log-level', self.__log_level], + )) # Composable node with container configuration - load_composable_nodes_with_container = ComposableNodeContainer( - condition=IfCondition( - PythonExpression([self.__use_composition, ' and ', self.__create_own_container]) - ), - name=self.__container_name, - namespace='', - package='rclcpp_components', - executable='component_container', - composable_node_descriptions=[ - ComposableNode( - package='ros_gz_bridge', - plugin='ros_gz_bridge::RosGzBridge', - name=self.__bridge_name, - namespace=self.__namespace, - parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], - extra_arguments=[{'use_intra_process_comms': True}], - ), - ], - output='screen', - ) + if use_composition_eval and create_own_container_eval: + launch_descriptions.append(ComposableNodeContainer( + name=self.__container_name, + namespace='', + package='rclcpp_components', + executable='component_container', + composable_node_descriptions=[ + ComposableNode( + package='ros_gz_bridge', + plugin='ros_gz_bridge::RosGzBridge', + name=self.__bridge_name, + namespace=self.__namespace, + parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], + extra_arguments=[{'use_intra_process_comms': True}], + ), + ], + output='screen', + )) # Composable node without container configuration - load_composable_nodes_without_container = LoadComposableNodes( - condition=IfCondition( - PythonExpression( - [self.__use_composition, ' and not ', self.__create_own_container] - ) - ), - target_container=self.__container_name, - composable_node_descriptions=[ - ComposableNode( - package='ros_gz_bridge', - plugin='ros_gz_bridge::RosGzBridge', - name=self.__bridge_name, - namespace=self.__namespace, - parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], - extra_arguments=[{'use_intra_process_comms': True}], - ), - ], - ) - - return [ - load_nodes, - load_composable_nodes_with_container, - load_composable_nodes_without_container - ] + if use_composition_eval and not create_own_container_eval: + launch_descriptions.append(LoadComposableNodes( + target_container=self.__container_name, + composable_node_descriptions=[ + ComposableNode( + package='ros_gz_bridge', + plugin='ros_gz_bridge::RosGzBridge', + name=self.__bridge_name, + namespace=self.__namespace, + parameters=[{'config_file': self.__config_file, **parsed_bridge_params}], + extra_arguments=[{'use_intra_process_comms': True}], + ), + ], + )) + + return launch_descriptions diff --git a/ros_gz_sim/ros_gz_sim/actions/gzserver.py b/ros_gz_sim/ros_gz_sim/actions/gzserver.py index c8b33c3a..975a7883 100644 --- a/ros_gz_sim/ros_gz_sim/actions/gzserver.py +++ b/ros_gz_sim/ros_gz_sim/actions/gzserver.py @@ -15,18 +15,17 @@ """Module for the GzServer action.""" import os -from typing import List -from typing import Optional +from typing import Dict, List, Optional, Union from ament_index_python.packages import get_package_share_directory from catkin_pkg.package import InvalidPackage, PACKAGE_MANIFEST_FILENAME, parse_package from launch.action import Action -from launch.actions import GroupAction, SetEnvironmentVariable -from launch.conditions import IfCondition +from launch.actions import SetEnvironmentVariable from launch.frontend import Entity, expose_action, Parser from launch.launch_context import LaunchContext from launch.some_substitutions_type import SomeSubstitutionsType -from launch.substitutions import PythonExpression +from launch.substitutions import TextSubstitution +from launch.utilities.type_utils import normalize_typed_substitution, perform_typed_substitution from launch_ros.actions import ComposableNodeContainer, LoadComposableNodes, Node from launch_ros.descriptions import ComposableNode from ros2pkg.api import get_package_names @@ -91,11 +90,11 @@ class GzServer(Action): def __init__( self, *, - world_sdf_file: Optional[SomeSubstitutionsType] = '', - world_sdf_string: Optional[SomeSubstitutionsType] = '', - container_name: Optional[SomeSubstitutionsType] = 'ros_gz_container', - create_own_container: Optional[SomeSubstitutionsType] = 'False', - use_composition: Optional[SomeSubstitutionsType] = 'False', + world_sdf_file: SomeSubstitutionsType = '', + world_sdf_string: SomeSubstitutionsType = '', + container_name: SomeSubstitutionsType = 'ros_gz_container', + create_own_container: Union[bool, SomeSubstitutionsType] = False, + use_composition: Union[bool, SomeSubstitutionsType] = False, **kwargs ) -> None: """ @@ -114,13 +113,24 @@ def __init__( self.__world_sdf_file = world_sdf_file self.__world_sdf_string = world_sdf_string self.__container_name = container_name - self.__create_own_container = create_own_container - self.__use_composition = use_composition + + + # This is here to allow using strings or booleans as values for boolean variables when the Action is used from Python + # See the RosGzBridge.__init__ function for more details. + if isinstance(create_own_container, str): + self.__create_own_container = normalize_typed_substitution(TextSubstitution(text=create_own_container), bool) + else: + self.__create_own_container = normalize_typed_substitution(create_own_container, bool) + + if isinstance(use_composition, str): + self.__use_composition = normalize_typed_substitution(TextSubstitution(text=use_composition), bool) + else: + self.__use_composition = normalize_typed_substitution(use_composition, bool) @classmethod def parse(cls, entity: Entity, parser: Parser): """Parse gz_server.""" - _, kwargs = super().parse(entity, parser) + kwargs:Dict = super().parse(entity, parser)[1] world_sdf_file = entity.get_attr( 'world_sdf_file', data_type=str, @@ -167,87 +177,70 @@ def parse(cls, entity: Entity, parser: Parser): def execute(self, context: LaunchContext) -> Optional[List[Action]]: """Execute the action.""" - if isinstance(self.__use_composition, list): - self.__use_composition = self.__use_composition[0] - if isinstance(self.__create_own_container, list): - self.__create_own_container = self.__create_own_container[0] + launch_descriptions: List[Action] = [] model_paths, plugin_paths = GazeboRosPaths.get_paths() - system_plugin_path_env = SetEnvironmentVariable( + launch_descriptions.append(SetEnvironmentVariable( 'GZ_SIM_SYSTEM_PLUGIN_PATH', os.pathsep.join([ os.environ.get('GZ_SIM_SYSTEM_PLUGIN_PATH', default=''), os.environ.get('LD_LIBRARY_PATH', default=''), plugin_paths, - ])) - resource_path_env = SetEnvironmentVariable( + ]))) + launch_descriptions.append(SetEnvironmentVariable( 'GZ_SIM_RESOURCE_PATH', os.pathsep.join([ os.environ.get('GZ_SIM_RESOURCE_PATH', default=''), model_paths, - ])) - - # Standard node configuration - load_nodes = GroupAction( - condition=IfCondition(PythonExpression(['not ', self.__use_composition])), - actions=[ - Node( - package='ros_gz_sim', - executable='gzserver', - output='screen', - parameters=[{'world_sdf_file': self.__world_sdf_file, - 'world_sdf_string': self.__world_sdf_string}], - ), - ], - ) + ]))) + + use_composition_eval = perform_typed_substitution(context, self.__use_composition, bool) + create_own_container_eval = perform_typed_substitution(context, self.__create_own_container, bool) + if not use_composition_eval: + # Standard node configuration + launch_descriptions.append(Node( + package='ros_gz_sim', + executable='gzserver', + output='screen', + parameters=[{'world_sdf_file': self.__world_sdf_file, + 'world_sdf_string': self.__world_sdf_string}], + )) # Composable node with container configuration - load_composable_nodes_with_container = ComposableNodeContainer( - condition=IfCondition( - PythonExpression([self.__use_composition, ' and ', self.__create_own_container]) - ), - name=self.__container_name, - namespace='', - package='rclcpp_components', - executable='component_container', - composable_node_descriptions=[ - ComposableNode( - package='ros_gz_sim', - plugin='ros_gz_sim::GzServer', - name='gz_server', - parameters=[{'world_sdf_file': self.__world_sdf_file, - 'world_sdf_string': self.__world_sdf_string}], - extra_arguments=[{'use_intra_process_comms': True}], - ), - ], - output='screen', - ) + if use_composition_eval and create_own_container_eval: + launch_descriptions.append(ComposableNodeContainer( + name=self.__container_name, + namespace='', + package='rclcpp_components', + executable='component_container', + composable_node_descriptions=[ + ComposableNode( + package='ros_gz_sim', + plugin='ros_gz_sim::GzServer', + name='gz_server', + parameters=[{'world_sdf_file': self.__world_sdf_file, + 'world_sdf_string': self.__world_sdf_string}], + extra_arguments=[{'use_intra_process_comms': True}], + ), + ], + output='screen', + )) # Composable node without container configuration - load_composable_nodes_without_container = LoadComposableNodes( - condition=IfCondition( - PythonExpression( - [self.__use_composition, ' and not ', self.__create_own_container] - ) - ), - target_container=self.__container_name, - composable_node_descriptions=[ - ComposableNode( - package='ros_gz_sim', - plugin='ros_gz_sim::GzServer', - name='gz_server', - parameters=[{'world_sdf_file': self.__world_sdf_file, - 'world_sdf_string': self.__world_sdf_string}], - extra_arguments=[{'use_intra_process_comms': True}], - ), - ], - ) - - return [ - system_plugin_path_env, - resource_path_env, - load_nodes, - load_composable_nodes_with_container, - load_composable_nodes_without_container - ] + if use_composition_eval and not create_own_container_eval: + launch_descriptions.append(LoadComposableNodes( + target_container=self.__container_name, + composable_node_descriptions=[ + ComposableNode( + package='ros_gz_sim', + plugin='ros_gz_sim::GzServer', + name='gz_server', + parameters=[{'world_sdf_file': self.__world_sdf_file, + 'world_sdf_string': self.__world_sdf_string}], + extra_arguments=[{'use_intra_process_comms': True}], + ), + ], + )) + + return launch_descriptions