From b4ac631123756335e47578f4a6fc68a530353c02 Mon Sep 17 00:00:00 2001 From: Xannor <2722473+xannor@users.noreply.github.com> Date: Wed, 27 Jul 2022 14:16:04 +0000 Subject: [PATCH] bugfixes and start of PTZ support --- custom_components/reolink_rest/__init__.py | 96 +- .../reolink_rest/binary_sensor.py | 157 +++- custom_components/reolink_rest/camera.py | 150 +++- custom_components/reolink_rest/config_flow.py | 8 +- custom_components/reolink_rest/const.py | 2 + custom_components/reolink_rest/discovery.py | 56 ++ custom_components/reolink_rest/entity.py | 832 ++++++++++-------- custom_components/reolink_rest/manifest.json | 2 +- custom_components/reolink_rest/models.py | 135 ++- custom_components/reolink_rest/push.py | 401 +++++---- custom_components/reolink_rest/services.yaml | 6 + custom_components/reolink_rest/typing.py | 51 +- custom_components/reolink_rest/webhook.py | 141 +-- 13 files changed, 1288 insertions(+), 749 deletions(-) diff --git a/custom_components/reolink_rest/__init__.py b/custom_components/reolink_rest/__init__.py index cb02707..f4445e2 100644 --- a/custom_components/reolink_rest/__init__.py +++ b/custom_components/reolink_rest/__init__.py @@ -5,30 +5,27 @@ import logging from typing import Final +import async_timeout -from homeassistant.core import HomeAssistant -from homeassistant.config_entries import ConfigEntry, SOURCE_INTEGRATION_DISCOVERY +from homeassistant.core import HomeAssistant, ServiceCall +from homeassistant.config_entries import ConfigEntry from homeassistant.helpers.typing import ConfigType -from homeassistant.helpers.discovery import async_listen -from homeassistant.helpers.discovery_flow import async_create_flow from homeassistant.helpers.update_coordinator import DataUpdateCoordinator -from homeassistant.helpers.typing import DiscoveryInfoType +from homeassistant.helpers.service import async_extract_config_entry_ids from homeassistant.const import Platform +from .discovery import async_discovery_handler + from .entity import ( - async_get_motion_poll_interval, async_get_poll_interval, - create_channel_motion_data_update_method, - create_device_data_update_method, + ReolinkEntityData, ) -from .typing import ReolinkDomainData, ReolinkEntryData +from .typing import ReolinkDomainData from .const import ( DATA_COORDINATOR, - DISCOVERY_EVENT, DOMAIN, - OPT_DISCOVERY, ) _LOGGER = logging.getLogger(__name__) @@ -39,38 +36,19 @@ async def async_setup(hass: HomeAssistant, _config: ConfigType) -> bool: """Setup ReoLink Component""" - async def _discovery(service: str, info: DiscoveryInfoType): - if service == DISCOVERY_EVENT: - for entry in hass.config_entries.async_entries(DOMAIN): - if OPT_DISCOVERY in entry.options: - discovery: dict = entry.options[OPT_DISCOVERY] - key = "uuid" - if not key in discovery or not key in info: - key = "mac" - if key in discovery and key in info and discovery[key] == info[key]: - if next( - ( - True - for k in info - if k not in discovery or discovery[k] != info[k] - ), - False, - ): - options = entry.options.copy() - options[OPT_DISCOVERY] = discovery = discovery.copy() - discovery.update(info) - - if not hass.config_entries.async_update_entry( - entry, options=options - ): - _LOGGER.warning("Could not update options") - return - - async_create_flow( - hass, DOMAIN, {"source": SOURCE_INTEGRATION_DISCOVERY}, info - ) - - async_listen(hass, DISCOVERY_EVENT, _discovery) + await async_discovery_handler(hass) + + domain_data: ReolinkDomainData = hass.data.setdefault(DOMAIN, {}) + + async def _reboot_handler(call: ServiceCall): + _LOGGER.debug("Reboot called.") + entries: set[str] = await async_extract_config_entry_ids(hass, call) + for entry_id in entries: + entry_data = domain_data[entry_id] + await entry_data["coordinator"].data.client.reboot() + hass.create_task(entry_data["coordinator"].async_request_refresh()) + + hass.services.async_register(DOMAIN, "reboot", _reboot_handler) return True @@ -85,28 +63,28 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: domain_data: ReolinkDomainData = hass.data.setdefault(DOMAIN, {}) entry_data = domain_data.setdefault(entry.entry_id, {}) - coordinator = entry_data.get("coordinator", None) - # if setup fails we do not want to recreate the coordinators + coordinator = entry_data.get(DATA_COORDINATOR, None) if coordinator is None: + first_attempt = True + + async def _update_data(): + nonlocal first_attempt + + if first_attempt: + first_attempt = False + async with async_timeout.Timeout(10, asyncio.get_event_loop()): + return await entity_data.async_update() + return await entity_data.async_update() + + entity_data = ReolinkEntityData(hass, entry) coordinator = DataUpdateCoordinator( hass, _LOGGER, - name=f"{DOMAIN}-DataUpdateCoordinator-{entry.entry_id}", + name=f"{DOMAIN}-{entity_data.name}", + update_method=_update_data, update_interval=async_get_poll_interval(entry), - update_method=create_device_data_update_method(entry_data), - ) - entry_data["coordinator"] = coordinator - motion_coordinator = entry_data.get("motion_coordinator", None) - if motion_coordinator is None: - motion_coordinator = DataUpdateCoordinator( - hass, - _LOGGER, - name=f"{DOMAIN}-Motion-DataUpdateCooridator-{entry.entry_id}", - update_interval=async_get_motion_poll_interval(entry), - update_method=create_channel_motion_data_update_method(entry_data), ) - entry_data["motion_coordinator"] = motion_coordinator - entry_data["motion_data_request"] = set() + entry_data[DATA_COORDINATOR] = coordinator await coordinator.async_config_entry_first_refresh() diff --git a/custom_components/reolink_rest/binary_sensor.py b/custom_components/reolink_rest/binary_sensor.py index 82683bd..dbab26b 100644 --- a/custom_components/reolink_rest/binary_sensor.py +++ b/custom_components/reolink_rest/binary_sensor.py @@ -5,7 +5,7 @@ from dataclasses import asdict, dataclass from datetime import timedelta import logging -from typing import Final +from typing import Final, cast from aiohttp.web import Request @@ -13,6 +13,7 @@ from homeassistant.config_entries import ConfigEntry from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import async_track_point_in_utc_time +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.util import dt from homeassistant.components.binary_sensor import ( @@ -28,14 +29,16 @@ from .webhook import async_get_webhook_manager from .entity import ( + ReolinkEntity, + ReolinkEntityData, ReolinkEntityDataUpdateCoordinator, ReolinkEntityDescription, - ReolinkMotionEntity, + async_get_motion_poll_interval, ) from .typing import ReolinkDomainData -from .const import DATA_COORDINATOR, DOMAIN +from .const import DATA_COORDINATOR, DATA_MOTION_COORDINATORS, DOMAIN _LOGGER = logging.getLogger(__name__) @@ -114,9 +117,24 @@ async def _handle_onvif_notify(hass: HomeAssistant, request: Request): # ideally we would get better notices from onvif, but since we only know # motion is/was happening we have to poll for any detail + async def _refresh(): + coordinator = entry_data[DATA_COORDINATOR] + if not coordinator.last_update_success: + return + data: ReolinkEntityData = coordinator.data + for channel in entry_data[DATA_MOTION_COORDINATORS].keys(): + data.async_request_motion_update(channel) + try: + await data.async_update_motion_data() + except Exception: # pylint: disable=broad-except + # since we are updating outside a coordinator, we need to handle errors + await coordinator.async_request_refresh() + for _coordinator in entry_data[DATA_MOTION_COORDINATORS].values(): + _coordinator.async_set_updated_data(data) + async def _try_again(*_): _ed.pop(DATA_MOTION_DEBOUNCE, None) - await entry_data["motion_coordinator"].async_request_refresh() + await _refresh() if motion != "false": _ed[DATA_MOTION_DEBOUNCE] = async_track_point_in_utc_time( @@ -124,7 +142,7 @@ async def _try_again(*_): ) # hand off refresh to task so we dont hold the hook too long - hass.create_task(entry_data["motion_coordinator"].async_request_refresh()) + hass.create_task(_refresh()) return None @@ -134,7 +152,35 @@ async def async_setup_entry( config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ): - """Setup camera platform""" + """Setup binary_sensor platform""" + + def _setup_hooks( + channel: int, motion_coordinator: ReolinkEntityDataUpdateCoordinator + ): + add_listener = motion_coordinator.async_add_listener + + def _coord_update(): + if channel in coordinator.data.updated_motion: + motion_coordinator.async_set_updated_data(coordinator.data) + + coord_cleanup = None + + def _add_listener(update_callback: CALLBACK_TYPE, context: any = None): + nonlocal coord_cleanup + # pylint: disable = protected-access + if len(motion_coordinator._listeners) == 0: + coord_cleanup = coordinator.async_add_listener(_coord_update) + + cleanup = add_listener(update_callback, context) + + def _cleanup(): + cleanup() + if len(motion_coordinator._listeners) == 0: + coord_cleanup() + + return _cleanup + + motion_coordinator.async_add_listener = _add_listener _LOGGER.debug("Setting up motion") domain_data: ReolinkDomainData = hass.data[DOMAIN] @@ -143,21 +189,34 @@ async def async_setup_entry( entities = [] data = coordinator.data - push_setup = not data.abilities.onvif + push_setup = True for channel in data.channels.keys(): ability = coordinator.data.abilities.channels[channel] if not ability.alarm.motion: continue - if not push_setup: - push_setup = True - webhook = async_get_webhook_manager(hass, _LOGGER, config_entry) - if webhook: - config_entry.async_on_unload( - webhook.async_add_handler(_handle_onvif_notify) - ) - push = async_get_push_manager(hass, _LOGGER, config_entry, webhook) + push_setup = False + coordinators = entry_data.setdefault(DATA_MOTION_COORDINATORS, {}) + if channel not in coordinators: + motion_coordinator = DataUpdateCoordinator( + hass, + _LOGGER, + name=f"{coordinator.name}-motion", + update_interval=async_get_motion_poll_interval(config_entry), + update_method=cast( + ReolinkEntityData, coordinator.data + ).async_update_motion_data, + ) + coordinators[channel] = motion_coordinator + motion_coordinator.data = data + + _setup_hooks(channel, motion_coordinator) + + else: + motion_coordinator: ReolinkEntityDataUpdateCoordinator = coordinators[ + channel + ] ai_types = [] # if ability.support.ai: <- in my tests this ability was not set @@ -177,13 +236,59 @@ async def async_setup_entry( continue description = ReolinkMotionSensorEntityDescription(**asdict(description)) description.channel = channel - entities.append(ReolinkMotionSensor(coordinator, description)) + entities.append(ReolinkMotionSensor(motion_coordinator, description)) if entities: async_add_entities(entities) - -class ReolinkMotionSensor(ReolinkMotionEntity, BinarySensorEntity): + if not push_setup and data.abilities.onvif: + webhooks = async_get_webhook_manager(hass) + if webhooks is not None: + webhook = webhooks.async_register(hass, config_entry) + config_entry.async_on_unload( + webhook.async_add_handler(_handle_onvif_notify) + ) + push = async_get_push_manager(hass) + subscription = None + + async def _async_sub(): + nonlocal subscription + subscription = await push.async_subscribe(webhook.url, config_entry) + if subscription is not None: + for coordinator in coordinators.values(): + coordinator.update_interval = None + + def _sub_failure(entry_id: str): + nonlocal subscription + if entry_id != config_entry.entry_id: + return + if subscription is not None: + for _coordinator in coordinators.values(): + _coordinator.update_interval = async_get_motion_poll_interval( + config_entry + ) + hass.create_task(_coordinator.async_request_refresh()) + subscription = None + + def _sub_resub(): + cleanup() + hass.create_task(_async_sub()) + + cleanup = coordinator.async_add_listener(_sub_resub) + + cleanup = push.async_on_subscription_failure(_sub_failure) + + await _async_sub() + + def _unsubscribe(): + cleanup() + if subscription is not None: + hass.create_task(push.async_unsubscribe(subscription)) + + config_entry.async_on_unload(_unsubscribe) + + +class ReolinkMotionSensor(ReolinkEntity, BinarySensorEntity): """Reolink Motion Sensor Entity""" entity_description: ReolinkMotionSensorEntityDescription @@ -195,21 +300,23 @@ def __init__( context: any = None, ) -> None: BinarySensorEntity.__init__(self) - ReolinkMotionEntity.__init__(self, coordinator, description, context) + ReolinkEntity.__init__(self, coordinator, description, context) - def _handle_coordinator_motion_update(self) -> None: - data = self.motion_coordinator.data.channel[self.entity_description.channel] + def _handle_coordinator_update(self) -> None: + data = self.coordinator.data.motion[self.entity_description.channel] if self.entity_description.ai_type is None: - self._attr_is_on = data.motion + self._attr_is_on = data.detected else: - self._attr_is_on = data.detected.get(self.entity_description.ai_type, False) + self._attr_is_on = data.get(self.entity_description.ai_type, False) + return super()._handle_coordinator_update() - return super()._handle_coordinator_motion_update() + async def async_update(self) -> None: + return await super().async_update() @property def extra_state_attributes(self): return { "update_method": "push" - if self.motion_coordinator.update_interval is None + if self.coordinator.update_interval is None else "poll" } diff --git a/custom_components/reolink_rest/camera.py b/custom_components/reolink_rest/camera.py index 70af90f..d43710e 100644 --- a/custom_components/reolink_rest/camera.py +++ b/custom_components/reolink_rest/camera.py @@ -3,13 +3,19 @@ from __future__ import annotations from asyncio import Task from dataclasses import asdict, dataclass -from enum import IntEnum, auto +from enum import IntEnum, IntFlag, auto import logging from typing import Final -from homeassistant.core import HomeAssistant -from homeassistant.config_entries import ConfigEntry -from homeassistant.helpers.entity_platform import AddEntitiesCallback +import voluptuous as vol + +from homeassistant.core import HomeAssistant, CALLBACK_TYPE +from homeassistant.config_entries import ConfigEntry, DiscoveryInfoType +from homeassistant.helpers.entity_platform import ( + AddEntitiesCallback, + async_get_current_platform, +) +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.components.camera import ( Camera, @@ -26,6 +32,7 @@ from async_reolink.api.const import IntStreamTypes as StreamTypes from .entity import ( + ReolinkEntityData, ReolinkEntityDataUpdateCoordinator, ReolinkEntity, ReolinkEntityDescription, @@ -128,17 +135,54 @@ class ReolinkCameraEntityDescription(ReolinkEntityDescription, CameraEntityDescr ] +class ReolinkCameraEntityFeature(IntFlag): + """Reolink Camera Entity Features""" + + PAN_TILT = 1 << 8 + ZOOM = 2 << 8 + FOCUS = 3 << 8 + + +async def async_setup_platform( + _hass: HomeAssistant, + _config_entry: ConfigEntry, + _async_add_entities: AddEntitiesCallback, + _discovery_info: DiscoveryInfoType | None = None, +): + """Setup camera platform""" + + platform = async_get_current_platform() + + platform.async_register_entity_service( + "set_zoom", + vol.Schema({"position": int}), + "async_set_zoom", + [ReolinkCameraEntityFeature.ZOOM.value], + ) + + platform.async_register_entity_service( + "set_focus", + vol.Schema({"position": int}), + "async_set_focus", + [ReolinkCameraEntityFeature.FOCUS.value], + ) + + # PTZ services: pan tilt (4 or 8 direction) zoom and focus + + async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ): - """Setup camera platform""" + """Setup camera entities""" _LOGGER.debug("Setting up camera") domain_data: ReolinkDomainData = hass.data[DOMAIN] + entry_data = domain_data[config_entry.entry_id] + _entry_data: dict = entry_data - coordinator = domain_data[config_entry.entry_id][DATA_COORDINATOR] + coordinator = entry_data[DATA_COORDINATOR] stream = "stream" in hass.config.components @@ -147,6 +191,23 @@ async def async_setup_entry( for channel in data.channels.keys(): ability = coordinator.data.abilities.channels[channel] + features: int = 0 + + has_ptz = False + if ability.ptz.type == abilities.channel.PTZTypeValues.AF: + features = ( + ReolinkCameraEntityFeature.ZOOM.value | ReolinkCameraEntityFeature.FOCUS + ) + has_ptz = True + elif ability.ptz.type: + has_ptz = True + features = ReolinkCameraEntityFeature.PAN_TILT.value + if ability.ptz.type in ( + abilities.channel.PTZTypeValues.PTZ, + abilities.channel.PTZTypeValues.PTZ_NO_SPEED, + ): + features |= ReolinkCameraEntityFeature.ZOOM.value + otypes: list[OutputStreamTypes] = [] if ability.snap: otypes.append(OutputStreamTypes.JPEG) @@ -169,6 +230,63 @@ async def async_setup_entry( if not otypes or not stypes: continue + ptz_coordinator = coordinator + if has_ptz: + coordinators: dict[ + int, ReolinkEntityDataUpdateCoordinator + ] = _entry_data.setdefault("ptz_coordinators", {}) + if channel not in coordinators: + + def _create_coordinator(channel: int): + entity_data: ReolinkEntityData = coordinator.data + + async def _update_data(): + entity_data.async_request_ptz_update(channel) + return await entity_data.async_update_ptz_data() + + _coordinator = DataUpdateCoordinator( + hass, + _LOGGER, + name=f"{coordinator.name}-ptz", + update_method=_update_data, + ) + _coordinator.data = data + + add_listener = _coordinator.async_add_listener + + def _coord_update(): + if channel in coordinator.data.updated_motion: + _coordinator.async_set_updated_data(coordinator.data) + + coord_cleanup = None + + def _add_listener( + update_callback: CALLBACK_TYPE, context: any = None + ): + nonlocal coord_cleanup + # pylint: disable = protected-access + if len(_coordinator._listeners) == 0: + coord_cleanup = coordinator.async_add_listener( + _coord_update + ) + + cleanup = add_listener(update_callback, context) + + def _cleanup(): + cleanup() + if len(_coordinator._listeners) == 0: + coord_cleanup() + + return _cleanup + + _coordinator.async_add_listener = _add_listener + + return _coordinator + + coordinators[channel] = ptz_coordinator = _create_coordinator(channel) + else: + ptz_coordinator = coordinators[channel] + main: OutputStreamTypes = None first: OutputStreamTypes = None for camera_info in CAMERAS: @@ -199,7 +317,11 @@ async def async_setup_entry( if description.output_type != first: description.entity_registry_enabled_default = False - entities.append(ReolinkCamera(coordinator, camera_info[0], description)) + entities.append( + ReolinkCamera( + ptz_coordinator, camera_info[0] | features, description + ) + ) if entities: async_add_entities(entities) @@ -213,7 +335,7 @@ class ReolinkCamera(ReolinkEntity, Camera): def __init__( self, coordinator: ReolinkEntityDataUpdateCoordinator, - supported_features: CameraEntityFeature, + supported_features: int, description: ReolinkCameraEntityDescription, context: any = None, ) -> None: @@ -224,7 +346,9 @@ def __init__( async def stream_source(self) -> str | None: domain_data: ReolinkDomainData = self.hass.data[DOMAIN] - client = domain_data[self.coordinator.config_entry.entry_id]["client"] + client = domain_data[self.coordinator.config_entry.entry_id][ + DATA_COORDINATOR + ].data.client if self.entity_description.output_type == OutputStreamTypes.RTSP: try: @@ -252,7 +376,9 @@ async def stream_source(self) -> str | None: async def _async_camera_image(self): domain_data: ReolinkDomainData = self.hass.data[DOMAIN] - client = domain_data[self.coordinator.config_entry.entry_id]["client"] + client = domain_data[self.coordinator.config_entry.entry_id][ + DATA_COORDINATOR + ].data.client try: image = await client.get_snap(self.entity_description.channel) except ReolinkResponseError as resperr: @@ -284,3 +410,7 @@ async def async_camera_image( ) return await self._snapshot_task + + async def async_set_zoom(self, position: int): + client = self.coordinator.data.client + await client.set_ptz_zoom(position, self.entity_description.channel) diff --git a/custom_components/reolink_rest/config_flow.py b/custom_components/reolink_rest/config_flow.py index 85d2ed6..e45d01f 100644 --- a/custom_components/reolink_rest/config_flow.py +++ b/custom_components/reolink_rest/config_flow.py @@ -25,6 +25,7 @@ from async_reolink.api.network import ChannelStatusType from async_reolink.rest import Client as RestClient from async_reolink.rest.connection import Encryption +from async_reolink.rest.errors import AUTH_ERRORCODES from .const import ( DEFAULT_PREFIX_CHANNEL, @@ -249,15 +250,12 @@ async def async_step_user( errors = {"base": "timeout"} return await self.async_step_connection(data, errors) except reo_errors.ReolinkResponseError as resp_error: - if resp_error.code in ( - reo_errors.ErrorCodes.AUTH_REQUIRED, - reo_errors.ErrorCodes.LOGIN_FAILED, - ): + if resp_error.code in AUTH_ERRORCODES: errors = {"base": "invalid_auth"} return await self.async_step_auth(data, errors) _LOGGER.exception( "An internal device error occurred on %s, configuration aborting", - self.data[CONF_HOST], + data[CONF_HOST], ) return self.async_abort(reason="device_error") except Exception: # pylint: disable=broad-except diff --git a/custom_components/reolink_rest/const.py b/custom_components/reolink_rest/const.py index 5e56faa..2422537 100644 --- a/custom_components/reolink_rest/const.py +++ b/custom_components/reolink_rest/const.py @@ -17,8 +17,10 @@ OPT_CHANNELS: Final = "channels" OPT_PREFIX_CHANNEL: Final = "prefix_channel" OPT_MOTION_INTERVAL: Final = "motion_interval" +OPT_BATCH_ABILITY: Final = "batch_abilitiy" DATA_COORDINATOR: Final = "coordinator" +DATA_MOTION_COORDINATORS: Final = "motion_coordinators" DATA_ONVIF: Final = "onvif" # keep? ---\/ diff --git a/custom_components/reolink_rest/discovery.py b/custom_components/reolink_rest/discovery.py index 4c76986..48df199 100644 --- a/custom_components/reolink_rest/discovery.py +++ b/custom_components/reolink_rest/discovery.py @@ -1 +1,57 @@ """Reolink Discovery support""" + +import logging +from homeassistant.core import HomeAssistant +from homeassistant.loader import bind_hass +from homeassistant.config_entries import SOURCE_INTEGRATION_DISCOVERY +from homeassistant.helpers.singleton import singleton +from homeassistant.helpers.typing import DiscoveryInfoType +from homeassistant.helpers.discovery import async_listen +from homeassistant.helpers.discovery_flow import async_create_flow + +from .const import DISCOVERY_EVENT, DOMAIN, OPT_DISCOVERY + +_LOGGER = logging.getLogger(__name__) + + +@bind_hass +@singleton(f"{DOMAIN}_discovery_data") +async def async_discovery_handler(hass: HomeAssistant): + """Discovery Handler""" + + data = {} + + async def _discovery(service: str, info: DiscoveryInfoType): + if service == DISCOVERY_EVENT: + for entry in hass.config_entries.async_entries(DOMAIN): + if OPT_DISCOVERY in entry.options: + discovery: dict = entry.options[OPT_DISCOVERY] + key = "uuid" + if not key in discovery or not key in info: + key = "mac" + if key in discovery and key in info and discovery[key] == info[key]: + if next( + ( + True + for k in info + if k not in discovery or discovery[k] != info[k] + ), + False, + ): + options = entry.options.copy() + options[OPT_DISCOVERY] = discovery = discovery.copy() + discovery.update(info) + + if not hass.config_entries.async_update_entry( + entry, options=options + ): + _LOGGER.warning("Discovery: Could not update options") + return + + async_create_flow( + hass, DOMAIN, {"source": SOURCE_INTEGRATION_DISCOVERY}, info + ) + + async_listen(hass, DISCOVERY_EVENT, _discovery) + + return data diff --git a/custom_components/reolink_rest/entity.py b/custom_components/reolink_rest/entity.py index 5bd67e7..5ec61dc 100644 --- a/custom_components/reolink_rest/entity.py +++ b/custom_components/reolink_rest/entity.py @@ -1,12 +1,9 @@ """Reolink Entities""" -from __future__ import annotations -import asyncio from datetime import timedelta -from typing import TYPE_CHECKING, Final, Mapping, cast -import async_timeout -from homeassistant.core import callback +from typing import Mapping, Sequence +from homeassistant.core import HomeAssistant from homeassistant import config_entries from homeassistant.helpers.update_coordinator import ( DataUpdateCoordinator, @@ -28,17 +25,24 @@ ) from async_reolink.api.errors import ReolinkResponseError, ErrorCodes -from async_reolink.api import system, network, ai, alarm -from async_reolink.api.commands import CommandRequest +from async_reolink.api import system, network, ai, alarm, ptz from async_reolink.api.const import DEFAULT_USERNAME, DEFAULT_PASSWORD, DEFAULT_TIMEOUT from async_reolink.rest import Client as ReolinkClient from async_reolink.rest.connection import Encryption -from async_reolink.rest.errors import CONNECTION_ERRORS, RESPONSE_ERRORS +from async_reolink.rest.errors import ( + CONNECTION_ERRORS, + AUTH_ERRORCODES, +) -from .typing import ReolinkDomainData, ReolinkEntryData +from .typing import EntityData -from .models import DeviceData, ChannelMotionData, MotionData, ReolinkEntityDescription +from .models import ( + MutableMotionData, + MutablePTZDisabled, + MutablePTZPosition, + ReolinkEntityDescription, +) from .const import ( CONF_USE_HTTPS, @@ -90,129 +94,154 @@ def _get_channels( return channels -def _add_motion_commands( - commands: list[CommandRequest], - abilities: system.abilities.Abilities, - *, - options: Mapping[str, any] | None = None, - channels: frozenset[int] | None = None, -): - md_index: dict[int, int] = {} - if len(abilities.channels) == 1: - channels = set({0}) - elif channels is None or len(channels) == 0: - channels = _get_channels(abilities, options) - - for i in channels: - # the MD command does not return the channel it replies to - md_index[len(commands)] = i - commands.append(alarm.GetMotionStateCommand(i)) - ability = abilities.channels[i] - if ( - ability.support.ai.animal - or ability.support.ai.face - or ability.support.ai.people - or ability.support.ai.pet - or ability.support.ai.vehicle +class _MutablePTZData: + def __init__(self) -> None: + self.pan = MutablePTZPosition() + self.tilt = MutablePTZPosition() + self.zoom = MutablePTZPosition() + self.focus = MutablePTZPosition() + self.autofocus = MutablePTZDisabled() + self.presets: dict[ptz.PTZPresetId, ptz.PTZPreset] = {} + self.patrol: dict[ptz.PTZPatrolId, ptz.PTZPatrol] = {} + self.tattern: dict[ptz.PTZTrackId, ptz.PTZTrack] = {} + + +class ReolinkEntityData: + """Reolink Entity Data and API""" + + def __init__(self, hass: HomeAssistant, config_entry: config_entries.ConfigEntry): + self.hass = hass + self.config_entry = config_entry + self.client = ReolinkClient() + self.device: device_registry.DeviceEntry = None + self.time_difference = timedelta() + self.abilities: system.abilities.Abilities = None + self.device_info: system.DeviceInfoType = None + self.channels: dict[int, DeviceInfo] = {} + self.ports: network.NetworkPortsType = None + self._batch_ability = True + self._connection_id = 0 + self._authentication_id = 0 + self.updated_motion: set[int] = set() + self._update_motion: set[int] = set() + self.motion: dict[int, MutableMotionData] = {} + self.updated_ptz: set[int] = set() + self._update_ptz: set[int] = set() + self.ptz: dict[int, _MutablePTZData] = {} + discovery: dict = config_entry.options.get(OPT_DISCOVERY, None) + if discovery is not None and ( + "name" in discovery or "uuid" in discovery or "mac" in discovery ): - commands.append(ai.GetAiStateCommand(i)) - - return (commands, md_index) - - -def create_channel_motion_data_update_method(entry_data: ReolinkEntryData): - """ "ChannelMotionData Updater Method""" - - async def _data_updater(): - client = entry_data["client"] - data = entry_data["motion_coordinator"].data - abilities = entry_data["coordinator"].data.abilities - - channels = entry_data.setdefault("motion_data_request", set()) - updated = data.updated if data else set() - motion = data.channel if data else {} - if TYPE_CHECKING: - updated = cast(set[int], updated) - motion = cast(dict[int, ChannelMotionData], motion) - (commands, md_index) = _add_motion_commands( - [], - abilities, - options=entry_data["coordinator"].config_entry.options, - channels=channels, - ) + self._name: str = discovery.get( + "name", discovery.get("uuid", discovery["mac"]) + ) + else: + self._name: str = config_entry[CONF_HOST] + + @property + def name(self): + """short name""" + return self._name + + def _processes_responses(self, response): + if system.GetAbilitiesCommand.is_response(response): + self.abilities = system.abilities.Abilities( + system.GetAbilitiesCommand.get_value(response) + ) + return True + if system.GetTimeCommand.is_response(response): + result = system.GetTimeCommand.get_value(response) + # pylint: disable=unsubscriptable-object + time = system.as_dateime(result["Time"], tzinfo=system.get_tzinfo(result)) + self.time_difference = dt.utcnow() - dt.as_utc(time) + return True + if network.GetNetworkPortsCommand.is_response(response): + self.ports = network.GetNetworkPortsCommand.get_value(response) + return True + if system.GetDeviceInfoCommand.is_response(response): + self.device_info = system.GetDeviceInfoCommand.get_value(response) + return True + return False + + async def _execute_commands( + self, commands: list, /, command_channel: dict[int, int] = None + ): idx = 0 - updated.clear() - channels.clear() + channels = None + mac = None + uuid = None try: - async for response in client.batch(commands): - if alarm.GetMotionStateCommand.is_response(response): - state = alarm.GetMotionStateCommand.get_value(response) - channel = md_index[idx] - updated.add(channel) - motion.setdefault(channel, ChannelMotionData()).motion = bool(state) - if ai.GetAiStateCommand.is_response(response): - state = ai.GetAiStateCommand.get_value(response) - channel = state["channel"] # pylint: disable=unsubscriptable-object - updated.add(channel) - if ai.AITypes.is_ai_response_values(state): - for (_type, value) in state.items(): - if ( - isinstance(value, dict) - and value["support"] - and _type in (e.value for e in ai.AITypes) - ): - motion.setdefault( - channel, ChannelMotionData() - ).detected[ai.AITypes(_type)] = bool( - value["alarm_state"] - ) + async for response in self.client.batch(commands): + if network.GetChannelStatusCommand.is_response(response): + channels = network.GetChannelStatusCommand.get_value(response) + elif network.GetLocalLinkCommand.is_response(response): + _mac = network.GetLocalLinkCommand.get_value(response)["mac"] + if not mac: + mac = _mac + elif mac.lower() != _mac.lower(): + raise UpdateFailed( + "Found different mac so possible wrong device" + ) + elif network.GetP2PCommand.is_response(response): + _uuid = network.GetP2PCommand.get_value(response)["uid"] + if not uuid: + uuid = _uuid + elif uuid.lower() != _uuid.lower(): + raise UpdateFailed( + "Did not find the same device as last time at this address!" + ) + else: + _ = ( + self._processes_responses(response) + or self._process_motion_responses( + response, command_index=idx, command_channel=command_channel + ) + or self._process_ptz_responses( + response, command_index=idx, command_channel=command_channel + ) + ) idx += 1 + except CONNECTION_ERRORS: + self._connection_id = 0 + raise + # except RESPONSE_ERRORS: + # raise except ReolinkResponseError as reoresp: - if reoresp.code == ErrorCodes.AUTH_REQUIRED: - await client.disconnect() - raise ConfigEntryAuthFailed() + # do not trap auth errors, instead we will just fail as usual + # auth errors at this point could be expired tokens + # so we do not want to assume password issues + if reoresp.code in AUTH_ERRORCODES: + await self.client.disconnect() + return False + if reoresp.code == ErrorCodes.READ_FAILED and True in ( + True + for command in commands + if isinstance(command, system.GetAbilitiesCommand) + ): + # some cameras do not like to batch in the ability command + # we will note this and no do that anymore + # TODO : update options to prevent it completely + self._batch_ability = False + return False raise reoresp + return (channels, mac, uuid) - return MotionData(updated, motion) - - return _data_updater - - -def create_device_data_update_method(entry_data: ReolinkEntryData): - """DeviceData Updater Method""" - - first_run = True - conn_id = 0 - auth_id = 0 - if "client" in entry_data: - client = entry_data["client"] - first_run = not client.is_connected - conn_id = client.connection_id - auth_id = client.authentication_id - else: - entry_data.setdefault("client", ReolinkClient()) - - async def _data_updater(): - nonlocal first_run, conn_id, auth_id - - client = entry_data["client"] - coordinator = entry_data["coordinator"] - config = coordinator.config_entry + async def async_update(self): + """update""" if ( - not client.is_connected - or not coordinator.last_update_success - or conn_id != client.connection_id + not self.client.is_connected + or self._connection_id != self.client.connection_id ): - host: str = config.data.get(CONF_HOST, None) + host: str = self.config_entry.data.get(CONF_HOST, None) discovery: dict = None if ( host is None - and (discovery := config.options.get(OPT_DISCOVERY, None)) + and (discovery := self.config_entry.options.get(OPT_DISCOVERY, None)) and "ip" in discovery ): host = discovery["ip"] - if config.data.get(CONF_USE_HTTPS, False): + if self.config_entry.data.get(CONF_USE_HTTPS, False): encryption = Encryption.HTTPS else: encryption = Encryption.NONE @@ -222,286 +251,386 @@ async def _data_updater(): "No host configured, and none discovered (was device lost?)" ) - await client.connect( + await self.client.connect( host, - config.data.get(CONF_PORT, DEFAULT_PORT), - config.data.get(CONF_TIMEOUT, DEFAULT_TIMEOUT), + self.config_entry.data.get(CONF_PORT, DEFAULT_PORT), + self.config_entry.data.get(CONF_TIMEOUT, DEFAULT_TIMEOUT), encryption=encryption, ) - if conn_id != client.connection_id: - conn_id = client.connection_id - auth_id = 0 + if self._connection_id != self.client.connection_id: + self._connection_id = self.client.connection_id + self._authentication_id = 0 if ( - not client.is_authenticated - or auth_id != client.authentication_id - or coordinator.last_exception is ConfigEntryAuthFailed + not self.client.is_authenticated + or self._authentication_id != self.client.authentication_id ): - - async def _auth(): - nonlocal auth_id - try: - if not await client.login( - config.data.get(CONF_USERNAME, DEFAULT_USERNAME), - config.data.get(CONF_PASSWORD, DEFAULT_PASSWORD), - ): - auth_id = 0 - await client.disconnect() - raise ConfigEntryAuthFailed() - except ReolinkResponseError as reoresp: - if reoresp.code in ( - ErrorCodes.AUTH_REQUIRED, - ErrorCodes.LOGIN_FAILED, - ): - await client.disconnect() - raise ConfigEntryAuthFailed() - raise reoresp - auth_id = client.authentication_id - if ( - coordinator.update_interval - and coordinator.update_interval.total_seconds() - >= client.authentication_timeout + try: + if not await self.client.login( + self.config_entry.data.get(CONF_USERNAME, DEFAULT_USERNAME), + self.config_entry.data.get(CONF_PASSWORD, DEFAULT_PASSWORD), ): - # TODO : should we drop the interval to below the timeout? - pass - - if first_run: - try: - async with async_timeout.timeout(5): - await _auth() - except asyncio.TimeoutError: - coordinator.logger.info( - "Camera is not responding quickly on first load so delaying" - ) - await client.disconnect() - raise - else: - await _auth() + self._authentication_id = 0 + await self.client.disconnect() + raise ConfigEntryAuthFailed() + except ReolinkResponseError as reoresp: + if reoresp.code in AUTH_ERRORCODES: + await self.client.disconnect() + raise ConfigEntryAuthFailed() + raise reoresp + self._authentication_id = self.client.authentication_id commands = [] - data = coordinator.data - if TYPE_CHECKING: - data = cast(DeviceData, data) - abilities = data.abilities if data else None - if abilities is None: + if self.abilities is None or not self._batch_ability: try: - abilities = await client.get_ability( - config.data.get(CONF_USERNAME, None) + self.abilities = await self.client.get_ability( + self.config_entry.data.get(CONF_USERNAME, None) ) except ReolinkResponseError as reoresp: - if reoresp.code in ( - ErrorCodes.AUTH_REQUIRED, - ErrorCodes.LOGIN_FAILED, - ): - await client.disconnect() - raise ConfigEntryAuthFailed() + if reoresp.code in AUTH_ERRORCODES: + # this could be because of a reboot or token expiration + await self.async_update() + return self if reoresp.code == ErrorCodes.PROTOCOL_ERROR: # possible weird encryption bug or other invalid response so we will force a reconnect - conn_id = 0 - auth_id = 0 + self._connection_id = 0 + self._authentication_id = 0 raise reoresp else: commands.append( - system.GetAbilitiesCommand(config.data.get(CONF_USERNAME, None)) + system.GetAbilitiesCommand( + self.config_entry.data.get(CONF_USERNAME, None) + ) ) - time = data.time if data else None - drift = data.drift if data else None commands.append(system.GetTimeCommand()) - device_info = data.device_info if data else None + channels = None - channel_info = data.channels if data else {} - ports = data.ports if data else None commands.append(network.GetNetworkPortsCommand()) mac = None uuid = None - updated_motion = None - if abilities.devInfo: + if self.abilities.devInfo: commands.append(system.GetDeviceInfoCommand()) - if device_info and device_info.get("channelNum", 1) > 1: + if self.device_info and self.device_info.get("channelNum", 1) > 1: commands.append(network.GetChannelStatusCommand()) - if "device" not in entry_data: - discovery: dict = config.options.get(OPT_DISCOVERY, None) + if self.device is None: + discovery: dict = self.config_entry.options.get(OPT_DISCOVERY, None) mac = discovery["mac"] if discovery and "mac" in discovery else None - if abilities.localLink: + if self.abilities.localLink: commands.append(network.GetLocalLinkCommand()) uuid = discovery["uuid"] if discovery and "uuid" in discovery else None - if abilities.p2p: + if self.abilities.p2p: commands.append(network.GetP2PCommand()) - if ( - entry_data["motion_coordinator"].update_interval is None - or not coordinator.last_update_success - or data is None - ): - updated_motion: set[int] = set() - motion: dict[int, ChannelMotionData] = {} - (_, md_index) = _add_motion_commands( - commands, abilities, options=config.options - ) - else: - motion = None + (_, command_channel) = self._create_motion_commands(commands) + (_, command_channel) = self._create_ptz_commands( + commands, command_channel=command_channel + ) - idx = 0 - try: - async for response in client.batch(commands): - if system.GetAbilitiesCommand.is_response(response): - abilities = system.abilities.Abilities( - system.GetAbilitiesCommand.get_value(response) - ) - if system.GetTimeCommand.is_response(response): - result = system.GetTimeCommand.get_value(response) - # pylint: disable=unsubscriptable-object - time = system.as_dateime( - result["Time"], tzinfo=system.get_tzinfo(result) - ) - drift = dt.utcnow() - dt.as_utc(time) - if network.GetNetworkPortsCommand.is_response(response): - ports = network.GetNetworkPortsCommand.get_value(response) - if system.GetDeviceInfoCommand.is_response(response): - device_info = system.GetDeviceInfoCommand.get_value(response) - if network.GetChannelStatusCommand.is_response(response): - channels = network.GetChannelStatusCommand.get_value(response) - if network.GetLocalLinkCommand.is_response(response): - _mac = network.GetLocalLinkCommand.get_value(response)["mac"] - if not mac: - mac = _mac - elif mac.lower() != _mac.lower(): - raise UpdateFailed( - "Found different mac so possible wrong device" - ) - if network.GetP2PCommand.is_response(response): - _uuid = network.GetP2PCommand.get_value(response)["uid"] - if not uuid: - uuid = _uuid - elif uuid.lower() != _uuid.lower(): - raise UpdateFailed( - "Did not find the same device as last time at this address!" - ) - if alarm.GetMotionStateCommand.is_response(response): - state = alarm.GetMotionStateCommand.get_value(response) - channel = md_index[idx] - updated_motion.add(channel) - motion.setdefault(channel, ChannelMotionData()).motion = bool(state) - if ai.GetAiStateCommand.is_response(response): - state = ai.GetAiStateCommand.get_value(response) - channel = state["channel"] # pylint: disable=unsubscriptable-object - updated_motion.add(channel) - if ai.AITypes.is_ai_response_values(state): - for (_type, value) in state.items(): - if ( - isinstance(value, dict) - and value["support"] - and _type in (e.value for e in ai.AITypes) - ): - motion.setdefault( - channel, ChannelMotionData() - ).detected[ai.AITypes(_type)] = bool( - value["alarm_state"] - ) - idx += 1 - except CONNECTION_ERRORS: - conn_id = 0 - raise - # except RESPONSE_ERRORS: - # raise - except ReolinkResponseError as reoresp: - if reoresp.code in ( - ErrorCodes.AUTH_REQUIRED, - ErrorCodes.LOGIN_FAILED, - ): - await client.disconnect() - raise ConfigEntryAuthFailed() from reoresp - raise reoresp + self._update_motion.clear() + self.updated_motion.clear() + self._update_ptz.clear() + self.updated_ptz.clear() + result = await self._execute_commands(commands, command_channel=command_channel) + if not result: + await self.async_update() + return self - if device_info and device_info.get("channelNum", 0) > 1 and channels is None: - channels = await client.get_channel_status() + channels, mac, uuid = result + + if ( + self.device_info + and self.device_info.get("channelNum", 0) > 1 + and channels is None + ): + channels = await self.client.get_channel_status() # pylint: disable=unsubscriptable-object - if "device" not in entry_data: - registry = device_registry.async_get(coordinator.hass) - entry_data["device"] = registry.async_get_or_create( - config_entry_id=config.entry_id, + if self.device is None: + registry = device_registry.async_get(self.hass) + self.device = registry.async_get_or_create( + config_entry_id=self.config_entry.entry_id, default_manufacturer="Reolink", - default_name=device_info["name"], + default_name=self.device_info["name"], identifiers={(DOMAIN, uuid)} if uuid else None, connections={(device_registry.CONNECTION_NETWORK_MAC, mac)} if mac else None, - sw_version=device_info["firmVer"], - hw_version=device_info["hardVer"], - default_model=device_info["model"], - configuration_url=client.base_url, + sw_version=self.device_info["firmVer"], + hw_version=self.device_info["hardVer"], + default_model=self.device_info["model"], + configuration_url=self.client.base_url, ) - if len(abilities.channels) < 2: - channel_info[0] = _dev_to_info(entry_data["device"]) + if len(self.abilities.channels) < 2: + self.channels[0] = _dev_to_info(self.device) else: - registry = device_registry.async_get(coordinator.hass) + registry = device_registry.async_get(self.hass) updated_device = registry.async_update_device( - entry_data["device"].id, - name=device_info["name"], - sw_version=device_info["firmVer"], - hw_version=device_info["hardVer"], + self.device.id, + name=self.device_info["name"], + sw_version=self.device_info["firmVer"], + hw_version=self.device_info["hardVer"], ) - if updated_device and updated_device != entry_data["device"]: - entry_data["device"] = updated_device - if len(abilities.channels) < 2: - channel_info[0] = _dev_to_info(updated_device) - - if len(abilities.channels) > 1 and channels: - for i in config.options.get( - OPT_CHANNELS, list(range(len(abilities.channels))) + if updated_device and updated_device != self.device: + self.device = updated_device + if len(self.abilities.channels) < 2: + self.channels[0] = _dev_to_info(updated_device) + + if len(self.abilities.channels) > 1 and channels: + for i in self.config_entry.options.get( + OPT_CHANNELS, list(range(len(self.abilities.channels))) ): status = next(c for c in channels if c["channel"] == i) name = status.get("name", f"Channel {status['channel']}") - if config.options.get(OPT_PREFIX_CHANNEL, False): - name = f"{entry_data['device'].name} {name}" - if not status["channel"] in channel_info: + if self.config_entry.options.get(OPT_PREFIX_CHANNEL, False): + name = f"{self.device.name} {name}" + if not status["channel"] in self.channels: if not registry: - registry = device_registry.async_get(coordinator.hass) + registry = device_registry.async_get(self.hass) channel_device = registry.async_get_or_create( - config_entry_id=config.entry_id, - via_device=entry_data["device"].identifiers.copy().pop(), + config_entry_id=self.config_entry.entry_id, + via_device=self.device.identifiers.copy().pop(), default_model=f"{status.get('typeInfo', '')} Channel {status['channel']}", default_name=name, - identifiers={ - (DOMAIN, f"{entry_data['device'].id}-{status['channel']}") - }, - default_manufacturer=entry_data["device"].manufacturer, + identifiers={(DOMAIN, f"{self.device.id}-{status['channel']}")}, + default_manufacturer=self.device.manufacturer, ) - channel_info[status["channel"]] = _dev_to_info(channel_device) + self.channels[status["channel"]] = _dev_to_info(channel_device) else: if not registry: - registry = device_registry.async_get(coordinator.hass) + registry = device_registry.async_get(self.hass) channel_device = registry.async_get_device( - channel_info[status["channel"]]["identifiers"] + self.channels[status["channel"]]["identifiers"] ) updated_device = registry.async_update_device( channel_device.id, name=name ) if updated_device and updated_device != channel_device: - channel_info[status["channel"]] = _dev_to_info(updated_device) + self.channels[status["channel"]] = _dev_to_info(updated_device) - if (uuid or mac) and OPT_DISCOVERY not in config.options: - options = config.options.copy() + if (uuid or mac) and OPT_DISCOVERY not in self.config_entry.options: + options = self.config_entry.options.copy() options[OPT_DISCOVERY] = {} if mac: options[OPT_DISCOVERY]["mac"] = mac if uuid: options[OPT_DISCOVERY]["uuid"] = uuid - coordinator.hass.config_entries.async_update_entry( - coordinator.config_entry, options=options + self.hass.config_entries.async_update_entry( + self.config_entry, options=options ) - if motion and updated_motion: - entry_data["motion_coordinator"].async_set_updated_data( - MotionData(updated_motion, motion) - ) - return DeviceData(time, drift, abilities, device_info, channel_info, ports) + return self + + def _create_motion_commands( + self, + /, + commands: list = None, + command_channel: dict[int, int] = None, + channels: Sequence[int] = None, + ): + if commands is None: + commands = [] + if command_channel is None: + command_channel = {} + if len(self.abilities.channels) == 1: + channels = set({0}) + elif channels is None or len(channels) == 0: + channels = _get_channels(self.abilities, self.config_entry.options) + + for i in channels: + # the MD command does not return the channel it replies to + command_channel[len(commands)] = i + commands.append(alarm.GetMotionStateCommand(i)) + ability = self.abilities.channels[i] + if ( + ability.support.ai.animal + or ability.support.ai.face + or ability.support.ai.people + or ability.support.ai.pet + or ability.support.ai.vehicle + ): + commands.append(ai.GetAiStateCommand(i)) + + return (commands, command_channel) + + def _process_motion_responses( + self, response, /, command_index: int, command_channel: dict[int, int] + ): + if alarm.GetMotionStateCommand.is_response(response): + state = alarm.GetMotionStateCommand.get_value(response) + channel = command_channel[command_index] + self.updated_motion.add(channel) + if channel not in self.motion: + self.motion.setdefault(channel, MutableMotionData()) + self.motion[channel].motion = bool(state) + return True + if ai.GetAiStateCommand.is_response(response): + state = ai.GetAiStateCommand.get_value(response) + channel = state["channel"] # pylint: disable=unsubscriptable-object + self.updated_motion.add(channel) + if ai.AITypes.is_ai_response_values(state): + for (_type, value) in state.items(): + if ( + isinstance(value, dict) + and value["support"] + and _type in (e.value for e in ai.AITypes) + ): + if channel not in self.motion: + self.motion.setdefault(channel, MutableMotionData()) + self.motion[channel].detected[ai.AITypes(_type)] = bool( + value["alarm_state"] + ) + return True + return False + + def async_request_motion_update(self, channel: int = 0): + """Request update of PTZ data for channel""" + self._update_motion.add(channel) + + async def async_update_motion_data(self): + """update motion only""" + + (commands, command_channel) = self._create_motion_commands( + channels=self._update_motion, + ) + self.updated_motion.clear() + self._update_motion.clear() + await self._execute_commands(commands, command_channel=command_channel) + + return self + + def _create_ptz_commands( + self, + /, + commands: list = None, + command_channel: dict[int, int] = None, + channels: set[int] = None, + ): + if commands is None: + commands = [] + if command_channel is None: + command_channel = {} + if len(self.abilities.channels) == 1: + channels = set({0}) + elif channels is None or len(channels) == 0: + channels = _get_channels(self.abilities, self.config_entry.options) + + for i in channels: + ability = self.abilities.channels[i] + if ability.ptz.control in ( + system.abilities.channel.PTZControlValues.ZOOM, + system.abilities.channel.PTZControlValues.ZOOM_FOCUS, + ): + commands.append(ptz.GetPTZZoomFocusCommand(i)) + if ability.ptz.type == system.abilities.channel.PTZTypeValues.AF: + command_channel[len(commands)] = i + commands.append(ptz.GetPTZAutoFocusCommand(i)) + if ability.ptz.patrol: + commands.append(ptz.GetPTZPatrolCommand(i)) + if ability.ptz.tattern: + commands.append(ptz.GetPTZTatternCommand(i)) + return (commands, command_channel) + + def _process_ptz_responses( + self, response, /, command_index: int, command_channel: dict[int, int] + ): + if ptz.GetPTZAutoFocusCommand.is_response(response): + value = ptz.GetPTZAutoFocusCommand.get_value(response) + channel = command_channel[command_index] + self.updated_ptz.add(channel) + if channel not in self.ptz: + data = self.ptz.setdefault(channel, _MutablePTZData()) + else: + data = self.ptz[channel] + data.autofocus.disabled = value["disable"] + return True + if ptz.GetPTZZoomFocusCommand.is_response(response): + value = ptz.GetPTZZoomFocusCommand.get_value(response) + channel = value["channel"] + self.updated_ptz.add(channel) + if channel not in self.ptz: + data = self.ptz.setdefault(channel, _MutablePTZData()) + else: + data = self.ptz[channel] + if "zoom" in value: + data.zoom.value = value["zoom"].get("pos", 0) + else: + data.zoom.value = 0 + if "focus" in value: + data.focus.value = value["focus"].get("pos", 0) + else: + data.focus.value = 0 + return True + if ptz.GetPTZPresetCommand.is_response(response): + for preset in ptz.GetPTZPresetCommand.get_value(response): + channel = preset["channel"] + self.updated_ptz.add(channel) + if channel not in self.ptz: + data = self.ptz.setdefault(channel, _MutablePTZData()) + else: + data = self.ptz[channel] + if data.presets is None: + data.presets = {} + if preset["id"] in data.presets: + data.presets[preset["id"]].update(**preset) + else: + data.presets[preset["id"]] = ptz.PTZPreset(**preset) + + return True + if ptz.GetPTZPatrolCommand.is_response(response): + for track in ptz.GetPTZPatrolCommand.get_value(response): + channel = track["channel"] + self.updated_ptz.add(channel) + if channel not in self.ptz: + data = self.ptz.setdefault(channel, _MutablePTZData()) + else: + data = self.ptz[channel] + if data.patrol is None: + data.patrol = {} + if track["id"] in data.patrol: + data.patrol[track["id"]].update(**track) + else: + data.patrol[track["id"]] = ptz.PTZPatrol(**track) + return True + if ptz.GetPTZTatternCommand.is_response(response): + for track in ptz.GetPTZTatternCommand.get_value(response): + channel = track["channel"] + self.updated_ptz.add(channel) + if channel not in self.ptz: + data = self.ptz.setdefault(channel, _MutablePTZData()) + else: + data = self.ptz[channel] + if data.tattern is None: + data.tattern = {} + if track["id"] in data.tattern: + data.tattern[track["id"]].update(**track) + else: + data.tattern[track["id"]] = ptz.PTZPatrol(**track) + return True + return False + + def async_request_ptz_update(self, channel: int = 0): + """Request update of PTZ data for channel""" + self._update_ptz.add(channel) + + async def async_update_ptz_data(self): + """update ptz only""" + (commands, command_channel) = self._create_ptz_commands( + channels=self._update_ptz, + ) + self.updated_ptz.clear() + self._update_ptz.clear() + await self._execute_commands(commands, command_channel=command_channel) - return _data_updater + return self + async def async_close(self): + """close""" + if self.client is not None: + await self.client.disconnect() + self.client = None -ReolinkEntityDataUpdateCoordinator: Final = DataUpdateCoordinator[DeviceData] + +ReolinkEntityDataUpdateCoordinator = DataUpdateCoordinator[EntityData] class ReolinkEntity(CoordinatorEntity[ReolinkEntityDataUpdateCoordinator]): @@ -529,52 +658,3 @@ def _handle_coordinator_update(self) -> None: self.entity_description.channel ] return super()._handle_coordinator_update() - - -ReolinkMotionEntityDataUpdateCooridnator: Final = DataUpdateCoordinator[MotionData] - - -class ReolinkMotionEntity(ReolinkEntity): - """Reolink Motion Entity""" - - def __init__( - self, - coordinator: ReolinkEntityDataUpdateCoordinator, - description: ReolinkEntityDescription, - context: any = None, - ) -> None: - super().__init__(coordinator, description, context) - domain_data: ReolinkDomainData = self.coordinator.hass.data[DOMAIN] - self.motion_coordinator = domain_data[self.coordinator.config_entry.entry_id][ - "motion_coordinator" - ] - - @property - def available(self) -> bool: - return super().available and self.motion_coordinator.last_update_success - - async def async_added_to_hass(self) -> None: - await super().async_added_to_hass() - - def _filter_coordinator_update(): - if self.entity_description.channel in self.motion_coordinator.data.updated: - self._handle_coordinator_motion_update() - - self.async_on_remove( - self.motion_coordinator.async_add_listener( - _filter_coordinator_update, self.coordinator_context - ) - ) - - @callback - def _handle_coordinator_motion_update(self) -> None: - """Handle updated motion data from the coordinator.""" - self.async_write_ha_state() - - async def async_update(self) -> None: - if not self.enabled: - return - - await super().async_update() - - await self.motion_coordinator.async_request_refresh() diff --git a/custom_components/reolink_rest/manifest.json b/custom_components/reolink_rest/manifest.json index 78e0953..bcd7f20 100644 --- a/custom_components/reolink_rest/manifest.json +++ b/custom_components/reolink_rest/manifest.json @@ -5,7 +5,7 @@ "issue_tracker": "https://github.com/xannor/ha_reolink_rest/issues", "version": "0.5.2", "iot_class": "local_polling", - "requirements": ["async-reolink.rest==0.5.5"], + "requirements": ["async-reolink.rest==0.5.7"], "dependencies": ["camera"], "after_dependencies": ["stream", "webhook", "reolink_discovery"], "codeowners": ["@xannor"], diff --git a/custom_components/reolink_rest/models.py b/custom_components/reolink_rest/models.py index 315a7ad..34b719e 100644 --- a/custom_components/reolink_rest/models.py +++ b/custom_components/reolink_rest/models.py @@ -1,13 +1,14 @@ """Common Models""" -from dataclasses import dataclass, field +from abc import ABC, abstractmethod +from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Mapping, Sequence +from typing import Mapping, MutableMapping -from homeassistant.helpers.entity import DeviceInfo, EntityDescription +from homeassistant.helpers.entity import EntityDescription from homeassistant.util import dt -from async_reolink.api import system, network, ai +from async_reolink.api import ai @dataclass @@ -17,44 +18,110 @@ class ReolinkEntityDescription(EntityDescription): channel: int = 0 -@dataclass -class ChannelMotionData: - """Reolink Motion Data""" +class MotionData(Mapping[ai.AITypes, bool], ABC): + """ + Motion data - motion: bool = field(default=False) - detected: Mapping[ai.AITypes, bool] = field(default_factory=dict) + is true if motion detected, otherwise false, also provides flags for if the motion was ai triggered + """ + @property + @abstractmethod + def detected(self) -> bool: + """detected""" -@dataclass -class DeviceData: - """Reolink Base Entity Data""" - time: datetime - drift: timedelta - abilities: system.abilities.Abilities - device_info: system.DeviceInfoType - channels: dict[int, DeviceInfo] - ports: network.NetworkPortsType +class MutableMotionData(MotionData, MutableMapping[ai.AITypes, bool]): + """Motion Data""" + def __init__(self) -> None: + self._detected: bool = False + self._ai: dict[ai.AITypes, bool] = {} -@dataclass -class MotionData: - """Reolink Base Motion Data""" + def __bool__(self): + return self._detected + + def __len__(self): + return self._ai.__len__() + + def __getitem__(self, __k: ai.AITypes): + return self._ai.__getitem__(__k) + + def __setitem__(self, __k: ai.AITypes, __v: bool): + return self._ai.__setitem__(__k, __v) + + def __delitem__(self, __v: ai.AITypes): + return self._ai.__delitem__(__v) + + def __iter__(self): + return self._ai.__iter__() + + def __contains__(self, __o: object): + return self._ai.__contains__(__o) + + @property + def detected(self): + """Detected""" + return self._detected + + @detected.setter + def detected(self, value: bool): + self._detected = value + + +class PTZPosition(ABC): + """ + PTZ Position data + + also can be used directly as an int + """ + + @property + @abstractmethod + def value(self) -> int: + """value""" + + def __index__(self): + return self.value + + +class MutablePTZPosition(PTZPosition): + """PTZ Position""" + + def __init__(self) -> None: + self._value = 0 + + @property + def value(self): + return self._value + + @value.setter + def value(self, value: int): + self._value = value + + +class PTZDisabled(ABC): + """PTZ Disabled""" + + @property + @abstractmethod + def disabled(self) -> bool: + """disabled""" + + def __bool__(self): + return not self.disabled - updated: frozenset[int] - channel: Mapping[int, ChannelMotionData] +class MutablePTZDisabled(PTZDisabled): + """PTZ Disabled""" -@dataclass(frozen=True) -class PushSubscription: - """Push Subscription Token""" + def __init__(self) -> None: + self._value = True - manager_url: str - timestamp: datetime - expires: timedelta | None + @property + def disabled(self): + return self._value - def __post_init__(self): - if self.timestamp and not isinstance(self.timestamp, datetime): - object.__setattr__(self, "timestamp", dt.parse_datetime(self.timestamp)) - if self.expires and not isinstance(self.expires, timedelta): - object.__setattr__(self, "expires", dt.parse_duration(self.expires)) + @disabled.setter + def disabled(self, value: bool) -> bool: + self._value = value diff --git a/custom_components/reolink_rest/push.py b/custom_components/reolink_rest/push.py index 2fdd90d..634520d 100644 --- a/custom_components/reolink_rest/push.py +++ b/custom_components/reolink_rest/push.py @@ -4,12 +4,12 @@ import asyncio import base64 -from dataclasses import asdict -from datetime import timedelta +from dataclasses import asdict, dataclass +from datetime import timedelta, datetime import hashlib import logging -from typing import Final, TypeVar, overload +from typing import Callable, Final, TypeVar, overload import secrets @@ -21,9 +21,8 @@ from homeassistant.core import HomeAssistant, callback from homeassistant.config_entries import ConfigEntry -from homeassistant.loader import bind_hass -from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.helpers.storage import Store +from homeassistant.helpers.singleton import singleton from homeassistant.util import dt from homeassistant.backports.enum import StrEnum @@ -33,16 +32,17 @@ from async_reolink.api.const import DEFAULT_USERNAME, DEFAULT_PASSWORD -from .const import DOMAIN, OPT_DISCOVERY +from .const import DATA_MOTION_COORDINATORS, DOMAIN, OPT_DISCOVERY -from .models import DeviceData, PushSubscription -from .typing import ReolinkDomainData, ReolinkEntryData, WebhookManager +from .typing import EntityData, ReolinkDomainData DATA_MANAGER: Final = "push_manager" DATA_STORE: Final = "push_store" STORE_VERSION: Final = 1 +_LOGGER = logging.getLogger(__name__) + class _Namespaces(StrEnum): @@ -200,89 +200,79 @@ def _process_error_response(response: et.Element): return (_text(code), _text(reason)) +@dataclass(frozen=True) +class PushSubscription: + """Push Subscription Token""" + + manager_url: str + timestamp: datetime + expires: timedelta | None + + def __post_init__(self): + if self.timestamp and not isinstance(self.timestamp, datetime): + object.__setattr__(self, "timestamp", dt.parse_datetime(self.timestamp)) + if self.expires and not isinstance(self.expires, timedelta): + object.__setattr__(self, "expires", dt.parse_duration(self.expires)) + + class PushManager: """Push Manager""" def __init__( self, - logger: logging.Logger, - url: str, storage: Store, - entry_id: str, ) -> None: - self._logger = logger - self._url = url self._storage = storage - self._subscription: PushSubscription = None + self._subscriptions: dict[str, PushSubscription] = None + self._renew_id = None + self._next_renewal = None self._renew_task = None - self._entry_id = entry_id - self._motion_interval: timedelta = None + self._on_failure: list[Callable[[str], None]] = [] - async def async_start(self): - """start up manager""" - data = await self._storage.async_load() - if isinstance(data, dict) and self._entry_id in data: - self._subscription = PushSubscription(**data[self._entry_id]) - # if we retrieve a sub we must have crashed so we will - # "renew" it incase the camera was reset inbetween - - domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] - entry_data = domain_data[self._entry_id] - await self._subscribe(entry_data) - - async def async_stop(self): - """shutdown manager""" - domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] - entry_data = domain_data[self._entry_id] - await self._unsubscribe(entry_data) - - async def async_reftresh(self): - """force refresh of manage incase device state changed""" - domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] - entry_data = domain_data[self._entry_id] - self._renew(entry_data) + async def _ensure_subscriptions(self): + if self._subscriptions is not None: + return - async def _store_subscription(self): data = await self._storage.async_load() - if self._subscription: - sub = asdict(self._subscription) + if isinstance(data, dict): + self._subscriptions = { + _k: PushSubscription(**_v) for _k, _v, in data.items() + } + else: + self._subscriptions = {} + + async def _save_subscriptions(self): + def _fix_expires(sub: dict): if "expires" in sub: sub["expires"] = isodate.duration_isoformat(sub["expires"]) - else: - sub = None + return sub - if isinstance(data, dict): - if not sub and self._entry_id in data: - data.pop(self._entry_id) - else: - data[self._entry_id] = sub - elif sub: - data = {self._entry_id: sub} - if data is not None: - await self._storage.async_save(data) + data = {_k: _fix_expires(asdict(_v)) for _k, _v in self._subscriptions.items()} + await self._storage.async_save(data) async def _send(self, url: str, headers, data): async with ClientSession(connector=TCPConnector(verify_ssl=False)) as client: - self._logger.debug("sending data") + _LOGGER.debug("%s->%r", url, data) headers.setdefault("content-type", "application/soap+xml;charset=UTF-8") async with client.post( url, data=data, headers=headers, allow_redirects=False ) as response: if "xml" not in response.content_type: - self._logger.warning("bad response") + _LOGGER.warning("bad response") return None text = await response.text() + _LOGGER.debug("%s<-%r, %r", url, response.status, text) return (response.status, et.fromstring(text)) - def _get_onvif_base(self, config_entry: ConfigEntry, device_data: DeviceData): + def _get_onvif_base(self, config_entry: ConfigEntry, device_data: EntityData): discovery: dict = config_entry.options.get(OPT_DISCOVERY, {}) host = config_entry.data.get(CONF_HOST, discovery.get("ip", None)) return f"http://{host}:{device_data.ports['onvifPort']}" - def _get_service_url(self, config_entry: ConfigEntry, device_data: DeviceData): + def _get_service_url(self, config_entry: ConfigEntry, device_data: EntityData): return self._get_onvif_base(config_entry, device_data) + EVENT_SERVICE def _get_wsse(self, config_entry: ConfigEntry): @@ -293,59 +283,67 @@ def _get_wsse(self, config_entry: ConfigEntry): def _handle_failed_subscription( self, - coordinator: DataUpdateCoordinator, - entry_data: ReolinkEntryData, + url: str, + entry_id: str, save: bool, ): - def _retry(): - cleanup() - self._storage.hass.create_task(self._subscribe(entry_data, save)) - - cleanup = coordinator.async_add_listener(_retry) + # def _retry(): + # cleanup() + # self._storage.hass.create_task(self._subscribe(url, entry_id, save)) - if self._motion_interval: - entry_data["motion_coordinator"].update_interval = self._motion_interval - self._motion_interval = None + # domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] + # entry_data = domain_data[entry_id] + # coordinator = entry_data["coordinator"] + # attempt resubscribe on next coordinator retrieval success + # cleanup = coordinator.async_add_listener(_retry) - async def _subscribe(self, entry_data: ReolinkEntryData, save: bool = True): - await self._unsubscribe(entry_data, False) + for handler in self._on_failure: + handler(entry_id) - coordinator = entry_data["coordinator"] - - wsse = self._get_wsse(coordinator.config_entry) - message = _create_subscribe(self._url) + def _cancel_renew(self): + if self._renew_task and not self._renew_task.cancelled(): + self._renew_task.cancel() + self._renew_task = None - headers = {"action": message[0]} + def _schedule_next_renew( + self, loop: asyncio.AbstractEventLoop, entry_id: str | None = None + ): + sub = None + if entry_id is not None: + sub = self._subscriptions[entry_id] + else: + expires = None + for _id, _sub in self._subscriptions.items(): + if _sub.expires is not None and ( + expires is None or _sub.expires < expires + ): + expires = _sub.expires + sub = _sub + entry_id = _id + break + + if sub is None or sub.expires is None: + return + domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] + entry_data = domain_data[entry_id] + expires = ( + sub.timestamp + sub.expires + entry_data["coordinator"].data.time_difference + ) - data = _create_envelope(message[2], wsse, *message[1]) - response = None - try: - response = await self._send( - self._get_service_url(coordinator.config_entry, coordinator.data), - headers, - et.tostring(data), - ) - except client_exceptions.ServerDisconnectedError: - raise - if response is None: + if self._next_renewal is not None and expires > self._next_renewal: return + self._cancel_renew() - status, response = response - if status != 200: - (code, reason) = _process_error_response(response) + delay = max((expires - dt.utcnow()).total_seconds(), 0) - # error respons is kinda useless so we just assume - self._logger.warning( - f"Camera ({coordinator.data.device_info['name']}) refused subscription request, probably needs a reboot." - ) - self._handle_failed_subscription(coordinator, entry_data, save) - return + def _task(): + loop.create_task(self._renew(entry_id)) - response = response.find(f".//{_Namespaces.WSNT.tag('SubscribeResponse')}") - await self._process_subscription(response, entry_data, save) + self._renew_id = entry_id + self._renew_task = loop.call_later(delay, _task) async def _process_subscription( - self, response: et.Element, entry_data: ReolinkEntryData, save: bool = True + self, response: et.Element, entry_id: str, save: bool = True ): reference = _find(_Namespaces.WSNT.tag("SubscriptionReference"), response) reference = _text(_find(_Namespaces.WSA.tag("Address"), reference), reference) @@ -368,41 +366,89 @@ async def _process_subscription( expires = dt.parse_datetime(expires) if expires else None expires = expires - time if expires else None - self._subscription = PushSubscription(reference, time, expires) - if self._motion_interval is None: - coordinator = entry_data["motion_coordinator"] - self._motion_interval = coordinator.update_interval - coordinator.update_interval = None + sub = PushSubscription(reference, time, expires) + self._subscriptions[entry_id] = sub - self._schedule_renew(entry_data, asyncio.get_event_loop()) + self._schedule_next_renew(asyncio.get_event_loop(), entry_id) if save: - await self._store_subscription() + await self._save_subscriptions() + return sub - async def _renew(self, entry_data: ReolinkEntryData, save: bool = True): - sub = self._subscription - if sub and not sub.expires: - return + async def _subscribe( + self, + url: str, + entry_id: str, + save: bool = True, + ): + config_entry = self._storage.hass.config_entries.async_get_entry(entry_id) - coordinator = entry_data["coordinator"] + wsse = self._get_wsse(config_entry) + message = _create_subscribe(url) - if sub and sub.expires: - data = coordinator.data - camera_now = dt.utcnow() + data.drift - expires = sub.timestamp + sub.expires - if (expires - camera_now).total_seconds() < 1: - return await self._subscribe(entry_data, save) + headers = {"action": message[0]} + + data = _create_envelope(message[2], wsse, *message[1]) + domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] + entry_data = domain_data[entry_id] + entity_data = entry_data["coordinator"].data + response = None + try: + response = await self._send( + self._get_service_url(config_entry, entity_data), + headers, + et.tostring(data), + ) + except client_exceptions.ServerDisconnectedError: + raise + if response is None: + return None + + status, response = response + if status != 200: + (code, reason) = _process_error_response(response) - if not sub: - return await self._subscribe(entry_data, save) + # error respons is kinda useless so we just assume + _LOGGER.warning( + "Camera (%s) refused subscription request, probably needs a reboot.", + entity_data.device_info["name"], + ) + self._handle_failed_subscription(url, entry_id, save) + return + + response = response.find(f".//{_Namespaces.WSNT.tag('SubscribeResponse')}") + return await self._process_subscription(response, entry_id, save) + + async def _renew( + self, + entry_id: str, + url: str | None = None, + save: bool = True, + ): + sub = self._subscriptions.get(entry_id, None) + if sub is None: + return - url = ( + domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] + entry_data = domain_data[entry_id] + coordinator = entry_data["coordinator"] + manager_url = ( self._get_onvif_base(coordinator.config_entry, coordinator.data) + sub.manager_url ) + if sub.expires: + if url is not None: + data = coordinator.data + camera_now = dt.utcnow() + data.time_difference + expires = sub.timestamp + sub.expires + if (expires - camera_now).total_seconds() < 2: + return await self._subscribe(url, entry_id, save) + return await self._unsubscribe(entry_id, save) + return None + wsse = self._get_wsse(coordinator.config_entry) - message = _create_renew(url) + message = _create_renew(manager_url, sub.expires) headers = {"action": message[0]} data = _create_envelope(message[2], wsse, *message[1]) @@ -419,30 +465,37 @@ async def _renew(self, entry_data: ReolinkEntryData, save: bool = True): (code, reason) = _process_error_response(response) # error respons is kinda useless so we just assume - self._logger.warning( - f"Camera ({coordinator.data.device_info['name']}) refused subscription renewal, probably was rebooted." + _LOGGER.warning( + "Camera (%s) refused subscription renewal, probably was rebooted.", + coordinator.data.device_info["name"], ) - self._handle_failed_subscription(coordinator, entry_data, save) - return + if url is not None: + return await self._subscribe(url, entry_id, save) + self._handle_failed_subscription(url, entry_id, save) + return None response = response.find(f".//{_Namespaces.WSNT.tag('SubscribeResponse')}") - await self._process_subscription(response, entry_data, save) + return await self._process_subscription(response, entry_id, save) - async def _unsubscribe(self, entry_data: ReolinkEntryData, save: bool = True): - sub = self._subscription - if not sub: - return + async def _unsubscribe(self, entry_id: str, save: bool = True): + if entry_id == self._renew_id: + self._cancel_renew() + self._schedule_next_renew(asyncio.get_event_loop()) - self._cancel_renew() + sub = self._subscriptions.pop(entry_id, None) + if sub is None: + return + domain_data: ReolinkDomainData = self._storage.hass.data[DOMAIN] + entry_data = domain_data[entry_id] coordinator = entry_data["coordinator"] send = True if sub.expires: data = coordinator.data - camera_now = dt.utcnow() + data.drift + camera_now = dt.utcnow() + data.time_difference expires = sub.timestamp + sub.expires - send = (expires - camera_now).total_seconds() > 1 + send = (expires - camera_now).total_seconds() < 1 # no need to unsubscribe an expiring/expired subscription if send: @@ -463,7 +516,7 @@ async def _unsubscribe(self, entry_data: ReolinkEntryData, save: bool = True): ) except client_exceptions.ServerDisconnectedError: # this could mean our subscription is invalid for now log and ignore - self._logger.warning( + _LOGGER.warning( "Got disconnected on attempt to unsubscribe, assuming invalid subscription" ) @@ -473,34 +526,42 @@ async def _unsubscribe(self, entry_data: ReolinkEntryData, save: bool = True): status, response = response if status != 200: (code, reason) = _process_error_response(response) - self._logger.warning("bad response") + _LOGGER.warning("bad response") - self._subscription = None if save: - await self._store_subscription() - - def _cancel_renew(self): - if self._renew_task and not self._renew_task.cancelled(): - self._renew_task.cancel() - self._renew_task = None - - def _schedule_renew( - self, entry_data: ReolinkEntryData, loop: asyncio.AbstractEventLoop - ): - self._cancel_renew() - sub = self._subscription - if not sub or not sub.expires: - return + await self._save_subscriptions() + + async def async_subscribe(self, url: str, config_entry: ConfigEntry): + """Subcribe""" + await self._ensure_subscriptions() + + if config_entry.entry_id in self._subscriptions: + return await self._renew(config_entry.entry_id, url) + return await self._subscribe(url, config_entry.entry_id) + + async def async_unsubscribe(self, subscription: PushSubscription): + """Unsubscribe""" + await self._ensure_subscriptions() + entry_id = next( + ( + entry_id + for (entry_id, sub) in self._subscriptions.items() + if sub == subscription + ), + None, + ) + if entry_id is None: + return False + await self._unsubscribe(entry_id) + return True - data = entry_data["coordinator"].data - camera_now = dt.utcnow() + data.drift - expires = sub.timestamp + sub.expires - delay = max((expires - camera_now).total_seconds(), 0) + def async_on_subscription_failure(self, callback: Callable[[str], None]): + self._on_failure.append(callback) - def _task(): - loop.create_task(self._renew(entry_data)) + def _remove(): + self._on_failure.remove(callback) - self._renew_task = loop.call_later(delay, _task) + return _remove async def async_parse_notification(request: Request): @@ -510,6 +571,7 @@ async def async_parse_notification(request: Request): return None text = await request.text() + _LOGGER.debug("processing notification<-%r", text) env = et.fromstring(text) if env is None or env.tag != f"{{{_Namespaces.SOAP_ENV}}}Envelope": return None @@ -526,37 +588,16 @@ async def async_parse_notification(request: Request): if motion is None: return None - return motion.attrib["Value"] + return motion.attrib["Value"][0:1].lower() == "t" +@singleton(f"{DOMAIN}-push-manager") @callback -@bind_hass def async_get_push_manager( hass: HomeAssistant, - logger: logging.Logger, - entry: ConfigEntry, - webhook: WebhookManager, ) -> PushManager: """Get Push Manager""" - domain_data: dict = hass.data[DOMAIN] - entry_data: dict = domain_data[entry.entry_id] - - if DATA_MANAGER in entry_data: - return entry_data[DATA_MANAGER] - - storage: Store = domain_data.setdefault( - DATA_STORE, Store(hass, STORE_VERSION, f"{DOMAIN}.push_subs") - ) - - entry_data[DATA_MANAGER] = manager = PushManager( - logger, webhook.url, storage, entry.entry_id - ) - - def _unload(): - hass.create_task(manager.async_stop()) - - entry.async_on_unload(_unload) - hass.create_task(manager.async_start()) - + storage = Store(hass, STORE_VERSION, f"{DOMAIN}_push") + manager = PushManager(storage) return manager diff --git a/custom_components/reolink_rest/services.yaml b/custom_components/reolink_rest/services.yaml index e69de29..0b5203a 100644 --- a/custom_components/reolink_rest/services.yaml +++ b/custom_components/reolink_rest/services.yaml @@ -0,0 +1,6 @@ +reboot: + name: Reboot + description: Remotely Reboot ReoLink device + target: + device: + integration: reolink_rest diff --git a/custom_components/reolink_rest/typing.py b/custom_components/reolink_rest/typing.py index 4579fe6..3df1980 100644 --- a/custom_components/reolink_rest/typing.py +++ b/custom_components/reolink_rest/typing.py @@ -1,27 +1,62 @@ """Common Typings""" -from typing import Protocol, TypeVar, TypedDict +from datetime import timedelta +from typing import Mapping, Protocol, TypedDict from aiohttp.web import Request, Response from homeassistant.core import HomeAssistant, CALLBACK_TYPE +from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.helpers.device_registry import DeviceEntry +from async_reolink.api import system, network, ptz from async_reolink.rest import Client -from .models import DeviceData, MotionData +from .models import MotionData, PTZPosition, PTZDisabled -_T = TypeVar("_T") +class PTZData(Protocol): + """PTZ Data""" -class ReolinkEntryData(TypedDict, total=False): - """Reolink Entry Data""" + pan: PTZPosition + tilt: PTZPosition + zoom: PTZPosition + focus: PTZPosition + autofocus: PTZDisabled + presets: Mapping[ptz.PTZPresetId, ptz.PTZPreset] | None + patrol: Mapping[ptz.PTZPatrolId, ptz.PTZPatrol] | None + tattern: Mapping[ptz.PTZTrackId, ptz.PTZTrack] | None + + +class EntityData(Protocol): + """Entity Data and API""" client: Client device: DeviceEntry - coordinator: DataUpdateCoordinator[DeviceData] - motion_coordinator: DataUpdateCoordinator[MotionData] - motion_data_request: set[int] + time_difference: timedelta + abilities: system.abilities.Abilities + device_info: system.DeviceInfoType + channels: Mapping[int, DeviceInfo] + ports: network.NetworkPortsType + updated_motion: frozenset[int] + motion: Mapping[int, MotionData] + updated_ptz: frozenset[int] + ptz: Mapping[int, PTZData] + + def async_request_motion_update(self, channel: int = 0) -> None: + """Request motion update for channel""" + ... + + def async_request_ptz_update(self, channel: int = 0) -> None: + """Request PTZ update for channel""" + ... + + +class ReolinkEntryData(TypedDict, total=False): + """Common entry data""" + + coordinator: DataUpdateCoordinator[EntityData] + motion_coordinators: dict[int, DataUpdateCoordinator[EntityData]] ReolinkDomainData = dict[str, ReolinkEntryData] diff --git a/custom_components/reolink_rest/webhook.py b/custom_components/reolink_rest/webhook.py index d6871e6..bbdb0da 100644 --- a/custom_components/reolink_rest/webhook.py +++ b/custom_components/reolink_rest/webhook.py @@ -3,14 +3,14 @@ from __future__ import annotations import logging -from typing import Final +from typing import Callable, NamedTuple from aiohttp.web import Request, Response from homeassistant.core import HomeAssistant, callback from homeassistant.config_entries import ConfigEntry from homeassistant.helpers.network import get_url, NoURLAvailableError -from homeassistant.loader import bind_hass +from homeassistant.helpers.singleton import singleton from .typing import AsyncWebhookHandler @@ -21,7 +21,58 @@ from .const import DOMAIN -DATA_MANAGER: Final = "webhook" +_LOGGER = logging.getLogger(__name__) + + +class Webhook: + """Webhook""" + + def __init__( + self, manager: WebhookManager, webook_id: str, remove: Callable[[], None] + ) -> None: + self._id = webook_id + self._url = f"{manager.url}{webhook.async_generate_path(webook_id)}" + self._handlers: list[AsyncWebhookHandler] = [] + self._remove = remove + + @property + def id(self): + """id""" + return self._id + + @property + def url(self): + """url""" + return self._url + + def async_add_handler(self, handler: AsyncWebhookHandler): + """Add Handler""" + self._handlers.append(handler) + + def _remove(): + self._handlers.remove(handler) + + return _remove + + async def async_notify_handlers(self, hass: HomeAssistant, request: Request): + """Notify handlers of webhook call""" + + for handler in self._handlers.copy(): + response = await handler(hass, request) + if response is not None: + return response + + return None + + def async_remove(self): + """Remove webook""" + self._remove() + + +class _WebhookAndEntryId(NamedTuple): + + entry_id: str + webhook: Webhook class WebhookManager: @@ -29,81 +80,68 @@ class WebhookManager: def __init__( self, - hass: HomeAssistant, - logger: logging.Logger, - entry: ConfigEntry, base_url: str, ) -> None: - self._id = f"{DOMAIN}_{entry.unique_id}" - self._entry_id = entry.entry_id - self._logger = logger - self._handlers: list[AsyncWebhookHandler] = [] + self._base_url = base_url + self._webhooks: dict[str, _WebhookAndEntryId] = {} + + def async_register(self, hass: HomeAssistant, config_entry: ConfigEntry): + """Get or create webhook for entry""" + for t in self._webhooks.values(): + if t.entry_id == config_entry.entry_id: + return t.webhook + + def _unregister(): + if _webhook.id in self._webhooks: + del self._webhooks[_webhook.id] + webhook.async_unregister(hass, _webhook.id) + + _webhook = Webhook(self, f"{DOMAIN}_{config_entry.unique_id}", _unregister) + self._webhooks[_webhook.id] = _WebhookAndEntryId( + config_entry.entry_id, _webhook + ) + + config_entry.async_on_unload(_unregister) webhook.async_register( hass, DOMAIN, - entry.title + " Webhook", - self._id, + f"{config_entry.title} Webhook", + _webhook.id, self._handle_webhook, ) - self._webhook_url = f"{base_url}{webhook.async_generate_path(self._id)}" - - def _unregister(): - webhook.async_unregister(hass, self._id) - entry.async_on_unload(_unregister) + return _webhook @property def url(self): """Current url""" - return self._webhook_url + return self._base_url async def _handle_webhook( self, hass: HomeAssistant, webhook_id: str, request: Request ) -> Response | None: - if webhook_id != self._id: + if webhook_id not in self._webhooks: return None - request["entry_id"] = self._entry_id - - for handler in self._handlers.copy(): - response = await handler(hass, request) - if response is not None: - return response - - return None - - def async_add_handler(self, handler: AsyncWebhookHandler): - """Register Handler""" - - self._handlers.append(handler) - - def _remove(): - self._handlers.remove(handler) - - return _remove + _LOGGER.debug("Webhook hit for %s", webhook_id) + _entry = self._webhooks[webhook_id] + request["entry_id"] = _entry.entry_id + return await _entry.webhook.async_notify_handlers(hass, request) +@singleton(f"{DOMAIN}-webhook-manager") @callback -@bind_hass -def async_get_webhook_manager( - hass: HomeAssistant, logger: logging.Logger, entry: ConfigEntry -) -> WebhookManager: +def async_get_webhook_manager(hass: HomeAssistant): """Get or create new Webhook Manager for entry""" if not webhook: return None - domain_data: dict = hass.data[DOMAIN] - entry_data: dict = domain_data[entry.entry_id] - - if DATA_MANAGER in entry_data: - return entry_data[DATA_MANAGER] - try: url = get_url(hass, prefer_external=False, allow_cloud=False) except NoURLAvailableError: - logger.warning( + _LOGGER.warning( "Could not get internal url from system" ", will attempt external url but this is not preferred" ", please verify your installation." @@ -112,11 +150,12 @@ def async_get_webhook_manager( try: url = get_url(hass, allow_cloud=False) except NoURLAvailableError: - logger.warning("Could not get an addressable url, disabling webook support") + _LOGGER.warning( + "Could not get an addressable url, disabling webook support" + ) url = None if not url: return None - entry_data[DATA_MANAGER] = manager = WebhookManager(hass, logger, entry, url) - return manager + return WebhookManager(url)