From dbcf2cab91c57054d7bffac9025387d664a18abb Mon Sep 17 00:00:00 2001 From: luneei Date: Sun, 29 Dec 2024 10:47:12 +0900 Subject: [PATCH] Major Update --- README.md | 4 +- .../kocom_wallpad/binary_sensor.py | 44 +- custom_components/kocom_wallpad/climate.py | 222 ++++----- .../kocom_wallpad/config_flow.py | 8 +- custom_components/kocom_wallpad/connection.py | 39 +- custom_components/kocom_wallpad/const.py | 5 +- custom_components/kocom_wallpad/entity.py | 10 +- custom_components/kocom_wallpad/fan.py | 24 +- custom_components/kocom_wallpad/gateway.py | 22 +- custom_components/kocom_wallpad/light.py | 18 +- custom_components/kocom_wallpad/manifest.json | 2 +- .../kocom_wallpad/pywallpad/client.py | 52 ++- .../kocom_wallpad/pywallpad/const.py | 6 + .../kocom_wallpad/pywallpad/crc.py | 8 +- .../kocom_wallpad/pywallpad/enums.py | 106 ++--- .../kocom_wallpad/pywallpad/packet.py | 428 +++++++++--------- custom_components/kocom_wallpad/sensor.py | 33 +- custom_components/kocom_wallpad/switch.py | 10 +- .../kocom_wallpad/translations/en.json | 2 +- custom_components/kocom_wallpad/util.py | 11 + hacs.json | 2 +- 21 files changed, 504 insertions(+), 552 deletions(-) diff --git a/README.md b/README.md index cd68fe1..620ef0b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![hacs_badge](https://img.shields.io/badge/HACS-Custom-41BDF5.svg?style=for-the-badge)](https://github.com/hacs/integration) -# Kocom WallPad Integration for Home Assistant -Home Assistant를 위한 Kocom WallPad 통합구성요소 +# Kocom Wallpad Integration for Home Assistant +Home Assistant를 위한 Kocom Wallpad 통합구성요소 ## 기여 문제가 있나요? [Issues](https://github.com/lunDreame/kocom-wallpad/issues) 탭에 작성해 주세요. diff --git a/custom_components/kocom_wallpad/binary_sensor.py b/custom_components/kocom_wallpad/binary_sensor.py index 90f8b0f..8e57ac4 100644 --- a/custom_components/kocom_wallpad/binary_sensor.py +++ b/custom_components/kocom_wallpad/binary_sensor.py @@ -14,11 +14,12 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect from .pywallpad.const import STATE, ERROR_CODE, TIME +from .pywallpad.enums import DeviceType from .pywallpad.packet import ( KocomPacket, ThermostatPacket, FanPacket, - MotionPacket + MotionPacket, ) from .gateway import KocomGateway @@ -37,12 +38,8 @@ async def async_setup_entry( @callback def async_add_binary_sensor(packet: KocomPacket) -> None: """Add new binary sensor entity.""" - if isinstance(packet, (ThermostatPacket, FanPacket)): + if isinstance(packet, (ThermostatPacket, FanPacket, MotionPacket)): async_add_entities([KocomBinarySensorEntity(gateway, packet)]) - elif isinstance(packet, MotionPacket): - async_add_entities([KocomMotionEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.BINARY_SENSOR): async_add_binary_sensor(entity) @@ -64,6 +61,7 @@ def __init__( ) -> None: """Initialize the binary sensor.""" super().__init__(gateway, packet) + self._attr_is_on = self.device.state[STATE] self._attr_extra_state_attributes = { DEVICE_TYPE: self.device.device_type, ROOM_ID: self.device.room_id, @@ -71,33 +69,7 @@ def __init__( ERROR_CODE: self.device.state[ERROR_CODE], } - @property - def is_on(self) -> bool: - """Return true if the binary sensor is on.""" - return self.device.state[STATE] - - -class KocomMotionEntity(KocomEntity, BinarySensorEntity): - """Representation of a Kocom binary sensor.""" - - _attr_device_class = BinarySensorDeviceClass.MOTION - - def __init__( - self, - gateway: KocomGateway, - packet: KocomPacket, - ) -> None: - """Initialize the binary sensor.""" - super().__init__(gateway, packet) - self._attr_extra_state_attributes = { - DEVICE_TYPE: self.device.device_type, - ROOM_ID: self.device.room_id, - SUB_ID: self.device.sub_id, - TIME: self.device.state[TIME], - } - - @property - def is_on(self) -> bool: - """Return true if the binary sensor is on.""" - return self.device.state[STATE] - \ No newline at end of file + if self.packet.device_type == DeviceType.MOTION: + self._attr_device_class = BinarySensorDeviceClass.MOTION + del self._attr_extra_state_attributes[ERROR_CODE] + self._attr_extra_state_attributes[TIME] = self.device.state[TIME] diff --git a/custom_components/kocom_wallpad/climate.py b/custom_components/kocom_wallpad/climate.py index aa391e2..ea27f6f 100644 --- a/custom_components/kocom_wallpad/climate.py +++ b/custom_components/kocom_wallpad/climate.py @@ -50,8 +50,6 @@ def async_add_climate(packet: KocomPacket) -> None: async_add_entities([KocomThermostatEntity(gateway, packet)]) elif isinstance(packet, AcPacket): async_add_entities([KocomAcEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.CLIMATE): async_add_climate(entity) @@ -62,45 +60,35 @@ def async_add_climate(packet: KocomPacket) -> None: class KocomThermostatEntity(KocomEntity, ClimateEntity): - """Representation of a Kocom climate.""" + """Representation of a Kocom thermostat.""" _enable_turn_on_off_backwards_compatibility = False _attr_hvac_modes = [HVACMode.OFF, HVACMode.HEAT] _attr_preset_modes = [PRESET_AWAY, PRESET_NONE] _attr_supported_features = ( - ClimateEntityFeature.TARGET_TEMPERATURE | - ClimateEntityFeature.TURN_OFF | - ClimateEntityFeature.TURN_ON | - ClimateEntityFeature.PRESET_MODE + ClimateEntityFeature.TARGET_TEMPERATURE + | ClimateEntityFeature.TURN_OFF + | ClimateEntityFeature.TURN_ON + | ClimateEntityFeature.PRESET_MODE ) _attr_temperature_unit = UnitOfTemperature.CELSIUS _attr_max_temp = 40 _attr_min_temp = 5 _attr_target_temperature_step = 1 - - def __init__( - self, - gateway: KocomGateway, - packet: KocomPacket, - ) -> None: - """Initialize the climate.""" + + def __init__(self, gateway: KocomGateway, packet: KocomPacket) -> None: + """Initialize the thermostat.""" super().__init__(gateway, packet) @property def hvac_mode(self) -> HVACMode: - """Return hvac operation ie. heat, cool mode.""" - if self.device.state[POWER]: - return HVACMode.HEAT - else: - return HVACMode.OFF - + """Return the current HVAC mode.""" + return HVACMode.HEAT if self.device.state[POWER] else HVACMode.OFF + @property def preset_mode(self) -> str: - """Return the current preset mode, e.g., home, away, temp.""" - if self.device.state[AWAY_MODE]: - return PRESET_AWAY - else: - return PRESET_NONE + """Return the current preset mode.""" + return PRESET_AWAY if self.device.state[AWAY_MODE] else PRESET_NONE @property def current_temperature(self) -> int: @@ -109,42 +97,43 @@ def current_temperature(self) -> int: @property def target_temperature(self) -> int: - """Return the temperature we try to reach.""" + """Return the target temperature.""" return self.device.state[TARGET_TEMP] - + async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None: - """Set new target hvac mode.""" - if hvac_mode == HVACMode.OFF: - power = False - elif hvac_mode == HVACMode.HEAT: - power = True - else: + """Set the HVAC mode.""" + hvac_to_power = { + HVACMode.OFF: False, + HVACMode.HEAT: True, + } + power = hvac_to_power.get(hvac_mode) + if power is None: raise ValueError(f"Unknown HVAC mode: {hvac_mode}") - - packet = self.packet.make_status(power=power) - await self.send(packet) + + make_packet = self.packet.make_power_status(power) + await self.send_packet(make_packet) async def async_set_preset_mode(self, preset_mode: str) -> None: - """Set new target preset mode.""" - if preset_mode == PRESET_AWAY: - away_mode = True - elif preset_mode == PRESET_NONE: - away_mode = False - else: + """Set the preset mode.""" + preset_to_away = { + PRESET_AWAY: True, + PRESET_NONE: False, + } + away_mode = preset_to_away.get(preset_mode) + if away_mode is None: raise ValueError(f"Unknown preset mode: {preset_mode}") - - packet = self.packet.make_status(away_mode=away_mode) - await self.send(packet) + + make_packet = self.packet.make_away_status(away_mode) + await self.send_packet(make_packet) async def async_set_temperature(self, **kwargs) -> None: - """Set new target temperature.""" - target_temp = kwargs.get(ATTR_TEMPERATURE) - if target_temp is None: - LOGGER.warning("Missing temperature") - return + """Set the target temperature.""" + if ATTR_TEMPERATURE not in kwargs: + raise ValueError("Missing temperature") - packet = self.packet.make_status(target_temp=target_temp) - await self.send(packet) + target_temp = int(kwargs[ATTR_TEMPERATURE]) + make_packet = self.packet.make_target_temp(target_temp) + await self.send_packet(make_packet) class KocomAcEntity(KocomEntity, ClimateEntity): @@ -156,54 +145,46 @@ class KocomAcEntity(KocomEntity, ClimateEntity): HVACMode.COOL, HVACMode.FAN_ONLY, HVACMode.DRY, - HVACMode.AUTO + HVACMode.AUTO, ] _attr_fan_modes = [FAN_LOW, FAN_MEDIUM, FAN_HIGH] _attr_supported_features = ( - ClimateEntityFeature.TARGET_TEMPERATURE | - ClimateEntityFeature.TURN_OFF | - ClimateEntityFeature.TURN_ON | - ClimateEntityFeature.FAN_MODE + ClimateEntityFeature.TARGET_TEMPERATURE + | ClimateEntityFeature.TURN_OFF + | ClimateEntityFeature.TURN_ON + | ClimateEntityFeature.FAN_MODE ) _attr_temperature_unit = UnitOfTemperature.CELSIUS _attr_max_temp = 30 _attr_min_temp = 18 _attr_target_temperature_step = 1 - - def __init__( - self, - gateway: KocomGateway, - packet: KocomPacket, - ) -> None: + + def __init__(self, gateway: KocomGateway, packet: KocomPacket) -> None: """Initialize the climate.""" super().__init__(gateway, packet) - + @property def hvac_mode(self) -> HVACMode: - """Return hvac operation ie. heat, cool mode.""" - if self.device.state[POWER] and (op_mode := self.device.state[OP_MODE]): - if op_mode == OpMode.COOL: - return HVACMode.COOL - elif op_mode == OpMode.FAN_ONLY: - return HVACMode.FAN_ONLY - elif op_mode == OpMode.DRY: - return HVACMode.DRY - elif op_mode == OpMode.AUTO: - return HVACMode.AUTO - else: - return HVACMode.OFF - + """Return current HVAC mode.""" + if self.device.state[POWER]: + op_mode = self.device.state[OP_MODE] + return { + OpMode.COOL: HVACMode.COOL, + OpMode.FAN_ONLY: HVACMode.FAN_ONLY, + OpMode.DRY: HVACMode.DRY, + OpMode.AUTO: HVACMode.AUTO, + }.get(op_mode, HVACMode.OFF) + return HVACMode.OFF + @property def fan_mode(self) -> str: - """Return the fan setting.""" - fan_mode = self.device.state[FAN_MODE] - if fan_mode == FanMode.LOW: - return FAN_LOW - elif fan_mode == FanMode.MEDIUM: - return FAN_MEDIUM - elif fan_mode == FanMode.HIGH: - return FAN_HIGH - + """Return current fan mode.""" + return { + FanMode.LOW: FAN_LOW, + FanMode.MEDIUM: FAN_MEDIUM, + FanMode.HIGH: FAN_HIGH, + }.get(self.device.state[FAN_MODE]) + @property def current_temperature(self) -> int: """Return the current temperature.""" @@ -211,50 +192,45 @@ def current_temperature(self) -> int: @property def target_temperature(self) -> int: - """Return the temperature we try to reach.""" + """Return the target temperature.""" return self.device.state[TARGET_TEMP] - + async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None: - """Set new target hvac mode.""" + """Set a new target HVAC mode.""" if hvac_mode == HVACMode.OFF: - op_mode = OpMode.OFF - elif hvac_mode == HVACMode.COOL: - op_mode = OpMode.COOL - elif hvac_mode == HVACMode.FAN_ONLY: - op_mode = OpMode.FAN_ONLY - elif hvac_mode == HVACMode.DRY: - op_mode = OpMode.DRY - elif hvac_mode == HVACMode.AUTO: - op_mode = OpMode.AUTO + make_packet = self.packet.make_power_status(False) else: - raise ValueError(f"Unknown HVAC mode: {hvac_mode}") - - packet = self.packet.make_status(op_mode=op_mode) - await self.send(packet) + hvac_to_op_mode = { + HVACMode.COOL: OpMode.COOL, + HVACMode.FAN_ONLY: OpMode.FAN_ONLY, + HVACMode.DRY: OpMode.DRY, + HVACMode.AUTO: OpMode.AUTO, + } + op_mode = hvac_to_op_mode.get(hvac_mode) + if op_mode is None: + raise ValueError(f"Unknown HVAC mode: {hvac_mode}") + make_packet = self.packet.make_op_mode(op_mode) + + await self.send_packet(make_packet) async def async_set_fan_mode(self, fan_mode: str) -> None: - """Set new target fan mode.""" - if fan_mode == FAN_LOW: - fan_speed = FanMode.LOW - elif fan_mode == FAN_MEDIUM: - fan_speed = FanMode.MEDIUM - elif fan_mode == FAN_HIGH: - fan_speed = FanMode.HIGH - else: + """Set a new target fan mode.""" + fan_speed = { + FAN_LOW: FanMode.LOW, + FAN_MEDIUM: FanMode.MEDIUM, + FAN_HIGH: FanMode.HIGH, + }.get(fan_mode) + if fan_speed is None: raise ValueError(f"Unknown fan mode: {fan_mode}") - - packet = self.packet.make_status(fan_mode=fan_speed) - await self.send(packet) - - async def async_set_preset_mode(self, preset_mode: str) -> None: - """Set new target preset mode.""" + + make_packet = self.packet.make_fan_mode(fan_speed) + await self.send_packet(make_packet) async def async_set_temperature(self, **kwargs) -> None: - """Set new target temperature.""" - target_temp = kwargs.get(ATTR_TEMPERATURE) - if target_temp is None: - LOGGER.warning("Missing temperature") - return + """Set a new target temperature.""" + if ATTR_TEMPERATURE not in kwargs: + raise ValueError("Missing temperature") - packet = self.packet.make_status(target_temp=target_temp) - await self.send(packet) + target_temp = int(kwargs[ATTR_TEMPERATURE]) + make_packet = self.packet.make_target_temp(target_temp) + await self.send_packet(make_packet) diff --git a/custom_components/kocom_wallpad/config_flow.py b/custom_components/kocom_wallpad/config_flow.py index 8f2b60c..ff8a334 100644 --- a/custom_components/kocom_wallpad/config_flow.py +++ b/custom_components/kocom_wallpad/config_flow.py @@ -3,13 +3,13 @@ from __future__ import annotations from typing import Any -import socket import voluptuous as vol from homeassistant.const import CONF_HOST, CONF_PORT from homeassistant.config_entries import ConfigFlow, ConfigFlowResult import homeassistant.helpers.config_validation as cv +from .connection import test_connection from .const import DOMAIN, LOGGER, DEFAULT_PORT @@ -29,11 +29,7 @@ async def async_step_user( host = user_input[CONF_HOST] port = user_input[CONF_PORT] - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5) - sock.connect((host, port)) - except socket.timeout: + if not await test_connection(host, port): errors["base"] = "cannnot_connect" else: await self.async_set_unique_id(host) diff --git a/custom_components/kocom_wallpad/connection.py b/custom_components/kocom_wallpad/connection.py index 22f5a32..7cd98a9 100644 --- a/custom_components/kocom_wallpad/connection.py +++ b/custom_components/kocom_wallpad/connection.py @@ -8,11 +8,9 @@ from .const import LOGGER -MAX_READ_BYTES = 256 - class Connection: - """Handles gateway connections.""" + """Connection class.""" def __init__(self, host: str, port: int) -> None: """Initialize the Connection.""" @@ -25,7 +23,7 @@ def __init__(self, host: str, port: int) -> None: self.last_reconnect_attempt: Optional[float] = None self.next_attempt_time: Optional[float] = None - async def connect(self): + async def connect(self) -> None: """Establish a connection.""" try: self.reader, self.writer = await asyncio.open_connection(self.host, self.port) @@ -39,7 +37,7 @@ def is_connected(self) -> bool: """Check if the connection is active.""" return self.writer is not None and not self.writer.is_closing() - async def reconnect(self): + async def reconnect(self) -> None: """Attempt to reconnect with exponential backoff.""" if self.writer is not None: self.writer.close() @@ -63,20 +61,19 @@ async def reconnect(self): self.reconnect_attempts = 0 self.next_attempt_time = None - async def send(self, packet: bytearray): + async def send(self, packet: bytearray) -> None: """Send a packet.""" try: self.writer.write(packet) await self.writer.drain() - await asyncio.sleep(0.1) except Exception as e: LOGGER.error(f"Failed to send packet data: {e}") await self.reconnect() - async def receive(self) -> Optional[bytes]: + async def receive(self, read_byte: int = 2048) -> Optional[bytes]: """Receive data.""" try: - return await self.reader.read(MAX_READ_BYTES) + return await self.reader.read(read_byte) except asyncio.TimeoutError: pass except Exception as e: @@ -84,10 +81,32 @@ async def receive(self) -> Optional[bytes]: await self.reconnect() return None - async def close(self): + async def close(self) -> None: """Close the connection.""" if self.writer: LOGGER.info("Connection closed.") self.writer.close() await self.writer.wait_closed() self.writer = None + + +async def test_connection(host: str, port: int, timeout: int = 5) -> bool: + """Test the connection with a timeout.""" + connection = Connection(host, port) + try: + await asyncio.wait_for(connection.connect(), timeout=timeout) + + if connection.is_connected(): + LOGGER.info("Connection test successful.") + return True + else: + LOGGER.error("Connection test failed.") + return False + except asyncio.TimeoutError: + LOGGER.error("Connection test timed out.") + return False + except Exception as e: + LOGGER.error(f"Connection test failed with error: {e}") + return False + finally: + await connection.close() diff --git a/custom_components/kocom_wallpad/const.py b/custom_components/kocom_wallpad/const.py index d0dc1a5..62be6e9 100644 --- a/custom_components/kocom_wallpad/const.py +++ b/custom_components/kocom_wallpad/const.py @@ -25,12 +25,13 @@ BRAND_NAME = "kocom" MANUFACTURER = "KOCOM Co., Ltd" MODEL = "Smart Wallpad" -SW_VERSION = "1.0.2" +SW_VERSION = "1.0.3" DEVICE_TYPE = "device_type" ROOM_ID = "room_id" SUB_ID = "sub_id" -PACKET = "packet" + +PACKET_DATA = "packet_data" PLATFORM_MAPPING: dict[type[KocomPacket], Platform] = { # type: ignore LightPacket: Platform.LIGHT, diff --git a/custom_components/kocom_wallpad/entity.py b/custom_components/kocom_wallpad/entity.py index 47ad1df..1d0baa9 100644 --- a/custom_components/kocom_wallpad/entity.py +++ b/custom_components/kocom_wallpad/entity.py @@ -10,7 +10,7 @@ from .pywallpad.packet import Device, KocomPacket from .gateway import KocomGateway -from .util import create_dev_id +from .util import create_dev_id, encode_bytes_to_base64 from .const import ( DOMAIN, BRAND_NAME, @@ -20,7 +20,7 @@ DEVICE_TYPE, ROOM_ID, SUB_ID, - PACKET, + PACKET_DATA, ) @@ -95,8 +95,8 @@ async def async_added_to_hass(self) -> None: @property def extra_restore_state_data(self) -> RestoredExtraData: """Return extra state data to be restored.""" - return RestoredExtraData({PACKET: ''.join(self.packet.packet)}) + return RestoredExtraData({PACKET_DATA: encode_bytes_to_base64(self.packet.packet)}) - async def send(self, packet: bytes) -> None: + async def send_packet(self, packet: bytes) -> None: """Send a packet to the gateway.""" - await self.gateway.client.send(packet) + await self.gateway.client.send_packet(packet) diff --git a/custom_components/kocom_wallpad/fan.py b/custom_components/kocom_wallpad/fan.py index 08414c5..c460540 100644 --- a/custom_components/kocom_wallpad/fan.py +++ b/custom_components/kocom_wallpad/fan.py @@ -38,8 +38,6 @@ def async_add_fan(packet: KocomPacket) -> None: """Add new fan entity.""" if isinstance(packet, FanPacket): async_add_entities([KocomFanEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.FAN): async_add_fan(entity) @@ -59,11 +57,7 @@ class KocomFanEntity(KocomEntity, FanEntity): FanEntityFeature.PRESET_MODE ) _attr_speed_count = 3 - _attr_preset_modes = [ - VentMode.VENTILATION.name, - VentMode.AUTO.name, - VentMode.AIR_PURIFIER.name, - ] + _attr_preset_modes = list(VentMode.__members__.keys()) _attr_speed_list = [ FanSpeed.LOW.value, FanSpeed.MEDIUM.value, @@ -105,14 +99,14 @@ async def async_set_percentage(self, percentage: int) -> None: speed_item = percentage_to_ordered_list_item(self._attr_speed_list, percentage) fan_speed = FanSpeed(speed_item) - packet = self.packet.make_status(fan_speed=fan_speed) - await self.send(packet) + make_packet = self.packet.make_fan_speed(fan_speed) + await self.send_packet(make_packet) async def async_set_preset_mode(self, preset_mode: str) -> None: """Set the preset mode of the fan.""" vent_mode = VentMode[preset_mode] - packet = self.packet.make_status(vent_mode=vent_mode) - await self.send(packet) + make_packet = self.packet.make_vent_mode(vent_mode) + await self.send_packet(make_packet) async def async_turn_on( self, @@ -122,10 +116,10 @@ async def async_turn_on( **kwargs: Any, ) -> None: """Turn on the fan.""" - packet = self.packet.make_status(power=True) - await self.send(packet) + make_packet = self.packet.make_power_status(True) + await self.send_packet(make_packet) async def async_turn_off(self, **kwargs: Any) -> None: """Turn off the fan.""" - packet = self.packet.make_status(power=False) - await self.send(packet) + make_packet = self.packet.make_power_status(False) + await self.send_packet(make_packet) diff --git a/custom_components/kocom_wallpad/gateway.py b/custom_components/kocom_wallpad/gateway.py index 9fd5193..559cb37 100644 --- a/custom_components/kocom_wallpad/gateway.py +++ b/custom_components/kocom_wallpad/gateway.py @@ -11,6 +11,7 @@ from dataclasses import dataclass from .pywallpad.client import KocomClient +from .pywallpad.const import TEMPERATURE, CO2 from .pywallpad.packet import ( KocomPacket, ThermostatPacket, @@ -20,8 +21,8 @@ ) from .connection import Connection -from .util import create_dev_id -from .const import LOGGER, DOMAIN, PACKET, PLATFORM_MAPPING +from .util import create_dev_id, decode_base64_to_bytes +from .const import LOGGER, DOMAIN, PACKET_DATA, PLATFORM_MAPPING class KocomGateway: @@ -70,8 +71,8 @@ async def _async_fetch_last_packets(self, entity_id: str) -> list[KocomPacket]: if not state or not state.extra_data: return [] - packet_data = state.extra_data.as_dict().get(PACKET) - return PacketParser.parse_state(packet_data) if packet_data else [] + packet_data = state.extra_data.as_dict().get(PACKET_DATA) + return PacketParser.parse_state(decode_base64_to_bytes(packet_data)) if packet_data else [] async def async_update_entity_registry(self) -> None: """Update the entity registry.""" @@ -113,15 +114,14 @@ def parse_platform(self, packet: KocomPacket) -> Platform | None: LOGGER.warning(f"Unrecognized platform type: {type(packet).__name__}") return None - if isinstance( - packet, (ThermostatPacket, FanPacket, EvPacket) - ) and (sub_id := packet._device.sub_id): - if "error" in sub_id: - platform = Platform.BINARY_SENSOR - elif "temperature" in sub_id: + platform_packet_types = (ThermostatPacket, FanPacket, EvPacket) + if isinstance(packet, platform_packet_types) and (sub_id := packet._device.sub_id): + if TEMPERATURE in sub_id: platform = Platform.SENSOR - elif "co2" in sub_id: + elif CO2 in sub_id: platform = Platform.SENSOR + elif "error" in sub_id: + platform = Platform.BINARY_SENSOR elif "direction" in sub_id: platform = Platform.SENSOR diff --git a/custom_components/kocom_wallpad/light.py b/custom_components/kocom_wallpad/light.py index f71d9e9..efae567 100644 --- a/custom_components/kocom_wallpad/light.py +++ b/custom_components/kocom_wallpad/light.py @@ -33,8 +33,6 @@ def async_add_light(packet: KocomPacket) -> None: """Add new light entity.""" if isinstance(packet, LightPacket): async_add_entities([KocomLightEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.LIGHT): async_add_light(entity) @@ -81,21 +79,17 @@ def brightness(self) -> int: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on light.""" if self.has_brightness: - brightness = kwargs.get(ATTR_BRIGHTNESS) - if brightness is None: - LOGGER.warning("Brightness not set") - return - + brightness = int(kwargs.get(ATTR_BRIGHTNESS, 255)) brightness = ((brightness * 3) // 225) + 1 if brightness not in self.device.state[LEVEL]: brightness = 255 - packet = self.packet.make_status(brightness=brightness) + make_packet = self.packet.make_brightness_status(brightness) else: - packet = self.packet.make_status(power=True) + make_packet = self.packet.make_power_status(True) - await self.send(packet) + await self.send_packet(make_packet) async def async_turn_off(self, **kwargs: Any) -> None: """Turn off light.""" - packet = self.packet.make_status(power=False) - await self.send(packet) + make_packet = self.packet.make_power_status(False) + await self.send_packet(make_packet) diff --git a/custom_components/kocom_wallpad/manifest.json b/custom_components/kocom_wallpad/manifest.json index ad58b93..aab9fb8 100644 --- a/custom_components/kocom_wallpad/manifest.json +++ b/custom_components/kocom_wallpad/manifest.json @@ -9,5 +9,5 @@ "iot_class": "local_push", "issue_tracker": "https://github.com/lunDreame/kocom-wallpad/issues", "requirements": [], - "version": "1.0.2" + "version": "1.0.3" } \ No newline at end of file diff --git a/custom_components/kocom_wallpad/pywallpad/client.py b/custom_components/kocom_wallpad/pywallpad/client.py index 91bda63..0c24d03 100644 --- a/custom_components/kocom_wallpad/pywallpad/client.py +++ b/custom_components/kocom_wallpad/pywallpad/client.py @@ -4,13 +4,13 @@ import asyncio from queue import Queue -from typing import Callable, Optional +from typing import Optional, Callable, Awaitable from ..connection import Connection -from .crc import verify_checksum +from .crc import verify_checksum, calculate_checksum from .packet import PacketParser -from .const import _LOGGER +from .const import _LOGGER, PREFIX_HEADER, SUFFIX_HEADER class PacketQueue: @@ -57,31 +57,36 @@ async def wait_for_resume(self): class KocomClient: """Client for the Kocom Wallpad.""" - def __init__(self, connection: Connection, timeout = 1.2, max_retries = 3) -> None: + def __init__( + self, + connection: Connection, + timeout: float = 0.25, + max_retries = 3 + ) -> None: """Initialize the KocomClient.""" + self.connection = connection self.timeout = timeout self.max_retries = max_retries - self.connection = connection self.tasks: list[asyncio.Task] = [] - self.device_callbacks: list[Callable] = [] + self.device_callbacks: list[Callable[[dict], Awaitable[None]]] = [] self.packet_queue = PacketQueue() async def start(self) -> None: """Start the client.""" - _LOGGER.debug("Starting client...") + _LOGGER.debug("Starting Kocom Client...") self.tasks.append(asyncio.create_task(self._listen())) self.tasks.append(asyncio.create_task(self._process_queue())) async def stop(self) -> None: """Stop the client.""" - _LOGGER.debug("Stopping client...") + _LOGGER.debug("Stopping Kocom Client...") for task in self.tasks: task.cancel() self.tasks.clear() self.device_callbacks.clear() - def add_device_callback(self, callback: Callable) -> None: + def add_device_callback(self, callback: Callable[[dict], Awaitable[None]]) -> None: """Add callback for device updates.""" self.device_callbacks.append(callback) @@ -93,16 +98,16 @@ async def _listen(self) -> None: if receive_data is None: continue - packets = self.extract_packets(receive_data) - for packet in packets: + packet_list = self.extract_packets(receive_data) + for packet in packet_list: if not verify_checksum(packet): _LOGGER.debug("Checksum verification failed for packet: %s", packet.hex()) continue - parsed_packet = PacketParser.parse_state(packet.hex()) - for parse_packet in parsed_packet: + parsed_packets = PacketParser.parse_state(packet) + for parsed_packet in parsed_packets: for callback in self.device_callbacks: - await callback(parse_packet) + await callback(parsed_packet) except Exception as e: _LOGGER.error(f"Error receiving data: {e}", exc_info=True) @@ -112,18 +117,18 @@ def extract_packets(self, data: bytes) -> list[bytes]: start = 0 while start < len(data): - start_pos = data.find(b'\xaa\x55', start) + start_pos = data.find(PREFIX_HEADER, start) if start_pos == -1: break - end_pos = data.find(b'\x0d\x0d', start_pos + len(b'\xaa\x55')) + end_pos = data.find(SUFFIX_HEADER, start_pos + len(PREFIX_HEADER)) if end_pos == -1: break - packet = data[start_pos:end_pos + len(b'\x0d\x0d')] + packet = data[start_pos:end_pos + len(SUFFIX_HEADER)] packets.append(packet) - start = end_pos + len(b'\x0d\x0d') + start = end_pos + len(SUFFIX_HEADER) return packets @@ -150,6 +155,15 @@ async def _process_queue(self) -> None: else: _LOGGER.error(f"Max retries reached for packet: {packet.hex()}") - async def send(self, packet: bytes) -> None: + async def send_packet(self, packet: bytearray) -> None: """Send a packet to the device.""" + packet[:0] = PREFIX_HEADER + if (checksum := calculate_checksum(packet)) is None: + _LOGGER.error("Checksum calculation failed for packet: %s", packet.hex()) + return + packet.append(checksum) + packet.extend(SUFFIX_HEADER) + if not verify_checksum(packet): + _LOGGER.error("Checksum verification failed for packet: %s", packet.hex()) + return self.packet_queue.add_packet(packet) diff --git a/custom_components/kocom_wallpad/pywallpad/const.py b/custom_components/kocom_wallpad/pywallpad/const.py index fef8722..f989fdb 100644 --- a/custom_components/kocom_wallpad/pywallpad/const.py +++ b/custom_components/kocom_wallpad/pywallpad/const.py @@ -4,6 +4,9 @@ _LOGGER = logging.getLogger(__package__) +PREFIX_HEADER = b"\xaaU" +SUFFIX_HEADER = b"\r\r" + POWER = "power" BRIGHTNESS = "brightness" LEVEL = "level" @@ -21,6 +24,8 @@ VENT_MODE = "vent_mode" FAN_SPEED = "fan_speed" +PRESET_LIST = "preset_list" +SPEED_LIST = "speed_list" PM10 = "pm10" PM25 = "pm25" @@ -30,3 +35,4 @@ HUMIDITY = "humidity" TIME = "time" +DATE = "date" diff --git a/custom_components/kocom_wallpad/pywallpad/crc.py b/custom_components/kocom_wallpad/pywallpad/crc.py index 8c9f6ab..81fd979 100644 --- a/custom_components/kocom_wallpad/pywallpad/crc.py +++ b/custom_components/kocom_wallpad/pywallpad/crc.py @@ -1,7 +1,5 @@ """CRC calculation for py wallpad.""" -from typing import Optional - def crc_ccitt_xmodem(data: bytes) -> int: """Calculate CRC-CCITT (XMODEM) checksum.""" crc = 0x0000 @@ -28,7 +26,7 @@ def verify_crc(packet: bytes) -> bool: calculated_checksum = crc_ccitt_xmodem(data) return calculated_checksum == provided_checksum -def calculate_crc(packet: bytes) -> Optional[tuple[int, int]]: +def calculate_crc(packet: bytes) -> list[int, int] | None: """Calculate CRC for a packet.""" if len(packet) < 17: return None @@ -39,7 +37,7 @@ def calculate_crc(packet: bytes) -> Optional[tuple[int, int]]: # Append the 16-bit checksum (split into two bytes) checksum_high = (checksum >> 8) & 0xFF checksum_low = checksum & 0xFF - return checksum_high, checksum_low + return list(checksum_high, checksum_low) def verify_checksum(packet: bytes) -> bool: """Verify checksum for a packet.""" @@ -50,7 +48,7 @@ def verify_checksum(packet: bytes) -> bool: calculated_checksum = (data_sum + 1) % 256 return calculated_checksum == packet[18] -def calculate_checksum(packet: bytes) -> Optional[int]: +def calculate_checksum(packet: bytes) -> int | None: """Calculate checksum for a packet.""" if len(packet) < 17: return None diff --git a/custom_components/kocom_wallpad/pywallpad/enums.py b/custom_components/kocom_wallpad/pywallpad/enums.py index 661b039..5706c6a 100644 --- a/custom_components/kocom_wallpad/pywallpad/enums.py +++ b/custom_components/kocom_wallpad/pywallpad/enums.py @@ -1,66 +1,68 @@ """Enums for py wallpad.""" -from enum import Enum +from enum import IntEnum -class DeviceType(Enum): +class DeviceType(IntEnum): """Device types for Kocom devices.""" - WALLPAD = "01" - LIGHT = "0e" - GAS = "2c" - DOORLOCK = "33" - THERMOSTAT = "36" - AC = "39" - OUTLET = "3b" - EV = "44" - FAN = "48" - MOTION = "60" - IGNORE = "86" - IAQ = "98" + WALLPAD = 0x01 + LIGHT = 0x0E + GAS = 0x2C + DOORLOCK = 0x33 + THERMOSTAT = 0x36 + AC = 0x39 + OUTLET = 0x3B + EV = 0x44 + FAN = 0x48 + MOTION = 0x60 + IGNORE = 0x86 + IAQ = 0x98 -class Command(Enum): +class Command(IntEnum): """Commands for Kocom devices.""" - STATUS = "00" - ON = "01" - OFF = "02" - DETECT = "04" - SCAN = "3a" - -class PacketType(Enum): + STATUS = 0x00 + ON = 0x01 + OFF = 0x02 + DETECT = 0x04 + SCAN = 0x3A + +class PacketType(IntEnum): """Packet types for Kocom devices.""" - SEND = "b" - RECV = "d" - -class OpMode(Enum): + SEND = 0x0B + RECV = 0x0D + +class OpMode(IntEnum): """Operating modes for AC devices.""" - COOL = "00" - FAN_ONLY = "01" - DRY = "02" - AUTO = "03" + COOL = 0x00 + FAN_ONLY = 0x01 + DRY = 0x02 + AUTO = 0x03 -class FanMode(Enum): +class FanMode(IntEnum): """Fan modes for fans.""" - OFF = "00" - LOW = "01" - MEDIUM = "02" - HIGH = "03" + OFF = 0x00 + LOW = 0x01 + MEDIUM = 0x02 + HIGH = 0x03 -class VentMode(Enum): +class VentMode(IntEnum): """Ventilation modes for fans.""" - OFF = "00" - VENTILATION = "01" - AUTO = "02" - AIR_PURIFIER = "08" + NONE = 0x00 + VENTILATION = 0x01 + AUTO = 0x02 + BYPASS = 0x03 + NIGHT = 0x05 + AIR_PURIFIER = 0x08 -class FanSpeed(Enum): +class FanSpeed(IntEnum): """Fan speeds for fans.""" - OFF = "00" - LOW = "40" - MEDIUM = "80" - HIGH = "c0" - -class EvDirection(Enum): - """EV directions.""" - IDLE = "00" - DOWN = "01" - UP = "02" - ARRIVAL = "03" + OFF = 0x00 + LOW = 0x40 + MEDIUM = 0x80 + HIGH = 0xC0 + +class Direction(IntEnum): + """Direction for EV devices.""" + IDLE = 0x00 + DOWN = 0x01 + UP = 0x02 + ARRIVAL = 0x03 diff --git a/custom_components/kocom_wallpad/pywallpad/packet.py b/custom_components/kocom_wallpad/pywallpad/packet.py index 3f1593d..dc18fed 100644 --- a/custom_components/kocom_wallpad/pywallpad/packet.py +++ b/custom_components/kocom_wallpad/pywallpad/packet.py @@ -23,6 +23,8 @@ FAN_MODE, VENT_MODE, FAN_SPEED, + PRESET_LIST, + SPEED_LIST, PM10, PM25, CO2, @@ -31,7 +33,6 @@ HUMIDITY, TIME, ) -from .crc import verify_checksum, calculate_checksum from .enums import ( DeviceType, PacketType, @@ -40,7 +41,7 @@ FanMode, # AC VentMode, # Fan FanSpeed, # Fan - EvDirection, + Direction, # EV ) @@ -61,21 +62,22 @@ def __repr__(self) -> str: class KocomPacket: """Base class for Kocom packets.""" - def __init__(self, packet: list[str]) -> None: + def __init__(self, packet: bytes) -> None: """Initialize the packet.""" self.packet = packet - self.packet_type = PacketType(packet[3][0]) - self.sequence = packet[3][1] + self.packet_type = PacketType(packet[3] >> 4) + self.sequence = packet[3] & 0x0F self.dest = packet[5:7] self.src = packet[7:9] self.command = Command(packet[9]) self.value = packet[10:18] self.checksum = packet[18] + self._last_recv_time = time.time() self._device: Device = None def __repr__(self) -> str: """Return a string representation of the packet.""" - return f"KocomPacket(packet={self.packet}, packet_type={self.packet_type}, sequence={self.sequence}, dest={self.dest}, src={self.src}, command={self.command}, value={self.value}, checksum={self.checksum}, device={self._device})" + return f"KocomPacket(packet_type={self.packet_type}, sequence={self.sequence}, dest={self.dest}, src={self.src}, command={self.command}, value={self.value}, checksum={self.checksum})" @property def device_type(self) -> DeviceType: @@ -102,37 +104,18 @@ def parse_data(self) -> list[Device]: _LOGGER.warning("Parsing not implemented for %s", self.name) return [] - def make_packet(self, command: Command, values: dict[int, int] = None) -> bytearray: - """Generate a packet.""" - packet = bytearray([0xaa, 0x55, 0x30, 0xbc, 0x00]) - + def make_packet(self, command: Command, value_packet: bytearray) -> bytearray: + """Make a packet.""" + if self.value != bytes(value_packet): + _LOGGER.debug("Packet value changed from %s to %s", self.value.hex(), value_packet.hex()) + self.value = bytes(value_packet) + header = bytearray([0x30, 0xBC, 0x00]) if self.device_type == DeviceType.EV: - packet.extend(int(x, 16) for x in self.dest) - packet.extend(int(x, 16) for x in self.src) + address = self.dest + self.src else: - packet.extend(int(x, 16) for x in self.src) - packet.extend(int(x, 16) for x in self.dest) - - packet.append(int(command.value, 16)) - packet.extend([0x00] * 8) - - if values: - for index, val in values.items(): - packet[10 + index] = val - - checksum = calculate_checksum(packet) - if checksum is None: - _LOGGER.error(f"Checksum calculation failed: {packet.hex()}") - return bytearray() - - packet.append(checksum) - packet.extend([0x0d, 0x0d]) - - if not verify_checksum(packet): - _LOGGER.error(f"Packet checksum verification failed: {packet.hex()}") - return bytearray() - - return packet + address = self.src + self.dest + command_byte = bytearray([command.value]) + return header + bytearray(address) + command_byte + value_packet class LightPacket(KocomPacket): @@ -148,8 +131,8 @@ def parse_data(self) -> list[Device]: self.valid_light_indices[self.room_id] = {} for i, value in enumerate(self.value[:8]): - power_state = value == "ff" - brightness = int(value, 16) if value != "ff" else 0 + power_state = value == 0xFF + brightness = value if not power_state else 0 if power_state and i not in self.valid_light_indices[self.room_id]: self.valid_light_indices[self.room_id][i] = [] @@ -170,7 +153,7 @@ def parse_data(self) -> list[Device]: has_brightness = bool(self.valid_light_indices[self.room_id][i]) if has_brightness: - state[POWER] = int(value, 16) > 0 + state[POWER] = value > 0 state[BRIGHTNESS] = brightness state[LEVEL] = self.valid_light_indices[self.room_id][i] @@ -185,23 +168,23 @@ def parse_data(self) -> list[Device]: return devices - def make_scan(self) -> None: + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, **kwargs) -> None: - """Make a status packet.""" - values = {} - control_mode, control = next(iter(kwargs.items())) - sub_id = self._device.sub_id - if control_mode == POWER and sub_id: - values[int(sub_id)] = 0xff if control else 0x00 - elif control_mode == BRIGHTNESS and sub_id: - values[int(sub_id)] = control - else: - raise ValueError(f"Invalid control mode: {control_mode}") - - return super().make_packet(Command.STATUS, values) + return super().make_packet(Command.SCAN, bytearray(self.value)) + + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + sub_id = int(self._device.sub_id) + value = bytearray(self.value) + value[sub_id] = 0xFF if power else 0x00 + return super().make_packet(Command.STATUS, value) + + def make_brightness_status(self, brightness: int) -> bytearray: + """Make a brightness status packet.""" + sub_id = int(self._device.sub_id) + value = bytearray(self.value) + value[sub_id] = brightness + return super().make_packet(Command.STATUS, value) class OutletPacket(KocomPacket): @@ -217,7 +200,7 @@ def parse_data(self) -> list[Device]: self.valid_outlet_indices[self.room_id] = [] for i, value in enumerate(self.value[:8]): - power_state = value == "ff" + power_state = value == 0xFF if power_state and i not in self.valid_outlet_indices[self.room_id]: self.valid_outlet_indices[self.room_id].append(i) @@ -237,21 +220,17 @@ def parse_data(self) -> list[Device]: devices.append(device) return devices - - def make_scan(self) -> None: + + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, **kwargs) -> None: - """Make a status packet.""" - values = {} - control_mode, control = next(iter(kwargs.items())) - if control_mode == POWER and (sub_id := self._device.sub_id): - values[int(sub_id)] = 0xff if control else 0x00 - else: - raise ValueError(f"Invalid control mode: {control_mode}") - - return super().make_packet(Command.STATUS, values) + return super().make_packet(Command.SCAN, bytearray(self.value)) + + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + sub_id = int(self._device.sub_id) + value = bytearray(self.value) + value[sub_id] = 0xFF if power else 0x00 + return super().make_packet(Command.STATUS, value) class ThermostatPacket(KocomPacket): @@ -261,15 +240,15 @@ def parse_data(self) -> list[Device]: """Parse thermostat-specific data.""" devices: list[Device] = [] - power_state = self.value[0][0] == "1" - hotwater_state = self.value[0][1] == "2" - away_mode = self.value[1][1] == "1" - target_temp = int(self.value[2], 16) - current_temp = int(self.value[4], 16) - hotwater_temp = int(self.value[3], 16) - heatwater_temp = int(self.value[5], 16) - error_code = int(self.value[6], 16) - boiler_error = int(self.value[7], 16) + power_state = self.value[0] >> 4 == 1 + hotwater_state = self.value[0] & 0x0F == 2 + away_mode = self.value[1] & 0x0F == 1 + target_temp = self.value[2] + current_temp = self.value[4] + hotwater_temp = self.value[3] + heatwater_temp = self.value[5] + error_code = self.value[6] + boiler_error = self.value[7] devices.append( Device( @@ -285,7 +264,7 @@ def parse_data(self) -> list[Device]: ) ) - if self.room_id == "00": + if self.room_id == '0': devices.append( Device( device_type=self.name, @@ -303,7 +282,7 @@ def parse_data(self) -> list[Device]: ) ) - if hotwater_state and self.room_id == "00": + if hotwater_state and self.room_id == '0': _LOGGER.debug(f"Supports hot water in thermostat.") if hotwater_temp > 0: @@ -331,26 +310,29 @@ def parse_data(self) -> list[Device]: return devices - def make_scan(self) -> None: + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, **kwargs) -> None: - """Make a status packet.""" - values = {} - control_mode, control = next(iter(kwargs.items())) - if control_mode == POWER: - values[0] = 0x11 if control else 0x01 - values[1] = 0x00 - elif control_mode == AWAY_MODE: - values[0] = 0x11 - values[1] = 0x01 if control else 0x00 - elif control_mode == TARGET_TEMP: - values[2] = int(control) - else: - raise ValueError(f"Unsupported control mode: {control_mode}") - - return super().make_packet(Command.STATUS, values) + return super().make_packet(Command.SCAN, bytearray(self.value)) + + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + value = bytearray(self.value) + value[0] = 0x11 if power else 0x01 + value[1] = 0x00 + return super().make_packet(Command.STATUS, value) + + def make_away_status(self, away_mode: bool) -> bytearray: + """Make an away status packet.""" + value = bytearray(self.value) + value[0] = 0x11 + value[1] = 0x01 if away_mode else 0x00 + return super().make_packet(Command.STATUS, value) + + def make_target_temp(self, target_temp: int) -> bytearray: + """Make a target temperature packet.""" + value = bytearray(self.value) + value[2] = target_temp + return super().make_packet(Command.STATUS, value) class AcPacket(KocomPacket): @@ -358,11 +340,11 @@ class AcPacket(KocomPacket): def parse_data(self) -> list[Device]: """Parse AC-specific data.""" - power_state = self.value[0] == "10" + power_state = self.value[0] == 0x10 op_mode = OpMode(self.value[1]) fan_mode = FanMode(self.value[2]) - current_temp = int(self.value[4], 16) - target_temp = int(self.value[5], 16) + current_temp = self.value[4] + target_temp = self.value[5] device = Device( device_type=self.name, @@ -378,49 +360,66 @@ def parse_data(self) -> list[Device]: ) return [device] - def make_scan(self) -> None: + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, **kwargs) -> None: - """Make a status packet.""" - values = {} - control_mode, control = next(iter(kwargs.items())) - if control_mode == POWER: - values[0] = 0x10 if control else 0x00 - elif control_mode == OP_MODE: - values[1] = int(control.value, 16) - elif control_mode == FAN_MODE: - values[2] = int(control.value, 16) - elif control_mode == TARGET_TEMP: - values[5] = int(control) - else: - raise ValueError(f"Unsupported control mode: {control_mode}") - - return super().make_packet(Command.STATUS, values) - + return super().make_packet(Command.SCAN, bytearray(self.value)) + + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + value = bytearray(self.value) + value[0] = 0x10 if power else 0x00 + return super().make_packet(Command.STATUS, value) + + def make_op_mode(self, op_mode: OpMode) -> bytearray: + """Make an operation mode packet.""" + value = bytearray(self.value) + value[0] = 0x10 + value[1] = op_mode.value + return super().make_packet(Command.STATUS, value) + + def make_fan_mode(self, fan_mode: FanMode) -> bytearray: + """Make a fan mode packet.""" + value = bytearray(self.value) + value[2] = fan_mode.value + return super().make_packet(Command.STATUS, value) + + def make_target_temp(self, target_temp: int) -> bytearray: + """Make a target temperature packet.""" + value = bytearray(self.value) + value[5] = target_temp + return super().make_packet(Command.STATUS, value) + class FanPacket(KocomPacket): """Handles packets for fan devices.""" - co2_supported = False - + co2_sensor = False + def parse_data(self) -> list[Device]: """Parse fan-specific data.""" devices: list[Device] = [] - power_state = self.value[0][0] == "1" - co2_sensor = self.value[0][1] == "1" + power_state = self.value[0] >> 4 == 1 + self.co2_sensor = self.value[0] & 0x0F == 1 vent_mode = VentMode(self.value[1]) fan_speed = FanSpeed(self.value[2]) - co2_state = int(''.join(self.value[4:6]), 16) - error_code = int(self.value[6], 16) + co2_state = (self.value[4] * 100) + self.value[5] + error_code = self.value[6] + preset_list = list(VentMode.__members__.keys()) + speed_list = list(FanSpeed.__members__.keys()) + devices.append( Device( device_type=self.name, room_id=self.room_id, device_id=self.device_id, - state={POWER: power_state, VENT_MODE: vent_mode, FAN_SPEED: fan_speed} + state={ + POWER: power_state, + VENT_MODE: vent_mode, + FAN_SPEED: fan_speed, + PRESET_LIST: preset_list, + SPEED_LIST: speed_list, + } ) ) devices.append( @@ -432,9 +431,8 @@ def parse_data(self) -> list[Device]: ) ) - if co2_sensor or self.co2_supported: + if self.co2_sensor: _LOGGER.debug(f"Supports CO2 sensor in fan.") - self.co2_supported = True devices.append( Device( device_type=self.name, @@ -446,29 +444,31 @@ def parse_data(self) -> list[Device]: ) return devices - - def make_scan(self) -> None: + + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, **kwargs) -> None: - """Make a status packet.""" - values = {} - control_mode, control = next(iter(kwargs.items())) - if control_mode == POWER: - values[0] = 0x11 if control else 0x00 - elif control_mode == VENT_MODE: - values[0] = 0x11 - values[1] = int(control.value, 16) - elif control_mode == FAN_SPEED: - values[0] = 0x00 if control == FanSpeed.OFF else 0x11 - values[1] = 0x01 - values[2] = int(control.value, 16) - else: - raise ValueError(f"Unsupported control mode: {control_mode}") - - return super().make_packet(Command.STATUS, values) + return super().make_packet(Command.SCAN, bytearray(self.value)) + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + value = bytearray(self.value) + value[0] = 0x11 if power else 0x00 + return super().make_packet(Command.STATUS, value) + + def make_vent_mode(self, vent_mode: VentMode) -> bytearray: + """Make a vent mode packet.""" + value = bytearray(self.value) + value[0] = 0x00 if vent_mode == VentMode.NONE else 0x11 + value[1] = vent_mode.value + return super().make_packet(Command.STATUS, value) + + def make_fan_speed(self, fan_speed: FanSpeed) -> bytearray: + """Make a fan speed packet.""" + value = bytearray(self.value) + value[0] = 0x00 if fan_speed == FanSpeed.OFF else 0x11 + value[2] = fan_speed.value + return super().make_packet(Command.STATUS, value) + class IAQPacket(KocomPacket): """Handles packets for IAQ devices.""" @@ -478,12 +478,12 @@ def parse_data(self) -> list[Device]: devices: list[Device] = [] sensor_mapping = { - PM10: int(self.value[0], 16), - PM25: int(self.value[1], 16), - CO2: int(''.join(self.value[2:4]), 16), - VOC: int(''.join(self.value[4:6]), 16), - TEMPERATURE: int(self.value[6], 16), - HUMIDITY: int(self.value[7], 16), + PM10: self.value[0], + PM25: self.value[1], + CO2: int.from_bytes(self.value[2:4], 'big'), + VOC: int.from_bytes(self.value[4:6], 'big'), + TEMPERATURE: self.value[6], + HUMIDITY: self.value[7], } for sensor_id, state in sensor_mapping.items(): @@ -498,6 +498,10 @@ def parse_data(self) -> list[Device]: devices.append(device) return devices + + def make_scan(self) -> bytearray: + """Make a scan packet.""" + return super().make_packet(Command.SCAN, bytearray(self.value)) class GasPacket(KocomPacket): @@ -505,25 +509,24 @@ class GasPacket(KocomPacket): def parse_data(self) -> list[Device]: """Parse gas-specific data.""" - power_state = self.command == Command.ON device = Device( device_type=self.name, room_id=self.room_id, device_id=self.device_id, - state={POWER: power_state}, + state={POWER: self.command == Command.ON}, ) return [device] - def make_scan(self) -> None: + def make_scan(self) -> bytearray: """Make a scan packet.""" - return super().make_packet(Command.SCAN) + return super().make_packet(Command.SCAN, bytearray(self.value)) - def make_status(self, power: bool) -> None: - """Make a status packet.""" - if not power: - return super().make_packet(Command.OFF) - else: - _LOGGER.debug("Supports gas valve lock only.") + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + if power: + _LOGGER.debug(f"Gas device is on. Ignoring power status.") + return + return super().make_packet(Command.OFF, bytearray(self.value)) class MotionPacket(KocomPacket): @@ -531,35 +534,28 @@ class MotionPacket(KocomPacket): def parse_data(self) -> list[Device]: """Parse motion-specific data.""" - detect_state = self.command == Command.DETECT - detect_time = time.time() device = Device( device_type=self.name, room_id=self.room_id, device_id=self.device_id, - state={STATE: detect_state, TIME: detect_time}, + state={STATE: self.command == Command.DETECT, TIME: time.time()}, ) return [device] class EvPacket(KocomPacket): """Handles packets for EV devices.""" - floor_supported = False def parse_data(self) -> list[Device]: """Parse EV-specific data.""" devices: list[Device] = [] - power_state = False - ev_direction = EvDirection(self.value[0]) - ev_floor = "" - devices.append( Device( device_type=self.name, room_id=self.room_id, device_id=self.device_id, - state={POWER: power_state}, + state={POWER: False}, ) ) devices.append( @@ -567,55 +563,53 @@ def parse_data(self) -> list[Device]: device_type=self.name, room_id=self.room_id, device_id=self.device_id, - state={STATE: ev_direction.name}, + state={STATE: Direction(self.value[0]).name}, sub_id="direction", ) ) return devices + + def make_power_status(self, power: bool) -> bytearray: + """Make a power status packet.""" + if not power: + _LOGGER.debug(f"EV device is off. Ignoring power status.") + return + return super().make_packet(Command.ON, bytearray(self.value)) - def make_scan(self) -> None: - """Make a scan packet.""" - return super().make_packet(Command.SCAN) - - def make_status(self, power: bool) -> None: - """Make a status packet.""" - if power: - return super().make_packet(Command.ON) - else: - _LOGGER.debug("Supports EV call only.") - class PacketParser: - """Parses raw Kocom packets into specific classes.""" + """Parses raw Kocom packets into specific device classes.""" @staticmethod - def parse(packet: str) -> KocomPacket: - """Parse a raw packet.""" - packet_list = [packet[i:i + 2] for i in range(0, len(packet), 2)] - device_type = packet_list[7] - return PacketParser.get_packet_class(device_type, packet_list) - + def parse(packet_data: bytes) -> KocomPacket: + """Parse a raw packet into a specific packet class.""" + device_type = packet_data[7] + return PacketParser._get_packet_instance(device_type, packet_data) + @staticmethod - def parse_state(packet: str) -> list[KocomPacket]: - """Parse the state from a packet.""" - parsed_packet = PacketParser.parse(packet) - if parsed_packet.device_type == DeviceType.WALLPAD: + def parse_state(packet: bytes) -> list[KocomPacket]: + """Parse device states from a packet.""" + base_packet = PacketParser.parse(packet) + + # Skip unsupported packet types + if base_packet.device_type == DeviceType.WALLPAD: return [] - if parsed_packet.packet_type == PacketType.RECV and parsed_packet.command == Command.SCAN: + if base_packet.packet_type == PacketType.RECV and base_packet.command == Command.SCAN: return [] + + # Generate packets for each device + parsed_packets: list[KocomPacket] = [] + for device_data in base_packet.parse_data(): + device_packet = deepcopy(base_packet) + device_packet._device = device_data + parsed_packets.append(device_packet) - packets: list[KocomPacket] = [] - for device in parsed_packet.parse_data(): - new_packet = deepcopy(parsed_packet) - new_packet._device = device - packets.append(new_packet) - - return packets - + return parsed_packets + @staticmethod - def get_packet_class(device_type: str, packet_list: list[str]) -> KocomPacket: - """Return the appropriate packet class based on device type.""" - packet_class_map = { + def _get_packet_instance(device_type: int, packet_data: bytes) -> KocomPacket: + """Retrieve the appropriate packet class based on device type.""" + device_class_map = { DeviceType.LIGHT.value: LightPacket, DeviceType.OUTLET.value: OutletPacket, DeviceType.THERMOSTAT.value: ThermostatPacket, @@ -627,10 +621,10 @@ def get_packet_class(device_type: str, packet_list: list[str]) -> KocomPacket: DeviceType.EV.value: EvPacket, DeviceType.WALLPAD.value: KocomPacket, } - packet_class = packet_class_map.get(device_type) + + packet_class = device_class_map.get(device_type) if packet_class is None: - _LOGGER.error("Unknown device type: %s, %s", device_type, packet_list) - return KocomPacket(packet_list) + _LOGGER.error("Unknown device type: %s, data: %s", format(device_type, 'x'), packet_data.hex()) + return KocomPacket(packet_data) - return packet_class(packet_list) - \ No newline at end of file + return packet_class(packet_data) diff --git a/custom_components/kocom_wallpad/sensor.py b/custom_components/kocom_wallpad/sensor.py index daedded..7170f39 100644 --- a/custom_components/kocom_wallpad/sensor.py +++ b/custom_components/kocom_wallpad/sensor.py @@ -30,6 +30,7 @@ TEMPERATURE, HUMIDITY, ) +from .pywallpad.enums import DeviceType from .pywallpad.packet import ( KocomPacket, FanPacket, @@ -53,12 +54,8 @@ async def async_setup_entry( @callback def async_add_sensor(packet: KocomPacket) -> None: """Add new sensor entity.""" - if isinstance(packet, (FanPacket, IAQPacket)): + if isinstance(packet, (FanPacket, IAQPacket, EvPacket)): async_add_entities([KocomSensorEntity(gateway, packet)]) - elif isinstance(packet, EvPacket): - async_add_entities([KocomEvEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.SENSOR): async_add_sensor(entity) @@ -79,6 +76,9 @@ def __init__( """Initialize the sensor.""" super().__init__(gateway, packet) + if self.packet.device_type != DeviceType.EV: + self._attr_state_class = SensorStateClass.MEASUREMENT + @property def native_value(self) -> int: """Return the state of the sensor.""" @@ -117,26 +117,3 @@ def native_unit_of_measurement(self) -> str | None: elif self.device.sub_id == HUMIDITY: return PERCENTAGE return None - - @property - def state_class(self) -> SensorStateClass: - """Return the state class of the sensor.""" - return SensorStateClass.MEASUREMENT - - -class KocomEvEntity(KocomEntity, SensorEntity): - """Representation of a Kocom EV sensor.""" - - def __init__( - self, - gateway: KocomGateway, - packet: KocomPacket, - ) -> None: - """Initialize the sensor.""" - super().__init__(gateway, packet) - - @property - def native_value(self) -> int: - """Return the state of the sensor.""" - return self.device.state[STATE] - \ No newline at end of file diff --git a/custom_components/kocom_wallpad/switch.py b/custom_components/kocom_wallpad/switch.py index 8f28810..f1109a6 100644 --- a/custom_components/kocom_wallpad/switch.py +++ b/custom_components/kocom_wallpad/switch.py @@ -39,8 +39,6 @@ def async_add_switch(packet: KocomPacket) -> None: """Add new switch entity.""" if isinstance(packet, (OutletPacket, GasPacket, EvPacket)): async_add_entities([KocomSwitchEntity(gateway, packet)]) - else: - LOGGER.warning(f"Unsupported packet type: {packet}") for entity in gateway.get_entities(Platform.SWITCH): async_add_switch(entity) @@ -73,10 +71,10 @@ def is_on(self) -> bool: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on switch.""" - packet = self.packet.make_status(power=True) - await self.send(packet) + make_packet = self.packet.make_power_status(True) + await self.send_packet(make_packet) async def async_turn_off(self, **kwargs: Any) -> None: """Turn off switch.""" - packet = self.packet.make_status(power=False) - await self.send(packet) + make_packet = self.packet.make_power_status(False) + await self.send_packet(make_packet) diff --git a/custom_components/kocom_wallpad/translations/en.json b/custom_components/kocom_wallpad/translations/en.json index 5aeb21f..ae2894c 100644 --- a/custom_components/kocom_wallpad/translations/en.json +++ b/custom_components/kocom_wallpad/translations/en.json @@ -1,5 +1,5 @@ { - "title": "Kocom WallPad", + "title": "Kocom Wallpad", "config": { "step": { "user": { diff --git a/custom_components/kocom_wallpad/util.py b/custom_components/kocom_wallpad/util.py index 00aaddf..9adae1b 100644 --- a/custom_components/kocom_wallpad/util.py +++ b/custom_components/kocom_wallpad/util.py @@ -2,8 +2,19 @@ from __future__ import annotations +import base64 +import json + def create_dev_id( device_type: str, room_id: str | None, sub_id: str | None ) -> str: """Create a device ID.""" return "_".join(filter(None, [device_type, room_id, sub_id])) + +def encode_bytes_to_base64(data: bytes) -> str: + """Encode bytes to Base64 string.""" + return base64.b64encode(data).decode("utf-8") + +def decode_base64_to_bytes(data: str) -> bytes: + """Decode Base64 string to bytes.""" + return base64.b64decode(data) diff --git a/hacs.json b/hacs.json index 4175409..4dd3367 100644 --- a/hacs.json +++ b/hacs.json @@ -1,5 +1,5 @@ { - "name": "Kocom WallPad", + "name": "Kocom Wallpad", "render_readme": true, "homeassistant": "2024.8.1" }