diff --git a/example.py b/example.py new file mode 100644 index 00000000..15f6b1b4 --- /dev/null +++ b/example.py @@ -0,0 +1,61 @@ +import logging +import pprint + +from juju import jasyncio +from juju.model import Model + + +async def main() -> None: + m = Model() + await m.connect(model_name="testm") + # from juju.client._client import CharmsFacade + # f = CharmsFacade.from_connection(m.connection()) + # rv = await f.CharmInfo("local:noble/fake-ingress-0") + # print(rv) + # print() + + #rv = await f.ApplicationsInfo(entities=[{"tag": "application-database"}]) + #print(rv) + #print() + + for app_name, app in m.applications.items(): + pprint.pprint(app.model.state.state["application"][app_name][-1]) + print(f"""{app_name}: + name............... {app.name!r} + charm_name......... {app.charm_name!r} + exposed............ {app.exposed!r} + charm_url.......... {app.charm_url!r} + owner_tag.......... {app.owner_tag!r} + life............... {app.life!r} + min_units.......... {app.min_units!r} + constraints["arch"] {app.constraints["arch"]!r} + subordinate........ {app.subordinate!r} + status............. {app.status!r} + workload_version... {app.workload_version!r} + """) + for u in app.units: + print(f"{u.name}: {u.agent_status!r} {u.workload_status!r}") + + await m.wait_for_idle() + + await m.disconnect() + + +class SymbolFilter(logging.Filter): + DEBUG = '🐛' + INFO = 'ℹī¸' + WARNING = '⚠ī¸' + ERROR = '❌' + CRITICAL = 'đŸ”Ĩ' + + def filter(self, record): + record.symbol = getattr(self, record.levelname, '#') + # FIXME can control log record origin here if needed + return True + + +if __name__ == "__main__": + # FIXME why is level=DEBUG broken? + #logging.basicConfig(level="INFO", format="%(symbol)s %(message)s") + #logging.root.addFilter(SymbolFilter()) + jasyncio.run(main()) diff --git a/fullstatus.json b/fullstatus.json new file mode 100644 index 00000000..d470a8c1 --- /dev/null +++ b/fullstatus.json @@ -0,0 +1,322 @@ +{ + "request-id": 7, + "response": { + "applications": { + "grafana-agent-k8s": { + "base": { + "channel": "22.04/stable", + "name": "ubuntu" + }, + "can-upgrade-to": "", + "charm": "ch:arm64/jammy/grafana-agent-k8s-75", + "charm-channel": "latest/stable", + "charm-profile": "", + "charm-version": "", + "endpoint-bindings": { + "": "alpha", + "certificates": "alpha", + "grafana-cloud-config": "alpha", + "grafana-dashboards-consumer": "alpha", + "grafana-dashboards-provider": "alpha", + "logging-consumer": "alpha", + "logging-provider": "alpha", + "metrics-endpoint": "alpha", + "peers": "alpha", + "receive-ca-cert": "alpha", + "send-remote-write": "alpha", + "tracing": "alpha" + }, + "exposed": false, + "int": 1, + "life": "", + "meter-statuses": null, + "provider-id": "4ecc75be-f038-4452-b1af-640d1b46f1c6", + "public-address": "10.152.183.55", + "relations": { + "peers": [ + "grafana-agent-k8s" + ] + }, + "status": { + "data": {}, + "info": "installing agent", + "kind": "", + "life": "", + "since": "2024-09-30T07:44:15.63582531Z", + "status": "waiting", + "version": "" + }, + "subordinate-to": [], + "units": { + "grafana-agent-k8s/0": { + "address": "10.1.121.164", + "agent-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T07:44:15.469295423Z", + "status": "idle", + "version": "3.5.1" + }, + "charm": "", + "leader": true, + "machine": "", + "opened-ports": [], + "provider-id": "grafana-agent-k8s-0", + "public-address": "", + "subordinates": null, + "workload-status": { + "data": {}, + "info": "Missing incoming (\"requires\") relation: metrics-endpoint|logging-provider|grafana-dashboards-consumer", + "kind": "", + "life": "", + "since": "2024-09-30T07:43:41.649319444Z", + "status": "blocked", + "version": "" + }, + "workload-version": "0.35.2" + } + }, + "workload-version": "0.35.2" + }, + "hexanator": { + "base": { + "channel": "24.04/stable", + "name": "ubuntu" + }, + "can-upgrade-to": "", + "charm": "local:noble/hexanator-1", + "charm-profile": "", + "charm-version": "", + "endpoint-bindings": { + "": "alpha", + "ingress": "alpha", + "rate-limit": "alpha" + }, + "exposed": false, + "int": 1, + "life": "", + "meter-statuses": null, + "provider-id": "b5efccf2-5a15-41a0-af0f-689a8d93a129", + "public-address": "10.152.183.113", + "relations": {}, + "status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T00:12:47.878239549Z", + "status": "active", + "version": "" + }, + "subordinate-to": [], + "units": { + "hexanator/0": { + "address": "10.1.121.184", + "agent-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T00:13:16.731257044Z", + "status": "idle", + "version": "3.5.1" + }, + "charm": "", + "leader": true, + "machine": "", + "opened-ports": [], + "provider-id": "hexanator-0", + "public-address": "", + "subordinates": null, + "workload-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T00:12:47.878239549Z", + "status": "active", + "version": "" + }, + "workload-version": "" + } + }, + "workload-version": "" + }, + "mysql-test-app": { + "base": { + "channel": "22.04/stable", + "name": "ubuntu" + }, + "can-upgrade-to": "", + "charm": "ch:arm64/jammy/mysql-test-app-62", + "charm-channel": "latest/edge", + "charm-profile": "", + "charm-version": "", + "endpoint-bindings": { + "": "alpha", + "application-peers": "alpha", + "database": "alpha", + "mysql": "alpha" + }, + "exposed": false, + "int": 2, + "life": "", + "meter-statuses": null, + "provider-id": "4338786a-a337-4779-820d-679a59ba1665", + "public-address": "10.152.183.118", + "relations": { + "application-peers": [ + "mysql-test-app" + ] + }, + "status": { + "data": {}, + "info": "installing agent", + "kind": "", + "life": "", + "since": "2024-09-30T07:48:25.106109123Z", + "status": "waiting", + "version": "" + }, + "subordinate-to": [], + "units": { + "mysql-test-app/0": { + "address": "10.1.121.142", + "agent-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-10-01T00:15:03.216904329Z", + "status": "idle", + "version": "3.5.1" + }, + "charm": "", + "leader": true, + "machine": "", + "opened-ports": [], + "provider-id": "mysql-test-app-0", + "public-address": "", + "subordinates": null, + "workload-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T07:47:54.212959856Z", + "status": "waiting", + "version": "" + }, + "workload-version": "0.0.2" + }, + "mysql-test-app/1": { + "address": "10.1.121.190", + "agent-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T23:49:39.923901864Z", + "status": "idle", + "version": "3.5.1" + }, + "charm": "", + "machine": "", + "opened-ports": [], + "provider-id": "mysql-test-app-1", + "public-address": "", + "subordinates": null, + "workload-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T07:47:54.211414881Z", + "status": "waiting", + "version": "" + }, + "workload-version": "0.0.2" + } + }, + "workload-version": "0.0.2" + } + }, + "branches": {}, + "controller-timestamp": "2024-10-01T07:25:22.51380313Z", + "machines": {}, + "model": { + "available-version": "", + "cloud-tag": "cloud-microk8s", + "meter-status": { + "color": "", + "message": "" + }, + "model-status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-27T08:21:45.368693216Z", + "status": "available", + "version": "" + }, + "name": "testm", + "region": "localhost", + "sla": "unsupported", + "type": "caas", + "version": "3.5.1" + }, + "offers": {}, + "relations": [ + { + "endpoints": [ + { + "application": "grafana-agent-k8s", + "name": "peers", + "role": "peer", + "subordinate": false + } + ], + "id": 0, + "interface": "grafana_agent_replica", + "key": "grafana-agent-k8s:peers", + "scope": "global", + "status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T07:43:31.018463595Z", + "status": "joined", + "version": "" + } + }, + { + "endpoints": [ + { + "application": "mysql-test-app", + "name": "application-peers", + "role": "peer", + "subordinate": false + } + ], + "id": 1, + "interface": "application-peers", + "key": "mysql-test-app:application-peers", + "scope": "global", + "status": { + "data": {}, + "info": "", + "kind": "", + "life": "", + "since": "2024-09-30T07:47:52.823202648Z", + "status": "joined", + "version": "" + } + } + ], + "remote-applications": {} + } +} diff --git a/juju/_sync.py b/juju/_sync.py new file mode 100644 index 00000000..36126c5f --- /dev/null +++ b/juju/_sync.py @@ -0,0 +1,109 @@ +# Copyright 2024 Canonical Ltd. +# Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations + +import asyncio +import dataclasses +import functools +import logging +import threading +from typing import ( + Any, + Callable, + Coroutine, + Dict, + Generic, + Optional, + Self, + TypeVar, +) + +import juju.client.connection +import juju.model + +R = TypeVar("R") + + +@dataclasses.dataclass +class SyncCacheLine(Generic[R]): + value: Optional[R] + exception: Optional[Exception] + + +def cache_until_await(f: Callable[..., R]) -> Callable[..., R]: + @functools.wraps(f) + def inner(self: juju.model.ModelEntity, *args, **kwargs) -> R: + try: + assert isinstance(self, juju.model.ModelEntity) + cached: SyncCacheLine[R] = self._sync_cache.setdefault( + f.__name__, + SyncCacheLine(None, None), + ) + + if cached.value is None and cached.exception is None: + asyncio.get_running_loop().call_soon(self._sync_cache.clear) + try: + cached.value = f(self, *args, **kwargs) + except Exception as e: + cached.exception = e + + if cached.exception: + raise cached.exception + + assert cached.value is not None + return cached.value + except AttributeError as e: + # The decorated functions are commonly used in @property's + # where the class or base class declares __getattr__ too. + # Python data model has is that AttributeError is special + # in this case, so wrap it into something else. + raise Exception(repr(e)) from e + + return inner + + +class ThreadedAsyncRunner(threading.Thread): + _conn: juju.client.connection.Connection | None + _loop: asyncio.AbstractEventLoop + + @classmethod + def new_connected(cls, *, connection_kwargs: Dict[str, Any]) -> Self: + rv = cls() + rv.start() + try: + rv._conn = asyncio.run_coroutine_threadsafe( + juju.client.connection.Connection.connect(**connection_kwargs), # type: ignore[reportUnknownMemberType] + rv._loop, + ).result() + return rv + except Exception: + logging.exception("Helper thread failed to connect") + # TODO: .stop vs .close + rv._loop.stop() + rv.join() + raise + + def call(self, coro: Coroutine[None, None, R]) -> R: + return asyncio.run_coroutine_threadsafe(coro, self._loop).result() + + def stop(self) -> None: + if self._conn: + self.call(self._conn.close()) + self._conn = None + self._loop.call_soon_threadsafe(self._loop.stop) + self.join() + + @property + def connection(self) -> juju.client.connection.Connection: + assert self._conn + return self._conn + + def __init__(self) -> None: + super().__init__() + self._conn = None + self._loop = asyncio.new_event_loop() + + def run(self) -> None: + asyncio.set_event_loop(self._loop) + self._loop.run_forever() + self._loop.close() diff --git a/juju/application.py b/juju/application.py index 3a80f23f..d6665f89 100644 --- a/juju/application.py +++ b/juju/application.py @@ -1,22 +1,33 @@ # Copyright 2023 Canonical Ltd. # Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations +import asyncio import hashlib import json import logging -import typing +import warnings from pathlib import Path +from typing import List, cast +from typing import reveal_type as reveal_type # FIXME temporary, don't merge from . import jasyncio, model, tag, utils from .annotationhelper import _get_annotations, _set_annotations from .bundle import get_charm_series, is_local_charm from .client import client +from .client._definitions import ( + ApplicationGetResults, + ApplicationInfoResult, + ApplicationResult, +) from .errors import JujuApplicationConfigError, JujuError from .origin import Channel from .placement import parse as parse_placement from .relation import Relation from .status import derive_status +from ._sync import cache_until_await from .url import URL +from .unit import Unit from .utils import block_until from .version import DEFAULT_ARCHITECTURE @@ -24,6 +35,86 @@ class Application(model.ModelEntity): + @property + def name(self) -> str: + return self.entity_id + + @property + def exposed(self) -> bool: + """ + Simon says to use this: + Applications[19].ApplicationInfo(*entities[tag:str]) + -> results[n].result.exposed: bool + """ + rv = self.safe_data["exposed"] + if (new := self._application_info().exposed) != rv: + warnings.warn(f"Mismatch in Application.exposed {(new, rv)}") + return rv + + @property + def owner_tag(self) -> str: + warnings.warn("Deprecated", DeprecationWarning) + return self.safe_data["owner-tag"] + + @property + def life(self) -> str: + rv = self.safe_data["life"] + if (new := self._application_info().life) != rv: + warnings.warn(f"Mismatch in Application.life {(new, rv)}") + return rv + + @property + def min_units(self) -> int: + warnings.warn("Deprecated", DeprecationWarning) + # TODO: default is 0 + return self.safe_data["min-units"] + + @property + def constraints(self) -> dict[str, str|int|bool]: + rv = self.safe_data["constraints"] + # FIXME old code returned a sparse dict + # new code returns a filled-in dict-like + # + # behaviour is the same for user code app.constraints["arch"] + # but is different for app.constraints == expected + if (new := self._application_get().constraints) != rv: + warnings.warn(f"Mismatch in Application.constraints {(new, rv)}") + return rv + + @property + def subordinate(self) -> bool: + # FIXME can be got from Charms.CharmInfo(charm_url).meta.subordinate + # Would it be easier to deprecate this? + warnings.warn("Deprecated", DeprecationWarning) + rv = self.safe_data["subordinate"] + return rv + + @property + def workload_version(self) -> str: + warnings.warn("Deprecated, use Unit.workload_version instead", DeprecationWarning) + rv = self.safe_data["workload-version"] + return rv + + @cache_until_await + def _application_get(self) -> ApplicationGetResults: + return self.model._sync_call( + self.model._helper_Application.Get( + application=self.name, + )) + + @cache_until_await + def _application_info(self) -> ApplicationResult: + first = self.model._sync_call( + self.model._helper_Application.ApplicationsInfo( + entities=[client.Entity(self.tag)], + )).results[0] + # This API can get a bunch of results for a bunch of entities, or "tags" + # For each, either .result or .error is set by Juju, and an exception is + # raised on any .error by juju.client.connection.Connection.rpc() + assert first # Work around #1111 + assert first.result + return first.result + @property def _unit_match_pattern(self): return r'^{}.*$'.format(self.entity_id) @@ -51,7 +142,8 @@ def on_unit_remove(self, callable_): callable_, 'unit', 'remove', self._unit_match_pattern) @property - def units(self): + def units(self) -> List[Unit]: + # FIXME need a live call to query units of a given app return [ unit for unit in self.model.units.values() if unit.application == self.name @@ -63,8 +155,8 @@ def subordinate_units(self): return [u for u in self.units if u.is_subordinate] @property - def relations(self) -> typing.List[Relation]: - return [rel for rel in self.model.relations if rel.matches(self.name)] + def relations(self) -> List[Relation]: + return [cast(Relation, rel) for rel in self.model.relations if rel.matches(self.name)] def related_applications(self, endpoint_name=None): apps = {} @@ -81,7 +173,7 @@ def is_us(ep): return apps @property - def status(self): + def status(self) -> str: """Get the application status. If the application is unknown it will attempt to derive the unit @@ -108,7 +200,7 @@ def status_message(self): return self.safe_data['status']['message'] @property - def tag(self): + def tag(self) -> str: return tag.application(self.name) async def add_relation(self, local_relation, remote_relation): @@ -487,6 +579,12 @@ async def get_status(self): client_facade = client.ClientFacade.from_connection(self.connection) full_status = await client_facade.FullStatus(patterns=None) + + import pprint + with open("/tmp/full.status.jsonl", "a") as f: + print(file=f) + print(pprint.pformat(full_status.serialize()), file=f) + _app = full_status.applications.get(self.name, None) if not _app: raise JujuError(f"application is not in FullStatus : {self.name}") @@ -510,7 +608,7 @@ def attach_resource(self, resource_name, file_name, file_obj): data = file_obj.read() headers['Content-Type'] = 'application/octet-stream' - headers['Content-Length'] = len(data) + headers['Content-Length'] = len(data) # type: ignore # https://github.com/python/typeshed/pull/12704 data_bytes = data if isinstance(data, bytes) else bytes(data, 'utf-8') headers['Content-Sha384'] = hashlib.sha384(data_bytes).hexdigest() @@ -520,7 +618,7 @@ def attach_resource(self, resource_name, file_name, file_obj): headers['Content-Disposition'] = "form-data; filename=\"{}\"".format(file_name) headers['Accept-Encoding'] = 'gzip' - headers['Bakery-Protocol-Version'] = 3 + headers['Bakery-Protocol-Version'] = 3 # type: ignore # https://github.com/python/typeshed/pull/12704 headers['Connection'] = 'close' conn.request('PUT', url, data, headers) @@ -569,21 +667,70 @@ async def run(self, command, timeout=None): units=[], ) + _agc_value: ApplicationGetResults | None = None + _agc_exception: Exception | None = None + + @property + def _application_get_cache(self) -> ApplicationGetResults: + if self._agc_value is None and self._agc_exception is None: + try: + self._agc_value = self.model._sync_call(self.model._helper_Application.Get(self.name)) + except Exception as e: + self._agc_exception = e + + def invalidate(): + self._agc_value = self._agc_exception = None + + asyncio.get_running_loop().call_soon(invalidate) + + if self._agc_exception: + raise self._agc_exception + assert self._agc_value is not None + return self._agc_value + + _aic_value: ApplicationInfoResult | None = None + _aic_exception: Exception | None = None + + @property + def _application_info_cache(self) -> ApplicationInfoResult: + if self._aic_value is None and self._aic_exception is None: + try: + self._aic_value = self.model._sync_call( + self.model._helper_Application.ApplicationsInfo( + entities=[client.Entity(self.tag)], + )).results[0] + except Exception as e: + self._aic_exception = e + + if self._aic_exception: + raise self._aic_exception + assert self._aic_value is not None + return self._aic_value + @property - def charm_name(self): + def charm_name(self) -> str: """Get the charm name of this application :return str: The name of the charm """ - return URL.parse(self.charm_url).name + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + rv = URL.parse(self.charm_url).name + reveal_type(URL) + reveal_type(URL.parse) + if (new := self._application_get().charm) != rv: + warnings.warn(f"Mismatch in .charm_name {(new, rv)}") + return rv @property - def charm_url(self): + def charm_url(self) -> str: """Get the charm url for this application :return str: The charm url """ - return self.safe_data['charm-url'] + warnings.warn("Deprecated (FIXME: most likely)", DeprecationWarning) + rv = self.safe_data["charm-url"] + return rv async def get_annotations(self): """Get annotations on this application. diff --git a/juju/client/_client.py b/juju/client/_client.py index 4bfc5720..568ff799 100644 --- a/juju/client/_client.py +++ b/juju/client/_client.py @@ -1,7 +1,26 @@ # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py. # Changes will be overwritten/lost when the file is regenerated. +from __future__ import annotations + +from typing import Protocol, Literal, List, Dict from juju.client._definitions import * +from juju.client._definitions import ( + ApplicationGetResults, + ApplicationInfoResults, + Entity, + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... +) from juju.client import _client7, _client1, _client3, _client4, _client2, _client17, _client6, _client11, _client10, _client5, _client9, _client18, _client19 diff --git a/juju/client/_client19.py b/juju/client/_client19.py index 476ed112..cc4c4aba 100644 --- a/juju/client/_client19.py +++ b/juju/client/_client19.py @@ -1,8 +1,13 @@ # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py. # Changes will be overwritten/lost when the file is regenerated. +from __future__ import annotations from juju.client.facade import Type, ReturnMapping from juju.client._definitions import * +from juju.client._definitions import ( + ApplicationGetResults, + ApplicationInfoResults, +) class ApplicationFacade(Type): @@ -1041,7 +1046,7 @@ async def AddUnits(self, application=None, attach_storage=None, num_units=None, @ReturnMapping(ApplicationInfoResults) - async def ApplicationsInfo(self, entities=None): + async def ApplicationsInfo(self, entities=None) -> ApplicationInfoResults: ''' ApplicationsInfo returns applications information. @@ -1322,8 +1327,10 @@ async def Expose(self, application=None, exposed_endpoints=None): + # FIXME see if this change really needed + # if so, update the codegen @ReturnMapping(ApplicationGetResults) - async def Get(self, application=None, branch=None): + async def Get(self, application=None, branch=None) -> ApplicationGetResults: ''' Get returns the charm configuration for an application. @@ -1349,7 +1356,6 @@ async def Get(self, application=None, branch=None): return reply - @ReturnMapping(CharmURLOriginResult) async def GetCharmURLOrigin(self, application=None, branch=None): ''' diff --git a/juju/client/_client6.py b/juju/client/_client6.py index 95b3ca84..6f184789 100644 --- a/juju/client/_client6.py +++ b/juju/client/_client6.py @@ -1338,6 +1338,12 @@ async def FullStatus(self, patterns=None): params=_params) _params['patterns'] = patterns reply = await self.rpc(msg) + + import pprint + with open("/tmp/full.status.jsonl", "a") as f: + print(file=f) + print("#6", file=f) + print(pprint.pformat(reply), file=f) return reply diff --git a/juju/client/_client7.py b/juju/client/_client7.py index 7b54560a..8bb2df91 100644 --- a/juju/client/_client7.py +++ b/juju/client/_client7.py @@ -1,8 +1,10 @@ # DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py. # Changes will be overwritten/lost when the file is regenerated. +from __future__ import annotations from juju.client.facade import Type, ReturnMapping from juju.client._definitions import * +from juju.client._definitions import Charm class ActionFacade(Type): @@ -1005,7 +1007,7 @@ async def AddCharm(self, charm_origin=None, force=None, url=None): @ReturnMapping(Charm) - async def CharmInfo(self, url=None): + async def CharmInfo(self, url=None) -> Charm: ''' CharmInfo returns information about the requested charm. @@ -1799,6 +1801,13 @@ async def FullStatus(self, include_storage=None, patterns=None): _params['include-storage'] = include_storage _params['patterns'] = patterns reply = await self.rpc(msg) + + import pprint + with open("/tmp/full.status.jsonl", "a") as f: + print(file=f) + print("#7", file=f) + print(pprint.pformat(reply), file=f) + return reply diff --git a/juju/client/connection.py b/juju/client/connection.py index cb87f118..9a4976af 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -4,6 +4,7 @@ import base64 import json import logging +from typing import Self import ssl import urllib.request import weakref @@ -257,7 +258,7 @@ async def connect( proxy=None, debug_log_conn=None, debug_log_params={} - ): + ) -> Self: """Connect to the websocket. If uuid is None, the connection will be to the controller. Otherwise it diff --git a/juju/client/connector.py b/juju/client/connector.py index 3a901c7d..2d4ab043 100644 --- a/juju/client/connector.py +++ b/juju/client/connector.py @@ -97,6 +97,9 @@ async def connect(self, **kwargs): if not ({'username', 'password'}.issubset(kwargs)): required = {'username', 'password'}.difference(kwargs) raise ValueError(f'Some authentication parameters are required : {",".join(required)}') + # FIXME ugly hack + # what if some values are not copyable or thread-safe? + self._kwargs_cache = kwargs.copy() self._connection = await Connection.connect(**kwargs) # Check if we support the target controller diff --git a/juju/client/facade.py b/juju/client/facade.py index 8211ee5b..b2bf159c 100644 --- a/juju/client/facade.py +++ b/juju/client/facade.py @@ -1,5 +1,6 @@ # Copyright 2023 Canonical Ltd. # Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations import argparse import builtins @@ -13,7 +14,7 @@ from collections import defaultdict from glob import glob from pathlib import Path -from typing import Any, Mapping, Sequence, TypeVar +from typing import overload, Any, Dict, Mapping, Optional, Sequence, TypeVar, Type as TypingType import typing_inspect @@ -482,31 +483,42 @@ def ReturnMapping(cls): def decorator(f): @functools.wraps(f) async def wrapper(*args, **kwargs): - nonlocal cls reply = await f(*args, **kwargs) - if cls is None: - return reply - if 'error' in reply: + return _convert_response(reply, cls=cls) + return wrapper + return decorator + + +@overload +def _convert_response(response: Dict, *, cls: TypingType[SomeType]) -> SomeType: ... + + +@overload +def _convert_response(response: Dict, *, cls: None) -> Dict: ... + + +def _convert_response(response: dict, *, cls: Optional[TypingType[Type]]): + if cls is None: + return response + if 'error' in response: + cls = CLASSES['Error'] + if typing_inspect.is_generic_type(cls) and issubclass(typing_inspect.get_origin(cls), Sequence): + parameters = typing_inspect.get_parameters(cls) + result = [] + item_cls = parameters[0] + for item in response: + result.append(item_cls.from_json(item)) + """ + if 'error' in item: cls = CLASSES['Error'] - if typing_inspect.is_generic_type(cls) and issubclass(typing_inspect.get_origin(cls), Sequence): - parameters = typing_inspect.get_parameters(cls) - result = [] - item_cls = parameters[0] - for item in reply: - result.append(item_cls.from_json(item)) - """ - if 'error' in item: - cls = CLASSES['Error'] - else: - cls = item_cls - result.append(cls.from_json(item)) - """ else: - result = cls.from_json(reply['response']) + cls = item_cls + result.append(cls.from_json(item)) + """ + else: + result = cls.from_json(response['response']) - return result - return wrapper - return decorator + return result def makeFunc(cls, name, description, params, result, _async=True): @@ -663,6 +675,7 @@ async def rpc(self, msg): def from_json(cls, data): def _parse_nested_list_entry(expr, result_dict): if isinstance(expr, str): + # FIXME uinreachable code, see #1111 if '>' in expr or '>=' in expr: # something like juju >= 2.9.31 i = expr.index('>') @@ -673,9 +686,11 @@ def _parse_nested_list_entry(expr, result_dict): # this is a simple entry result_dict[expr] = '' elif isinstance(expr, dict): + # FIXME uinreachable code, see #1111 for _, v in expr.items(): _parse_nested_list_entry(v, result_dict) elif isinstance(expr, list): + # FIXME am I crazy? or is this crazy? for v in expr: _parse_nested_list_entry(v, result_dict) else: @@ -734,6 +749,9 @@ def get(self, key, default=None): return getattr(self, attr, default) +SomeType = TypeVar("SomeType", bound=Type) + + class Schema(dict): def __init__(self, schema): self.name = schema['Name'] diff --git a/juju/client/protocols.py b/juju/client/protocols.py new file mode 100644 index 00000000..d524ce9d --- /dev/null +++ b/juju/client/protocols.py @@ -0,0 +1,30 @@ +from typing import List, Protocol + +from juju.client._definitions import ( + ApplicationGetResults, + ApplicationInfoResults, + Entity, +) + + +class ApplicationFacadeProtocol(Protocol): + async def Get(self, application=None, branch=None) -> ApplicationGetResults: ... + + # jRRC Params={"entities":[{"tag": "yada-yada"}]} + # codegen unpacks top-level keys into keyword arguments + async def ApplicationsInfo(self, entities: List[Entity]) -> ApplicationInfoResults: ... + + # etc... + # etc... + # etc... + # etc... + # etc... + # etc... + + +class CharmsFacadeProtocol(Protocol): + ... + + +class UniterFacadeProtocol(Protocol): + ... diff --git a/juju/model.py b/juju/model.py index a51944de..f3829cfa 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,5 +1,6 @@ # Copyright 2023 Canonical Ltd. # Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations import base64 import collections @@ -18,6 +19,21 @@ from datetime import datetime, timedelta from functools import partial from pathlib import Path +from typing import ( + Any, + Coroutine, + Dict, + List, + Literal, + Mapping, + Optional, + overload, + Set, + TypeVar, + TYPE_CHECKING, + # Union, +) +from typing import reveal_type as reveal_type import yaml import websockets @@ -26,7 +42,7 @@ from .annotationhelper import _get_annotations, _set_annotations from .bundle import BundleHandler, get_charm_series, is_local_charm from .charmhub import CharmHub -from .client import client, connector +from .client import client, connection, connector, protocols from .client.overrides import Caveat, Macaroon from .constraints import parse as parse_constraints from .constraints import parse_storage_constraint @@ -44,6 +60,15 @@ from .tag import application as application_tag from .url import URL, Schema from .version import DEFAULT_ARCHITECTURE +from ._sync import SyncCacheLine, ThreadedAsyncRunner + +if TYPE_CHECKING: + from .client._definitions import FullStatus + from .application import Application + from .machine import Machine + from .relation import Relation + from .remoteapplication import ApplicationOffer, RemoteApplication + from .unit import Unit log = logging.getLogger(__name__) @@ -124,7 +149,27 @@ def __init__(self, model): self.model = model self.state = dict() - def _live_entity_map(self, entity_type): + @overload + def _live_entity_map(self, entity_type: Literal["application"]) -> Dict[str, Application]: ... + + @overload + def _live_entity_map(self, entity_type: Literal["applicationOffer"]) -> Dict[str, ApplicationOffer]: ... + + @overload + def _live_entity_map(self, entity_type: Literal["machine"]) -> Dict[str, Machine]: ... + + @overload + def _live_entity_map(self, entity_type: Literal["relation"]) -> Dict[str, Relation]: ... + + @overload + def _live_entity_map(self, entity_type: Literal["remoteApplication"]) -> Dict[str, RemoteApplication]: ... + + @overload + def _live_entity_map(self, entity_type: Literal["unit"]) -> Dict[str, Unit]: ... + + # FIXME and all the other types + + def _live_entity_map(self, entity_type: str) -> Mapping[str, ModelEntity]: """Return an id:Entity map of all the living entities of type ``entity_type``. @@ -136,7 +181,7 @@ def _live_entity_map(self, entity_type): } @property - def applications(self): + def applications(self) -> dict[str, Application]: """Return a map of application-name:Application for all applications currently in the model. @@ -144,7 +189,7 @@ def applications(self): return self._live_entity_map('application') @property - def remote_applications(self): + def remote_applications(self) -> Dict[str, RemoteApplication]: """Return a map of application-name:Application for all remote applications currently in the model. @@ -152,14 +197,14 @@ def remote_applications(self): return self._live_entity_map('remoteApplication') @property - def application_offers(self): + def application_offers(self) -> Dict[str, ApplicationOffer]: """Return a map of application-name:Application for all applications offers currently in the model. """ return self._live_entity_map('applicationOffer') @property - def machines(self): + def machines(self) -> Dict[str, Machine]: # FIXME validate that key is in fact a string """Return a map of machine-id:Machine for all machines currently in the model. @@ -167,7 +212,7 @@ def machines(self): return self._live_entity_map('machine') @property - def units(self): + def units(self) -> Dict[str, Unit]: """Return a map of unit-id:Unit for all units currently in the model. @@ -180,7 +225,7 @@ def subordinate_units(self): return {u_name: u for u_name, u in self.units.items() if u.is_subordinate} @property - def relations(self): + def relations(self) -> Dict[str, Relation]: """Return a map of relation-id:Relation for all relations currently in the model. @@ -224,10 +269,12 @@ def apply_delta(self, delta): history.append(None) entity = self.get_entity(delta.entity, delta.get_id()) + assert entity return entity.previous(), entity + # FIXME this function may explicitly return None, but is the rest of the code prepared for that? def get_entity( - self, entity_type, entity_id, history_index=-1, connected=True): + self, entity_type, entity_id, history_index=-1, connected=True) -> Optional[ModelEntity]: """Return an object instance for the given entity_type and id. By default the object state matches the most recent state from @@ -254,8 +301,15 @@ def get_entity( class ModelEntity: """An object in the Model tree""" - - def __init__(self, entity_id, model, history_index=-1, connected=True): + entity_id: str + model: Model + _history_index: int + connected: bool + connection: connection.Connection + _status: str + _sync_cache: Dict[str, SyncCacheLine] + + def __init__(self, entity_id, model: Model, history_index=-1, connected=True): """Initialize a new entity :param entity_id str: The unique id of the object in the model @@ -273,6 +327,7 @@ def __init__(self, entity_id, model, history_index=-1, connected=True): self.connected = connected self.connection = model.connection() self._status = 'unknown' + self._sync_cache = {} def __repr__(self): return '<{} entity_id="{}">'.format(type(self).__name__, @@ -543,11 +598,16 @@ async def resolve(self, url, architecture, is_bundle=is_bundle, ) +R = TypeVar("R") class Model: """ The main API for interacting with a Juju model. """ + connector: connector.Connector + state: ModelState + _sync: ThreadedAsyncRunner | None = None + def __init__( self, max_frame_size=None, @@ -588,6 +648,28 @@ def __init__( Schema.CHARM_HUB: CharmhubDeployType(self._resolve_charm), } + def _sync_call(self, coro: Coroutine[None, None, R]) -> R: + assert self._sync + return self._sync.call(coro) + + @property + def _helper_Application(self) -> protocols.ApplicationFacadeProtocol: + """An ApplicationFacade suitable for ._sync.call(...)""" + assert self._sync + return client.ApplicationFacade.from_connection(self._sync.connection) + + @property + def _helper_Charms(self) -> protocols.CharmsFacadeProtocol: + assert self._sync + return client.CharmsFacade.from_connection(self._sync.connection) + + @property + def _helper_Uniter(self) -> protocols.UniterFacadeProtocol: + """A UniterFacade suitable for ._sync.call(...)""" + assert self._sync + return client.UniterFacade.from_connection(self._sync.connection) + + def is_connected(self): """Reports whether the Model is currently connected.""" return self._connector.is_connected() @@ -705,6 +787,9 @@ async def connect(self, *args, **kwargs): if not is_debug_log_conn: await self._after_connect(model_name, model_uuid) + self._sync = ThreadedAsyncRunner.new_connected( + connection_kwargs=self._connector._kwargs_cache) + async def connect_model(self, model_name, **kwargs): """ .. deprecated:: 0.6.2 @@ -779,6 +864,10 @@ async def disconnect(self): """Shut down the watcher task and close websockets. """ + if self._sync: + self._sync.stop() + self._sync = None + if not self._watch_stopped.is_set(): log.debug('Stopping watcher task') self._watch_stopping.set() @@ -1054,7 +1143,7 @@ def tag(self): return tag.model(self.uuid) @property - def applications(self): + def applications(self) -> dict[str, Application]: """Return a map of application-name:Application for all applications currently in the model. @@ -1062,7 +1151,7 @@ def applications(self): return self.state.applications @property - def remote_applications(self): + def remote_applications(self) -> Dict[str, RemoteApplication]: """Return a map of application-name:Application for all remote applications currently in the model. @@ -1070,14 +1159,14 @@ def remote_applications(self): return self.state.remote_applications @property - def application_offers(self): + def application_offers(self) -> Dict[str, ApplicationOffer]: """Return a map of application-name:Application for all applications offers currently in the model. """ return self.state.application_offers @property - def machines(self): + def machines(self) -> Dict[str, Machine]: # FIXME validate that key is string and not an int """Return a map of machine-id:Machine for all machines currently in the model. @@ -1085,7 +1174,7 @@ def machines(self): return self.state.machines @property - def units(self): + def units(self) -> Dict[str, Unit]: """Return a map of unit-id:Unit for all units currently in the model. @@ -1125,11 +1214,12 @@ def name(self): return self._info.name @property - def info(self): + def info(self) -> ModelInfo: """Return the cached client.ModelInfo object for this Model. If Model.get_info() has not been called, this will return None. """ + assert self._info is not None return self._info @property @@ -2457,7 +2547,7 @@ async def get_action_status(self, uuid_or_prefix=None, name=None): results[tag.untag('action-', a.action.tag)] = a.status return results - async def get_status(self, filters=None, utc=False): + async def get_status(self, filters=None, utc=False) -> FullStatus: """Return the status of the model. :param str filters: Optional list of applications, units, or machines @@ -2490,6 +2580,7 @@ async def get_metrics(self, *tags): for entity_metrics in metrics_result.results: error = entity_metrics.error if error: + # FIXME why is bad tag handling specific to this one method? if "is not a valid tag" in error: raise ValueError(error.message) else: @@ -2789,9 +2880,35 @@ async def _get_source_api(self, url): await controller.connect(controller_name=controller_name) return controller - async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=False, - wait_for_active=False, timeout=10 * 60, idle_period=15, check_freq=0.5, - status=None, wait_for_at_least_units=None, wait_for_exact_units=None): + # FIXME scraped from charm collection + # apps + # - most common: [explicit, list, of, apps] + # - rare: implicit None + # status + # - common implicit None + # - common "active" + # - rare "blocked" + # - very rare "unknown" + # + # wait_for_at_least_units: + # - is used, const values like 1, 2, or 3i + # - may be used with 2 apps, 1 or 2 units + # wait_for_exact_units: + # - pretty common, const value or parametric + # - rarely used with 0 units, after explicit .scale(scale=0) + # - there's a test for 0 units, implying that all apps are gone + async def wait_for_idle(self, + apps: Optional[List[str]] = None, + raise_on_error: bool = True, + raise_on_blocked: bool = False, + wait_for_active: bool = False, + timeout: Optional[float] = 10 * 60, + idle_period: float = 15, + check_freq=0.5, + status: Optional[str] = None, + wait_for_at_least_units: Optional[int] = None, + wait_for_exact_units: Optional[int] = None, + ): """Wait for applications in the model to settle into an idle state. :param List[str] apps: Optional list of specific app names to wait on. @@ -2842,8 +2959,6 @@ async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=F _wait_for_units = wait_for_at_least_units if wait_for_at_least_units is not None else 1 - timeout = timedelta(seconds=timeout) if timeout is not None else None - idle_period = timedelta(seconds=idle_period) start_time = datetime.now() # Type check against the common error of passing a str for apps if apps is not None and (not isinstance(apps, list) or @@ -2851,127 +2966,253 @@ async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=F for o in apps)): raise JujuError(f'Expected a List[str] for apps, given {apps}') - apps = apps or self.applications - idle_times = {} - units_ready = set() # The units that are in the desired state - last_log_time = None - log_interval = timedelta(seconds=30) - - def _raise_for_status(entities, status): - if not entities: - return - for entity_name, error_type in (("Machine", JujuMachineError), - ("Agent", JujuAgentError), - ("Unit", JujuUnitError), - ("App", JujuAppError)): - errored = entities.get(entity_name, []) - if not errored: - continue - raise error_type("{}{} in {}: {}".format( - entity_name, - "s" if len(errored) > 1 else "", - status, - ", ".join(errored), - )) + # apps narrowed to List[str] + # FIXME switch to lazy evaluations of "expected apps" + apps = apps or list(self.applications) + idle_times: Dict[str, datetime] = {} + units_ready: Set[str] = set() # The units that are in the desired state + last_log_time: List[Optional[datetime]] = [None] if wait_for_exact_units is not None: assert isinstance(wait_for_exact_units, int) and wait_for_exact_units >= 0, \ 'Invalid value for wait_for_exact_units : %s' % wait_for_exact_units while True: - # The list 'busy' is what keeps this loop going, - # i.e. it'll stop when busy is empty after all the - # units are scanned - busy = [] - errors = {} - blocks = {} - for app_name in apps: - if app_name not in self.applications: - busy.append(app_name + " (missing)") - continue - app = self.applications[app_name] - app_status = await app.get_status() - if raise_on_error and app_status == "error": - errors.setdefault("App", []).append(app.name) - if raise_on_blocked and app_status == "blocked": - blocks.setdefault("App", []).append(app.name) - - # Check if wait_for_exact_units flag is used - if wait_for_exact_units is not None: - if len(app.units) != wait_for_exact_units: - busy.append(app.name + " (waiting for exactly %s units, current : %s)" % - (wait_for_exact_units, len(app.units))) - continue - # If we have less # of units then required, then wait a bit more - elif len(app.units) < _wait_for_units: - busy.append(app.name + " (not enough units yet - %s/%s)" % - (len(app.units), _wait_for_units)) - continue - # User is waiting for at least a certain # of units, and we have enough - elif wait_for_at_least_units and len(units_ready) >= _wait_for_units: - # So no need to keep looking, we have the desired number of units ready to go, - # exit the loop. Don't just return here, though, we might still have some - # errors to raise at the end - break - for unit in app.units: - if raise_on_error and unit.machine is not None and unit.machine.status == "error": - errors.setdefault("Machine", []).append(unit.machine.id) - continue - if raise_on_error and unit.agent_status == "error": - errors.setdefault("Agent", []).append(unit.name) - continue - if raise_on_error and unit.workload_status == "error": - errors.setdefault("Unit", []).append(unit.name) - continue - if raise_on_blocked and unit.workload_status == "blocked": - blocks.setdefault("Unit", []).append(unit.name) - continue - # TODO (cderici): we need two versions of wait_for_idle, one for waiting on - # individual units, another one for waiting for an application. - # The convoluted logic below is the result of trying to do both at the same - # time - need_to_wait_more_for_a_particular_status = status and (unit.workload_status != status) - app_is_in_desired_status = (not status) or (app_status == status) - if not need_to_wait_more_for_a_particular_status and \ - unit.agent_status == "idle" and \ - (wait_for_at_least_units or app_is_in_desired_status): - # A unit is ready if either: - # 1) Don't need to wait more for a particular status and the agent is "idle" - # 2) We're looking for a particular status and the unit's workload, - # as well as the application, is in that status. If the user wants to - # see only a particular number of units in that state -- i.e. a subset of - # the units is needed, then we don't care about the application status - # (because e.g. app can be in 'waiting' while unit.0 is 'active' and unit.1 - # is 'waiting') - - # Either way, the unit is ready, start measuring the time period that - # it needs to stay in that state (i.e. idle_period) - units_ready.add(unit.name) - now = datetime.now() - idle_start = idle_times.setdefault(unit.name, now) - - if now - idle_start < idle_period: - busy.append("{} [{}] {}: {}".format(unit.name, - unit.agent_status, - unit.workload_status, - unit.workload_status_message)) - else: - idle_times.pop(unit.name, None) + exc: Optional[Exception] = None + legacy_exc: Optional[Exception] = None + idle = legacy_idle = False + try: + idle = await self._check_idle( + apps=apps, + raise_on_error=raise_on_error, + raise_on_blocked=raise_on_blocked, + status=status, + wait_for_at_least_units=wait_for_at_least_units, + wait_for_exact_units=wait_for_exact_units, + timeout=timeout, + idle_period=idle_period, + _wait_for_units=_wait_for_units, + idle_times=idle_times, + units_ready=units_ready, + last_log_time=last_log_time, + start_time=start_time, + ) + except Exception as e: + exc = e + + try: + legacy_idle = await self._legacy_check_idle( + apps=apps, + raise_on_error=raise_on_error, + raise_on_blocked=raise_on_blocked, + status=status, + wait_for_at_least_units=wait_for_at_least_units, + wait_for_exact_units=wait_for_exact_units, + timeout=timeout, + idle_period=idle_period, + _wait_for_units=_wait_for_units, + idle_times=idle_times, + units_ready=units_ready, + last_log_time=last_log_time, + start_time=start_time, + ) + except Exception as e: + legacy_exc = e + + if bool(exc) ^ bool(legacy_exc): + warnings.warn(f"Idle loop mismatch: {[exc, legacy_exc]}") + + if exc: + raise exc + if legacy_exc: + raise legacy_exc + + if idle ^ legacy_idle: + warnings.warn(f"Idle loop mismatch: {[idle, legacy_idle]}") + + if idle or legacy_idle: + return + + await jasyncio.sleep(check_freq) + + async def _check_idle( + self, + *, + apps: List[str], + raise_on_error: bool, + raise_on_blocked: bool, + status: Optional[str], + wait_for_at_least_units: Optional[int], + wait_for_exact_units: Optional[int], + timeout: Optional[float], + idle_period: float, + _wait_for_units: int, + idle_times: Dict[str, datetime] = {}, + units_ready: Set[str] = set(), # The units that are in the desired state + last_log_time: List[Optional[datetime]] = [None], + start_time: datetime = datetime.now(), + ) -> bool: + now = datetime.now() + tmp = await self.get_status() + reveal_type(tmp.applications) + for app_name in apps: + app = tmp.applications.get(app_name) + if not app: + logging.info("Waiting for app %r", app_name) + return False + reveal_type(app) + # FIXME what if app has zero units? + # what if the caller is waiting for units to go down to zero? + reveal_type(app.units) + if len(app.units) < _wait_for_units: + logging.info("Waitinf for app %r units %s/%s", + app_name, len(app.units), _wait_for_units) + return False + + # TODO: refactor to simplify later + # datetime -> float; check vs outer loop + idle_times.setdefault(app_name, now) + # TODO continue here... + return True + + async def _legacy_check_idle( + self, + *, + apps: List[str], + raise_on_error: bool, + raise_on_blocked: bool, + status: Optional[str], + wait_for_at_least_units: Optional[int], + wait_for_exact_units: Optional[int], + timeout: Optional[float], + idle_period: float, + _wait_for_units: int, + idle_times: Dict[str, datetime] = {}, + units_ready: Set[str] = set(), # The units that are in the desired state + last_log_time: List[Optional[datetime]] = [None], + start_time: datetime = datetime.now(), + ): + #__import__("pdb").set_trace() + _timeout = timedelta(seconds=timeout) if timeout is not None else None + _idle_period = timedelta(seconds=idle_period) + log_interval = timedelta(seconds=30) + # The list 'busy' is what keeps this loop going, + # i.e. it'll stop when busy is empty after all the + # units are scanned + busy: List[str] = [] + errors: Dict[Literal["Machine", "Agent", "App", "Unit"], List[Any]] = {} + blocks: Dict[Literal["Machine", "Agent", "App", "Unit"], List[Any]] = {} + + for app_name in apps: + if app_name not in self.applications: + busy.append(app_name + " (missing)") + return False + app = self.applications[app_name] + app_status = await app.get_status() + if raise_on_error and app_status == "error": + errors.setdefault("App", []).append(app.name) + if raise_on_blocked and app_status == "blocked": + blocks.setdefault("App", []).append(app.name) + + # Check if wait_for_exact_units flag is used + if wait_for_exact_units is not None: + if len(app.units) != wait_for_exact_units: + busy.append(app.name + " (waiting for exactly %s units, current : %s)" % + (wait_for_exact_units, len(app.units))) + return False + # If we have less # of units then required, then wait a bit more + elif len(app.units) < _wait_for_units: + busy.append(app.name + " (not enough units yet - %s/%s)" % + (len(app.units), _wait_for_units)) + return False + # User is waiting for at least a certain # of units, and we have enough + # FIXME seems to skip unit status checks, is that expected? + elif wait_for_at_least_units and len(units_ready) >= _wait_for_units: + # So no need to keep looking, we have the desired number of units ready to go, + # exit the loop. Don't just return here, though, we might still have some + # errors to raise at the end + return True + for unit in app.units: + if raise_on_error and unit.machine is not None and unit.machine.status == "error": + errors.setdefault("Machine", []).append(unit.machine.id) + return False + if raise_on_error and unit.agent_status == "error": + errors.setdefault("Agent", []).append(unit.name) + return False + if raise_on_error and unit.workload_status == "error": + errors.setdefault("Unit", []).append(unit.name) + return False + if raise_on_blocked and unit.workload_status == "blocked": + blocks.setdefault("Unit", []).append(unit.name) + return False + # TODO (cderici): we need two versions of wait_for_idle, one for waiting on + # individual units, another one for waiting for an application. + # The convoluted logic below is the result of trying to do both at the same + # time + need_to_wait_more_for_a_particular_status = status and (unit.workload_status != status) + app_is_in_desired_status = (not status) or (app_status == status) + if not need_to_wait_more_for_a_particular_status and \ + unit.agent_status == "idle" and \ + (wait_for_at_least_units or app_is_in_desired_status): + # A unit is ready if either: + # 1) Don't need to wait more for a particular status and the agent is "idle" + # 2) We're looking for a particular status and the unit's workload, + # as well as the application, is in that status. If the user wants to + # see only a particular number of units in that state -- i.e. a subset of + # the units is needed, then we don't care about the application status + # (because e.g. app can be in 'waiting' while unit.0 is 'active' and unit.1 + # is 'waiting') + + # Either way, the unit is ready, start measuring the time period that + # it needs to stay in that state (i.e. idle_period) + units_ready.add(unit.name) + now = datetime.now() + idle_start = idle_times.setdefault(unit.name, now) + + if now - idle_start < _idle_period: busy.append("{} [{}] {}: {}".format(unit.name, unit.agent_status, unit.workload_status, unit.workload_status_message)) - _raise_for_status(errors, "error") - _raise_for_status(blocks, "blocked") - if not busy: - break - busy = "\n ".join(busy) - if timeout is not None and datetime.now() - start_time > timeout: - raise jasyncio.TimeoutError("Timed out waiting for model:\n" + busy) - if last_log_time is None or datetime.now() - last_log_time > log_interval: - log.info("Waiting for model:\n " + busy) - last_log_time = datetime.now() - await jasyncio.sleep(check_freq) + else: + idle_times.pop(unit.name, None) + busy.append("{} [{}] {}: {}".format(unit.name, + unit.agent_status, + unit.workload_status, + unit.workload_status_message)) + _raise_for_status(errors, "error") + _raise_for_status(blocks, "blocked") + + if not busy: + return True + + if _timeout is not None and datetime.now() - start_time > _timeout: + raise jasyncio.TimeoutError("\n ".join(["Timed out waiting for model:", *busy])) + + if last_log_time[0] is None or datetime.now() - last_log_time[0] > log_interval: + log.info("\n ".join(["Waiting for model:", *busy])) + last_log_time[0] = datetime.now() + + return False + + +def _raise_for_status(entities: Dict, status: Any) -> None: + if not entities: + return + for entity_name, error_type in (("Machine", JujuMachineError), + ("Agent", JujuAgentError), + ("Unit", JujuUnitError), + ("App", JujuAppError)): + errored = entities.get(entity_name, []) + if not errored: + continue + raise error_type("{}{} in {}: {}".format( + entity_name, + "s" if len(errored) > 1 else "", + status, + ", ".join(errored), + )) + def _create_consume_args(offer, macaroon, controller_info): diff --git a/juju/unit.py b/juju/unit.py index 6c8b03d8..8a7f953f 100644 --- a/juju/unit.py +++ b/juju/unit.py @@ -15,6 +15,10 @@ class Unit(model.ModelEntity): + @property + def name(self) -> str: + return self.entity_id + @property def agent_status(self): """Returns the current agent status string. diff --git a/juju/url.py b/juju/url.py index a00798d4..dc1fe9fe 100644 --- a/juju/url.py +++ b/juju/url.py @@ -1,7 +1,9 @@ # Copyright 2023 Canonical Ltd. # Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations from enum import Enum + from .errors import JujuError from urllib.parse import urlparse @@ -19,7 +21,17 @@ def __str__(self): class URL: - def __init__(self, schema, user=None, name=None, revision=None, series=None, architecture=None): + name: str + + # FIXME looks like defaults for parts should be empty strings rather than None + def __init__(self, + schema, + user=None, + name: str = None, # FIXME + revision=None, + series=None, + architecture=None, + ): self.schema = schema self.user = user self.name = name @@ -32,7 +44,7 @@ def __init__(self, schema, user=None, name=None, revision=None, series=None, arc self.architecture = architecture @staticmethod - def parse(s, default_store=Schema.CHARM_HUB): + def parse(s: str, default_store=Schema.CHARM_HUB) -> URL: """parse parses the provided charm URL string into its respective structure. @@ -87,7 +99,7 @@ def __str__(self): return "{}:{}".format(str(self.schema), self.path()) -def parse_v1_url(schema, u, s): +def parse_v1_url(schema, u, s) -> URL: c = URL(schema) parts = u.path.split("/") @@ -119,7 +131,7 @@ def parse_v1_url(schema, u, s): return c -def parse_v2_url(u, s, default_store): +def parse_v2_url(u, s, default_store) -> URL: if not u.scheme: c = URL(default_store) elif Schema.CHARM_HUB.matches(u.scheme): diff --git a/setup.py b/setup.py index 14852310..a6228d0b 100644 --- a/setup.py +++ b/setup.py @@ -22,11 +22,12 @@ packages=find_packages( exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), package_data={'juju': ['py.typed']}, + python_requires=">=3.8", install_requires=[ 'macaroonbakery>=1.1,<2.0', 'pyRFC3339>=1.0,<2.0', 'pyyaml>=5.1.2', - 'websockets>=8.1', + 'websockets>=13.0.1', 'paramiko>=2.4.0', 'pyasn1>=0.4.4', 'toposort>=1.5,<2', @@ -35,6 +36,12 @@ 'hvac', 'packaging', ], + extras_require={ + "dev": [ + "types-setuptools", + "pyright~=1.1", + ], + }, include_package_data=True, maintainer='Juju Ecosystem Engineering', maintainer_email='juju@lists.ubuntu.com', @@ -51,6 +58,9 @@ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ], entry_points={ 'console_scripts': [ diff --git a/tests/unit/test_wait_for_idle.py b/tests/unit/test_wait_for_idle.py new file mode 100644 index 00000000..f1409701 --- /dev/null +++ b/tests/unit/test_wait_for_idle.py @@ -0,0 +1,248 @@ +# Copyright 2024 Canonical Ltd. +# Licensed under the Apache V2, see LICENCE file for details. +from __future__ import annotations + +import json +import pytest +from datetime import datetime, timedelta +from typing import Any, Dict, List, Tuple, Union +from typing import reveal_type as reveal_type +from unittest.mock import Mock + +from juju.application import Application +from juju.client.facade import _convert_response +from juju.client._definitions import FullStatus +from juju.model import Model +from juju.unit import Unit + + +async def test_no_apps(full_status_response: dict, kwargs: Dict[str, Any]): + kwargs["apps"] = [] + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert idle and legacy + + +async def test_missing_app(full_status_response: dict, kwargs: Dict[str, Any]): + kwargs["apps"] = ["missing"] + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert not idle and not legacy + + +async def test_no_units(full_status_response: dict, kwargs: Dict[str, Any]): + full_status_response["response"]["applications"]["hexanator"]["units"].clear() + kwargs["apps"] = ["hexanator"] + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert idle == legacy + assert not idle and not legacy + + +async def test_idle_app(full_status_response: dict, kwargs: Dict[str, Any]): + kwargs["apps"] = ["hexanator"] + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert idle and legacy + + +async def test_idle_period(full_status_response: dict, kwargs: Dict[str, Any]): + kwargs["apps"] = ["hexanator"] + kwargs["idle_period"] = 1 + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert not idle and not legacy + + +async def test_after_idle_period(full_status_response: dict, kwargs: Dict[str, Any]): + kwargs["apps"] = ["hexanator"] + kwargs["idle_period"] = 1 + kwargs["idle_times"] = {"hexanator": datetime.now() - timedelta(seconds=2)} + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert idle and legacy + + +async def test_something_useful(full_status_response: dict, kwargs: Dict[str, Any]): + # tweak the state + full_status_response["response"]["applications"]["hexanator"]["status"]["status"] = "BROKEN" + kwargs["wait_for_exact_units"] = 2 + + idle, legacy = await model_fake(full_status_response).check_both(**kwargs) + assert idle == legacy + + +@pytest.fixture +def kwargs() -> Dict[str, Any]: + return dict( + apps=["hexanator", "grafana-agent-k8s", "mysql-test-app"], + raise_on_error=False, + raise_on_blocked=False, + status=None, + wait_for_at_least_units=None, + wait_for_exact_units=None, + timeout=100, + idle_period=0, + _wait_for_units=1, + ) + + +@pytest.fixture +def full_status_response(pytestconfig: pytest.Config) -> dict: + return json.loads(((pytestconfig.rootpath / "fullstatus.json")).read_text()) + + +def model_fake(resp: dict) -> ModelFake: + m = ModelFake() + m._response = resp + + fs = _convert_response(resp, cls=FullStatus) + assert fs.applications + + for name in fs.applications: + app = m._applications[name] = ApplicationFake(name, m) + + fsapp = fs.applications[name] + assert fsapp + assert fsapp.status # DetailedStatus + assert isinstance(fsapp.status.status, str) + app._status = fsapp.status.status + + assert isinstance(fsapp.status.info, str) + app._status_message = fsapp.status.info + + for uname in fsapp.units: + app._units.append(unit := UnitFake(uname, m)) + + fsunit = fsapp.units[uname] + assert fsunit + + assert fsunit.agent_status # DetailedStatus + assert isinstance(fsunit.agent_status.status, str) + unit._agent_status = fsunit.agent_status.status + + assert isinstance(fsunit.agent_status.info, str) + unit._agent_status_message = fsunit.agent_status.info + + assert fsunit.workload_status # DetailedStatus + assert isinstance(fsunit.workload_status.status, str) + unit._workload_status = fsunit.workload_status.status + + assert isinstance(fsunit.workload_status.info, str) + unit._workload_status_message = fsunit.workload_status.info + + return m + +class ModelFake(Model): + _applications: Dict[str, Application] + _response: Dict + + async def check_both(self, **kwargs) -> Tuple[Union[bool, Exception], Union[bool, Exception]]: + try: + idle = await self._check_idle(**kwargs) + except Exception as e: + idle = e + + try: + legacy = await self._legacy_check_idle(**kwargs) + except Exception as e: + raise + legacy = e + + return idle, legacy + + @property + def applications(self) -> Dict[str, Application]: + return self._applications + + def __init__(self): + super().__init__() + self._applications = {} + + def connection(self): + rv = Mock() + rv.facades = {"Client": 6} # Must match juju.client.connection.client_facades + rv.rpc = self.rpc + return rv + + async def rpc(self, msg, encoder=None): + return self._response + + +class ApplicationFake(Application): + _status: str = "" + _status_message: str = "" + _units: List[Unit] + + @property + def status(self) -> str: + return self._status + + @property + def status_message(self) -> str: + return self._status_message + + @property + def units(self) -> List[Unit]: + return self._units + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._units = [] + + +class UnitFake(Unit): + _agent_status: str = "" + _agent_status_message: str = "" + _workload_status: str = "" + _workload_status_message: str = "" + + @property + def agent_status(self) -> str: + return self._agent_status + + @property + def agent_status_message(self) -> str: + return self._agent_status_message + + @property + def workload_status(self) -> str: + return self._workload_status + + @property + def workload_status_message(self) -> str: + return self._workload_status_message + + + +async def test_model_fake(full_status_response): + """Validate parity between the FullStatus response and data from the library API""" + m = model_fake(full_status_response) + + app = m._applications["hexanator"] + assert len(app._units) == 1 + + u = app._units[0] + assert u._agent_status == "idle" + assert not u._agent_status_message + assert u._workload_status == "active" + assert not u._workload_status_message + + app = m._applications["grafana-agent-k8s"] + assert len(app._units) == 1 + + u = app._units[0] + assert u._agent_status == "idle" + assert not u._agent_status_message + assert u._workload_status == "blocked" + assert u._workload_status_message.startswith("Missing incoming") + + + app = m._applications["mysql-test-app"] + assert len(app._units) == 2 + + u = [u for u in app._units if u.name.endswith("/0")][0] + assert u._agent_status == "idle" + assert not u._agent_status_message + assert u._workload_status == "waiting" + assert not u._workload_status_message + + u = [u for u in app._units if u.name.endswith("/1")][0] + assert u._agent_status == "idle" + assert not u._agent_status_message + assert u._workload_status == "waiting" + assert not u._workload_status_message