-
-
Notifications
You must be signed in to change notification settings - Fork 91
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Sander van Kasteel <[email protected]>
- Loading branch information
1 parent
7795fca
commit 4f9296a
Showing
9 changed files
with
233 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,27 @@ | ||
"""Xml commands.""" | ||
"""Xml commands module.""" | ||
from deebot_client.command import Command, CommandMqttP2P | ||
|
||
from .common import XmlCommand | ||
from .error import GetError | ||
|
||
__all__ = [ | ||
"GetError", | ||
] | ||
|
||
# fmt: off | ||
# ordered by file asc | ||
_COMMANDS: list[type[XmlCommand]] = [ | ||
GetError, | ||
] | ||
# fmt: on | ||
|
||
COMMANDS: dict[str, type[Command]] = { | ||
cmd.name: cmd # type: ignore[misc] | ||
for cmd in _COMMANDS | ||
} | ||
|
||
COMMANDS_WITH_MQTT_P2P_HANDLING: dict[str, type[CommandMqttP2P]] = { | ||
cmd_name: cmd | ||
for (cmd_name, cmd) in COMMANDS.items() | ||
if issubclass(cmd, CommandMqttP2P) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,62 @@ | ||
"""Common xml based commands.""" | ||
from abc import ABC, abstractmethod | ||
from typing import cast | ||
from xml.etree.ElementTree import Element, SubElement | ||
|
||
from defusedxml import ElementTree # type: ignore[import-untyped] | ||
|
||
from xml.etree import ElementTree | ||
|
||
from deebot_client.command import Command | ||
from deebot_client.command import Command, CommandWithMessageHandling | ||
from deebot_client.const import DataType | ||
from deebot_client.event_bus import EventBus | ||
from deebot_client.logging_filter import get_logger | ||
from deebot_client.message import HandlingResult, MessageStr | ||
|
||
_LOGGER = get_logger(__name__) | ||
|
||
|
||
class XmlCommand(Command): | ||
"""Xml command.""" | ||
|
||
data_type: DataType = DataType.XML | ||
|
||
@property | ||
def has_sub_element(self) -> bool: | ||
@property # type: ignore[misc] | ||
@classmethod | ||
def has_sub_element(cls) -> bool: | ||
"""Return True if command has inner element.""" | ||
return False | ||
|
||
def _get_payload(self) -> str: | ||
element = ctl_element = ElementTree.Element("ctl") | ||
element = ctl_element = Element("ctl") | ||
|
||
if len(self._args) > 0: | ||
if self.has_sub_element: | ||
element = ElementTree.SubElement(element, self.name.lower()) | ||
element = SubElement(element, self.name.lower()) | ||
|
||
if isinstance(self._args, dict): | ||
for key, value in self._args.items(): | ||
element.set(key, value) | ||
|
||
return ElementTree.tostring(ctl_element, "unicode") | ||
return cast(str, ElementTree.tostring(ctl_element, "unicode")) | ||
|
||
|
||
class XmlCommandWithMessageHandling( | ||
XmlCommand, CommandWithMessageHandling, MessageStr, ABC | ||
): | ||
"""Xml command, which handle response by itself.""" | ||
|
||
@classmethod | ||
@abstractmethod | ||
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: | ||
"""Handle xml message and notify the correct event subscribers. | ||
:return: A message response | ||
""" | ||
|
||
@classmethod | ||
def _handle_str(cls, event_bus: EventBus, message: str) -> HandlingResult: | ||
"""Handle string message and notify the correct event subscribers. | ||
:return: A message response | ||
""" | ||
xml = ElementTree.fromstring(message) | ||
return cls._handle_xml(event_bus, xml) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
"""Error commands.""" | ||
from xml.etree.ElementTree import Element | ||
|
||
from deebot_client.const import ERROR_CODES | ||
from deebot_client.event_bus import EventBus | ||
from deebot_client.events import ErrorEvent, StateEvent | ||
from deebot_client.message import HandlingResult | ||
from deebot_client.models import State | ||
|
||
from .common import XmlCommandWithMessageHandling | ||
|
||
|
||
class GetError(XmlCommandWithMessageHandling): | ||
"""Get error command.""" | ||
|
||
name = "GetError" | ||
|
||
@classmethod | ||
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: | ||
"""Handle xml message and notify the correct event subscribers. | ||
:return: A message response | ||
""" | ||
error_code = int(errs) if (errs := xml.attrib["errs"]) else 0 | ||
|
||
if error_code != 0: | ||
event_bus.notify(StateEvent(State.ERROR)) | ||
|
||
description = ERROR_CODES.get(error_code) | ||
event_bus.notify(ErrorEvent(error_code, description)) | ||
return HandlingResult.success() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from collections.abc import Sequence | ||
|
||
import pytest | ||
|
||
from deebot_client.commands.json import GetError | ||
from deebot_client.events import ErrorEvent, StateEvent | ||
from deebot_client.events.base import Event | ||
from deebot_client.models import State | ||
from tests.helpers import get_request_json, get_success_body | ||
|
||
from . import assert_command | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("code", "expected_events"), | ||
[ | ||
(0, ErrorEvent(0, "NoError: Robot is operational")), | ||
(105, [StateEvent(State.ERROR), ErrorEvent(105, "Stuck: Robot is stuck")]), | ||
], | ||
) | ||
async def test_getErrors(code: int, expected_events: Event | Sequence[Event]) -> None: | ||
json = get_request_json(get_success_body({"code": [code]})) | ||
await assert_command(GetError(), json, expected_events) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from functools import partial | ||
from typing import Any | ||
|
||
from deebot_client.hardware.deebot import get_static_device_info | ||
from tests.commands import assert_command as assert_command_base | ||
|
||
assert_command = partial( | ||
assert_command_base, static_device_info=get_static_device_info("ls1ok3") | ||
) | ||
|
||
|
||
def get_request_xml(data: str | None) -> dict[str, Any]: | ||
return {"id": "ALZf", "ret": "ok", "resp": data, "payloadType": "x"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from collections.abc import Sequence | ||
|
||
import pytest | ||
|
||
from deebot_client.commands.xml import GetError | ||
from deebot_client.events import ErrorEvent, StateEvent | ||
from deebot_client.events.base import Event | ||
from deebot_client.models import State | ||
|
||
from . import assert_command, get_request_xml | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("errs", "expected_events"), | ||
[ | ||
("", ErrorEvent(0, "NoError: Robot is operational")), | ||
("105", [StateEvent(State.ERROR), ErrorEvent(105, "Stuck: Robot is stuck")]), | ||
], | ||
) | ||
async def test_getErrors(errs: str, expected_events: Event | Sequence[Event]) -> None: | ||
json = get_request_xml(f"<ctl ret='ok' errs='{errs}'/>") | ||
await assert_command(GetError(), json, expected_events) |