diff --git a/CHANGELOG.md b/CHANGELOG.md index 023ec2c..e2b4842 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Upcoming + +- feat: divide each frame into multiple rectangles, compare rectangle with the same rectangle in the previous frame and update only the changed ones + ## Version 0.11.1 - fix: change type of `kwargs` in `HeadlessWidget.__init__` to `object` diff --git a/demo.py b/demo.py index bc9f08c..f945386 100644 --- a/demo.py +++ b/demo.py @@ -3,7 +3,6 @@ from __future__ import annotations -import functools from pathlib import Path from typing import TYPE_CHECKING @@ -14,8 +13,6 @@ if TYPE_CHECKING: from threading import Thread - import numpy as np - from numpy._typing import NDArray WIDTH = 400 HEIGHT = 240 @@ -23,12 +20,12 @@ def render( *, - rectangle: tuple[int, int, int, int], - data: NDArray[np.uint8], + regions: list[config.Region], last_render_thread: Thread, ) -> None: """Render the data to a png file.""" - _ = rectangle, last_render_thread + _ = last_render_thread + data = regions[0]['data'] with Path('demo.png').open('wb') as file: png.Writer( alpha=True, @@ -38,7 +35,7 @@ def render( bitdepth=8, ).write( file, - data.reshape(-1, data.shape[1] * 4).tolist(), + data.reshape(data.shape[0], -1).tolist(), ) @@ -48,13 +45,13 @@ def render( 'width': WIDTH, 'height': HEIGHT, 'flip_vertical': True, - 'rotation': 1, + 'rotation': 3, }, ) from kivy.app import App from kivy.graphics.context_instructions import Color -from kivy.graphics.vertex_instructions import Rectangle +from kivy.graphics.vertex_instructions import Ellipse from kivy.uix.gridlayout import GridLayout from kivy.uix.label import Label @@ -75,18 +72,34 @@ def build(self) -> FboGridLayout: ): label = Label(text=f'Hello {index}') - def render( - label: Label, - *_: object, - color: tuple[float, float, float, float], - ) -> None: - with label.canvas.before: - Color(*color) - Rectangle(pos=label.pos, size=label.size) + with label.canvas.before: + Color(*color) + label.shape = Ellipse() label.bind( - size=functools.partial(render, color=color), - pos=functools.partial(render, color=color), + size=lambda label, *_: ( + setattr( + label.shape, + 'size', + (label.size[0] / 2, label.size[1] / 2), + ), + setattr( + label.shape, + 'pos', + ( + label.pos[0] + label.size[0] / 4, + label.pos[1] + label.size[1] / 4, + ), + ), + ), + pos=lambda label, *_: setattr( + label.shape, + 'pos', + ( + label.pos[0] + label.size[0] / 4, + label.pos[1] + label.size[1] / 4, + ), + ), ) fbg.add_widget(label) diff --git a/headless_kivy/__init__.py b/headless_kivy/__init__.py index c94b349..a46e00c 100644 --- a/headless_kivy/__init__.py +++ b/headless_kivy/__init__.py @@ -1,237 +1,17 @@ -"""Implement a Kivy widget that renders everything in memory. +"""Headless widget useful for testing Kivy applications or driving embedded displays. * IMPORTANT: You need to run `setup_headless` function before instantiating `HeadlessWidget`. A Kivy widget rendered in memory which doesn't create any window in any display manager (a.k.a "headless"). -""" - -from __future__ import annotations - -import time -from pathlib import Path -from queue import Empty, Queue -from threading import Thread -from typing import TYPE_CHECKING, ClassVar - -import numpy as np -from kivy.graphics.fbo import Fbo -from kivy.graphics.gl_instructions import ClearBuffers, ClearColor -from kivy.graphics.instructions import Callback, Canvas -from kivy.graphics.vertex_instructions import Rectangle -from kivy.metrics import dp -from kivy.uix.widget import Widget - -from headless_kivy import config -from headless_kivy.logger import logger - -if TYPE_CHECKING: - from numpy._typing import NDArray - - -def apply_tranformations(data: NDArray[np.uint8]) -> NDArray[np.uint8]: - data = np.rot90(data, config.rotation()) - if config.flip_horizontal(): - data = np.fliplr(data) - if config.flip_vertical(): - data = np.flipud(data) - return data - - -class HeadlessWidget(Widget): - """A Kivy widget that renders everything in memory.""" - - last_second: int - rendered_frames: int - skipped_frames: int - pending_render_threads: Queue[Thread] - - previous_data: NDArray[np.uint8] | None = None - previous_frame: NDArray[np.uint8] | None = None - fbo: Fbo - fbo_rectangle: Rectangle - - raw_data: ClassVar[NDArray[np.uint8]] - - def __init__(self: HeadlessWidget, **kwargs: object) -> None: - """Initialize a `HeadlessWidget`.""" - config.check_initialized() - - __import__('kivy.core.window') - - if config.is_debug_mode(): - self.last_second = int(time.time()) - self.rendered_frames = 0 - self.skipped_frames = 0 - - self.pending_render_threads = Queue(2 if config.double_buffering() else 1) - self.canvas = Canvas() - - with self.canvas: - self.fbo = Fbo(size=self.size, with_stencilbuffer=True) - if config.is_debug_mode(): - self.fbo_rectangle = Rectangle(size=self.size, texture=self.fbo.texture) - with self.fbo.before: - ClearColor(0, 0, 0, 0) - ClearBuffers() +It provides tooling for test environment and optimizations for custom displays in +embedded systems. - with self.fbo.after: - Callback(self.render_on_display) - - super().__init__(**kwargs) - - def add_widget( - self: HeadlessWidget, - *args: object, - **kwargs: object, - ) -> None: - """Extend `Widget.add_widget` and handle `canvas`.""" - canvas = self.canvas - self.canvas = self.fbo - super().add_widget(*args, **kwargs) - self.canvas = canvas - - def remove_widget( - self: HeadlessWidget, - *args: object, - **kwargs: object, - ) -> None: - """Extend `Widget.remove_widget` and handle `canvas`.""" - canvas = self.canvas - self.canvas = self.fbo - super().remove_widget(*args, **kwargs) - self.canvas = canvas - - def on_size( - self: HeadlessWidget, - _: HeadlessWidget, - value: tuple[int, int], - ) -> None: - """Update size of `fbo` and size of `fbo_rect` when widget's size changes.""" - self.fbo.size = value - if config.is_debug_mode(): - self.fbo_rectangle.size = value - - def on_pos( - self: HeadlessWidget, - _: HeadlessWidget, - value: tuple[int, int], - ) -> None: - """Update position of `fbo_rect` when widget's position changes.""" - if config.is_debug_mode(): - self.fbo_rectangle.pos = value - - def render_on_display(self: HeadlessWidget, *_: object) -> None: # noqa: C901 - """Render the current frame on the display.""" - # Log the number of skipped and rendered frames in the last second - if config.is_debug_mode(): - self.fbo_rectangle.texture = self.fbo.texture - # Increment rendered_frames/skipped_frames count every frame and reset their - # values to zero every second. - current_second = int(time.time()) - - if current_second != self.last_second: - logger.debug( - f"""Frames in {self.last_second}: \ -[Skipped: {self.skipped_frames}] [Rendered: {self.rendered_frames}]""", - ) - self.last_second = current_second - self.rendered_frames = 0 - self.skipped_frames = 0 - - data = np.frombuffer(self.fbo.texture.pixels, dtype=np.uint8) - if self.previous_data is not None and np.array_equal(data, self.previous_data): - if config.is_debug_mode(): - self.skipped_frames += 1 - return - self.previous_data = data - # Render the current frame on the display asynchronously - try: - last_thread = self.pending_render_threads.get(False) - except Empty: - last_thread = None - - height = int(min(self.fbo.texture.height, dp(config.height()) - self.y)) - width = int(min(self.fbo.texture.width, dp(config.width()) - self.x)) - - data = data.reshape( - int(self.fbo.texture.height), - int(self.fbo.texture.width), - -1, - ) - data = data[:height, :width, :] - data = apply_tranformations(data) - x, y = int(self.x), int(dp(config.height()) - self.y - self.height) - - mask = np.any(data != self.previous_frame, axis=2) - alpha_mask = np.repeat(mask[:, :, np.newaxis], 4, axis=2) - self.previous_frame = data - if config.rotation() % 2 == 0: - HeadlessWidget.raw_data[y : y + height, x : x + width, :][alpha_mask] = ( - data[alpha_mask] - ) - else: - HeadlessWidget.raw_data[x : x + width, y : y + height, :][alpha_mask] = ( - data[alpha_mask] - ) - - if config.is_debug_mode(): - self.rendered_frames += 1 - raw_file_path = Path(f'headless_kivy_buffer-{self.x}_{self.y}.raw') - - if not raw_file_path.exists(): - with raw_file_path.open('wb') as file: - file.write( - b'\x00' * int(dp(config.width()) * dp(config.height()) * 4), - ) - with raw_file_path.open('r+b') as file: - for i in range(height): - file.seek(int((x + (y + i) * dp(config.width())) * 4)) - file.write(bytes(data[i, :, :].flatten().tolist())) - - raw_file_path = Path('headless_kivy_buffer.raw') - - if not raw_file_path.exists(): - with raw_file_path.open('wb') as file: - file.write( - b'\x00' * int(dp(config.width()) * dp(config.height()) * 4), - ) - with raw_file_path.open('r+b') as file: - for i in range(height): - file.seek(int((x + (y + i) * dp(config.width())) * 4)) - file.write( - bytes( - HeadlessWidget.raw_data[y + i, x : x + width, :] - .flatten() - .tolist(), - ), - ) - - thread = Thread( - target=config.callback(), - kwargs={ - 'rectangle': (x, y, x + width - 1, y + height - 1), - 'data': data, - 'last_render_thread': last_thread, - }, - daemon=True, - ) - self.pending_render_threads.put(thread) - thread.start() - - @classmethod - def get_instance( - cls: type[HeadlessWidget], - widget: Widget, - ) -> HeadlessWidget | None: - """Get the nearest instance of `HeadlessWidget`.""" - if isinstance(widget, HeadlessWidget): - return widget - if widget.parent: - return cls.get_instance(widget.parent) - return None +headless_kivy automatically throttles the frame rate to the value set by the `max_fps` +""" +from headless_kivy.widget import HeadlessWidget -__all__ = ['HeadlessWidget'] +__all__ = ('HeadlessWidget',) diff --git a/headless_kivy/_debug.py b/headless_kivy/_debug.py new file mode 100644 index 0000000..9dda04a --- /dev/null +++ b/headless_kivy/_debug.py @@ -0,0 +1,87 @@ +"""DebugMixin adds debug information to the HeadlessWidget class.""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import TYPE_CHECKING, ClassVar, Self + +from kivy.metrics import dp + +from headless_kivy import config +from headless_kivy.logger import logger + +if TYPE_CHECKING: + import numpy as np + from kivy.graphics.context_instructions import Color + from kivy.graphics.fbo import Fbo + from kivy.graphics.vertex_instructions import Rectangle + from numpy._typing import NDArray + + +class DebugMixin: + fbo: Fbo + fbo_render_color: Color + fbo_render_rectangle: Rectangle + + x: int + y: int + + raw_data: ClassVar[NDArray[np.uint8]] + + def __init__(self: Self, **kwargs: dict[str, object]) -> None: + super().__init__(**kwargs) + if config.is_debug_mode(): + self.last_second = int(time.time()) + self.rendered_frames = 0 + self.skipped_frames = 0 + + def set_debug_info(self: Self) -> None: + # Log the number of skipped and rendered frames in the last second + if config.is_debug_mode(): + self.fbo_render_rectangle.texture = self.fbo.texture + # Increment rendered_frames/skipped_frames count every frame and reset their + # values to zero every second. + current_second = int(time.time()) + + if current_second != self.last_second: + logger.debug( + f"""Frames in {self.last_second}: \ +[Skipped: {self.skipped_frames}] [Rendered: {self.rendered_frames}]""", + ) + self.last_second = current_second + self.rendered_frames = 0 + self.skipped_frames = 0 + + def render_debug_info( + self: Self, + rect_: tuple[int, int, int, int], + _: list[tuple[int, int, int, int]], + data: NDArray[np.uint8], + ) -> None: + if config.is_debug_mode(): + x1, y1, x2, y2 = rect_ + self.rendered_frames += 1 + w = int(dp(config.width())) + h = int(dp(config.height())) + + raw_file_path = Path('headless_kivy_buffer.raw') + + with raw_file_path.open('w+b') as file: + file.truncate(w * h * 4) + for i in range(y1, y2): + file.seek((x1 + i * w) * 4) + file.write( + bytes(self.raw_data[h - i - 1, x1:x2, :].flatten().tolist()), + ) + + raw_file_path = Path(f'headless_kivy_buffer-{self.x}_{self.y}.raw') + if config.rotation() % 2 != 0: + w, h = h, w + x1, y1, x2, y2 = y1, x1, y2, x2 + + with raw_file_path.open('w+b') as file: + file.truncate(w * h * 4) + for i in range(y1, y2): + file.seek((x1 + i * w) * 4) + file.write(bytes(data[i - y1, x1:x2, :].flatten().tolist())) diff --git a/headless_kivy/config.py b/headless_kivy/config.py index 899f3bb..4070b88 100644 --- a/headless_kivy/config.py +++ b/headless_kivy/config.py @@ -13,8 +13,13 @@ from headless_kivy.constants import ( DOUBLE_BUFFERING, + FLIP_HORIZONTAL, + FLIP_VERTICAL, HEIGHT, IS_DEBUG_MODE, + MAX_FPS, + REGION_SIZE, + ROTATION, WIDTH, ) from headless_kivy.logger import add_file_handler, add_stdout_handler @@ -28,26 +33,46 @@ class SetupHeadlessConfig(TypedDict): - """Arguments of `setup_headless_kivy` function.""" + """Arguments of `setup_headless_kivy` function. + + Attributes + ---------- + callback: `Callback` + The callback function that will render the data to the screen. + max_fps: `int`, optional + Maximum frames per second for the Kivy application. + width: `int`, optional + The width of the display in pixels. + height: `int`, optional + The height of the display in pixels. + is_debug_mode: `bool`, optional + If set to True, the application will consume computational resources to log + additional debug information. + double_buffering: `bool`, optional + If set to True, it will let Kivy generate the next frame while sending the last + frame to the display. + rotation: `int`, optional + The rotation of the display clockwise, it will be multiplied by 90. + flip_horizontal: `bool`, optional + Whether the screen should be flipped horizontally or not. + flip_vertical: `bool`, optional + Whether the screen should be flipped vertically or not. + region_size: `int`, optional + Approximate size of rectangles to divide the screen into and see if they need to + be updated. + + """ - """The callback function that will render the data to the screen.""" callback: Callback - """The width of the display in pixels.""" + max_fps: NotRequired[int] width: NotRequired[int] - """The height of the display in pixels.""" height: NotRequired[int] - """If set to True, the application will consume computational resources to log - additional debug information.""" is_debug_mode: NotRequired[bool] - """Is set to `True`, it will let Kivy to generate the next frame while sending the - last frame to the display.""" double_buffering: NotRequired[bool] - """The rotation of the display clockwise, it will be multiplied by 90.""" rotation: NotRequired[int] - """Whether the screen should be flipped horizontally or not""" flip_horizontal: NotRequired[bool] - """Whether the screen should be flipped vertically or not""" flip_vertical: NotRequired[bool] + region_size: NotRequired[int] _config: SetupHeadlessConfig | None = None @@ -76,16 +101,20 @@ def setup_headless_kivy(config: SetupHeadlessConfig) -> None: add_stdout_handler() add_file_handler() + Config.set('kivy', 'kivy_clock', 'default') Config.set('graphics', 'fbo', 'force-hardware') + Config.set('graphics', 'fullscreen', '0') + Config.set('graphics', 'maxfps', f'{max_fps()}') + Config.set('graphics', 'multisamples', '1') + Config.set('graphics', 'resizable', '0') + Config.set('graphics', 'vsync', '0') Config.set('graphics', 'width', f'{width()}') Config.set('graphics', 'height', f'{height()}') - from headless_kivy import HeadlessWidget + from headless_kivy.widget import HeadlessWidget HeadlessWidget.raw_data = np.zeros( - (int(dp(height())), int(dp(width())), 4) - if rotation() % 2 == 0 - else (int(dp(width())), int(dp(height())), 4), + (int(dp(height())), int(dp(width())), 4), dtype=np.uint8, ) @@ -96,14 +125,20 @@ def check_initialized() -> None: report_uninitialized() +class Region(TypedDict): + """A region of the screen to be updated.""" + + rectangle: tuple[int, int, int, int] + data: NDArray[np.uint8] + + class Callback(Protocol): """The signature of the renderer function.""" def __call__( self: Callback, *, - rectangle: tuple[int, int, int, int], - data: NDArray[np.uint8], + regions: list[Region], last_render_thread: Thread, ) -> None: """Render the data to the screen.""" @@ -117,6 +152,14 @@ def callback() -> Callback: report_uninitialized() +@cache +def max_fps() -> int: + """Return the maximum frames per second for the Kivy application.""" + if _config: + return _config.get('max_fps', MAX_FPS) + report_uninitialized() + + @cache def width() -> int: """Return the width of the display in pixels.""" @@ -153,7 +196,7 @@ def double_buffering() -> bool: def rotation() -> int: """Return the rotation of the display.""" if _config: - return _config.get('rotation', 0) + return _config.get('rotation', ROTATION) report_uninitialized() @@ -161,7 +204,7 @@ def rotation() -> int: def flip_horizontal() -> bool: """Return `True` if the display is flipped horizontally.""" if _config: - return _config.get('flip_horizontal', False) + return _config.get('flip_horizontal', FLIP_HORIZONTAL) report_uninitialized() @@ -169,5 +212,13 @@ def flip_horizontal() -> bool: def flip_vertical() -> bool: """Return `True` if the display is flipped vertically.""" if _config: - return _config.get('flip_vertical', False) + return _config.get('flip_vertical', FLIP_VERTICAL) + report_uninitialized() + + +@cache +def region_size() -> int: + """Return the approximate size of rectangles to divide the screen into.""" + if _config: + return _config.get('region_size', REGION_SIZE) report_uninitialized() diff --git a/headless_kivy/constants.py b/headless_kivy/constants.py index fe9aa7c..c976416 100644 --- a/headless_kivy/constants.py +++ b/headless_kivy/constants.py @@ -4,6 +4,7 @@ from str_to_bool import str_to_bool +MAX_FPS = int(os.environ.get('HEADLESS_KIVY_MAX_FPS', '32')) WIDTH = int(os.environ.get('HEADLESS_KIVY_WIDTH', '240')) HEIGHT = int(os.environ.get('HEADLESS_KIVY_HEIGHT', '240')) IS_DEBUG_MODE = str_to_bool(os.environ.get('HEADLESS_KIVY_DEBUG', 'False')) == 1 @@ -13,3 +14,9 @@ ) == 1 ) +ROTATION = int(os.environ.get('HEADLESS_KIVY_ROTATION', '0')) +FLIP_HORIZONTAL = ( + str_to_bool(os.environ.get('HEADLESS_KIVY_FLIP_HORIZONTAL', 'False')) == 1 +) +FLIP_VERTICAL = str_to_bool(os.environ.get('HEADLESS_KIVY_FLIP_VERTICAL', 'False')) == 1 +REGION_SIZE = int(os.environ.get('HEADLESS_KIVY_REGION_SIZE', '60')) diff --git a/headless_kivy/utils.py b/headless_kivy/utils.py new file mode 100644 index 0000000..4460c0a --- /dev/null +++ b/headless_kivy/utils.py @@ -0,0 +1,148 @@ +"""Utility functions for the project.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, TypeVar + +import numpy as np +from kivy.metrics import dp + +from headless_kivy import config + +if TYPE_CHECKING: + from numpy._typing import NDArray + + +def divide_number(n: int) -> list[int]: + """Divide a number into a list of smaller numbers.""" + k = round(n / config.region_size()) + base, rem = divmod(n, k) + return [base + 1] * rem + [base] * (k - rem) + + +T = TypeVar('T') + + +def get(array: list[T], index: int) -> T | None: + """Get an element from a list.""" + return next(iter(array[index : index + 1]), None) + + +def transform_data(data: NDArray[np.uint8]) -> NDArray[np.uint8]: + """Apply transformations to the data.""" + data = np.rot90(data, config.rotation()) + if config.flip_horizontal(): + data = np.fliplr(data) + if config.flip_vertical(): + data = np.flipud(data) + return data + + +def transform_coordinates( + region: tuple[int, int, int, int], +) -> tuple[int, int, int, int]: + """Transform the coordinates of a region.""" + y1, x1, y2, x2 = region[:4] + h, w = int(dp(config.width())), int(dp(config.height())) + positions = { + 0: (x1, y1, x2, y2), + 1: (y1, w - x2, y2, w - x1), + 2: (w - x2, h - y2, w - x1, h - y1), + 3: (h - y2, x1, h - y1, x2), + } + x1, y1, x2, y2 = positions[config.rotation() % 4] + if config.flip_horizontal(): + if config.rotation() % 2 == 0: + x1, x2 = w - x2, w - x1 + else: + x1, x2 = h - x2, h - x1 + if config.flip_vertical(): + if config.rotation() % 2 == 0: + y1, y2 = h - y2, h - y1 + else: + y1, y2 = w - y2, w - y1 + return y1, x1, y2, x2 + + +def divide_array_into_rectangles( + array: np.ndarray, +) -> list[list[tuple[int, int, int, int]]]: + """Divide a 2D array into multiple rectangles.""" + width_splits = divide_number(array.shape[0]) + height_splits = divide_number(array.shape[1]) + + height_indices = np.cumsum([0, *height_splits]) + width_indices = np.cumsum([0, *width_splits]) + + return [ + [ + ( + int(width_indices[j]), + int(height_indices[i]), + int(width_indices[j] + width_splits[j]), + int(height_indices[i] + height_splits[i]), + ) + for j in range(len(width_splits)) + ] + for i in range(len(height_splits)) + ] + + +def divide_into_regions( + mask: NDArray[np.bool_], +) -> list[tuple[int, int, int, int]]: + """Divide a mask into regions.""" + blocks = [ + [ + block + if np.any( + mask[ + block[0] : block[2], + block[1] : block[3], + ], + ) + else None + for block in row + ] + for row in divide_array_into_rectangles(mask) + ] + remaining = [ + (column[0], row[0]) + for row in enumerate(blocks) + for column in enumerate(row[1]) + if column[1] is not None + ] + regions: list[tuple[int, int, int, int]] = [] + while remaining: + y, x = end_y, end_x = remaining[0] + block = blocks[x][y] + assert block is not None # noqa: S101 + x1, y1, x2, y2 = block + for i in range(x + 1, len(blocks)): + candidate_block = blocks[i][y] + if not candidate_block: + break + end_x = i + y2 = candidate_block[3] + + for j in range(y + 1, len(blocks[0])): + for i in range(x, end_x + 1): + if not blocks[i][j]: + break + else: + end_y = j + candidate_block = blocks[x][j] + assert candidate_block is not None # noqa: S101 + x2 = candidate_block[2] + continue + break + remaining = [ + block + for block in remaining + if block[0] < y or block[1] < x or block[0] > end_y or block[1] > end_x + ] + for i in range(x, end_x + 1): + for j in range(y, end_y + 1): + blocks[i][j] = None + regions.append((x1, y1, x2, y2)) + return regions diff --git a/headless_kivy/widget.py b/headless_kivy/widget.py new file mode 100644 index 0000000..e434eac --- /dev/null +++ b/headless_kivy/widget.py @@ -0,0 +1,221 @@ +"""Implement a Kivy widget that renders everything in memory. + +* IMPORTANT: You need to run `setup_headless` function before instantiating +`HeadlessWidget`. +""" + +from __future__ import annotations + +import time +from queue import Empty, Queue +from threading import Thread +from typing import TYPE_CHECKING, ClassVar + +import numpy as np +from kivy.graphics.context_instructions import Color +from kivy.graphics.fbo import Fbo +from kivy.graphics.gl_instructions import ClearBuffers, ClearColor +from kivy.graphics.instructions import Callback, Canvas +from kivy.graphics.vertex_instructions import Rectangle +from kivy.metrics import dp +from kivy.properties import NumericProperty +from kivy.uix.widget import Widget + +from headless_kivy import config +from headless_kivy._debug import DebugMixin +from headless_kivy.utils import ( + divide_into_regions, + transform_coordinates, + transform_data, +) + +if TYPE_CHECKING: + from numpy._typing import NDArray + + +class HeadlessWidget(Widget, DebugMixin): + """A Kivy widget that renders everything in memory.""" + + fps = NumericProperty(0) + + update_region_seed = 0 + last_second: int + rendered_frames: int + skipped_frames: int + + last_render: float + pending_render_threads: Queue[Thread] + + previous_data: NDArray[np.uint8] | None = None + previous_frame: NDArray[np.uint8] | None = None + fbo: Fbo + fbo_background_color: Color + fbo_background_rectangle: Rectangle + + fbo_render_color: Color + fbo_render_rectangle: Rectangle + + raw_data: ClassVar[NDArray[np.uint8]] + + def __init__(self: HeadlessWidget, **kwargs: object) -> None: + """Initialize a `HeadlessWidget`.""" + config.check_initialized() + + __import__('kivy.core.window') + + self.fps = config.max_fps() + + self.last_render = time.time() + self.pending_render_threads = Queue(2 if config.double_buffering() else 1) + + self.canvas = Canvas() + with self.canvas: + self.fbo = Fbo(size=self.size, with_stencilbuffer=True) + self.fbo_background_color = Color(0, 0, 0, 1) + self.fbo_background_rectangle = Rectangle(size=self.size) + self.fbo_render_color = Color(1, 1, 1, 1) + if config.is_debug_mode(): + self.fbo_render_rectangle = Rectangle( + size=self.size, + texture=self.fbo.texture, + ) + + with self.fbo.before: + ClearColor(0, 0, 0, 0) + ClearBuffers() + + with self.fbo.after: + Callback(self.render_on_display) + + super().__init__(**kwargs) + + def add_widget( + self: HeadlessWidget, + *args: object, + **kwargs: object, + ) -> None: + """Extend `Widget.add_widget` and handle `canvas`.""" + canvas = self.canvas + self.canvas = self.fbo + super().add_widget(*args, **kwargs) + self.canvas = canvas + + def remove_widget( + self: HeadlessWidget, + *args: object, + **kwargs: object, + ) -> None: + """Extend `Widget.remove_widget` and handle `canvas`.""" + canvas = self.canvas + self.canvas = self.fbo + super().remove_widget(*args, **kwargs) + self.canvas = canvas + + def on_size( + self: HeadlessWidget, + _: HeadlessWidget, + value: tuple[int, int], + ) -> None: + """Update size of fbo related elements when widget's size changes.""" + self.fbo.size = value + self.fbo_background_rectangle.size = value + if config.is_debug_mode(): + self.fbo_render_rectangle.size = value + + def on_pos( + self: HeadlessWidget, + _: HeadlessWidget, + value: tuple[int, int], + ) -> None: + """Update position of fbo related elements when widget's position changes.""" + self.fbo_background_rectangle.pos = value + if config.is_debug_mode(): + self.fbo_render_rectangle.pos = value + + def render_on_display(self: HeadlessWidget, *_: object) -> None: + """Render the current frame on the display.""" + data = np.frombuffer(self.fbo.texture.pixels, dtype=np.uint8) + if self.previous_data is not None and np.array_equal(data, self.previous_data): + return + self.last_render = time.time() + self.previous_data = data + # Render the current frame on the display asynchronously + try: + last_thread = self.pending_render_threads.get(False) + except Empty: + last_thread = None + + x, y = int(self.x), int(self.y) + height = int(min(self.fbo.texture.height, dp(config.height()) - y)) + width = int(min(self.fbo.texture.width, dp(config.width()) - x)) + + if x < 0: + width += x + x = 0 + if y < 0: + height += y + y = 0 + + data = data.reshape( + int(self.fbo.texture.height), + int(self.fbo.texture.width), + -1, + ) + data = data[:height, :width, :] + + mask = np.any(data != self.previous_frame, axis=2) + regions = divide_into_regions(mask) + + alpha_mask = np.repeat(mask[:, :, np.newaxis], 4, axis=2) + self.previous_frame = data + HeadlessWidget.raw_data[y : y + height, x : x + width, :][alpha_mask] = data[ + alpha_mask + ] + chunk = transform_data( + HeadlessWidget.raw_data[y : y + height, x : x + width, :], + ) + + regions = [ + (*transform_coordinates(region[:4]), *region[4:]) for region in regions + ] + self.render_debug_info( + (x, y, x + width, y + height), + regions, + chunk, + ) + + thread = Thread( + target=config.callback(), + kwargs={ + 'regions': [ + { + 'rectangle': region[:4], + 'data': chunk[ + region[0] : region[2], + region[1] : region[3], + :, + ], + } + for region in regions + ], + 'last_render_thread': last_thread, + }, + daemon=False, + ) + self.pending_render_threads.put(thread) + thread.start() + + @classmethod + def get_instance( + cls: type[HeadlessWidget], + widget: Widget, + ) -> HeadlessWidget | None: + """Get the nearest instance of `HeadlessWidget`.""" + if isinstance(widget, HeadlessWidget): + return widget + if widget.parent: + return cls.get_instance(widget.parent) + return None + + +__all__ = ['HeadlessWidget'] diff --git a/pyproject.toml b/pyproject.toml index 4955546..d742184 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,9 @@ lint = "ruff check . --unsafe-fixes" typecheck = "pyright -p pyproject.toml ." sanity = ["typecheck", "lint"] +[tool.poe.tasks.deploy-to-device] +cmd = 'scripts/deploy.sh' + [tool.ruff] target-version = 'py311' diff --git a/scripts/deploy.sh b/scripts/deploy.sh new file mode 100755 index 0000000..d73a033 --- /dev/null +++ b/scripts/deploy.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env sh + +set -e -o errexit + +uv build + +LATEST_VERSION=$(basename $(ls -rt dist/*.whl | tail -n 1)) + +function run_on_pod() { + if [ $# -lt 1 ]; then + echo "Usage: run_on_pod " + return 1 + fi + if [ $# -eq 1 ]; then + ssh ubo-development-pod "sudo XDG_RUNTIME_DIR=/run/user/\$(id -u ubo) -u ubo bash -c 'source \$HOME/.profile && source /etc/profile && source /opt/ubo/env/bin/activate && $1'" + return 0 + fi + return 1 +} + +scp dist/$LATEST_VERSION ubo-development-pod:/tmp/ + +run_on_pod "pip install --upgrade --force-reinstall --no-deps /tmp/$LATEST_VERSION"