Skip to content

Commit

Permalink
refactor: drop all the fps handling and render scheduling logic in fa…
Browse files Browse the repository at this point in the history
…vor of `Canvas`'s `Callback` instruction which gets called only when new drawing is done
  • Loading branch information
sassanh committed Oct 25, 2024
1 parent 8626a75 commit 4813481
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 203 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,8 @@ headless_kivy/_version.py
# logs
*.log

# debug
*.raw

# demo
demo.png
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Upcoming

- refactor: drop all the fps handling and render scheduling logic in favor of `Canvas`'s `Callback` instruction which gets called only when new drawing is done

## Version 0.10.1

- chore: set dependencies for `[test]` extra
Expand Down
29 changes: 4 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ the last rendered frame.
pip install headless-kivy
```

To work on a non-RPi environment, run this:
To use its test tools, you can install it with the following command:

```sh
# pip:
pip install headless-kivy[dev]
# poetry:
poetry --group dev headless-kivy
```

## 🛠 Usage
Expand All @@ -32,14 +29,11 @@ poetry --group dev headless-kivy

```python
setup_headless(
min_fps=1,
max_fps=30,
width=240,
height=240,
is_debug_mode=False,
display_class=ST7789,
double_buffering=True,
automatic_fps=True,
)
```

Expand Down Expand Up @@ -86,14 +80,6 @@ and debugging purposes.
It always runs in a new thread, the previous thread is provided so that it can call
its `join` if desired.

#### `min_fps`

Minimum frames per second for when the Kivy application is idle.

#### `max_fps`

Maximum frames per second for the Kivy application.

#### `width`

The width of the display in pixels.
Expand All @@ -111,12 +97,6 @@ If set to True, the application will print debug information, including FPS.
Is set to `True`, it will let Kivy generate the next frame while sending the last
frame to the display.

#### `automatic_fps`

If set to `True`, it will monitor the hash of the screen data, if this hash changes,
it will increase the fps to the maximum and if the hash doesn't change for a while,
it will drop the fps to the minimum.

#### `rotation`

The rotation of the display. It will be multiplied by 90 degrees.
Expand All @@ -131,13 +111,12 @@ If set to `True`, it will flip the display vertically.

## 🤝 Contributing

You need to have [Poetry](https://python-poetry.org/) installed on your machine.
You need to have [uv](https://github.com/astral-sh/uv) installed on your machine.

After having poetry, to install the required dependencies, run the following command
in the root directory of the project:
To install the required dependencies, run the following command in the root directory of the project:

```sh
poetry install
uv sync
```

## ⚠️ Important Note
Expand Down
3 changes: 1 addition & 2 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ def render(
*,
rectangle: tuple[int, int, int, int],
data: NDArray[np.uint8],
data_hash: int,
last_render_thread: Thread,
) -> None:
"""Render the data to a png file."""
_ = rectangle, data_hash, last_render_thread
_ = rectangle, last_render_thread
with Path('demo.png').open('wb') as file:
png.Writer(
alpha=True,
Expand Down
177 changes: 59 additions & 118 deletions headless_kivy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
A Kivy widget rendered in memory which doesn't create any window in any display manager
(a.k.a "headless").
When no animation is running, you can drop fps to `min_fps` by calling
`activate_low_fps_mode`.
To increase fps to `max_fps` call `activate_high_fps_mode`.
"""

from __future__ import annotations
Expand All @@ -18,25 +13,20 @@
from pathlib import Path
from queue import Empty, Queue
from threading import Thread
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, ClassVar

import numpy as np
from kivy.app import App
from kivy.clock import Clock
from kivy.graphics.context_instructions import Color
from kivy.graphics.fbo import Fbo
from kivy.graphics.gl_instructions import ClearBuffers, ClearColor
from kivy.graphics.instructions import Canvas
from kivy.graphics.instructions import Callback, Canvas
from kivy.graphics.vertex_instructions import Rectangle
from kivy.metrics import dp
from kivy.properties import ObjectProperty
from kivy.uix.widget import Widget

from headless_kivy import config
from headless_kivy.logger import logger

if TYPE_CHECKING:
from kivy.graphics.texture import Texture
from numpy._typing import NDArray


Expand All @@ -52,28 +42,22 @@ def apply_tranformations(data: NDArray[np.uint8]) -> NDArray[np.uint8]:
class HeadlessWidget(Widget):
"""A Kivy widget that renders everything in memory."""

_is_setup_headless_called: bool = False
should_ignore_hash: bool = False
texture = ObjectProperty(None, allownone=True)

last_second: int
rendered_frames: int
skipped_frames: int
pending_render_threads: Queue[Thread]
last_hash: int
fps: int

previous_data: NDArray[np.uint8] | None = None
previous_frame: NDArray[np.uint8] | None = None
fbo: Fbo
fbo_rect: Rectangle
fbo_rectangle: Rectangle

raw_data: NDArray[np.uint8]
raw_data: ClassVar[NDArray[np.uint8]]

def __init__(self: HeadlessWidget, **kwargs: dict[str, object]) -> None:
"""Initialize a `HeadlessWidget`."""
config.check_initialized()

self.should_ignore_hash = False

__import__('kivy.core.window')

if config.is_debug_mode():
Expand All @@ -82,38 +66,22 @@ def __init__(self: HeadlessWidget, **kwargs: dict[str, object]) -> None:
self.skipped_frames = 0

self.pending_render_threads = Queue(2 if config.double_buffering() else 1)
self.last_hash = 0
self.last_change = time.time()
self.fps = config.max_fps()

self.canvas = Canvas()

with self.canvas:
self.fbo = Fbo(size=self.size, with_stencilbuffer=True)
self.fbo_color = Color(1, 1, 1, 1)
self.fbo_rect = Rectangle()
if config.is_debug_mode():
self.fbo_rectangle = Rectangle(size=self.size, texture=self.fbo.texture)

with self.fbo:
with self.fbo.before:
ClearColor(0, 0, 0, 0)
ClearBuffers()

self.texture = self.fbo.texture
with self.fbo.after:
Callback(self.render_on_display)

super().__init__(**kwargs)

self.render_trigger = Clock.create_trigger(
self.render_on_display,
1 / self.fps,
interval=True,
)
self.render_trigger()
app = App.get_running_app()

def clear(*_: object) -> None:
self.render_trigger.cancel()

if app:
app.bind(on_stop=clear)

def add_widget(
self: HeadlessWidget,
*args: object,
Expand Down Expand Up @@ -143,60 +111,23 @@ def on_size(
) -> None:
"""Update size of `fbo` and size of `fbo_rect` when widget's size changes."""
self.fbo.size = value
self.texture = self.fbo.texture
self.fbo_rect.size = value
if config.is_debug_mode():
self.fbo_rectangle.size = value

def on_pos(
self: HeadlessWidget,
_: HeadlessWidget,
value: tuple[int, int],
) -> None:
"""Update position of `fbo_rect` when widget's position changes."""
self.fbo_rect.pos = value

def on_texture(self: HeadlessWidget, _: HeadlessWidget, value: Texture) -> None:
"""Update texture of `fbo_rect` when widget's texture changes."""
self.fbo_rect.texture = value

def on_alpha(self: HeadlessWidget, _: HeadlessWidget, value: float) -> None:
"""Update alpha value of `fbo_rect` when widget's alpha value changes."""
self.fbo_color.rgba = (1, 1, 1, value)

def render(self: HeadlessWidget) -> None:
"""Schedule a force render."""
if not self:
return
Clock.schedule_once(self.render_on_display, 0)

def _activate_high_fps_mode(self: HeadlessWidget) -> None:
"""Increase fps to `max_fps`."""
if not self:
return
logger.info('Activating high fps mode, setting FPS to `max_fps`')
self.fps = config.max_fps()
self.render_trigger.timeout = 1.0 / self.fps
self.last_hash = 0

def activate_high_fps_mode(self: HeadlessWidget) -> None:
"""Schedule increasing fps to `max_fps`."""
self.render()
Clock.schedule_once(lambda _: self._activate_high_fps_mode(), 0)

def _activate_low_fps_mode(self: HeadlessWidget) -> None:
"""Drop fps to `min_fps`."""
logger.info('Activating low fps mode, dropping FPS to `min_fps`')
self.fps = config.min_fps()
self.render_trigger.timeout = 1.0 / self.fps

def activate_low_fps_mode(self: HeadlessWidget) -> None:
"""Schedule dropping fps to `min_fps`."""
self.render()
Clock.schedule_once(lambda _: self._activate_low_fps_mode(), 0)
if config.is_debug_mode():
self.fbo_rectangle.pos = value

def render_on_display(self: HeadlessWidget, *_: object) -> None: # noqa: C901
"""Render the current frame on the display."""
# Log the number of skipped and rendered frames in the last second
if config.is_debug_mode():
self.fbo_rectangle.texture = self.fbo.texture
# Increment rendered_frames/skipped_frames count every frame and reset their
# values to zero every second.
current_second = int(time.time())
Expand All @@ -210,46 +141,44 @@ def render_on_display(self: HeadlessWidget, *_: object) -> None: # noqa: C901
self.rendered_frames = 0
self.skipped_frames = 0

data = np.frombuffer(self.texture.pixels, dtype=np.uint8)
data_hash = hash(data.data.tobytes())
if data_hash == self.last_hash and not self.should_ignore_hash:
# Only drop FPS when the screen has not changed for at least one second
if (
config.automatic_fps()
and time.time() - self.last_change > 1
and self.fps != config.min_fps()
):
logger.debug('Frame content has not changed for 1 second')
self.activate_low_fps_mode()

# Considering the content has not changed, this frame can safely be ignored
data = np.frombuffer(self.fbo.texture.pixels, dtype=np.uint8)
if self.previous_data is not None and np.array_equal(data, self.previous_data):
self.skipped_frames += 1
return

self.should_ignore_hash = False

self.last_change = time.time()
self.last_hash = data_hash
if config.automatic_fps() and self.fps != config.max_fps():
logger.debug('Frame content has changed')
self.activate_high_fps_mode()

self.previous_data = data
# Render the current frame on the display asynchronously
try:
last_thread = self.pending_render_threads.get(False)
except Empty:
last_thread = None

height = int(min(self.texture.height, dp(config.height()) - self.y))
width = int(min(self.texture.width, dp(config.width()) - self.x))
height = int(min(self.fbo.texture.height, dp(config.height()) - self.y))
width = int(min(self.fbo.texture.width, dp(config.width()) - self.x))

data = data.reshape(int(self.texture.height), int(self.texture.width), -1)
data = data.reshape(
int(self.fbo.texture.height),
int(self.fbo.texture.width),
-1,
)
data = data[:height, :width, :]
data = apply_tranformations(data)
x, y = int(self.x), int(dp(config.height()) - self.y - self.height)

mask = np.any(data != self.previous_frame, axis=2)
alpha_mask = np.repeat(mask[:, :, np.newaxis], 4, axis=2)
self.previous_frame = data
if config.rotation() % 2 == 0:
HeadlessWidget.raw_data[y : y + height, x : x + width, :][alpha_mask] = (
data[alpha_mask]
)
else:
HeadlessWidget.raw_data[x : x + width, y : y + height, :][alpha_mask] = (
data[alpha_mask]
)

if config.is_debug_mode():
self.rendered_frames += 1
raw_file_path = Path('headless_kivy_buffer.raw')
raw_file_path = Path(f'headless_kivy_buffer-{self.x}_{self.y}.raw')

if not raw_file_path.exists():
with raw_file_path.open('wb') as file:
Expand All @@ -261,17 +190,29 @@ def render_on_display(self: HeadlessWidget, *_: object) -> None: # noqa: C901
file.seek(int((x + (y + i) * dp(config.width())) * 4))
file.write(bytes(data[i, :, :].flatten().tolist()))

if config.rotation() % 2 == 0:
HeadlessWidget.raw_data[y : y + height, x : x + width, :] = data
else:
HeadlessWidget.raw_data[x : x + width, y : y + height, :] = data
raw_file_path = Path('headless_kivy_buffer.raw')

if not raw_file_path.exists():
with raw_file_path.open('wb') as file:
file.write(
b'\x00' * int(dp(config.width()) * dp(config.height()) * 4),
)
with raw_file_path.open('r+b') as file:
for i in range(height):
file.seek(int((x + (y + i) * dp(config.width())) * 4))
file.write(
bytes(
HeadlessWidget.raw_data[y + i, x : x + width, :]
.flatten()
.tolist(),
),
)

thread = Thread(
target=config.callback(),
kwargs={
'rectangle': (self.x, self.y, width, height),
'rectangle': (x, y, x + width - 1, y + height - 1),
'data': data,
'data_hash': data_hash,
'last_render_thread': last_thread,
},
daemon=True,
Expand Down
Loading

0 comments on commit 4813481

Please sign in to comment.