Skip to content

Commit

Permalink
feat: Convert capella to messages
Browse files Browse the repository at this point in the history
  • Loading branch information
huyenngn committed Jan 10, 2024
1 parent 759c54e commit 247d0f9
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 36 deletions.
49 changes: 23 additions & 26 deletions capella_ros_tools/modules/capella/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,6 @@ def get_classes(self, package: t.Any) -> list[ClassDef]:
for cls in package.classes:
props = []
for prop in cls.owned_properties:
type: t.Any
if prop.type.__class__.__name__ == "Enumeration":
type = EnumDef(
prop.type.name,
[
EnumValue(
literal.value.type.name,
literal.name,
literal.value.value,
literal.description,
)
for literal in prop.type.owned_literals
],
prop.type.description,
)
elif prop.type.__class__.__name__ == "Class":
type = ClassDef(
prop.type.name,
[],
prop.type.description,
)
else:
type = prop.type.name

type_pkg_name = prop.type.parent.name
if type_pkg_name in [
"Predefined Types",
Expand All @@ -52,9 +28,9 @@ def get_classes(self, package: t.Any) -> list[ClassDef]:

props.append(
ClassProperty(
prop.name,
type,
prop.type.name,
type_pkg_name,
prop.name,
prop.min_card.value,
prop.max_card.value,
prop.description,
Expand All @@ -68,3 +44,24 @@ def get_classes(self, package: t.Any) -> list[ClassDef]:
)
)
return classes

def get_enums(self, package: t.Any) -> list[EnumDef]:
"""Get enums in Capella model."""
enums = []
for enum in package.enumerations:
enums.append(
EnumDef(
enum.name,
[
EnumValue(
literal.value.type.name,
literal.name,
literal.value.value,
literal.description,
)
for literal in enum.owned_literals
],
enum.description,
)
)
return enums
44 changes: 37 additions & 7 deletions capella_ros_tools/modules/messages/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,35 @@ def to_msg_file(self, msg_file: Path):

def to_msg_string(self) -> str:
"""Convert message to string."""
msg_string = "\n".join(self.annotations) + "\n"
msg_string += (
"\n".join([f"{f.type} {f.name}" for f in self.fields]) + "\n\n"
)
msg_string += (
"\n\n".join([f"{e.name}\n{e.values}" for e in self.enums]) + "\n\n"
)
msg_string = "\n".join(self.annotations) + "\n\n"
for enum in self.enums:
msg_string += (
f"# name: {enum.name}" + "\n".join(enum.annotations) + "\n"
)
msg_string += (
"\n".join(
[
f"{value.type.name} {value.name} = {value.value}"
+ "\n".join(self.annotations)
for value in enum.values
]
)
+ "\n\n"
)

for field in self.fields:
msg_string += (
f"{(field.type.pkg_name + '/') if field.type.pkg_name else ''}"
)
msg_string += f"{field.type.name}"
if field.type.array_size == float("inf"):
msg_string += "[]"
elif field.type.array_size:
msg_string += f"[{field.type.array_size}]"
msg_string += (
f" {field.name}" + "\n".join(self.annotations) + "\n\n"
)

return msg_string


Expand All @@ -36,3 +58,11 @@ def to_msg_folder(self, msg_pkg_dir: Path):
msg_pkg_dir.mkdir(parents=True, exist_ok=True)
for msg in self.messages:
msg.to_msg_file(msg_pkg_dir / f"{msg.name}.msg")
for pkg in self.packages:
pkg.to_msg_folder(msg_pkg_dir / pkg.name)

def to_pkg_folder(self, pkg_dir: Path):
"""Write packages to folder."""
pkg_dir.mkdir(parents=True, exist_ok=True)
for pkg in self.packages:
pkg.to_msg_folder(pkg_dir / pkg.name / "msg")
88 changes: 85 additions & 3 deletions capella_ros_tools/scripts/capella2msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
import typing as t

from capella_ros_tools.modules.capella.parser import CapellaModel
from capella_ros_tools.modules.messages import (
BaseTypeDef,
ConstantDef,
EnumDef,
FieldDef,
)
from capella_ros_tools.modules.messages.serializer import (
MessageDef,
MessagePkgDef,
)

CAPELLA_TYPE_TO_MSG = {
"Boolean": "bool",
Expand All @@ -26,10 +36,82 @@
class Converter:
"""Convert Capella data to ROS messages."""

def __init__(self, capella_path: t.Any, layer: str, merge: str) -> None:
def __init__(
self,
msg_path: t.Any,
capella_path: t.Any,
layer: str,
action: str,
no_deps: bool,
) -> None:
self.msg_path = msg_path
self.msgs = MessagePkgDef(msg_path.stem, [], [])
self.model = CapellaModel(capella_path, layer)
self.merge = merge
self.action = action
self.no_deps = no_deps

def _add_package(self, current_root: t.Any) -> MessagePkgDef:
"""Add package to message package definition."""
current_pkg_def = MessagePkgDef(current_root.name, [], [])

for cls in self.model.get_classes(current_root):
current_pkg_def.messages.append(
MessageDef(
cls.name,
[
FieldDef(
BaseTypeDef(
CAPELLA_TYPE_TO_MSG[prop.type_name]
if prop.type_name in CAPELLA_TYPE_TO_MSG
else prop.type_name,
None if prop.max_card == 1 else prop.max_card,
None
if prop.type_pkg_name == current_pkg_def.name
else prop.type_pkg_name,
),
prop.name,
prop.description.split("\n"),
)
for prop in cls.properties
],
[],
cls.description.split("\n"),
)
)

for enum in self.model.get_enums(current_root):
current_pkg_def.messages.append(
MessageDef(
enum.name,
[],
[
EnumDef(
enum.name,
[
ConstantDef(
BaseTypeDef(
CAPELLA_TYPE_TO_MSG[value.type]
),
value.name,
str(value.value),
value.description.split("\n"),
)
for value in enum.values
],
[],
)
],
enum.description.split("\n"),
)
)

for pkg_name in self.model.get_packages(current_root):
new_root = current_root.packages.by_name(pkg_name)
current_pkg_def.packages.append(self._add_package(new_root))

return current_pkg_def

def convert(self) -> None:
"""Convert Capella data to ROS messages."""
return
self.msgs.packages.append(self._add_package(self.model.data))
self.msgs.to_pkg_folder(self.msg_path)

0 comments on commit 247d0f9

Please sign in to comment.