From 4ec73c879c88a33c39ab61da6436d0716b028196 Mon Sep 17 00:00:00 2001 From: Sassan Haradji Date: Mon, 15 Apr 2024 19:35:18 +0400 Subject: [PATCH] refactor(keypad): reduce complexities feat(keypad): dispatch release actions `KeypadKeyReleaseAction` #39 fix(keypad): dispatch the state of mic key when keypad service initializes #1 --- CHANGELOG.md | 3 + ubo_app/services/020-keypad/setup.py | 216 ++++++--------------------- ubo_app/store/main/reducer.py | 22 +-- ubo_app/store/services/keypad.py | 6 + 4 files changed, 71 insertions(+), 176 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9c1f172..3a780da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ screen to render the extra information - fix(docker): open_webui now runs in its own network as `hostname` and `network_mode` can't be used together #63 +- refactor(keypad): reduce complexities +- feat(keypad): dispatch release actions `KeypadKeyReleaseAction` #39 +- fix(keypad): dispatch the state of mic key when keypad service initializes #1 ## Version 0.12.5 diff --git a/ubo_app/services/020-keypad/setup.py b/ubo_app/services/020-keypad/setup.py index f234ee93..e87ff843 100644 --- a/ubo_app/services/020-keypad/setup.py +++ b/ubo_app/services/020-keypad/setup.py @@ -5,13 +5,15 @@ import logging import math import time -from enum import StrEnum -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Literal, cast import board -from immutable import Immutable -from ubo_app.store.services.keypad import Key, KeypadKeyPressAction +from ubo_app.store.services.keypad import ( + Key, + KeypadKeyPressAction, + KeypadKeyReleaseAction, +) from ubo_app.store.services.sound import SoundDevice, SoundSetMuteStatusAction from ubo_app.utils import IS_RPI @@ -28,88 +30,22 @@ class KeypadError(Exception): ... -class ButtonName(StrEnum): - TOP_LEFT = 'top_left' - MIDDLE_LEFT = 'middle_left' - BOTTOM_LEFT = 'bottom_left' - UP = 'up' - DOWN = 'down' - BACK = 'back' - HOME = 'home' - MIC = 'mic' - - -class ButtonEvent(Immutable): - status: ButtonStatus - timestamp: float - - -class Event(Immutable): - inputs: UnaryStruct - timestamp: float - - -class KeypadStatus: - """Class to keep track of button status. - - 0: "TOP_LEFT": Top left button - 1: "MIDDLE_LEFT": Middle left button - 2: "BOTTOM_LEFT": Bottom left button - 3: "BACK": Button with back arrow label (under LCD) - 4: "HOME": Button with home label (Under LCD) - 5: "UP": Right top button with upward arrow label - 6: "DOWN": Right bottom button with downward arrow label - 7: "MIC": Microphone mute switch - """ - - buttons: dict[ButtonName, ButtonEvent] - - def __init__(self: KeypadStatus) -> None: - self.buttons = { - button_name: ButtonEvent(status='released', timestamp=time.time()) - for button_name in ButtonName - } - - def update_status( - self: KeypadStatus, - button_name: ButtonName, - new_status: Literal['pressed', 'released'], - ) -> None: - if new_status not in {'pressed', 'released'}: - msg = 'Invalid status' - raise KeypadError(msg) - if button_name in self.buttons: - self.buttons[button_name] = ButtonEvent( - status=new_status, - timestamp=time.time(), - ) - - def get_status( - self: KeypadStatus, - button_name: ButtonName, - ) -> Literal['pressed', 'released']: - if button_name in self.buttons: - return self.buttons[button_name].status - msg = 'Invalid button name' - raise KeypadError(msg) - - def get_timestamp(self: KeypadStatus, button_name: ButtonName) -> float: - if button_name in self.buttons: - return self.buttons[button_name].timestamp - msg = 'Invalid button name' - raise KeypadError(msg) - - def get_label(self: KeypadStatus, index: int) -> ButtonName: - if index > len(ButtonName): - msg = 'Invalid index' - raise KeypadError(msg) - return list(ButtonName)[index] +KEY_INDEX = { + 0: Key.L1, + 1: Key.L2, + 2: Key.L3, + 3: Key.UP, + 4: Key.DOWN, + 5: Key.BACK, + 6: Key.HOME, +} +MIC_INDEX = 7 class Keypad: """Class to handle keypad events.""" - event_queue: list[Event] + previous_inputs: int aw: AW9523 inputs: UnaryStruct @@ -122,14 +58,11 @@ def __init__(self: Keypad) -> None: self.logger = logging.getLogger('keypad') self.logger.setLevel(logging.WARNING) self.logger.debug('Initialising keypad...') - self.event_queue = [] + self.previous_inputs = 0 self.aw = None - self.inputs = None self.bus_address = 0x58 self.model = 'aw9523' self.enabled = True - self.buttons = KeypadStatus() - self.index = 0 self.init_i2c() def clear_interrupt_flags(self: Keypad, i2c: i2c_device.I2CDevice) -> None: @@ -197,17 +130,23 @@ def init_i2c(self: Keypad) -> None: self.clear_interrupt_flags(new_i2c) # read register values - self.inputs = self.aw.inputs + inputs = cast(int, self.aw.inputs) self.logger.debug( 'Initializing inputs', - extra={'inputs': f'{self.inputs:016b}'}, + extra={'inputs': f'{inputs:016b}'}, ) - self.event_queue = [Event(inputs=self.inputs, timestamp=time.time())] + self.previous_inputs = inputs time.sleep(0.5) # Interrupt callback when any button is pressed btn.when_pressed = self.key_press_cb + is_mic_active = inputs & 1 << MIC_INDEX != 0 + self.on_button_event( + index=MIC_INDEX, + status='released' if is_mic_active else 'pressed', + ) + def key_press_cb(self: Keypad, _: object) -> None: """Handle key press dispatched by GPIO interrupt. @@ -228,117 +167,60 @@ def key_press_cb(self: Keypad, _: object) -> None: if self.aw is None: return # read register values - self.inputs = self.aw.inputs - event = Event(inputs=self.inputs, timestamp=time.time()) + inputs = cast(int, self.aw.inputs) # append the event to the queue. The queue has a depth of 2 and # keeps the current and last event. - self.event_queue.append(event) - self.logger.info(self.event_queue) - self.logger.debug('Current Inputs', extra={'inputs': f'{self.inputs:016b}'}) - previous_event = self.event_queue.pop(0) + self.logger.debug('Current Inputs', extra={'inputs': f'{inputs:016b}'}) self.logger.debug( 'Previous Inputs', - extra={'inputs': f'{previous_event.inputs:016b}'}, + extra={'inputs': f'{self.previous_inputs:016b}'}, ) # XOR the last recorded input values with the current input values # to see which bits have changed. Technically there can only be one # bit change in every callback - change_mask = previous_event.inputs ^ self.inputs + change_mask = self.previous_inputs ^ inputs self.logger.debug('Change', extra={'change_mask': f'{change_mask:016b}'}) if change_mask == 0: return # use the change mask to see if the button was the change was # falling (1->0) indicating a pressed action # or risign (0->1) indicating a release action - self.index = (int)(math.log2(change_mask)) - self.logger.info('button index', extra={'button_index': self.index}) + index = (int)(math.log2(change_mask)) + self.logger.info('button index', extra={'button_index': index}) # Check for rising edge or falling edge action (press or release) - self.button_label = self.buttons.get_label(self.index) - if (previous_event.inputs & change_mask) == 0: + if (self.previous_inputs & change_mask) == 0: self.logger.info( 'Button pressed', - extra={'button': str(self.index), 'label': self.button_label}, + extra={'button': str(index)}, ) - - # calculate how long the button was held down - # and print the time - last_time_stamp = self.buttons.get_timestamp(self.button_label) - held_down_time = time.time() - last_time_stamp - self.logger.info( - 'Button was pressed down', - extra={'held_down_time': held_down_time}, - ) - - self.buttons.update_status(self.button_label, 'released') - self.on_button_event(self.button_label, 'released') - + self.on_button_event(index=index, status='released') else: self.logger.info( 'Button released', - extra={'button': str(self.index), 'label': self.button_label}, + extra={'button': str(index)}, ) - self.buttons.update_status(self.button_label, 'pressed') - self.on_button_event(self.button_label, 'pressed') + self.on_button_event(index=index, status='pressed') - self.logger.info(self.buttons.buttons) + self.previous_inputs = inputs def on_button_event( self: Keypad, - button_pressed: ButtonName, - button_status: ButtonStatus, + *, + index: int, + status: ButtonStatus, ) -> None: from ubo_app.store import dispatch - if button_status == 'pressed': - if button_pressed == ButtonName.UP: - dispatch( - KeypadKeyPressAction( - key=Key.UP, - ), - ) - elif button_pressed == ButtonName.DOWN: - dispatch( - KeypadKeyPressAction( - key=Key.DOWN, - ), - ) - elif button_pressed == ButtonName.TOP_LEFT: - dispatch( - KeypadKeyPressAction( - key=Key.L1, - ), - ) - elif button_pressed == ButtonName.MIDDLE_LEFT: - dispatch( - KeypadKeyPressAction( - key=Key.L2, - ), - ) - elif button_pressed == ButtonName.BOTTOM_LEFT: - dispatch( - KeypadKeyPressAction( - key=Key.L3, - ), - ) - elif button_pressed == ButtonName.BACK: - dispatch( - KeypadKeyPressAction( - key=Key.BACK, - ), - ) - elif button_pressed == ButtonName.HOME: - dispatch( - KeypadKeyPressAction( - key=Key.HOME, - ), - ) - if button_pressed == ButtonName.MIC: - from ubo_app.store import dispatch - + if index in KEY_INDEX: + if status == 'pressed': + dispatch(KeypadKeyPressAction(key=KEY_INDEX[index])) + elif status == 'released': + dispatch(KeypadKeyReleaseAction(key=KEY_INDEX[index])) + if index == MIC_INDEX: dispatch( SoundSetMuteStatusAction( device=SoundDevice.INPUT, - mute=button_status == 'pressed', + mute=status == 'pressed', ), ) diff --git a/ubo_app/store/main/reducer.py b/ubo_app/store/main/reducer.py index 3200506c..4be6f624 100644 --- a/ubo_app/store/main/reducer.py +++ b/ubo_app/store/main/reducer.py @@ -28,6 +28,8 @@ KeypadEvent, KeypadKeyPressAction, KeypadKeyPressEvent, + KeypadKeyReleaseAction, + KeypadKeyReleaseEvent, ) from ubo_app.store.services.sound import SoundChangeVolumeAction, SoundDevice @@ -52,26 +54,28 @@ def reducer( if isinstance(action, KeypadKeyPressAction): actions: list[SoundChangeVolumeAction] = [] + events: list[KeypadKeyPressEvent] = [] if action.key == Key.UP and len(state.path) == 1: - actions.append( + actions = [ SoundChangeVolumeAction( amount=0.05, device=SoundDevice.OUTPUT, ), - ) - if action.key == Key.DOWN and len(state.path) == 1: - actions.append( + ] + elif action.key == Key.DOWN and len(state.path) == 1: + actions = [ SoundChangeVolumeAction( amount=-0.05, device=SoundDevice.OUTPUT, ), - ) + ] + else: + events = [KeypadKeyPressEvent(key=action.key)] + return CompleteReducerResult(state=state, actions=actions, events=events) + if isinstance(action, KeypadKeyReleaseAction): return CompleteReducerResult( state=state, - actions=actions, - events=[ - KeypadKeyPressEvent(key=action.key), - ], + events=[KeypadKeyReleaseEvent(key=action.key)], ) if isinstance(action, RegisterAppAction): diff --git a/ubo_app/store/services/keypad.py b/ubo_app/store/services/keypad.py index 474f5293..885c61ce 100644 --- a/ubo_app/store/services/keypad.py +++ b/ubo_app/store/services/keypad.py @@ -29,8 +29,14 @@ class KeypadKeyDownAction(KeypadAction): ... class KeypadKeyPressAction(KeypadAction): ... +class KeypadKeyReleaseAction(KeypadAction): ... + + class KeypadEvent(BaseEvent): key: Key class KeypadKeyPressEvent(KeypadEvent): ... + + +class KeypadKeyReleaseEvent(KeypadEvent): ...