From 077f06b039d008b13ab047e69fb1ecd46cfdb9b8 Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Fri, 20 Sep 2024 15:32:59 +0900 Subject: [PATCH] first hacky results --- example.py | 118 ++++++++++++++++++++++++++++++++++++++ juju/client/connection.py | 3 +- juju/client/connector.py | 3 + juju/model.py | 13 +++-- 4 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 example.py diff --git a/example.py b/example.py new file mode 100644 index 00000000..437770ca --- /dev/null +++ b/example.py @@ -0,0 +1,118 @@ +import asyncio +import logging +import threading +from typing import Coroutine, TypeVar + +from juju import jasyncio +from juju.client import client, connection +from juju.model import Model + + +async def main() -> None: + m = Model() + await m.connect(model_name="testm") + app_facade = client.ApplicationFacade.from_connection(m.connection()) + + helper = SyncModelConnection() + helper.start() + # FIXME wait for it to start + import time + time.sleep(1) + helper.call(helper.ext_init(**m._connector._kwargs_cache), low_level=True) + + for app_name in m.applications: + print(f"main loop {'-'*40}") + # print(m.applications[app_name].constraints.arch) # temporarily broken, but why? + print(m.applications[app_name].name) + print(m.applications[app_name].exposed) + print(m.applications[app_name].charm_url) + print(m.applications[app_name].owner_tag) + # print(m.applications[app_name].life) + print(m.applications[app_name].min_units) + print(m.applications[app_name].constraints) + print(m.applications[app_name].subordinate) + # print(m.applications[app_name].status) + print(m.applications[app_name].workload_version) + + print() + app = await app_facade.Get(app_name) + print(app.application, app.charm, app.constraints.arch) + + print(f"helper loop {'+'*40}") + app = helper.call(client.ApplicationFacade.from_connection(helper._conn).Get(app_name)) + print(app.application, app.charm, app.constraints.arch) + + helper.call(helper._conn.close()) + helper.stop() + + +R = TypeVar("R") + + +class SyncModelConnection(threading.Thread): + _cond: threading.Condition + _conn: connection.Connection | None + + def __init__(self, *, connection_kwargs: dict = {}): # FIXME + super().__init__() + # FIXME check if we really need this explicit locking + self._cond = threading.Condition() + self._conn = None + self._connection_kwargs = connection_kwargs + + def call(self, coro: Coroutine[None, None, R], *, low_level=False) -> R: + assert self._loop + assert self._conn or low_level + f = asyncio.run_coroutine_threadsafe(coro, self._loop) + return f.result() + + # FIXME one or the other + async def ext_init(self, **kw): + self._conn = await connection.Connection.connect(**kw) + + async def init(self): + try: + self._conn = await connection.Connection.connect(**self._connection_kwargs) + except Exception: + logging.exception("Helper thread failed to connect") + assert self._loop + self._loop.stop() + + def run(self): + with self._cond: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + self._loop.run_forever() + + with self._cond: + self._loop.close() + self._loop = None + # In case another thread was waiting for a result when we were asked to stop + self._cond.notify_all() + + def stop(self): + with self._cond: + if self._loop: + self._loop.call_soon_threadsafe(self._loop.stop) + self.join() + + +class SymbolFilter(logging.Filter): + DEBUG = '🐛' + INFO = 'ℹī¸' + WARNING = '⚠ī¸' + ERROR = '❌' + CRITICAL = 'đŸ”Ĩ' + + def filter(self, record): + record.symbol = getattr(self, record.levelname, '#') + # FIXME can control log record origin here if needed + return True + + +if __name__ == "__main__": + # FIXME why is level=DEBUG broken? + logging.basicConfig(level="INFO", format="%(symbol)s %(message)s") + logging.root.addFilter(SymbolFilter()) + jasyncio.run(main()) diff --git a/juju/client/connection.py b/juju/client/connection.py index 45ee6d95..164c9097 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -4,6 +4,7 @@ import base64 import json import logging +from typing import Self import ssl import urllib.request import weakref @@ -257,7 +258,7 @@ async def connect( proxy=None, debug_log_conn=None, debug_log_params={} - ): + ) -> Self: """Connect to the websocket. If uuid is None, the connection will be to the controller. Otherwise it diff --git a/juju/client/connector.py b/juju/client/connector.py index 3a901c7d..2d4ab043 100644 --- a/juju/client/connector.py +++ b/juju/client/connector.py @@ -97,6 +97,9 @@ async def connect(self, **kwargs): if not ({'username', 'password'}.issubset(kwargs)): required = {'username', 'password'}.difference(kwargs) raise ValueError(f'Some authentication parameters are required : {",".join(required)}') + # FIXME ugly hack + # what if some values are not copyable or thread-safe? + self._kwargs_cache = kwargs.copy() self._connection = await Connection.connect(**kwargs) # Check if we support the target controller diff --git a/juju/model.py b/juju/model.py index a51944de..c5da1765 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,5 +1,6 @@ # Copyright 2023 Canonical Ltd. # Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations import base64 import collections @@ -18,6 +19,7 @@ from datetime import datetime, timedelta from functools import partial from pathlib import Path +from typing import TYPE_CHECKING import yaml import websockets @@ -45,6 +47,9 @@ from .url import URL, Schema from .version import DEFAULT_ARCHITECTURE +if TYPE_CHECKING: + from .application import Application + log = logging.getLogger(__name__) @@ -124,7 +129,7 @@ def __init__(self, model): self.model = model self.state = dict() - def _live_entity_map(self, entity_type): + def _live_entity_map(self, entity_type) -> dict[str, ModelEntity]: """Return an id:Entity map of all the living entities of type ``entity_type``. @@ -136,7 +141,7 @@ def _live_entity_map(self, entity_type): } @property - def applications(self): + def applications(self) -> dict[str, Application]: """Return a map of application-name:Application for all applications currently in the model. @@ -227,7 +232,7 @@ def apply_delta(self, delta): return entity.previous(), entity def get_entity( - self, entity_type, entity_id, history_index=-1, connected=True): + self, entity_type, entity_id, history_index=-1, connected=True) -> ModelEntity: """Return an object instance for the given entity_type and id. By default the object state matches the most recent state from @@ -1054,7 +1059,7 @@ def tag(self): return tag.model(self.uuid) @property - def applications(self): + def applications(self) -> dict[str, Application]: """Return a map of application-name:Application for all applications currently in the model.