Skip to content

Commit

Permalink
refactor all things!
Browse files Browse the repository at this point in the history
  • Loading branch information
dimaqq committed Sep 20, 2024
1 parent 077f06b commit 18dde2f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 87 deletions.
102 changes: 16 additions & 86 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,31 @@
import asyncio
import logging
import threading
from typing import Coroutine, TypeVar

from juju import jasyncio
from juju.client import client, connection
from juju.model import Model


async def main() -> None:
m = Model()
await m.connect(model_name="testm")
app_facade = client.ApplicationFacade.from_connection(m.connection())

helper = SyncModelConnection()
helper.start()
# FIXME wait for it to start
import time
time.sleep(1)
helper.call(helper.ext_init(**m._connector._kwargs_cache), low_level=True)

for app_name in m.applications:
for app_name, app in m.applications.items():
print(f"main loop {'-'*40}")
# print(m.applications[app_name].constraints.arch) # temporarily broken, but why?
print(m.applications[app_name].name)
print(m.applications[app_name].exposed)
print(m.applications[app_name].charm_url)
print(m.applications[app_name].owner_tag)
# print(m.applications[app_name].life)
print(m.applications[app_name].min_units)
print(m.applications[app_name].constraints)
print(m.applications[app_name].subordinate)
# print(m.applications[app_name].status)
print(m.applications[app_name].workload_version)

print()
app = await app_facade.Get(app_name)
print(app.application, app.charm, app.constraints.arch)

print(f"helper loop {'+'*40}")
app = helper.call(client.ApplicationFacade.from_connection(helper._conn).Get(app_name))
print(app.application, app.charm, app.constraints.arch)

helper.call(helper._conn.close())
helper.stop()


R = TypeVar("R")


class SyncModelConnection(threading.Thread):
_cond: threading.Condition
_conn: connection.Connection | None

def __init__(self, *, connection_kwargs: dict = {}): # FIXME
super().__init__()
# FIXME check if we really need this explicit locking
self._cond = threading.Condition()
self._conn = None
self._connection_kwargs = connection_kwargs

def call(self, coro: Coroutine[None, None, R], *, low_level=False) -> R:
assert self._loop
assert self._conn or low_level
f = asyncio.run_coroutine_threadsafe(coro, self._loop)
return f.result()

# FIXME one or the other
async def ext_init(self, **kw):
self._conn = await connection.Connection.connect(**kw)

async def init(self):
try:
self._conn = await connection.Connection.connect(**self._connection_kwargs)
except Exception:
logging.exception("Helper thread failed to connect")
assert self._loop
self._loop.stop()

def run(self):
with self._cond:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

self._loop.run_forever()

with self._cond:
self._loop.close()
self._loop = None
# In case another thread was waiting for a result when we were asked to stop
self._cond.notify_all()
print(app, app.charm_name)
#print(m.applications[app_name].exposed)
#print(m.applications[app_name].charm_url)
#print(m.applications[app_name].owner_tag)
# print(m.applications[app_name].life)
#print(m.applications[app_name].min_units)
#print(m.applications[app_name].constraints)
# print(m.applications[app_name].constraints.arch)
#print(m.applications[app_name].subordinate)
# print(m.applications[app_name].status)
#print(m.applications[app_name].workload_version)

def stop(self):
with self._cond:
if self._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
self.join()
# FIXME why sudden framing/asyncio error on disconnect?
await m.disconnect()


class SymbolFilter(logging.Filter):
Expand All @@ -113,6 +43,6 @@ def filter(self, record):

if __name__ == "__main__":
# FIXME why is level=DEBUG broken?
logging.basicConfig(level="INFO", format="%(symbol)s %(message)s")
logging.root.addFilter(SymbolFilter())
#logging.basicConfig(level="INFO", format="%(symbol)s %(message)s")
#logging.root.addFilter(SymbolFilter())
jasyncio.run(main())
86 changes: 86 additions & 0 deletions juju/_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.
from __future__ import annotations

