diff --git a/example.py b/example.py index 437770ca..593f48f3 100644 --- a/example.py +++ b/example.py @@ -1,101 +1,31 @@ -import asyncio import logging -import threading -from typing import Coroutine, TypeVar from juju import jasyncio -from juju.client import client, connection from juju.model import Model async def main() -> None: m = Model() await m.connect(model_name="testm") - app_facade = client.ApplicationFacade.from_connection(m.connection()) - - helper = SyncModelConnection() - helper.start() - # FIXME wait for it to start - import time - time.sleep(1) - helper.call(helper.ext_init(**m._connector._kwargs_cache), low_level=True) - - for app_name in m.applications: + for app_name, app in m.applications.items(): print(f"main loop {'-'*40}") - # print(m.applications[app_name].constraints.arch) # temporarily broken, but why? print(m.applications[app_name].name) - print(m.applications[app_name].exposed) - print(m.applications[app_name].charm_url) - print(m.applications[app_name].owner_tag) - # print(m.applications[app_name].life) - print(m.applications[app_name].min_units) - print(m.applications[app_name].constraints) - print(m.applications[app_name].subordinate) - # print(m.applications[app_name].status) - print(m.applications[app_name].workload_version) - - print() - app = await app_facade.Get(app_name) - print(app.application, app.charm, app.constraints.arch) print(f"helper loop {'+'*40}") - app = helper.call(client.ApplicationFacade.from_connection(helper._conn).Get(app_name)) - print(app.application, app.charm, app.constraints.arch) - - helper.call(helper._conn.close()) - helper.stop() - - -R = TypeVar("R") - - -class SyncModelConnection(threading.Thread): - _cond: threading.Condition - _conn: connection.Connection | None - - def __init__(self, *, connection_kwargs: dict = {}): # FIXME - super().__init__() - # FIXME check if we really need this explicit locking - self._cond = threading.Condition() - self._conn = None - self._connection_kwargs = connection_kwargs - - def call(self, coro: Coroutine[None, None, R], *, low_level=False) -> R: - assert self._loop - assert self._conn or low_level - f = asyncio.run_coroutine_threadsafe(coro, self._loop) - return f.result() - - # FIXME one or the other - async def ext_init(self, **kw): - self._conn = await connection.Connection.connect(**kw) - - async def init(self): - try: - self._conn = await connection.Connection.connect(**self._connection_kwargs) - except Exception: - logging.exception("Helper thread failed to connect") - assert self._loop - self._loop.stop() - - def run(self): - with self._cond: - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - - self._loop.run_forever() - - with self._cond: - self._loop.close() - self._loop = None - # In case another thread was waiting for a result when we were asked to stop - self._cond.notify_all() + print(app, app.charm_name) + #print(m.applications[app_name].exposed) + #print(m.applications[app_name].charm_url) + #print(m.applications[app_name].owner_tag) + # print(m.applications[app_name].life) + #print(m.applications[app_name].min_units) + #print(m.applications[app_name].constraints) + # print(m.applications[app_name].constraints.arch) + #print(m.applications[app_name].subordinate) + # print(m.applications[app_name].status) + #print(m.applications[app_name].workload_version) - def stop(self): - with self._cond: - if self._loop: - self._loop.call_soon_threadsafe(self._loop.stop) - self.join() + # FIXME why sudden framing/asyncio error on disconnect? + await m.disconnect() class SymbolFilter(logging.Filter): @@ -113,6 +43,6 @@ def filter(self, record): if __name__ == "__main__": # FIXME why is level=DEBUG broken? - logging.basicConfig(level="INFO", format="%(symbol)s %(message)s") - logging.root.addFilter(SymbolFilter()) + #logging.basicConfig(level="INFO", format="%(symbol)s %(message)s") + #logging.root.addFilter(SymbolFilter()) jasyncio.run(main()) diff --git a/juju/_sync.py b/juju/_sync.py new file mode 100644 index 00000000..de84443b --- /dev/null +++ b/juju/_sync.py @@ -0,0 +1,86 @@ +# Copyright 2024 Canonical Ltd. +# Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations + +import asyncio +import logging +import threading +from typing import Coroutine, TypeVar, Self + +from juju.client import connection + +R = TypeVar("R") + + +class SyncProxy(threading.Thread): + _cond: threading.Condition + _conn: connection.Connection | None + _loop: asyncio.AbstractEventLoop | None + + @classmethod + def new_running(cls, *, connection_kwargs: dict) -> Self: + self = cls(connection_kwargs=connection_kwargs) + self.start() + return self + + def __init__(self, *, connection_kwargs: dict): + super().__init__() + self._cond = threading.Condition() + self._conn = None + self._connection_kwargs = connection_kwargs + + def start(self): + super().start() + + with self._cond: + while not self._loop: + self._cond.wait() + + try: + asyncio.run_coroutine_threadsafe(self._connect(), self._loop).result() + except Exception: + logging.exception("Helper thread failed to connect") + assert self._loop + self._loop.stop() + raise + + def call(self, coro: Coroutine[None, None, R]) -> R: + assert self._loop + assert self._conn # for sanity, really + return asyncio.run_coroutine_threadsafe(coro, self._loop).result() + + async def _connect(self): + try: + self._conn = await connection.Connection.connect(**self._connection_kwargs) + finally: + with self._cond: + self._cond.notify_all() + + def run(self) -> None: + with self._cond: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._cond.notify_all() + + self._loop.run_forever() + + # FIXME not needed, as caller is expected to .join() + with self._cond: + self._loop.close() + self._loop = None + self._cond.notify_all() + + def stop(self) -> None: + try: + if self._conn: + # FIXME somehow I'm getting Bad file description here + # and it's deep in the websockets/legacy/framing... + # And I was not getting the same when in prev revision with manual helper... + self.call(self._conn.close()) + self._conn = None + with self._cond: + if self._loop: + self._loop.call_soon_threadsafe(self._loop.stop) + self.join() + except Exception: + logging.exception("XXXXXXXXX") diff --git a/juju/application.py b/juju/application.py index 3a80f23f..e374ba74 100644 --- a/juju/application.py +++ b/juju/application.py @@ -575,6 +575,12 @@ def charm_name(self): :return str: The name of the charm """ + # FIXME just testing stuff + print(">>>> sync call") + tmp = self.model._sync_proxy.call( + client.ApplicationFacade.from_connection( + self.model._sync_proxy._conn).Get(self.name)) + print("<<<<", tmp) return URL.parse(self.charm_url).name @property diff --git a/juju/model.py b/juju/model.py index c5da1765..b18d1b11 100644 --- a/juju/model.py +++ b/juju/model.py @@ -46,6 +46,7 @@ from .tag import application as application_tag from .url import URL, Schema from .version import DEFAULT_ARCHITECTURE +from ._sync import SyncProxy if TYPE_CHECKING: from .application import Application @@ -553,6 +554,9 @@ class Model: """ The main API for interacting with a Juju model. """ + # FIXME better name + _sync_proxy: SyncProxy | None = None + def __init__( self, max_frame_size=None, @@ -710,6 +714,8 @@ async def connect(self, *args, **kwargs): if not is_debug_log_conn: await self._after_connect(model_name, model_uuid) + self._sync_proxy = SyncProxy.new_running(connection_kwargs=self._connector._kwargs_cache) + async def connect_model(self, model_name, **kwargs): """ .. deprecated:: 0.6.2 @@ -784,6 +790,10 @@ async def disconnect(self): """Shut down the watcher task and close websockets. """ + if self._sync_proxy: + self._sync_proxy.stop() + self._sync_proxy = None + if not self._watch_stopped.is_set(): log.debug('Stopping watcher task') self._watch_stopping.set() diff --git a/setup.py b/setup.py index a2e8ee94..cba1c85b 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ 'macaroonbakery>=1.1,<2.0', 'pyRFC3339>=1.0,<2.0', 'pyyaml>=5.1.2', - 'websockets>=8.1', + 'websockets>=13.0.1', 'paramiko>=2.4.0', 'pyasn1>=0.4.4', 'toposort>=1.5,<2',