Skip to content

Commit

Permalink
fix(core): keep a reference for background tasks created with `async_…
Browse files Browse the repository at this point in the history
….create_task` to avoid them getting garbage collected and cancelled
sassanh committed Aug 27, 2024

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent f1ff576 commit e0d2c3c
Showing 6 changed files with 49 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
## Version 0.15.12

- build(packer): remove /etc/xdg/autostart/piwiz.desktop to avoid running piwiz as we already set ubo user
- fix(core): keep a reference for background tasks created with `async_.create_task` to avoid them getting garbage collected and cancelled

## Version 0.15.11

1 change: 1 addition & 0 deletions ubo_app/constants.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
)
INSTALLATION_PATH = os.environ.get('UBO_INSTALLATION_PATH', '/opt/ubo')
DEBUG_MODE = str_to_bool(os.environ.get('UBO_DEBUG', 'False')) == 1
DEBUG_MODE_TASKS = str_to_bool(os.environ.get('UBO_DEBUG_TASKS', 'False')) == 1
LOG_LEVEL = os.environ.get('UBO_LOG_LEVEL', 'DEBUG' if DEBUG_MODE else None)
GUI_LOG_LEVEL = os.environ.get('UBO_GUI_LOG_LEVEL', 'DEBUG' if DEBUG_MODE else None)
SERVICES_PATH = (
14 changes: 14 additions & 0 deletions ubo_app/error_handlers.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import sys
import threading
import traceback
import weakref
from typing import TYPE_CHECKING, cast

from ubo_app.utils.eeprom import read_serial_number
@@ -89,23 +90,34 @@ def thread_exception_handler(args: threading.ExceptHookArgs) -> None:
)


STACKS = weakref.WeakKeyDictionary(dict[asyncio.Task, str]())


def loop_exception_handler(
loop: asyncio.AbstractEventLoop,
context: dict[str, object],
) -> None:
from ubo_app.constants import DEBUG_MODE_TASKS
from ubo_app.logging import logger

threads_info = get_all_thread_stacks()

exception = context.get('exception')

if DEBUG_MODE_TASKS:
task = cast(asyncio.Task, context.get('future') or context.get('task'))
parent_stack = STACKS.get(task, '<unavailable>') if task else '<unavailable>'
else:
parent_stack = None

if exception and not isinstance(exception, asyncio.CancelledError):
logger.exception(
'Event loop exception',
extra={
'loop': loop,
'error_message': context.get('message'),
'future': context.get('future'),
'parent_stack': parent_stack,
},
exc_info=cast(Exception, exception),
)
@@ -116,6 +128,7 @@ def loop_exception_handler(
'error_message': context.get('message'),
'future': context.get('future'),
'threads': threads_info,
'parent_stack': parent_stack,
},
exc_info=cast(Exception, exception),
)
@@ -125,6 +138,7 @@ def loop_exception_handler(
extra={
'loop': loop,
'context': context,
'parent_stack': parent_stack,
},
)

21 changes: 12 additions & 9 deletions ubo_app/load_services.py
Original file line number Diff line number Diff line change
@@ -21,12 +21,12 @@
from redux import CombineReducerRegisterAction, ReducerType

from ubo_app.constants import (
DEBUG_MODE,
DEBUG_MODE_TASKS,
DISABLED_SERVICES,
SERVICES_LOOP_GRACE_PERIOD,
SERVICES_PATH,
)
from ubo_app.error_handlers import loop_exception_handler
from ubo_app.error_handlers import STACKS, loop_exception_handler
from ubo_app.logging import logger

if TYPE_CHECKING:
@@ -290,8 +290,6 @@ def run(self: UboServiceThread) -> None:
},
)
asyncio.set_event_loop(self.loop)
if DEBUG_MODE:
self.loop.set_debug(enabled=True)

REGISTERED_PATHS[self.path] = self
result = None
@@ -313,15 +311,20 @@ def __repr__(self: UboServiceThread) -> str:

def run_task(
self: UboServiceThread,
task: Coroutine,
coroutine: Coroutine,
callback: Callable[[Task], None] | None = None,
) -> asyncio.Handle:
def task_wrapper() -> None:
result = self.loop.create_task(task)
def task_wrapper(stack: str) -> None:
task = self.loop.create_task(coroutine)
if DEBUG_MODE_TASKS:
STACKS[task] = stack
if callback:
callback(result)
callback(task)

return self.loop.call_soon_threadsafe(task_wrapper)
return self.loop.call_soon_threadsafe(
task_wrapper,
''.join(traceback.format_stack()[:-3]) if DEBUG_MODE_TASKS else '',
)

async def shutdown(self: UboServiceThread) -> None:
from ubo_app.logging import logger
23 changes: 15 additions & 8 deletions ubo_app/service.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import asyncio
import contextlib
import threading
import traceback
from typing import TYPE_CHECKING, TypeVarTuple

from typing_extensions import TypeVar
@@ -40,15 +41,24 @@ def run(self: WorkerThread) -> None:

def run_task(
self: WorkerThread,
task: Coroutine,
coroutine: Coroutine,
callback: Callable[[Task], None] | None = None,
) -> Handle:
def task_wrapper() -> None:
result = self.loop.create_task(task)
from ubo_app.constants import DEBUG_MODE_TASKS

def task_wrapper(stack: str) -> None:
task = self.loop.create_task(coroutine)
if DEBUG_MODE_TASKS:
from ubo_app.error_handlers import STACKS

STACKS[task] = stack
if callback:
callback(result)
callback(task)

return self.loop.call_soon_threadsafe(task_wrapper)
return self.loop.call_soon_threadsafe(
task_wrapper,
''.join(traceback.format_stack()[:-3]) if DEBUG_MODE_TASKS else '',
)

async def shutdown(self: WorkerThread) -> None:
from ubo_app.constants import MAIN_LOOP_GRACE_PERIOD
@@ -95,12 +105,9 @@ def stop(self: WorkerThread) -> None:


def start_event_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
from ubo_app.constants import DEBUG_MODE
from ubo_app.error_handlers import loop_exception_handler

loop.set_exception_handler(loop_exception_handler)
if DEBUG_MODE:
loop.set_debug(enabled=True)

worker_thread.loop = loop
worker_thread.start()
7 changes: 6 additions & 1 deletion ubo_app/utils/async_.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@
from redux.basic_types import TaskCreatorCallback


background_tasks = []


def create_task(
task: Coroutine,
callback: TaskCreatorCallback | None = None,
@@ -23,7 +26,9 @@ def callback_(task: asyncio.Task) -> None:
if callback:
callback(task)

return ubo_app.service._create_task(task, callback_) # noqa: SLF001
handle = ubo_app.service._create_task(task, callback_) # noqa: SLF001
background_tasks.append(handle)
return handle


T = TypeVar('T', infer_variance=True)

0 comments on commit e0d2c3c

Please sign in to comment.