import asyncio
import logging
import threading
from typing import Coroutine, TypeVar, Self

from juju.client import connection

R = TypeVar("R")


class SyncProxy(threading.Thread):
_cond: threading.Condition
_conn: connection.Connection | None
_loop: asyncio.AbstractEventLoop | None

@classmethod
def new_running(cls, *, connection_kwargs: dict) -> Self:
self = cls(connection_kwargs=connection_kwargs)
self.start()
return self

def __init__(self, *, connection_kwargs: dict):
super().__init__()
self._cond = threading.Condition()
self._conn = None
self._connection_kwargs = connection_kwargs

def start(self):
super().start()

with self._cond:
while not self._loop:
self._cond.wait()

try:
asyncio.run_coroutine_threadsafe(self._connect(), self._loop).result()
except Exception:
logging.exception("Helper thread failed to connect")
assert self._loop
self._loop.stop()
raise

def call(self, coro: Coroutine[None, None, R]) -> R:
assert self._loop
assert self._conn # for sanity, really
return asyncio.run_coroutine_threadsafe(coro, self._loop).result()

async def _connect(self):
try:
self._conn = await connection.Connection.connect(**self._connection_kwargs)
finally:
with self._cond:
self._cond.notify_all()

def run(self) -> None:
with self._cond:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._cond.notify_all()

self._loop.run_forever()

# FIXME not needed, as caller is expected to .join()
with self._cond:
self._loop.close()
self._loop = None
self._cond.notify_all()

def stop(self) -> None:
try:
if self._conn:
# FIXME somehow I'm getting Bad file description here
# and it's deep in the websockets/legacy/framing...
# And I was not getting the same when in prev revision with manual helper...
self.call(self._conn.close())
self._conn = None
with self._cond:
if self._loop:
self._loop.call_soon_threadsafe(self._loop.stop)
self.join()
except Exception:
logging.exception("XXXXXXXXX")
6 changes: 6 additions & 0 deletions juju/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ def charm_name(self):
:return str: The name of the charm
"""
# FIXME just testing stuff
print(">>>> sync call")
tmp = self.model._sync_proxy.call(
client.ApplicationFacade.from_connection(
self.model._sync_proxy._conn).Get(self.name))
print("<<<<", tmp)
return URL.parse(self.charm_url).name

@property
Expand Down
10 changes: 10 additions & 0 deletions juju/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .tag import application as application_tag
from .url import URL, Schema
from .version import DEFAULT_ARCHITECTURE
from ._sync import SyncProxy

if TYPE_CHECKING:
from .application import Application
Expand Down Expand Up @@ -553,6 +554,9 @@ class Model:
"""
The main API for interacting with a Juju model.
"""
# FIXME better name
_sync_proxy: SyncProxy | None = None

def __init__(
self,
max_frame_size=None,
Expand Down Expand Up @@ -710,6 +714,8 @@ async def connect(self, *args, **kwargs):
if not is_debug_log_conn:
await self._after_connect(model_name, model_uuid)

self._sync_proxy = SyncProxy.new_running(connection_kwargs=self._connector._kwargs_cache)

async def connect_model(self, model_name, **kwargs):
"""
.. deprecated:: 0.6.2
Expand Down Expand Up @@ -784,6 +790,10 @@ async def disconnect(self):
"""Shut down the watcher task and close websockets.
"""
if self._sync_proxy:
self._sync_proxy.stop()
self._sync_proxy = None

if not self._watch_stopped.is_set():
log.debug('Stopping watcher task')
self._watch_stopping.set()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
'macaroonbakery>=1.1,<2.0',
'pyRFC3339>=1.0,<2.0',
'pyyaml>=5.1.2',
'websockets>=8.1',
'websockets>=13.0.1',
'paramiko>=2.4.0',
'pyasn1>=0.4.4',
'toposort>=1.5,<2',
Expand Down

0 comments on commit 18dde2f

Please sign in to comment.