diff --git a/custom_components/bticino_x8000/__init__.py b/custom_components/bticino_x8000/__init__.py index 69c144c..605533f 100644 --- a/custom_components/bticino_x8000/__init__.py +++ b/custom_components/bticino_x8000/__init__.py @@ -1,4 +1,5 @@ -from datetime import timedelta # noqa: D104 +"""Init.""" +from datetime import timedelta import logging from homeassistant.config_entries import ConfigEntry @@ -23,22 +24,23 @@ async def async_setup_entry( """Set up the Bticino_X8000 component.""" data = dict(config_entry.data) - async def update_token(now: None) -> None: - _LOGGER.debug("Refreshing access token") + async def update_token(now: dt_util.dt.datetime) -> None: + _LOGGER.debug("Refreshing access token: %s", now) ( access_token, refresh_token, access_token_expires_on, ) = await refresh_access_token(data) - data["access_token"] = access_token data["refresh_token"] = refresh_token data["access_token_expires_on"] = dt_util.as_utc(access_token_expires_on) hass.config_entries.async_update_entry(config_entry, data=data) - update_interval = timedelta(minutes=2) + update_interval = timedelta(minutes=60) async_track_time_interval(hass, update_token, update_interval) - await update_token(dt_util.as_timestamp(dt_util.utcnow())) + hass.async_add_job(update_token(None)) + await update_token(None) + for plant_data in data["selected_thermostats"]: plant_data = list(plant_data.values())[0] webhook_id = plant_data.get("webhook_id") @@ -61,7 +63,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> plant_data = list(plant_data.values())[0] webhook_id = plant_data.get("webhook_id") subscription_id = plant_data.get("subscription_id") - response = await bticino_api.delete_subscribe_C2C_notifications( + response = await bticino_api.delete_subscribe_c2c_notifications( plant_id, subscription_id ) if response["status_code"] == 200: diff --git a/custom_components/bticino_x8000/api.py b/custom_components/bticino_x8000/api.py index 301849c..5c7b03c 100644 --- a/custom_components/bticino_x8000/api.py +++ b/custom_components/bticino_x8000/api.py @@ -54,9 +54,9 @@ async def check_api_endpoint_health(self) -> bool: self.header, ) return True - else: + if status_code == 401: _LOGGER.warning( - "The endpoint API is unhealthy. Attempt to update token. HTTP %s, Content: %s, data: %s", + "Attempt to update token. HTTP %s, Content: %s, data: %s", status_code, content, self.data, @@ -67,12 +67,12 @@ async def check_api_endpoint_health(self) -> bool: return await self.check_api_endpoint_health() return False - except Exception as e: + except aiohttp.ClientError as e: _LOGGER.error( "The endpoint API is unhealthy. Attempt to update token. Error: %s", e, ) - return False + return False async def handle_unauthorized_error(self, response: aiohttp.ClientResponse) -> bool: """Head off 401 Unauthorized.""" @@ -82,8 +82,8 @@ async def handle_unauthorized_error(self, response: aiohttp.ClientResponse) -> b _LOGGER.warning("Received 401 Unauthorized error. Attempting token refresh") ( access_token, - refresh_token, - access_token_expires_on, + _, + _, ) = await refresh_access_token(self.data) self.header = { "Authorization": access_token, @@ -91,8 +91,7 @@ async def handle_unauthorized_error(self, response: aiohttp.ClientResponse) -> b "Content-Type": "application/json", } return True - else: - return False + return False async def get_plants(self) -> dict[str, Any]: """Retrieve thermostat plants.""" @@ -108,24 +107,29 @@ async def get_plants(self) -> dict[str, Any]: "status_code": status_code, "data": json.loads(content)["plants"], } - else: + if status_code == 401: # Retry the request on 401 Unauthorized if await self.handle_unauthorized_error(response): # Retry the original request return await self.get_plants() - return { - "status_code": status_code, - "error": f"Errore nella richiesta di get_plants. Content: {content}, URL: {url}, HEADER: {self.header}", - } - except Exception as e: + return { + "status_code": status_code, + "error": ( + f"Failed get_plants. " + f"Content: {content}, " + f"URL: {url}, " + f"HEADER: {self.header}" + ), + } + except aiohttp.ClientError as e: return { "status_code": 500, - "error": f"Errore nella richiesta di get_plants: {e}", + "error": f"Failed get_plants: {e}", } - async def get_topology(self, plantId: str) -> dict[str, Any]: + async def get_topology(self, plant_id: str) -> dict[str, Any]: """Retrieve thermostat topology.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}{PLANTS}/{plantId}{TOPOLOGY}" + url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}{PLANTS}/{plant_id}{TOPOLOGY}" async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.header) as response: @@ -137,26 +141,30 @@ async def get_topology(self, plantId: str) -> dict[str, Any]: "status_code": status_code, "data": json.loads(content)["plant"]["modules"], } - else: + if status_code == 401: # Retry the request on 401 Unauthorized if await self.handle_unauthorized_error(response): # Retry the original request - return await self.get_topology(plantId) - return { - "status_code": status_code, - "error": f"Failed to get topology: Content: {content}, HEADEr: {self.header}, URL: {url}", - } - except Exception as e: + return await self.get_topology(plant_id) + return { + "status_code": status_code, + "error": "Failed to get topology.", + } + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Failed to get topology: {e}", } async def set_chronothermostat_status( - self, plantId: str, moduleId: str, data: dict[str, Any] + self, plant_id: str, module_id: str, data: dict[str, Any] ) -> dict[str, Any]: """Set thermostat status.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/addressLocation{PLANTS}/{plantId}/modules/parameter/id/value/{moduleId}" + url = ( + f"{DEFAULT_API_BASE_URL}" + f"{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/" + f"addressLocation{PLANTS}/{plant_id}/modules/parameter/id/value/{module_id}" + ) async with aiohttp.ClientSession() as session: try: async with session.post( @@ -165,97 +173,114 @@ async def set_chronothermostat_status( status_code = response.status content = await response.text() - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.set_chronothermostat_status( - plantId, moduleId, data - ) + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.set_chronothermostat_status( + plant_id, module_id, data + ) return {"status_code": status_code, "text": content} - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, - "error": f"Errore nella richiesta di set_chronothermostat_status: {e}", + "error": ( + f"Errore nella richiesta di set_chronothermostat_status: " + f"{e}" + ), } async def get_chronothermostat_status( - self, plantId: str, moduleId: str + self, plant_id: str, module_id: str ) -> dict[str, Any]: """Get thermostat status.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/addressLocation{PLANTS}/{plantId}/modules/parameter/id/value/{moduleId}" + url = ( + f"{DEFAULT_API_BASE_URL}" + f"{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/" + f"addressLocation{PLANTS}/{plant_id}/modules/parameter/id/value/{module_id}" + ) async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.header) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.get_chronothermostat_status(plantId, moduleId) - + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.get_chronothermostat_status( + plant_id, module_id + ) return {"status_code": status_code, "data": json.loads(content)} - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di get_chronothermostat_status: {e}", } async def get_chronothermostat_measures( - self, plantId: str, moduleId: str + self, plant_id: str, module_id: str ) -> dict[str, Any]: """Get thermostat measures.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/addressLocation{PLANTS}/{plantId}/modules/parameter/id/value/{moduleId}/measures" + url = ( + f"{DEFAULT_API_BASE_URL}" + f"{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/" + f"addressLocation{PLANTS}/{plant_id}/modules/parameter/id/value/{module_id}/measures" + ) async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.header) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.get_chronothermostat_measures( - plantId, moduleId - ) - + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.get_chronothermostat_measures( + plant_id, module_id + ) return {"status_code": status_code, "data": json.loads(content)} - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di get_chronothermostat_measures: {e}", } async def get_chronothermostat_programlist( - self, plantId: str, moduleId: str + self, plant_id: str, module_id: str ) -> dict[str, Any]: """Get thermostat programlist.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/addressLocation{PLANTS}/{plantId}/modules/parameter/id/value/{moduleId}/programlist" + url = ( + f"{DEFAULT_API_BASE_URL}" + f"{THERMOSTAT_API_ENDPOINT}/chronothermostat/thermoregulation/" + f"addressLocation{PLANTS}/{plant_id}/modules/parameter/id/value/{module_id}/" + f"programlist" + ) async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=self.header) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.get_chronothermostat_programlist( - plantId, moduleId - ) + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.get_chronothermostat_programlist( + plant_id, module_id + ) return { "status_code": status_code, "data": json.loads(content)["chronothermostats"][0]["programs"], } - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di get_chronothermostat_programlist: {e}", } - async def get_subscriptions_C2C_notifications(self) -> dict[str, Any]: + async def get_subscriptions_c2c_notifications(self) -> dict[str, Any]: """Get C2C subscriptions.""" url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}/subscription" async with aiohttp.ClientSession() as session: @@ -263,27 +288,27 @@ async def get_subscriptions_C2C_notifications(self) -> dict[str, Any]: async with session.get(url, headers=self.header) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.get_subscriptions_C2C_notifications() + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.get_subscriptions_c2c_notifications() return { "status_code": status_code, "data": json.loads(content) if status_code == 200 else content, } - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di get_subscriptions_C2C_notifications: {e}", } - async def set_subscribe_C2C_notifications( - self, plantId: str, data: dict[str, Any] + async def set_subscribe_c2c_notifications( + self, plant_id: str, data: dict[str, Any] ) -> dict[str, Any]: """Add C2C subscriptions.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}{PLANTS}/{plantId}/subscription" + url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}{PLANTS}/{plant_id}/subscription" async with aiohttp.ClientSession() as session: try: async with session.post( @@ -291,39 +316,46 @@ async def set_subscribe_C2C_notifications( ) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.set_subscribe_C2C_notifications(plantId, data) + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.set_subscribe_c2c_notifications( + plant_id, data + ) return {"status_code": status_code, "text": json.loads(content)} - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di set_subscribe_C2C_notifications: {e}", } - async def delete_subscribe_C2C_notifications( - self, plantId: str, subscriptionId: str + async def delete_subscribe_c2c_notifications( + self, plant_id: str, subscription_id: str ) -> dict[str, Any]: """Remove C2C subscriptions.""" - url = f"{DEFAULT_API_BASE_URL}{THERMOSTAT_API_ENDPOINT}{PLANTS}/{plantId}/subscription/{subscriptionId}" + url = ( + f"{DEFAULT_API_BASE_URL}" + f"{THERMOSTAT_API_ENDPOINT}" + f"{PLANTS}/{plant_id}/subscription/{subscription_id}" + ) + async with aiohttp.ClientSession() as session: try: async with session.delete(url, headers=self.header) as response: status_code = response.status content = await response.text() - - # Retry the request on 401 Unauthorized - if await self.handle_unauthorized_error(response): - # Retry the original request - return await self.delete_subscribe_C2C_notifications( - plantId, subscriptionId - ) + if status_code == 401: + # Retry the request on 401 Unauthorized + if await self.handle_unauthorized_error(response): + # Retry the original request + return await self.delete_subscribe_c2c_notifications( + plant_id, subscription_id + ) return {"status_code": status_code, "text": content} - except Exception as e: + except aiohttp.ClientError as e: return { "status_code": 500, "error": f"Errore nella richiesta di delete_subscribe_C2C_notifications: {e}", diff --git a/custom_components/bticino_x8000/auth.py b/custom_components/bticino_x8000/auth.py index 178b16f..ecc8c50 100644 --- a/custom_components/bticino_x8000/auth.py +++ b/custom_components/bticino_x8000/auth.py @@ -1,3 +1,4 @@ +"""Auth.""" from datetime import timedelta # noqa: D100 import logging from typing import Any @@ -12,7 +13,7 @@ async def exchange_code_for_tokens( - client_id: str, client_secret: str, redirect_uri: str, code: str + client_id: str, client_secret: str, code: str ) -> tuple[str, str, str]: """Get access token.""" token_url = f"{DEFAULT_AUTH_BASE_URL}{AUTH_REQ_ENDPOINT}" diff --git a/custom_components/bticino_x8000/climate.py b/custom_components/bticino_x8000/climate.py index a75d8fb..3504872 100644 --- a/custom_components/bticino_x8000/climate.py +++ b/custom_components/bticino_x8000/climate.py @@ -1,24 +1,18 @@ """Climate.""" -from datetime import datetime, timedelta +from datetime import timedelta import logging from typing import Any import voluptuous as vol from homeassistant.components.climate import ( - ATTR_PRESET_MODE, - DEFAULT_MIN_TEMP, - PRESET_AWAY, - PRESET_BOOST, - PRESET_HOME, - PRESET_NONE, ClimateEntity, ClimateEntityFeature, HVACAction, HVACMode, ) from homeassistant.config_entries import ConfigEntry -from homeassistant.const import ATTR_TEMPERATURE, PRECISION_HALVES, UnitOfTemperature +from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import config_validation as cv, entity_platform from homeassistant.helpers.dispatcher import async_dispatcher_connect @@ -45,10 +39,12 @@ DEFAULT_MAX_TEMP = 40 DEFAULT_MIN_TEMP = 7 BOOST_MODES = ["heating", "cooling"] +PRECISION_HALVES = 0.1 +# pylint: disable=R0902 class BticinoX8000ClimateEntity(ClimateEntity): # type:ignore - """Representation of a Bticino X8000 Climate entity.""" + """Bticino X8000 Climate entity.""" _attr_supported_features = SUPPORT_FLAGS _attr_temperature_unit = UnitOfTemperature.CELSIUS @@ -56,16 +52,12 @@ class BticinoX8000ClimateEntity(ClimateEntity): # type:ignore _attr_hvac_mode = HVACMode.AUTO _attr_max_temp = DEFAULT_MAX_TEMP _attr_min_temp = DEFAULT_MIN_TEMP - _attr_target_temperature_step = 0.1 _custom_attributes: dict[str, Any] = {} def __init__( self, data: dict[str, Any], - plant_id: str, - topology_id: str, - thermostat_name: str, - programs: list[dict[str, Any]], + config: dict[str, Any], ) -> None: """Init.""" self._attr_hvac_modes = [ @@ -80,19 +72,19 @@ def __init__( HVACAction.OFF, ] self._bticino_api = BticinoX8000Api(data) - self._plant_id = plant_id - self._topology_id = topology_id - self._programs_name = programs + self._plant_id: str = config["plant_id"] + self._topology_id: str = config["topology_id"] + self._programs_name = config["programs"] self._program_number: list[dict[str, Any]] = [] - self._name = thermostat_name + self._name: str = config["thermostat_name"] self._set_point: float | None = None self._temperature: float | None = None self._humidity: float | None = None self._function: str = "" self._mode: str = "" self._program: str = "" - self._loadState: str = "" - self._activationTime: str = "" + self._load_state: str = "" + self._activation_time: str = "" def _update_attrs(self, custom_attrs: dict[str, Any]) -> None: """Update custom attributes.""" @@ -169,7 +161,7 @@ def hvac_mode(self) -> HVACMode | None: @property def hvac_action(self) -> HVACAction | None: """Return current operation action.""" - if self._mode and self._function and self._loadState: + if self._mode and self._function and self._load_state: if ( ( self._mode.lower() == "manual" @@ -177,7 +169,7 @@ def hvac_action(self) -> HVACAction | None: or self._mode.lower() == "automatic" ) and self._function.lower() == "heating" - and self._loadState.lower() == "active" + and self._load_state.lower() == "active" ): return HVACAction.HEATING if ( @@ -187,21 +179,17 @@ def hvac_action(self) -> HVACAction | None: or self._mode.lower() == "automatic" ) and self._function.lower() == "cooling" - and self._loadState.lower() == "active" + and self._load_state.lower() == "active" ): return HVACAction.COOLING - if self._loadState.lower() == "inactive": + if self._load_state.lower() == "inactive": return HVACAction.OFF return None - # @property - # def supported_features(self) -> int: - # """Return the list of supported features.""" - # return SUPPORT_FLAGS - + # pylint: disable=W0239 @property def state_attributes(self) -> dict[str, Any]: - """Return the list of cusom attributes.""" + """Return the list of custom attributes.""" attrs = super().state_attributes or {} attrs.update(self._custom_attributes) return attrs @@ -243,13 +231,13 @@ def handle_webhook_update(self, event: dict[str, Any]) -> None: self._program_number = chronothermostat_data.get("programs", []) self._program = self._get_program_name(self._program_number) if "activationTime" in chronothermostat_data: - self._activationTime = chronothermostat_data.get("activationTime") + self._activation_time = chronothermostat_data.get("activationTime") self._update_attrs( { "programs": self._program, self._mode.lower() + "_time_remainig": self.calculate_remaining_time( - self._activationTime + self._activation_time ), } ) @@ -259,7 +247,7 @@ def handle_webhook_update(self, event: dict[str, Any]) -> None: "programs": self._program, } ) - self._loadState = chronothermostat_data.get("loadState") + self._load_state = chronothermostat_data.get("loadState") self._set_point = float(set_point.get("value")) self._temperature = float(thermometer_data.get("value")) self._humidity = float(hygrometer_data.get("value")) @@ -307,7 +295,8 @@ async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None: async def async_therm_manual( self, target_temperature: str, end_timestamp: int ) -> None: - return None + """Set manual.""" + _LOGGER.debug(target_temperature, end_timestamp) async def _async_service_set_temperature_with_end_datetime( self, **kwargs: Any @@ -389,6 +378,7 @@ async def _async_service_set_temperature_with_time_period( async def _async_service_clear_temperature_setting(self, **kwargs: Any) -> None: _LOGGER.debug("Clearing %s temperature setting", self.entity_id) + kwargs.get(ATTR_TEMPERATURE) await self.async_therm_manual("None", 0) async def async_set_temperature(self, **kwargs: Any) -> None: @@ -418,38 +408,16 @@ async def async_set_temperature(self, **kwargs: Any) -> None: def has_data( self, - ) -> tuple[float | None, float | None, float | None, str, str, str, str] | None: + ) -> bool: """Entity data.""" if ( - self._set_point is not None - and self._temperature is not None - and self._humidity is not None - and not self._function + not self._function and not self._mode and not self._program - and not self._loadState + and not self._load_state ): - return ( - self._set_point, - self._temperature, - self._humidity, - self._function, - self._mode, - self._program, - self._loadState, - ) - else: - return None - - # return ( - # self._set_point is not None - # and self._temperature is not None - # and self._humidity is not None - # and self._function is not None - # and self._mode is not None - # and self._program is not None - # and self._loadState is not None - # ) + return False + return True def calculate_remaining_time(self, date_string: str) -> dict[str, Any]: """Conver string to date object.""" @@ -486,13 +454,13 @@ async def async_sync_manual(self) -> None: self._program_number = chronothermostat_data["programs"] self._program = self._get_program_name(self._program_number) if "activationTime" in chronothermostat_data: - self._activationTime = chronothermostat_data.get("activationTime") + self._activation_time = chronothermostat_data.get("activationTime") self._update_attrs( { "programs": self._program, self._mode.lower() + "_time_remainig": self.calculate_remaining_time( - self._activationTime + self._activation_time ), } ) @@ -502,7 +470,7 @@ async def async_sync_manual(self) -> None: "programs": self._program, } ) - self._loadState = chronothermostat_data["loadState"] + self._load_state = chronothermostat_data["loadState"] set_point_data = chronothermostat_data["setPoint"] self._set_point = float(set_point_data["value"]) thermometer_data = chronothermostat_data["thermometer"]["measures"][0] @@ -531,13 +499,13 @@ async def async_setup_entry( topology_id = plant_data.get("id") thermostat_name = plant_data.get("name") programs = plant_data.get("programs") - my_entity = BticinoX8000ClimateEntity( - data, - plant_id, - topology_id, - thermostat_name, - programs, - ) + config = { + "plant_id": plant_id, + "topology_id": topology_id, + "thermostat_name": thermostat_name, + "programs": programs, + } + my_entity = BticinoX8000ClimateEntity(data, config) async_add_entities([my_entity]) if not my_entity.has_data(): await my_entity.async_sync_manual() diff --git a/custom_components/bticino_x8000/config_flow.py b/custom_components/bticino_x8000/config_flow.py index cbc036f..cf21ad2 100644 --- a/custom_components/bticino_x8000/config_flow.py +++ b/custom_components/bticino_x8000/config_flow.py @@ -29,15 +29,24 @@ class BticinoX8000ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): # type:ignore """Bticino ConfigFlow.""" + def __init__(self) -> None: + """Init.""" + self.data: dict[str, Any] = {} + self._thermostat_options: dict[str, Any] = {} + self.bticino_api: BticinoX8000Api | None = None + async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """User configuration.""" - try: + if self.hass.config.external_url is not None: external_url = self.hass.config.external_url - except: - _LOGGER.warning("No external url available, using default") - external_url = "My HA external url ex: https://pippo.duckdns.com:8123 (specify the port if is not standard 443)" + else: + external_url = ( + "My HA external url ex: " + "https://pippo.duckdns.com:8123 " + "(specify the port if is not standard 443)" + ) if self._async_current_entries(): return self.async_abort(reason="single_instance_allowed") @@ -74,7 +83,8 @@ async def async_step_user( self.data = user_input authorization_url = self.get_authorization_url(user_input) message = ( - f"Click the link below to authorize Bticino X8000. After authorization, paste the browser URL here.\n\n" + f"Click the link below to authorize Bticino X8000. " + f"After authorization, paste the browser URL here.\n\n" f"{authorization_url}" ) return self.async_show_form( @@ -112,15 +122,17 @@ async def async_step_get_authorize_code( _LOGGER.debug("Parsed URL: %s", parsed_url) query_params = parse_qs(parsed_url.query) _LOGGER.debug("Query Parameters: %s", query_params) - code = query_params.get("code", [""])[0] - state = query_params.get("state", [""])[0] - if not code or not state: + if ( + not query_params.get("code", [""])[0] + or not query_params.get("state", [""])[0] + ): raise ValueError( - "Unable to identify the Authorize Code or State. Please make sure to provide a valid URL." + "Unable to identify the Authorize Code or State. " + "Please make sure to provide a valid URL." ) - self.data["code"] = code + self.data["code"] = query_params.get("code", [""])[0] ( access_token, @@ -129,8 +141,7 @@ async def async_step_get_authorize_code( ) = await exchange_code_for_tokens( self.data["client_id"], self.data["client_secret"], - DEFAULT_REDIRECT_URI, - code, + query_params.get("code", [""])[0], ) self.data["access_token"] = access_token @@ -146,7 +157,7 @@ async def async_step_get_authorize_code( plants_data = await self.bticino_api.get_plants() if plants_data["status_code"] == 200: thermostat_options = {} - plant_ids = list(set(plant["id"] for plant in plants_data["data"])) + plant_ids = list({plant["id"] for plant in plants_data["data"]}) for plant_id in plant_ids: topologies = await self.bticino_api.get_topology(plant_id) for thermo in topologies["data"]: @@ -173,13 +184,13 @@ async def async_step_get_authorize_code( "selected_thermostats", description="Select Thermostats", default=[ - thermostat_options[thermo]["name"] - for thermo in thermostat_options + options["name"] + for thermo, options in thermostat_options.items() ], ): cv.multi_select( [ - thermostat_options[thermo]["name"] - for thermo in thermostat_options + options["name"] + for thermo, options in thermostat_options.items() ] ), } @@ -191,32 +202,34 @@ async def async_step_get_authorize_code( return await self.async_step_get_authorize_code() return await self.async_step_user(self.data) - async def add_c2c_subscription(self, plantId: str, webhook_id: str) -> str | None: + async def add_c2c_subscription(self, plant_id: str, webhook_id: str) -> str | None: """Subscribe C2C.""" - webhook_path = "/api/webhook/" - webhook_endpoint = self.data["external_url"] + webhook_path + webhook_id - response = await self.bticino_api.set_subscribe_C2C_notifications( - plantId, {"EndPointUrl": webhook_endpoint} - ) - if response["status_code"] == 201: - _LOGGER.debug("Webhook subscription registrata con successo!") - subscriptionId: str = response["text"]["subscriptionId"] - return subscriptionId - else: - return None + if self.bticino_api is not None: + webhook_path = "/api/webhook/" + webhook_endpoint = self.data["external_url"] + webhook_path + webhook_id + response = await self.bticino_api.set_subscribe_c2c_notifications( + plant_id, {"EndPointUrl": webhook_endpoint} + ) + if response["status_code"] == 201: + _LOGGER.debug("Webhook subscription registrata con successo!") + subscription_id: str = response["text"]["subscriptionId"] + return subscription_id + return None async def get_programs_from_api( self, plant_id: str, topology_id: str - ) -> list[dict[str, Any]]: + ) -> list[dict[str, Any]] | None: """Retreive the program list.""" - programs = await self.bticino_api.get_chronothermostat_programlist( - plant_id, topology_id - ) - filtered_programs = [ - program for program in programs["data"] if program["number"] != 0 - ] + if self.bticino_api is not None: + programs = await self.bticino_api.get_chronothermostat_programlist( + plant_id, topology_id + ) + filtered_programs = [ + program for program in programs["data"] if program["number"] != 0 + ] - return filtered_programs + return filtered_programs + return None async def async_step_select_thermostats( self, user_input: dict[str, Any] diff --git a/custom_components/bticino_x8000/webhook.py b/custom_components/bticino_x8000/webhook.py index 01840fb..00e620b 100644 --- a/custom_components/bticino_x8000/webhook.py +++ b/custom_components/bticino_x8000/webhook.py @@ -1,15 +1,16 @@ +"""Webhook.""" import logging -import aiohttp -from aiohttp.web import Request, HTTPBadRequest, Response -from homeassistant.config_entries import ConfigEntry -from homeassistant.core import HomeAssistant -from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform -from homeassistant.helpers.dispatcher import async_dispatcher_send -from .const import DOMAIN + +from aiohttp.web import Request, Response + from homeassistant.components.webhook import ( async_register as webhook_register, async_unregister as webhook_unregister, ) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.dispatcher import async_dispatcher_send + +from .const import DOMAIN _LOGGER = logging.getLogger(__name__)