From ebbfebeff7f249f62b9d1ce3d2709b4346eb8b24 Mon Sep 17 00:00:00 2001 From: luneei Date: Sat, 4 Jan 2025 20:07:02 +0900 Subject: [PATCH] Major Update --- .../kocom_wallpad/binary_sensor.py | 12 +- custom_components/kocom_wallpad/const.py | 6 +- custom_components/kocom_wallpad/entity.py | 10 +- custom_components/kocom_wallpad/gateway.py | 17 +- custom_components/kocom_wallpad/manifest.json | 2 +- .../kocom_wallpad/pywallpad/client.py | 25 ++- .../kocom_wallpad/pywallpad/const.py | 3 + .../kocom_wallpad/pywallpad/enums.py | 3 + .../kocom_wallpad/pywallpad/packet.py | 155 +++++++++++++++--- custom_components/kocom_wallpad/switch.py | 19 ++- 10 files changed, 205 insertions(+), 47 deletions(-) diff --git a/custom_components/kocom_wallpad/binary_sensor.py b/custom_components/kocom_wallpad/binary_sensor.py index cdb0a3b..44c54ec 100644 --- a/custom_components/kocom_wallpad/binary_sensor.py +++ b/custom_components/kocom_wallpad/binary_sensor.py @@ -20,6 +20,8 @@ ThermostatPacket, FanPacket, MotionPacket, + PrivatePacket, + PublicPacket, ) from .gateway import KocomGateway @@ -38,7 +40,10 @@ async def async_setup_entry( @callback def async_add_binary_sensor(packet: KocomPacket) -> None: """Add new binary sensor entity.""" - if isinstance(packet, (ThermostatPacket, FanPacket, MotionPacket)): + if isinstance( + packet, + (ThermostatPacket, FanPacket, MotionPacket, PrivatePacket, PublicPacket) + ): async_add_entities([KocomBinarySensorEntity(gateway, packet)]) for entity in gateway.get_entities(Platform.BINARY_SENSOR): @@ -66,10 +71,13 @@ def __init__( DEVICE_TYPE: self.packet._device.device_type, ROOM_ID: self.packet._device.room_id, SUB_ID: self.packet._device.sub_id, - ERROR_CODE: self.packet._device.state[ERROR_CODE], + ERROR_CODE: self.packet._device.state.get(ERROR_CODE), } if self.packet.device_type == DeviceType.MOTION: self._attr_device_class = BinarySensorDeviceClass.MOTION del self._attr_extra_state_attributes[ERROR_CODE] self._attr_extra_state_attributes[TIME] = self.packet._device.state[TIME] + if self.packet.device_type in {DeviceType.PRIVATE, DeviceType.PUBLIC}: + self._attr_device_class = None + del self._attr_extra_state_attributes[ERROR_CODE] diff --git a/custom_components/kocom_wallpad/const.py b/custom_components/kocom_wallpad/const.py index f7205f9..623ac94 100644 --- a/custom_components/kocom_wallpad/const.py +++ b/custom_components/kocom_wallpad/const.py @@ -12,6 +12,8 @@ GasPacket, MotionPacket, EVPacket, + PrivatePacket, + PublicPacket, ) from homeassistant.const import Platform @@ -25,7 +27,7 @@ BRAND_NAME = "Kocom" MANUFACTURER = "KOCOM Co., Ltd" MODEL = "Smart Wallpad" -SW_VERSION = "1.0.6" +SW_VERSION = "1.0.7" DEVICE_TYPE = "device_type" ROOM_ID = "room_id" @@ -44,4 +46,6 @@ GasPacket: Platform.SWITCH, MotionPacket: Platform.BINARY_SENSOR, EVPacket: Platform.SWITCH, + PrivatePacket: Platform.SWITCH, + PublicPacket: Platform.SWITCH, } diff --git a/custom_components/kocom_wallpad/entity.py b/custom_components/kocom_wallpad/entity.py index 535a3a1..d1faa4b 100644 --- a/custom_components/kocom_wallpad/entity.py +++ b/custom_components/kocom_wallpad/entity.py @@ -1,6 +1,7 @@ """Entity classes for Kocom Wallpad.""" from __future__ import annotations +import asyncio from homeassistant.core import callback from homeassistant.helpers.entity import DeviceInfo @@ -110,4 +111,11 @@ def extra_restore_state_data(self) -> RestoredExtraData: async def send_packet(self, packet: bytes) -> None: """Send a packet to the gateway.""" - await self.gateway.client.send_packet(packet) + if isinstance(packet, list): + for item in packet: + if isinstance(item, float): + await asyncio.sleep(item) + elif isinstance(item, bytearray): + await self.gateway.connection.send(item) + else: + await self.gateway.client.send_packet(packet) diff --git a/custom_components/kocom_wallpad/gateway.py b/custom_components/kocom_wallpad/gateway.py index 116ee74..db7128a 100644 --- a/custom_components/kocom_wallpad/gateway.py +++ b/custom_components/kocom_wallpad/gateway.py @@ -11,12 +11,21 @@ from dataclasses import dataclass from .pywallpad.client import KocomClient -from .pywallpad.const import ERROR, CO2, TEMPERATURE, DIRECTION, FLOOR +from .pywallpad.const import ( + ERROR, + CO2, + TEMPERATURE, + DIRECTION, + FLOOR, + BELL, +) from .pywallpad.packet import ( KocomPacket, ThermostatPacket, FanPacket, EVPacket, + PrivatePacket, + PublicPacket, PacketParser, ) @@ -126,7 +135,9 @@ def parse_platform(self, packet: KocomPacket) -> Platform | None: LOGGER.warning(f"Unrecognized platform type: {type(packet).__name__}") return None - platform_packet_types = (ThermostatPacket, FanPacket, EVPacket) + platform_packet_types = ( + ThermostatPacket, FanPacket, EVPacket, PrivatePacket, PublicPacket + ) if isinstance(packet, platform_packet_types) and (sub_id := packet._device.sub_id): if ERROR in sub_id: platform = Platform.BINARY_SENSOR @@ -136,6 +147,8 @@ def parse_platform(self, packet: KocomPacket) -> Platform | None: platform = Platform.SENSOR elif sub_id in {DIRECTION, FLOOR}: # EV platform = Platform.SENSOR + elif sub_id in BELL: # Intercom + platform = Platform.BINARY_SENSOR return platform \ No newline at end of file diff --git a/custom_components/kocom_wallpad/manifest.json b/custom_components/kocom_wallpad/manifest.json index afb6cd8..d26eb98 100644 --- a/custom_components/kocom_wallpad/manifest.json +++ b/custom_components/kocom_wallpad/manifest.json @@ -9,5 +9,5 @@ "iot_class": "local_push", "issue_tracker": "https://github.com/lunDreame/kocom-wallpad/issues", "requirements": [], - "version": "1.0.6" + "version": "1.0.7" } \ No newline at end of file diff --git a/custom_components/kocom_wallpad/pywallpad/client.py b/custom_components/kocom_wallpad/pywallpad/client.py index f19e07a..04ff958 100644 --- a/custom_components/kocom_wallpad/pywallpad/client.py +++ b/custom_components/kocom_wallpad/pywallpad/client.py @@ -8,7 +8,7 @@ from ..connection import Connection -from .crc import verify_checksum, calculate_checksum +from .crc import verify_checksum, verify_crc, calculate_checksum from .packet import PacketParser from .const import _LOGGER, PREFIX_HEADER, SUFFIX_HEADER @@ -100,18 +100,17 @@ async def _listen(self) -> None: packet_list = self.extract_packets(receive_data) for packet in packet_list: - if not verify_checksum(packet): - _LOGGER.debug("Checksum verification failed for packet: %s", packet.hex()) - continue - - parsed_packets = PacketParser.parse_state(packet) - for parsed_packet in parsed_packets: - _LOGGER.debug( - "Received packet: %s, %s, %s", - parsed_packet, parsed_packet._device, parsed_packet._last_data - ) - for callback in self.device_callbacks: - await callback(parsed_packet) + if verify_checksum(packet) or verify_crc(packet): + parsed_packets = PacketParser.parse_state(packet) + for parsed_packet in parsed_packets: + _LOGGER.debug( + "Received packet: %s, %s, %s", + parsed_packet, parsed_packet._device, parsed_packet._last_data + ) + for callback in self.device_callbacks: + await callback(parsed_packet) + else: + _LOGGER.debug("Received invalid packet: %s", packet.hex()) except Exception as e: _LOGGER.error(f"Error receiving data: {e}", exc_info=True) diff --git a/custom_components/kocom_wallpad/pywallpad/const.py b/custom_components/kocom_wallpad/pywallpad/const.py index 36205ee..a200e3f 100644 --- a/custom_components/kocom_wallpad/pywallpad/const.py +++ b/custom_components/kocom_wallpad/pywallpad/const.py @@ -42,3 +42,6 @@ DIRECTION = "direction" FLOOR = "floor" + +SHUTDOWN = "shutdown" +BELL = "bell" diff --git a/custom_components/kocom_wallpad/pywallpad/enums.py b/custom_components/kocom_wallpad/pywallpad/enums.py index ec89837..6ac10b0 100644 --- a/custom_components/kocom_wallpad/pywallpad/enums.py +++ b/custom_components/kocom_wallpad/pywallpad/enums.py @@ -18,9 +18,11 @@ class DeviceType(IntEnum): MOTION = 0x60 IGNORE = 0x86 IAQ = 0x98 + NONE = 0xFF class PacketType(IntEnum): """Packet types for Kocom devices.""" + CALL = 0x09 SEND = 0x0B RECV = 0x0D @@ -36,6 +38,7 @@ class Command(IntEnum): OFF = 0x02 DETECT = 0x04 SCAN = 0x3A + NONE = 0xFF class OpMode(IntEnum): """Operating modes for AC devices.""" diff --git a/custom_components/kocom_wallpad/pywallpad/packet.py b/custom_components/kocom_wallpad/pywallpad/packet.py index 93e39f1..66dff20 100644 --- a/custom_components/kocom_wallpad/pywallpad/packet.py +++ b/custom_components/kocom_wallpad/pywallpad/packet.py @@ -37,6 +37,8 @@ TIME, DIRECTION, FLOOR, + SHUTDOWN, + BELL, ) from .enums import ( DeviceType, @@ -80,7 +82,7 @@ def __init__(self, packet: bytes) -> None: self.value = packet[10:18] self.checksum = packet[18] - self._main_address = self.src + self._address = self.src self._last_recv_time = time.time() self._device: Device = None self._last_data: dict[str, Any] = {} @@ -92,25 +94,23 @@ def __repr__(self) -> str: @property def device_type(self) -> DeviceType: """Return the device type.""" - if self.dest[0] == DeviceType.WALLPAD.value: - self._main_address = self.src - return DeviceType(self.src[0]) - self._main_address = self.dest - return DeviceType(self.dest[0]) + if self._address[0] in {DeviceType.WALLPAD.value, DeviceType.NONE.value}: + self._address = self.dest + return DeviceType(self._address[0]) @property def room_id(self) -> str: """Return the room ID.""" - return str(self._main_address[1]) + return str(self._address[1]) @property def device_id(self) -> str: """Return the device ID.""" return f"{self.device_name()}_{self.room_id}" - def device_name(self, upper=False) -> str: + def device_name(self, capital=False) -> str: """Return the device name.""" - if upper: + if capital: return self.device_type.name.upper() return self.device_type.name.lower() @@ -127,9 +127,9 @@ def make_packet(self, command: Command, value_packet: bytearray) -> bytearray: header = bytearray([0x30, 0xBC, 0x00]) if self.device_type == DeviceType.EV: - address = bytearray([0x01, 0x00]) + bytearray(self._main_address) + address = bytearray([0x01, 0x00]) + bytearray(self._address) else: - address = bytearray(self._main_address) + bytearray([0x01, 0x00]) + address = bytearray(self._address) + bytearray([0x01, 0x00]) command_byte = bytearray([command.value]) return header + address + command_byte + value_packet @@ -399,7 +399,7 @@ def parse_data(self) -> list[Device]: target_temp = self.value[5] device = Device( - device_type=self.device_name(upper=True), + device_type=self.device_name(capital=True), room_id=self.room_id, device_id=self.device_id, state={ @@ -554,7 +554,7 @@ def parse_data(self) -> list[Device]: if state > 0: _LOGGER.debug(f"{sensor_id}: {state}") device = Device( - device_type=self.device_name(upper=True), + device_type=self.device_name(capital=True), device_id=self.device_id, state={STATE: state}, sub_id=sensor_id, @@ -631,7 +631,7 @@ def parse_data(self) -> list[Device]: devices.append( Device( - device_type=self.device_name(upper=True), + device_type=self.device_name(capital=True), room_id=self.room_id, device_id=self.device_id, state={POWER: power_state}, @@ -639,7 +639,7 @@ def parse_data(self) -> list[Device]: ) devices.append( Device( - device_type=self.device_name(upper=True), + device_type=self.device_name(capital=True), room_id=self.room_id, device_id=self.device_id, state={STATE: direction_state.name}, @@ -653,7 +653,7 @@ def parse_data(self) -> list[Device]: devices.append( Device( - device_type=self.device_name(upper=True), + device_type=self.device_name(capital=True), room_id=self.room_id, device_id=self.device_id, state={STATE: floor_state}, @@ -671,16 +671,120 @@ def make_power_status(self, power: bool) -> bytearray: return super().make_packet(Command.ON, bytearray(self.value)) +class PrivatePacket(KocomPacket): + """Handles packets for intercom private devices.""" + + def parse_data(self) -> list[Device]: + """Parse intercom private-specific data.""" + devices: list[Device] = [] + power_state = False + bell_state = ( + self.value[2] == 0x31 and (self.value[5] == 0x01 and self.value[6] == 0x01) + ) + devices.append( + Device( + device_type=f"{self.endpoint.name.lower()}", + device_id=f"{self.endpoint.name.lower()}_private", + room_id="private", + state={POWER: power_state}, + ) + ) + devices.append( + Device( + device_type=f"{self.endpoint.name.lower()}", + device_id=f"{self.endpoint.name.lower()}_private", + room_id="private", + state={POWER: power_state}, + sub_id=SHUTDOWN, + ) + ) + devices.append( + Device( + device_type=f"{self.endpoint.name.lower()}", + device_id=f"{self.endpoint.name.lower()}_private", + room_id="private", + state={STATE: bell_state}, + sub_id=BELL, + ) + ) + return devices + + def make_power_status(self, power: bool) -> list[bytearray | float]: + """Make a power status packet.""" + if not power: + _LOGGER.debug(f"Intercom private device is off. Ignoring power status.") + return + return [ + bytearray.fromhex('AA5579BC02020031FFFFFF61FFFFFF030008D30D0D'), + 1.0, + bytearray.fromhex('AA5579BC02020031FFFFFF61FFFFFF240097A20D0D'), + 0.1, + bytearray.fromhex('AA5579BC02020031FFFFFF61FFFFFF040091440D0D'), + ] + + def make_shutdown_status(self, shutdown: bool) -> list[bytearray | float]: + """Make a shutdown status packet.""" + if not shutdown: + _LOGGER.debug(f"Intercom private device is off. Ignoring shutdown status.") + return + return [ + bytearray.fromhex('AA5579BC02020031FFFFFF61FFFFFF030008D30D0D'), + 1.0, + bytearray.fromhex('AA5579BC02020031FFFFFF61FFFFFF040091440D0D'), + ] + + +class PublicPacket(KocomPacket): + """Handles packets for intercom public devices.""" + + def parse_data(self) -> list[Device]: + """Parse intercom public-specific data.""" + devices: list[Device] = [] + power_state = False + bell_state = self.value[5] == 0x01 and self.value[6] == 0x01 + devices.append( + Device( + device_type=f"{self.endpoint.name.lower()}", + device_id=f"{self.endpoint.name.lower()}_public", + room_id="public", + state={POWER: power_state}, + ) + ) + devices.append( + Device( + device_type=f"{self.endpoint.name.lower()}", + device_id=f"{self.endpoint.name.lower()}_public", + room_id="public", + state={STATE: bell_state}, + sub_id=BELL, + ) + ) + return devices + + def make_power_status(self, power: bool) -> list[bytearray | float]: + """Make a power status packet.""" + if not power: + _LOGGER.debug(f"Intercom public device is off. Ignoring power status.") + return + return [ + bytearray.fromhex('AA5579BC080200FFFFFFFF61FFFFFF030026950D0D'), + 0.5, + bytearray.fromhex('AA5579BC080200FFFFFFFF61FFFFFF2400B9E40D0D'), + ] + + class PacketParser: """Parses raw Kocom packets into specific device classes.""" @staticmethod def parse(packet_data: bytes) -> KocomPacket: """Parse a raw packet into a specific packet class.""" - if packet_data[5] == DeviceType.WALLPAD.value: - device_type = packet_data[7] - else: + if (packet_data[4] == Endpoint.INTERCOM.value + or packet_data[7] == DeviceType.WALLPAD.value + ): device_type = packet_data[5] + else: + device_type = packet_data[7] return PacketParser._get_packet_instance(device_type, packet_data) @staticmethod @@ -688,10 +792,13 @@ def parse_state(packet: bytes, last_data: dict[str, Any] = None) -> list[KocomPa """Parse device states from a packet.""" base_packet = PacketParser.parse(packet) - # Skip unsupported packet types - #if base_packet.device_type == DeviceType.WALLPAD: - # return [] - if base_packet.packet_type == PacketType.RECV and base_packet.command == Command.SCAN: + if (base_packet.endpoint == Endpoint.INTERCOM + and base_packet.packet_type != PacketType.CALL + ): + [] + if (base_packet.packet_type == PacketType.RECV + and base_packet.command == Command.SCAN + ): return [] # Update base packet's last_data if provided @@ -721,6 +828,8 @@ def _get_packet_instance(device_type: int, packet_data: bytes) -> KocomPacket: DeviceType.MOTION.value: MotionPacket, DeviceType.EV.value: EVPacket, DeviceType.WALLPAD.value: KocomPacket, + DeviceType.PRIVATE.value: PrivatePacket, # intercom private + DeviceType.PUBLIC.value: PublicPacket, # intercom public } packet_class = device_class_map.get(device_type) diff --git a/custom_components/kocom_wallpad/switch.py b/custom_components/kocom_wallpad/switch.py index 9b50889..9f1c7ea 100644 --- a/custom_components/kocom_wallpad/switch.py +++ b/custom_components/kocom_wallpad/switch.py @@ -12,13 +12,15 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.dispatcher import async_dispatcher_connect -from .pywallpad.const import POWER +from .pywallpad.const import POWER, SHUTDOWN from .pywallpad.enums import DeviceType from .pywallpad.packet import ( KocomPacket, OutletPacket, GasPacket, EVPacket, + PrivatePacket, + PublicPacket, ) from .gateway import KocomGateway @@ -37,7 +39,10 @@ async def async_setup_entry( @callback def async_add_switch(packet: KocomPacket) -> None: """Add new switch entity.""" - if isinstance(packet, (OutletPacket, GasPacket, EVPacket)): + if isinstance( + packet, + (OutletPacket, GasPacket, EVPacket, PrivatePacket, PublicPacket) + ): async_add_entities([KocomSwitchEntity(gateway, packet)]) for entity in gateway.get_entities(Platform.SWITCH): @@ -71,10 +76,16 @@ def is_on(self) -> bool: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on switch.""" - make_packet = self.packet.make_power_status(True) + if self.packet._device.sub_id == SHUTDOWN: + make_packet = self.packet.make_shutdown_status(True) + else: + make_packet = self.packet.make_power_status(True) await self.send_packet(make_packet) async def async_turn_off(self, **kwargs: Any) -> None: """Turn off switch.""" - make_packet = self.packet.make_power_status(False) + if self.packet._device.sub_id == SHUTDOWN: + make_packet = self.packet.make_shutdown_status(False) + else: + make_packet = self.packet.make_power_status(False) await self.send_packet(make_packet)