Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parameter handling #664

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions ros_gz_bridge/ros_gz_bridge/actions/ros_gz_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@

"""Module for the ros_gz bridge action."""

from typing import Dict, List, Optional, Union
from typing import cast, Dict, List, Optional, Union

from launch.action import Action
from launch.frontend import Entity, expose_action, Parser
from launch.launch_context import LaunchContext
from launch.some_substitutions_type import SomeSubstitutionsType
from launch.substitutions import TextSubstitution
from launch.utilities import ensure_argument_type
from launch.utilities.type_utils import normalize_typed_substitution, perform_typed_substitution
from launch_ros.actions import ComposableNodeContainer, LoadComposableNodes, Node
from launch_ros.descriptions import ComposableNode
from launch_ros.parameters_type import SomeParameters


@expose_action('ros_gz_bridge')
Expand All @@ -41,7 +43,7 @@ def __init__(
use_composition: Union[bool, SomeSubstitutionsType] = False,
use_respawn: Union[bool, SomeSubstitutionsType] = False,
log_level: SomeSubstitutionsType = 'info',
bridge_params: SomeSubstitutionsType = '',
bridge_params: Optional[SomeParameters] = None,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -89,7 +91,13 @@ def __init__(

self.__use_respawn = normalize_typed_substitution(use_respawn, bool)
self.__log_level = log_level
self.__bridge_params = bridge_params
self.__bridge_params = [{'config_file': self.__config_file}]
if bridge_params is not None:
# This handling of bridge_params was copied from launch_ros/actions/node.py
ensure_argument_type(bridge_params, (list), 'bridge_params', 'RosGzBridge')
# All elements in the list are paths to files with parameters (or substitutions that
# evaluate to paths), or dictionaries of parameters (fields can be substitutions).
self.__bridge_params.extend(cast(list, bridge_params))

@classmethod
def parse(cls, entity: Entity, parser: Parser):
Expand Down Expand Up @@ -128,9 +136,7 @@ def parse(cls, entity: Entity, parser: Parser):
'log_level', data_type=str,
optional=True)

bridge_params = entity.get_attr(
'bridge_params', data_type=str,
optional=True)
parameters = entity.get_attr('param', data_type=List[Entity], optional=True)

if isinstance(bridge_name, str):
bridge_name = parser.parse_substitution(bridge_name)
Expand Down Expand Up @@ -165,31 +171,13 @@ def parse(cls, entity: Entity, parser: Parser):
log_level = parser.parse_substitution(log_level)
kwargs['log_level'] = log_level

if isinstance(bridge_params, str):
bridge_params = parser.parse_substitution(bridge_params)
kwargs['bridge_params'] = bridge_params
if parameters is not None:
kwargs['bridge_params'] = Node.parse_nested_parameters(parameters, parser)

return cls, kwargs

def execute(self, context: LaunchContext) -> Optional[List[Action]]:
"""Execute the action."""
if hasattr(self.__bridge_params, 'perform'):
string_bridge_params = self.__bridge_params.perform(context)
elif isinstance(self.__bridge_params, list):
if hasattr(self.__bridge_params[0], 'perform'):
string_bridge_params = self.__bridge_params[0].perform(context)
else:
string_bridge_params = str(self.__bridge_params)
# Remove unnecessary symbols
simplified_bridge_params = string_bridge_params.translate(
{ord(i): None for i in '{} "\''}
)
# Parse to dictionary
parsed_bridge_params = {}
if simplified_bridge_params:
bridge_params_pairs = simplified_bridge_params.split(',')
parsed_bridge_params = dict(pair.split(':') for pair in bridge_params_pairs)

use_composition_eval = perform_typed_substitution(
context, self.__use_composition, bool
)
Expand All @@ -209,7 +197,7 @@ def execute(self, context: LaunchContext) -> Optional[List[Action]]:
output='screen',
respawn=perform_typed_substitution(context, self.__use_respawn, bool),
respawn_delay=2.0,
parameters=[{'config_file': self.__config_file, **parsed_bridge_params}],
parameters=self.__bridge_params,
arguments=['--ros-args', '--log-level', self.__log_level],
))

Expand All @@ -226,7 +214,7 @@ def execute(self, context: LaunchContext) -> Optional[List[Action]]:
plugin='ros_gz_bridge::RosGzBridge',
name=self.__bridge_name,
namespace=self.__namespace,
parameters=[{'config_file': self.__config_file, **parsed_bridge_params}],
parameters=self.__bridge_params,
extra_arguments=[{'use_intra_process_comms': True}],
),
],
Expand All @@ -243,7 +231,7 @@ def execute(self, context: LaunchContext) -> Optional[List[Action]]:
plugin='ros_gz_bridge::RosGzBridge',
name=self.__bridge_name,
namespace=self.__namespace,
parameters=[{'config_file': self.__config_file, **parsed_bridge_params}],
parameters=self.__bridge_params,
extra_arguments=[{'use_intra_process_comms': True}],
),
],
Expand Down
Loading