From b7afcfb787f24c253a2a8a04bd10d8ba21e05f51 Mon Sep 17 00:00:00 2001 From: huyenngn Date: Mon, 13 Nov 2023 15:19:07 +0100 Subject: [PATCH] feat: Capella model to .msg files --- rosidl2capella/__main__.py | 7 +- rosidl2capella/capella2msg.py | 113 +++++++++++++++ rosidl2capella/modules/parse_capella.py | 12 +- rosidl2capella/modules/parse_message.py | 10 +- rosidl2capella/modules/serialize_capella.py | 24 ++-- rosidl2capella/modules/serialize_message.py | 51 +++++++ rosidl2capella/msg2capella.py | 43 +++--- tests/data/model/ros_msgs.capella | 147 +++++++++++--------- 8 files changed, 296 insertions(+), 111 deletions(-) create mode 100644 rosidl2capella/capella2msg.py create mode 100644 rosidl2capella/modules/serialize_message.py diff --git a/rosidl2capella/__main__.py b/rosidl2capella/__main__.py index 9202995..20331d1 100644 --- a/rosidl2capella/__main__.py +++ b/rosidl2capella/__main__.py @@ -4,7 +4,9 @@ import click import rosidl2capella -from rosidl2capella.msg2capella import msg2capella +from rosidl2capella.capella2msg import capella2msg + +# from rosidl2capella.msg2capella import msg2capella @click.command() @@ -18,4 +20,5 @@ def main(): if __name__ == "__main__": - msg2capella() + # msg2capella() + capella2msg() diff --git a/rosidl2capella/capella2msg.py b/rosidl2capella/capella2msg.py new file mode 100644 index 0000000..57b0547 --- /dev/null +++ b/rosidl2capella/capella2msg.py @@ -0,0 +1,113 @@ +# Copyright DB Netz AG and contributors +# SPDX-License-Identifier: Apache-2.0 +"""CLI for importing .msg to capella model.""" +import sys +from pathlib import Path + +import click + +import rosidl2capella +from rosidl2capella.modules import BASIC_TYPES, ROS_INTERFACES +from rosidl2capella.modules.parse_capella import ParseCapella +from rosidl2capella.modules.serialize_message import ( + SerializeMessageDef, + SerializeMessagesPkg, +) + + +class Capella2Msg: + """Class for converting capella model to .msg files.""" + + def __init__(self, path_to_capella_model, layer, overlap) -> None: + self.parser = ParseCapella(path_to_capella_model, layer) + self.overlap = overlap + + def add_package(self, current_root): + """Add package to message package.""" + out = SerializeMessagesPkg({}, {}) + + messages = self.parser.get_classes(current_root) + types = self.parser.get_types(current_root) + + out.messages = { + msg_name: SerializeMessageDef(msg_name, desc, props) + for msg_name, (desc, props) in (messages | types).items() + } + out.packages = { + pkg_name: self.add_package(current_root.packages.by_name(pkg_name)) + for pkg_name in self.parser.get_packages(current_root) + } + return out + + +@click.command() +@click.version_option( + version=rosidl2capella.__version__, + prog_name="rosidl2capella", + message="%(prog)s %(version)s", +) +@click.argument( + "path-to-msgs-root", + type=click.Path( + file_okay=False, + readable=True, + resolve_path=True, + path_type=Path, + ), + required=True, +) +@click.argument( + "path-to-capella-model", + type=click.Path( + exists=True, + readable=True, + resolve_path=True, + path_type=str, + ), + required=True, +) +@click.argument( + "layer", + type=click.Choice(["oa", "sa", "la", "pa"], case_sensitive=False), + required=True, +) +@click.option( + "-o", + "--overlap", + type=click.Choice( + ["keep", "overwrite", "ask", "abort"], case_sensitive=False + ), + default="ask" if sys.stdin.isatty() else "abort", +) +@click.option("--debug", is_flag=True) +def capella2msg( + path_to_msgs_root, path_to_capella_model, layer, overlap, debug +): + """Convert capella model to .msg files.""" + converter = Capella2Msg(path_to_capella_model, layer, overlap) + current_root = converter.parser.data + + messages = converter.parser.get_classes(current_root) + types = converter.parser.get_types(current_root) + + packages = converter.parser.get_packages(current_root) + packages.discard(BASIC_TYPES) + packages.discard(ROS_INTERFACES) + + root = SerializeMessagesPkg({}, {}) + root.messages = { + msg_name: SerializeMessageDef(msg_name, desc, props) + for msg_name, (desc, props) in (messages | types).items() + } + + root.packages = { + pkg_name: converter.add_package( + current_root.packages.by_name(pkg_name) + ) + for pkg_name in packages + } + + if debug: + click.echo(root) + else: + root.to_msg_folder(path_to_msgs_root) diff --git a/rosidl2capella/modules/parse_capella.py b/rosidl2capella/modules/parse_capella.py index aab1ee4..54351c8 100644 --- a/rosidl2capella/modules/parse_capella.py +++ b/rosidl2capella/modules/parse_capella.py @@ -3,7 +3,7 @@ """Class definition for Capella model parser.""" import typing as t -from . import BaseCapella, EnumValue, MsgProp +from . import ROS_INTERFACES, BaseCapella, EnumValue, MsgProp class ParseCapella(BaseCapella): @@ -20,8 +20,12 @@ def get_classes(self, package: t.Any) -> dict[str, t.Any]: props = [ MsgProp( prop.name, - prop.type.name, - "", + prop.type.name + if prop.type.__class__.__name__ != "Enumeration" + else "uint8", + prop.type.parent.name + if prop.type.parent.parent.name == ROS_INTERFACES + else "", prop.min_card.value, prop.max_card.value, prop.description, @@ -38,7 +42,7 @@ def get_types(self, package: t.Any) -> dict[str, t.Any]: props = [ EnumValue( prop.name, - prop.type.name, + prop.value.type.name if prop.value.type else "uint8", prop.value.value, prop.description, ) diff --git a/rosidl2capella/modules/parse_message.py b/rosidl2capella/modules/parse_message.py index 5de0778..d4c6b26 100644 --- a/rosidl2capella/modules/parse_message.py +++ b/rosidl2capella/modules/parse_message.py @@ -7,13 +7,9 @@ from . import EnumValue, MessageDef, MessagesPkg, MsgProp -RE_TNC = re.compile( - r"^([A-Za-z0-9\[\]\/_]+).*?([A-Za-z0-9_]+)(?:.*?# ([^\n]+))?" -) -RE_ENUM = re.compile( - r"^([A-Za-z0-9]+).*?([A-Za-z0-9_]+).*?= ([0-9]+)(?:.*?# ([^\n]+))?" -) -RE_COMMENT = re.compile(r"cf. ([a-zA-Z0-9_]+)(?:, ([a-zA-Z0-9_]+))?") +RE_TNC = re.compile(r"^([A-Za-z0-9\[\]\/_]+)\s+(\w+)(?:\s+#\s*(.+))?$") +RE_ENUM = re.compile(r"^(\w+)\s+(\w+)\s*=\s*(\d+)\s*(?:#\s*(.+))?$") +RE_COMMENT = re.compile(r"cf.\s+(\w+)(?:,\s+(\w+))?") class ParseMessageDef(MessageDef): diff --git a/rosidl2capella/modules/serialize_capella.py b/rosidl2capella/modules/serialize_capella.py index e8619e3..c6d430f 100644 --- a/rosidl2capella/modules/serialize_capella.py +++ b/rosidl2capella/modules/serialize_capella.py @@ -11,6 +11,11 @@ class SerializeCapella(BaseCapella): """Serializer for Capella model.""" + def __init__(self, path_to_capella_model: str, layer: str) -> None: + super().__init__(path_to_capella_model, layer) + self.create_packages({BASIC_TYPES}, self.data) + self.basic_types = self.data.packages.by_name(BASIC_TYPES) + def create_packages(self, packages: set[str], package: t.Any) -> None: """Create packages in Capella model.""" for package_name in packages: @@ -61,10 +66,13 @@ def create_types( property = type.owned_literals.create( "EnumerationLiteral", name=prop.name, - description=description, + description=prop.comment, ) + self.create_basic_types({prop.type}) property.value = capellambse.new_object( - "LiteralNumericValue", value=float(prop.value) + "LiteralNumericValue", + value=float(prop.value), + type=self.basic_types.datatypes.by_name(prop.type), ) return overlap @@ -76,14 +84,12 @@ def delete_types(self, types: list, package: t.Any) -> None: except KeyError: pass - def create_basic_types( - self, basic_types: set[str], package: t.Any - ) -> list: + def create_basic_types(self, basic_types: set[str]) -> list: """Create basic types in Capella model.""" overlap = [] for basic_type in basic_types: try: - overlap.append(package.datatypes.by_name(basic_type)) + overlap.append(self.basic_types.datatypes.by_name(basic_type)) except KeyError: if basic_type in ["string", "char"]: type = "StringType" @@ -91,7 +97,7 @@ def create_basic_types( type = "BooleanType" else: type = "NumericType" - package.datatypes.create(type, name=basic_type) + self.basic_types.datatypes.create(type, name=basic_type) return overlap def create_composition( @@ -154,9 +160,7 @@ def _find_type(self, type_name: str, package: t.Any) -> t.Any: except KeyError: pass try: - return self.data.packages.by_name(BASIC_TYPES).datatypes.by_name( - type_name - ) + return self.basic_types.datatypes.by_name(type_name) except KeyError: return None diff --git a/rosidl2capella/modules/serialize_message.py b/rosidl2capella/modules/serialize_message.py new file mode 100644 index 0000000..177106e --- /dev/null +++ b/rosidl2capella/modules/serialize_message.py @@ -0,0 +1,51 @@ +# Copyright DB Netz AG and contributors +# SPDX-License-Identifier: Apache-2.0 +"""Serializer for ROS messages.""" + +from pathlib import Path + +from . import MessageDef, MessagesPkg + + +class SerializeMessageDef(MessageDef): + """Serializer for message files.""" + + def to_msg_file(self, file: Path) -> None: + """Write message definition to message file.""" + description = "# " + self.description.replace("\n", "\n# ") + "\n" + props = "\n".join( + f"{p.typedir+'/' if p.typedir else ''}{p.type}" + f"{'[]' if p.min != p.max else ''} {p.name}\t" + f"{'# ' if p.comment else ''}{p.comment}" + for p in self.props + ) + file.write_text(description + "\n" + props) + + def to_type_file(self, file: Path) -> None: + """Write message definition to message file.""" + description = "# " + self.description.replace("\n", "\n# ") + "\n" + props = "\n".join( + f"{p.type} {file.stem + '_' + p.name}\t= {p.value}\t" + f"{'# ' if p.comment else ''}{p.comment}" + for p in self.props + ) + file.write_text(description + "\n" + props) + + +class SerializeMessagesPkg(MessagesPkg): + """Serializer for message packages.""" + + def to_msg_folder(self, path_to_pkg_root: Path) -> None: + """Write message package to message package.""" + path_to_pkg_root.mkdir(parents=True, exist_ok=True) + for msg_name, msg in self.messages.items(): + msg_file = path_to_pkg_root.joinpath(msg_name + ".msg") + if path_to_pkg_root.name == "types": + msg.to_type_file(msg_file) + else: + msg.to_msg_file(msg_file) + + for pkg_name, pkg in self.packages.items(): + new_path = path_to_pkg_root.joinpath(pkg_name) + new_path.mkdir(parents=True, exist_ok=True) + pkg.to_msg_folder(new_path) diff --git a/rosidl2capella/msg2capella.py b/rosidl2capella/msg2capella.py index f1932d2..a0937f0 100644 --- a/rosidl2capella/msg2capella.py +++ b/rosidl2capella/msg2capella.py @@ -7,7 +7,7 @@ import click import rosidl2capella -from rosidl2capella.modules import BASIC_TYPES, ROS_INTERFACES +from rosidl2capella.modules import ROS_INTERFACES from rosidl2capella.modules.parse_message import ParseMessagesPkg from rosidl2capella.modules.serialize_capella import SerializeCapella @@ -32,7 +32,10 @@ def add_objects(self, messages, packages, current_root): ) ) - overlap = func[0](messages, current_root) + overlap = func[0]( + {msg_name: msg.as_struct for msg_name, msg in messages.items()}, + current_root, + ) if overlap and self.overlap == "abort": click.echo( @@ -47,32 +50,29 @@ def add_objects(self, messages, packages, current_root): ) ): func[1]([cls], current_root) - func[0]({cls.name: messages[cls.name]}, current_root) + func[0]({cls.name: messages[cls.name].as_struct}, current_root) - for pkg_name, (new_messages, new_packages) in packages.items(): - new_root = self.serializer.data.packages.by_name(pkg_name) - self.add_objects(new_messages, new_packages, new_root) + for pkg_name, pkg in packages.items(): + new_root = current_root.packages.by_name(pkg_name) + self.add_objects(pkg.messages, pkg.packages, new_root) def add_relations(self, messages, packages, current_root): """Add relations to capella model.""" if current_root.name == "types": return - for class_name, (_, props) in messages.items(): - for prop in props: + for class_name, cls in messages.items(): + for prop in cls.props: if not self.serializer.create_composition( class_name, prop, current_root ): while not self.serializer.create_attribute( class_name, prop, current_root ): - self.serializer.create_basic_types( - {prop.type}, - self.serializer.data.packages.by_name(BASIC_TYPES), - ) + self.serializer.create_basic_types({prop.type}) - for pkg_name, (new_messages, new_packages) in packages.items(): - new_root = self.serializer.data.packages.by_name(pkg_name) - self.add_relations(new_messages, new_packages, new_root) + for pkg_name, pkg in packages.items(): + new_root = current_root.packages.by_name(pkg_name) + self.add_relations(pkg.messages, pkg.packages, new_root) @click.command() @@ -96,7 +96,6 @@ def add_relations(self, messages, packages, current_root): "path-to-capella-model", type=click.Path( exists=True, - file_okay=False, readable=True, resolve_path=True, path_type=str, @@ -125,18 +124,16 @@ def msg2capella( ros_interfaces = ParseMessagesPkg.from_pkg_folders( Path(__file__).joinpath("ros_interfaces") - ).as_structs + ) - messages, packages = ParseMessagesPkg.from_msg_folder( - path_to_msgs_root - ).as_structs - packages |= {ROS_INTERFACES: ros_interfaces} | {BASIC_TYPES: ({}, {})} + msg = ParseMessagesPkg.from_msg_folder(path_to_msgs_root) + msg.packages |= {ROS_INTERFACES: ros_interfaces} current_root = converter.serializer.data - converter.add_objects(messages, packages, current_root) + converter.add_objects(msg.messages, msg.packages, current_root) - converter.add_relations(messages, packages, current_root) + converter.add_relations(msg.messages, msg.packages, current_root) if debug: click.echo(converter.serializer.data) diff --git a/tests/data/model/ros_msgs.capella b/tests/data/model/ros_msgs.capella index 2967327..db4693b 100644 --- a/tests/data/model/ros_msgs.capella +++ b/tests/data/model/ros_msgs.capella @@ -202,111 +202,128 @@ id="bd10fade-2ac9-41bf-a42b-05cbb751d274" name="Interfaces"/> - - + + + + + + + + + + + + - - + + - - - - + + - - - - + + - - - + + - - - - + + - - - + + - - - - - - - - - - - - - - - - - - - + - - - - + - - - + - - + - - + + + + + + + + + + + + + + + + + +