From 43deb5c53668ecd6980ccdc248f2416ccb43b163 Mon Sep 17 00:00:00 2001 From: Sassan Haradji Date: Sat, 10 Aug 2024 21:49:54 +0400 Subject: [PATCH] feat(core): add signal management for ubo_app process - closes #156 --- CHANGELOG.md | 1 + ubo_app/display.py | 28 ++++++++++++------------- ubo_app/services/040-camera/setup.py | 4 ++-- ubo_app/setup.py | 31 ++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbfa1350..a21c6c1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - fix(core): updating items of the pages after the first page, not being reflected on the screen - closes #149 - feat(rpi-connect): implement `rpi-connect` under `Remote` menu - closes #139 - fix(core): update manager downloads the latest `install.sh` and runs it to do the update - closes #152 +- feat(core): add signal management for ubo_app process - closes #156 ## Version 0.15.5 diff --git a/ubo_app/display.py b/ubo_app/display.py index ef4901a1..929bf38a 100644 --- a/ubo_app/display.py +++ b/ubo_app/display.py @@ -82,23 +82,13 @@ def render_on_display( state.block(rectangle, data_bytes) -def pause() -> None: - """Pause the display.""" - state.is_running = False - - -def resume() -> None: - """Resume the display.""" - state.is_running = True - - -class _State: +class _DisplayState: """The state of the display.""" is_running = True display = setup_display() - def __init__(self: _State, splash_screen: bytes | None = None) -> None: + def __init__(self: _DisplayState, splash_screen: bytes | None = None) -> None: if IS_RPI: from RPi import GPIO # pyright: ignore [reportMissingModuleSource] @@ -117,7 +107,15 @@ def __init__(self: _State, splash_screen: bytes | None = None) -> None: atexit.register(self.turn_off) - def turn_off(self: _State) -> None: + def pause(self: _DisplayState) -> None: + """Pause the display.""" + self.is_running = False + + def resume(self: _DisplayState) -> None: + """Resume the display.""" + self.is_running = True + + def turn_off(self: _DisplayState) -> None: """Destroy the display.""" from ubo_app.constants import HEIGHT, WIDTH @@ -135,7 +133,7 @@ def turn_off(self: _State) -> None: GPIO.cleanup(26) def block( - self: _State, + self: _DisplayState, rectangle: tuple[int, int, int, int], data_bytes: bytes, *, @@ -146,4 +144,4 @@ def block( self.display._block(*rectangle, data_bytes) # noqa: SLF001 -state = _State() +state = _DisplayState() diff --git a/ubo_app/services/040-camera/setup.py b/ubo_app/services/040-camera/setup.py index ec200e49..f70e43c7 100644 --- a/ubo_app/services/040-camera/setup.py +++ b/ubo_app/services/040-camera/setup.py @@ -187,7 +187,7 @@ def feed_viewfinder_locked(_: object) -> None: feed_viewfinder_scheduler = Clock.schedule_interval(feed_viewfinder_locked, 0.04) - display.pause() + display.state.pause() def handle_stop_viewfinder() -> None: with fs_lock: @@ -195,7 +195,7 @@ def handle_stop_viewfinder() -> None: is_running = False feed_viewfinder_scheduler.cancel() dispatch(CloseApplicationEvent(application=application)) - display.resume() + display.state.resume() cancel_subscription() if picamera2: picamera2.stop() diff --git a/ubo_app/setup.py b/ubo_app/setup.py index ccc9c514..eda7a733 100644 --- a/ubo_app/setup.py +++ b/ubo_app/setup.py @@ -2,11 +2,15 @@ from __future__ import annotations +import signal from pathlib import Path from typing import Any, cast import numpy as np from fake import Fake +from redux import FinishAction + +from ubo_app.store.main import dispatch class _FakeAsyncProcess(Fake): @@ -98,3 +102,30 @@ async def fake_create_subprocess_exec( ) import ubo_app.display as _ # noqa: F401 + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + +def signal_handler(signum: int, _: object) -> None: + """Handle the signal.""" + from ubo_app import display + from ubo_app.logging import logger + + logger.info('Received signal %s, turning off the display...', signum) + + display.state.turn_off() + display.state.pause() + + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + if signum == signal.SIGINT: + logger.info('Exiting gracefully, sending the signal again will force exit!') + dispatch(FinishAction()) + elif signum == signal.SIGTERM: + logger.info( + 'Exiting forcefully, sending the signal again will not be caught!', + ) + import os + + os.kill(os.getpid(), signal.SIGTERM)