Skip to content

Commit

Permalink
feat: Capella model to .msg files
Browse files Browse the repository at this point in the history
  • Loading branch information
huyenngn committed Nov 13, 2023
1 parent 064a097 commit b7afcfb
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 111 deletions.
7 changes: 5 additions & 2 deletions rosidl2capella/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import click

import rosidl2capella
from rosidl2capella.msg2capella import msg2capella
from rosidl2capella.capella2msg import capella2msg

# from rosidl2capella.msg2capella import msg2capella


@click.command()
Expand All @@ -18,4 +20,5 @@ def main():


if __name__ == "__main__":
msg2capella()
# msg2capella()
capella2msg()
113 changes: 113 additions & 0 deletions rosidl2capella/capella2msg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright DB Netz AG and contributors
# SPDX-License-Identifier: Apache-2.0
"""CLI for importing .msg to capella model."""
import sys
from pathlib import Path

import click

import rosidl2capella
from rosidl2capella.modules import BASIC_TYPES, ROS_INTERFACES
from rosidl2capella.modules.parse_capella import ParseCapella
from rosidl2capella.modules.serialize_message import (
SerializeMessageDef,
SerializeMessagesPkg,
)


class Capella2Msg:
"""Class for converting capella model to .msg files."""

def __init__(self, path_to_capella_model, layer, overlap) -> None:
self.parser = ParseCapella(path_to_capella_model, layer)
self.overlap = overlap

def add_package(self, current_root):
"""Add package to message package."""
out = SerializeMessagesPkg({}, {})

messages = self.parser.get_classes(current_root)
types = self.parser.get_types(current_root)

out.messages = {
msg_name: SerializeMessageDef(msg_name, desc, props)
for msg_name, (desc, props) in (messages | types).items()
}
out.packages = {
pkg_name: self.add_package(current_root.packages.by_name(pkg_name))
for pkg_name in self.parser.get_packages(current_root)
}
return out


@click.command()
@click.version_option(
version=rosidl2capella.__version__,
prog_name="rosidl2capella",
message="%(prog)s %(version)s",
)
@click.argument(
"path-to-msgs-root",
type=click.Path(
file_okay=False,
readable=True,
resolve_path=True,
path_type=Path,
),
required=True,
)
@click.argument(
"path-to-capella-model",
type=click.Path(
exists=True,
readable=True,
resolve_path=True,
path_type=str,
),
required=True,
)
@click.argument(
"layer",
type=click.Choice(["oa", "sa", "la", "pa"], case_sensitive=False),
required=True,
)
@click.option(
"-o",
"--overlap",
type=click.Choice(
["keep", "overwrite", "ask", "abort"], case_sensitive=False
),
default="ask" if sys.stdin.isatty() else "abort",
)
@click.option("--debug", is_flag=True)
def capella2msg(
path_to_msgs_root, path_to_capella_model, layer, overlap, debug
):
"""Convert capella model to .msg files."""
converter = Capella2Msg(path_to_capella_model, layer, overlap)
current_root = converter.parser.data

messages = converter.parser.get_classes(current_root)
types = converter.parser.get_types(current_root)

packages = converter.parser.get_packages(current_root)
packages.discard(BASIC_TYPES)
packages.discard(ROS_INTERFACES)

root = SerializeMessagesPkg({}, {})
root.messages = {
msg_name: SerializeMessageDef(msg_name, desc, props)
for msg_name, (desc, props) in (messages | types).items()
}

root.packages = {
pkg_name: converter.add_package(
current_root.packages.by_name(pkg_name)
)
for pkg_name in packages
}

if debug:
click.echo(root)
else:
root.to_msg_folder(path_to_msgs_root)
12 changes: 8 additions & 4 deletions rosidl2capella/modules/parse_capella.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""Class definition for Capella model parser."""
import typing as t

from . import BaseCapella, EnumValue, MsgProp
from . import ROS_INTERFACES, BaseCapella, EnumValue, MsgProp


class ParseCapella(BaseCapella):
Expand All @@ -20,8 +20,12 @@ def get_classes(self, package: t.Any) -> dict[str, t.Any]:
props = [
MsgProp(
prop.name,
prop.type.name,
"",
prop.type.name
if prop.type.__class__.__name__ != "Enumeration"
else "uint8",
prop.type.parent.name
if prop.type.parent.parent.name == ROS_INTERFACES
else "",
prop.min_card.value,
prop.max_card.value,
prop.description,
Expand All @@ -38,7 +42,7 @@ def get_types(self, package: t.Any) -> dict[str, t.Any]:
props = [
EnumValue(
prop.name,
prop.type.name,
prop.value.type.name if prop.value.type else "uint8",
prop.value.value,
prop.description,
)
Expand Down
10 changes: 3 additions & 7 deletions rosidl2capella/modules/parse_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@

from . import EnumValue, MessageDef, MessagesPkg, MsgProp

RE_TNC = re.compile(
r"^([A-Za-z0-9\[\]\/_]+).*?([A-Za-z0-9_]+)(?:.*?# ([^\n]+))?"
)
RE_ENUM = re.compile(
r"^([A-Za-z0-9]+).*?([A-Za-z0-9_]+).*?= ([0-9]+)(?:.*?# ([^\n]+))?"
)
RE_COMMENT = re.compile(r"cf. ([a-zA-Z0-9_]+)(?:, ([a-zA-Z0-9_]+))?")
RE_TNC = re.compile(r"^([A-Za-z0-9\[\]\/_]+)\s+(\w+)(?:\s+#\s*(.+))?$")
RE_ENUM = re.compile(r"^(\w+)\s+(\w+)\s*=\s*(\d+)\s*(?:#\s*(.+))?$")
RE_COMMENT = re.compile(r"cf.\s+(\w+)(?:,\s+(\w+))?")


class ParseMessageDef(MessageDef):
Expand Down
24 changes: 14 additions & 10 deletions rosidl2capella/modules/serialize_capella.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
class SerializeCapella(BaseCapella):
"""Serializer for Capella model."""

def __init__(self, path_to_capella_model: str, layer: str) -> None:
super().__init__(path_to_capella_model, layer)
self.create_packages({BASIC_TYPES}, self.data)
self.basic_types = self.data.packages.by_name(BASIC_TYPES)

def create_packages(self, packages: set[str], package: t.Any) -> None:
"""Create packages in Capella model."""
for package_name in packages:
Expand Down Expand Up @@ -61,10 +66,13 @@ def create_types(
property = type.owned_literals.create(
"EnumerationLiteral",
name=prop.name,
description=description,
description=prop.comment,
)
self.create_basic_types({prop.type})
property.value = capellambse.new_object(
"LiteralNumericValue", value=float(prop.value)
"LiteralNumericValue",
value=float(prop.value),
type=self.basic_types.datatypes.by_name(prop.type),
)
return overlap

Expand All @@ -76,22 +84,20 @@ def delete_types(self, types: list, package: t.Any) -> None:
except KeyError:
pass

def create_basic_types(
self, basic_types: set[str], package: t.Any
) -> list:
def create_basic_types(self, basic_types: set[str]) -> list:
"""Create basic types in Capella model."""
overlap = []
for basic_type in basic_types:
try:
overlap.append(package.datatypes.by_name(basic_type))
overlap.append(self.basic_types.datatypes.by_name(basic_type))
except KeyError:
if basic_type in ["string", "char"]:
type = "StringType"
elif basic_type == "bool":
type = "BooleanType"
else:
type = "NumericType"
package.datatypes.create(type, name=basic_type)
self.basic_types.datatypes.create(type, name=basic_type)
return overlap

def create_composition(
Expand Down Expand Up @@ -154,9 +160,7 @@ def _find_type(self, type_name: str, package: t.Any) -> t.Any:
except KeyError:
pass
try:
return self.data.packages.by_name(BASIC_TYPES).datatypes.by_name(
type_name
)
return self.basic_types.datatypes.by_name(type_name)
except KeyError:
return None

Expand Down
51 changes: 51 additions & 0 deletions rosidl2capella/modules/serialize_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright DB Netz AG and contributors
# SPDX-License-Identifier: Apache-2.0
"""Serializer for ROS messages."""

from pathlib import Path

from . import MessageDef, MessagesPkg


class SerializeMessageDef(MessageDef):
"""Serializer for message files."""

def to_msg_file(self, file: Path) -> None:
"""Write message definition to message file."""
description = "# " + self.description.replace("\n", "\n# ") + "\n"
props = "\n".join(
f"{p.typedir+'/' if p.typedir else ''}{p.type}"
f"{'[]' if p.min != p.max else ''} {p.name}\t"
f"{'# ' if p.comment else ''}{p.comment}"
for p in self.props
)
file.write_text(description + "\n" + props)

def to_type_file(self, file: Path) -> None:
"""Write message definition to message file."""
description = "# " + self.description.replace("\n", "\n# ") + "\n"
props = "\n".join(
f"{p.type} {file.stem + '_' + p.name}\t= {p.value}\t"
f"{'# ' if p.comment else ''}{p.comment}"
for p in self.props
)
file.write_text(description + "\n" + props)


class SerializeMessagesPkg(MessagesPkg):
"""Serializer for message packages."""

def to_msg_folder(self, path_to_pkg_root: Path) -> None:
"""Write message package to message package."""
path_to_pkg_root.mkdir(parents=True, exist_ok=True)
for msg_name, msg in self.messages.items():
msg_file = path_to_pkg_root.joinpath(msg_name + ".msg")
if path_to_pkg_root.name == "types":
msg.to_type_file(msg_file)
else:
msg.to_msg_file(msg_file)

for pkg_name, pkg in self.packages.items():
new_path = path_to_pkg_root.joinpath(pkg_name)
new_path.mkdir(parents=True, exist_ok=True)
pkg.to_msg_folder(new_path)
43 changes: 20 additions & 23 deletions rosidl2capella/msg2capella.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click

import rosidl2capella
from rosidl2capella.modules import BASIC_TYPES, ROS_INTERFACES
from rosidl2capella.modules import ROS_INTERFACES
from rosidl2capella.modules.parse_message import ParseMessagesPkg
from rosidl2capella.modules.serialize_capella import SerializeCapella

Expand All @@ -32,7 +32,10 @@ def add_objects(self, messages, packages, current_root):
)
)

overlap = func[0](messages, current_root)
overlap = func[0](
{msg_name: msg.as_struct for msg_name, msg in messages.items()},
current_root,
)

if overlap and self.overlap == "abort":
click.echo(
Expand All @@ -47,32 +50,29 @@ def add_objects(self, messages, packages, current_root):
)
):
func[1]([cls], current_root)
func[0]({cls.name: messages[cls.name]}, current_root)
func[0]({cls.name: messages[cls.name].as_struct}, current_root)

for pkg_name, (new_messages, new_packages) in packages.items():
new_root = self.serializer.data.packages.by_name(pkg_name)
self.add_objects(new_messages, new_packages, new_root)
for pkg_name, pkg in packages.items():
new_root = current_root.packages.by_name(pkg_name)
self.add_objects(pkg.messages, pkg.packages, new_root)

def add_relations(self, messages, packages, current_root):
"""Add relations to capella model."""
if current_root.name == "types":
return
for class_name, (_, props) in messages.items():
for prop in props:
for class_name, cls in messages.items():
for prop in cls.props:
if not self.serializer.create_composition(
class_name, prop, current_root
):
while not self.serializer.create_attribute(
class_name, prop, current_root
):
self.serializer.create_basic_types(
{prop.type},
self.serializer.data.packages.by_name(BASIC_TYPES),
)
self.serializer.create_basic_types({prop.type})

for pkg_name, (new_messages, new_packages) in packages.items():
new_root = self.serializer.data.packages.by_name(pkg_name)
self.add_relations(new_messages, new_packages, new_root)
for pkg_name, pkg in packages.items():
new_root = current_root.packages.by_name(pkg_name)
self.add_relations(pkg.messages, pkg.packages, new_root)


@click.command()
Expand All @@ -96,7 +96,6 @@ def add_relations(self, messages, packages, current_root):
"path-to-capella-model",
type=click.Path(
exists=True,
file_okay=False,
readable=True,
resolve_path=True,
path_type=str,
Expand Down Expand Up @@ -125,18 +124,16 @@ def msg2capella(

ros_interfaces = ParseMessagesPkg.from_pkg_folders(
Path(__file__).joinpath("ros_interfaces")
).as_structs
)

messages, packages = ParseMessagesPkg.from_msg_folder(
path_to_msgs_root
).as_structs
packages |= {ROS_INTERFACES: ros_interfaces} | {BASIC_TYPES: ({}, {})}
msg = ParseMessagesPkg.from_msg_folder(path_to_msgs_root)
msg.packages |= {ROS_INTERFACES: ros_interfaces}

current_root = converter.serializer.data

converter.add_objects(messages, packages, current_root)
converter.add_objects(msg.messages, msg.packages, current_root)

converter.add_relations(messages, packages, current_root)
converter.add_relations(msg.messages, msg.packages, current_root)

if debug:
click.echo(converter.serializer.data)
Expand Down
Loading

0 comments on commit b7afcfb

Please sign in to comment.