Skip to content

Commit

Permalink
Merge pull request #939 from cderici/green-ci-cleanup
Browse files Browse the repository at this point in the history
#939

#### Description

There's a couple of known intermittent failures in CI, some excessive warning logs and asyncio outputs. This PR attempts to clear up some of these issues for the new release.

Some major ones of the known CI failures are as follows (will be updated on the go) in the QA steps below. All the tests should be green before this lands.


#### QA Steps

```
tox -e integration --tests/integration/test_model.py::test_add_and_list_storage
```

```
tox -e integration -- tests/integration/test_controller.py::test_destroy_model_by_name
```

```
tox -e integration -- tests/integration/test_controller.py::test_secrets_backend_lifecycle
```

```
tox -e integration -- tests/integration/test_crossmodel.py::test_relate_with_offer
```

```
tox -e integration -- tests/integration/test_model.py::test_deploy_bundle_local_charms
```

```
tox -e integration -- tests/integration/test_model.py::test_deploy_bundle_local_charm_series_manifest
```

```
tox -e integration -- tests/integration/test_connection.py::test_reconnect
```

All CI tests need to pass.

Following examples should complete without any noise:
```sh
 $ python examples/connect_current_model.py
```

Also try to deliberately have it error. The error should only include the relevant parts, i.e. no clutter or noise from pending tasks etc.

```sh
 $ juju deploy ubuntu
 $ python examples/deploy.py
Connecting to model
Deploying ubuntu
Disconnecting from model
Traceback (most recent call last):
 ...........
 File "/home/caner/work/python-libjuju/juju/client/_client17.py", line 1051, in Deploy
 reply = await self.rpc(msg)
 File "/home/caner/work/python-libjuju/juju/client/facade.py", line 659, in rpc
 result = await self.connection.rpc(msg, encoder=TypeEncoder)
 File "/home/caner/work/python-libjuju/juju/client/connection.py", line 693, in rpc
 raise errors.JujuError(err_results)
juju.errors.JujuError: ['cannot add application "ubuntu": application already exists']
```

#### Notes & Discussion

JUJU-4549
  • Loading branch information
jujubot authored Sep 6, 2023
2 parents eb21baa + 0b8912a commit 0f577e2
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ jobs:
run: pip install tox
- name: Run integration
# Force one single concurrent test
run: tox -e integration -- -n 1
run: tox -e integration
61 changes: 44 additions & 17 deletions juju/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import ssl
import signal
import urllib.request
import weakref
from http.client import HTTPSConnection
Expand Down Expand Up @@ -217,7 +218,7 @@ def status(self):
if connection.is_debug_log_connection:
stopped = connection._debug_log_task.cancelled()
else:
stopped = connection._receiver_task.cancelled()
stopped = connection._receiver_task is not None and connection._receiver_task.cancelled()

if stopped or not connection._ws.open:
return self.ERROR
Expand Down Expand Up @@ -418,6 +419,14 @@ async def _open(self, endpoint, cacert):
sock = self.proxy.socket()
server_hostname = "juju-app"

def _exit_tasks():
for task in jasyncio.all_tasks():
task.cancel()

loop = jasyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _exit_tasks)

return (await websockets.connect(
url,
ssl=self._get_ssl(cacert),
Expand All @@ -431,25 +440,41 @@ async def close(self, to_reconnect=False):
return
self.monitor.close_called.set()

# Cancel all the tasks (that we started):
if self._pinger_task:
self._pinger_task.cancel()
self._pinger_task = None
if self._receiver_task:
self._receiver_task.cancel()
self._receiver_task = None
if self._debug_log_task:
self._debug_log_task.cancel()
self._debug_log_task = None
# Allow a second for tasks to be cancelled
await jasyncio.sleep(1)

if self._ws and not self._ws.closed:
await self._ws.close()
self._ws = None

if not to_reconnect:
try:
log.debug('Gathering all tasks for connection close')

# Avoid gathering the current task
tasks_need_to_be_gathered = [task for task in jasyncio.all_tasks() if task != jasyncio.current_task()]
await jasyncio.gather(*tasks_need_to_be_gathered)
except jasyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosed:
pass

self._pinger_task = None
self._receiver_task = None
self._debug_log_task = None

if self.proxy is not None:
self.proxy.close()

# Remove signal handlers
loop = jasyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)

async def _recv(self, request_id):
if not self.is_open:
raise websockets.exceptions.ConnectionClosed(0, 'websocket closed')
Expand Down Expand Up @@ -517,15 +542,15 @@ async def _debug_logger(self):
self.debug_log_shown_lines += number_of_lines_written

if self.debug_log_shown_lines >= self.debug_log_params['limit']:
jasyncio.create_task(self.close())
jasyncio.create_task(self.close(), name="Task_Close")
return

except KeyError as e:
log.exception('Unexpected debug line -- %s' % e)
jasyncio.create_task(self.close())
jasyncio.create_task(self.close(), name="Task_Close")
raise
except jasyncio.CancelledError:
jasyncio.create_task(self.close())
jasyncio.create_task(self.close(), name="Task_Close")
raise
except websockets.exceptions.ConnectionClosed:
log.warning('Debug Logger: Connection closed, reconnecting')
Expand All @@ -536,7 +561,7 @@ async def _debug_logger(self):
return
except Exception as e:
log.exception("Error in debug logger : %s" % e)
jasyncio.create_task(self.close())
jasyncio.create_task(self.close(), name="Task_Close")
raise

async def _receiver(self):
Expand All @@ -552,7 +577,8 @@ async def _receiver(self):
result = json.loads(result)
await self.messages.put(result['request-id'], result)
except jasyncio.CancelledError:
raise
log.debug('Receiver: Cancelled')
pass
except websockets.exceptions.ConnectionClosed as e:
log.warning('Receiver: Connection closed, reconnecting')
await self.messages.put_all(e)
Expand Down Expand Up @@ -592,7 +618,8 @@ async def _do_ping():
break
await jasyncio.sleep(10)
except jasyncio.CancelledError:
raise
log.debug('Pinger: Cancelled')
pass
except websockets.exceptions.ConnectionClosed:
# The connection has closed - we can't do anything
# more until the connection is restarted.
Expand Down Expand Up @@ -769,7 +796,7 @@ async def reconnect(self):
if not self.is_debug_log_connection:
self._build_facades(res.get('facades', {}))
if not self._pinger_task:
self._pinger_task = jasyncio.create_task(self._pinger())
self._pinger_task = jasyncio.create_task(self._pinger(), name="Task_Pinger")

async def _connect(self, endpoints):
if len(endpoints) == 0:
Expand Down Expand Up @@ -820,12 +847,12 @@ async def _try_endpoint(endpoint, cacert, delay):
# If this is a debug-log connection, and the _debug_log_task
# is not created yet, then go ahead and schedule it
if self.is_debug_log_connection and not self._debug_log_task:
self._debug_log_task = jasyncio.create_task(self._debug_logger())
self._debug_log_task = jasyncio.create_task(self._debug_logger(), name="Task_Debug_Log")

# If this is regular connection, and we dont have a
# receiver_task yet, then schedule a _receiver_task
elif not self.is_debug_log_connection and not self._receiver_task:
self._receiver_task = jasyncio.create_task(self._receiver())
self._receiver_task = jasyncio.create_task(self._receiver(), name="Task_Receiver")

log.debug("Driver connected to juju %s", self.addr)
self.monitor.close_called.clear()
Expand Down Expand Up @@ -880,7 +907,7 @@ async def _connect_with_redirect(self, endpoints):
login_result = await self._connect_with_login(e.endpoints)
self._build_facades(login_result.get('facades', {}))
if not self._pinger_task:
self._pinger_task = jasyncio.create_task(self._pinger())
self._pinger_task = jasyncio.create_task(self._pinger(), name="Task_Pinger")

# _build_facades takes the facade list that comes from the connection with the controller,
# validates that the client knows about them (client_facades) and builds the facade list
Expand Down
2 changes: 1 addition & 1 deletion juju/jasyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
gather, sleep, wait_for, create_subprocess_exec, subprocess, \
wait, FIRST_COMPLETED, Lock, as_completed, new_event_loop, \
get_event_loop_policy, CancelledError, get_running_loop, \
create_task # noqa
create_task, ALL_COMPLETED, all_tasks, current_task, shield # noqa


def create_task_with_handler(coro, task_name, logger=ROOT_LOGGER):
Expand Down
9 changes: 7 additions & 2 deletions juju/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def block_until(*conditions, timeout=None, wait_period=0.5):
async def _block():
while not all(c() for c in conditions):
await jasyncio.sleep(wait_period)
await jasyncio.wait_for(_block(), timeout)
await jasyncio.shield(jasyncio.wait_for(_block(), timeout))


async def block_until_with_coroutine(condition_coroutine, timeout=None, wait_period=0.5):
Expand All @@ -139,7 +139,7 @@ async def block_until_with_coroutine(condition_coroutine, timeout=None, wait_per
async def _block():
while not await condition_coroutine():
await jasyncio.sleep(wait_period)
await jasyncio.wait_for(_block(), timeout=timeout)
await jasyncio.shield(jasyncio.wait_for(_block(), timeout=timeout))


async def wait_for_bundle(model, bundle, **kwargs):
Expand Down Expand Up @@ -181,6 +181,11 @@ async def run_with_interrupt(task, *events, log=None):
return_when=jasyncio.FIRST_COMPLETED)
for f in pending:
f.cancel() # cancel unfinished tasks
for f in pending:
try:
await f
except jasyncio.CancelledError:
pass
for f in done:
f.exception() # prevent "exception was not retrieved" errors
if task in done:
Expand Down
Empty file added tests/integration/a.file
Empty file.
4 changes: 4 additions & 0 deletions tests/integration/charm-assumes/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
options:
status:
type: string
default: "active"
13 changes: 13 additions & 0 deletions tests/integration/charm-assumes/dispatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

status="$(config-get status)"

if [[ "$status" == "error" ]]; then
if [[ -e .errored ]]; then
status="active"
else
touch .errored
exit 1
fi
fi
status-set "$status"
5 changes: 5 additions & 0 deletions tests/integration/charm-assumes/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
bases:
- architectures:
- amd64
channel: '22.04'
name: ubuntu
13 changes: 13 additions & 0 deletions tests/integration/charm-assumes/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: assumes-charm
summary: "test"
description: "test"
maintainers: ["test"]
assumes:
- juju
- any-of:
- all-of:
- juju >= 2.9
- juju < 3
- all-of:
- juju >= 3.1
- juju < 4
2 changes: 1 addition & 1 deletion tests/integration/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_monitor(event_loop):
assert conn.monitor.status == 'connected'
await conn.close()

assert conn.monitor.status == 'disconnected'
assert conn.monitor.status == 'disconnecting'


@base.bootstrapped
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async def test_secrets_backend_lifecycle(event_loop):
# deploy postgresql
await m.deploy('postgresql', base='[email protected]')
# deploy vault
await m.deploy("vault", base='ubuntu@20.04')
await m.deploy("vault", channel='1.8/stable', base='ubuntu@22.04')
# relate/integrate
await m.integrate("vault:db", "postgresql:db")
# wait for the postgresql app
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_crossmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def test_relate_with_offer(event_loop):
channel='14/stable',
)
assert 'postgresql' in model_1.applications
await model_1.wait_for_idle(status="active")
await model_1.wait_for_idle()
await model_1.create_offer("postgresql:db")

offers = await model_1.list_offers()
Expand Down
41 changes: 10 additions & 31 deletions tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import string
import time
import uuid
from concurrent.futures import ThreadPoolExecutor

import mock
import paramiko
Expand Down Expand Up @@ -663,32 +662,6 @@ async def mock_AddRelation(*args, **kwargs):
assert isinstance(my_relation, Relation)


async def _deploy_in_loop(new_loop, model_name, jujudata):
new_model = Model(jujudata=jujudata)
await new_model.connect(model_name)
try:
await new_model.deploy('ubuntu', channel='stable')
assert 'ubuntu' in new_model.applications
finally:
await new_model.disconnect()


@base.bootstrapped
async def test_explicit_loop_threaded(event_loop):
async with base.CleanModel() as model:
model_name = model.name
new_loop = jasyncio.new_event_loop()
with ThreadPoolExecutor(1) as executor:
f = executor.submit(
new_loop.run_until_complete,
_deploy_in_loop(new_loop,
model_name,
model._connector.jujudata))
f.result()
await model._wait_for_new('application', 'ubuntu')
assert 'ubuntu' in model.applications


@base.bootstrapped
async def test_store_resources_charm(event_loop):
pytest.skip('Revise: test_store_resources_charm intermittent test failure')
Expand Down Expand Up @@ -1086,7 +1059,7 @@ async def test_application_annotations(event_loop):
async def test_unit_annotations(event_loop):

async with base.CleanModel() as model:
app = await model.deploy('ubuntu', channel="stable")
app = await model.deploy('ubuntu')
await model.wait_for_idle()
unit = app.units[0]

Expand All @@ -1096,8 +1069,8 @@ async def test_unit_annotations(event_loop):
expected = {"foo": "bar", "another": "value"}
await unit.set_annotations(expected)

annotations = await unit.get_annotations()
assert annotations == expected
annotations_2 = await unit.get_annotations()
assert annotations_2 == expected


@base.bootstrapped
Expand Down Expand Up @@ -1274,7 +1247,13 @@ async def test_detach_storage(event_loop):
async def test_add_and_list_storage(event_loop):
async with base.CleanModel() as model:
app = await model.deploy('postgresql', base='[email protected]')
await model.wait_for_idle(status="active", timeout=900)
# TODO (cderici):
# This is a good use case for waiting on individual unit status
# (i.e. not caring about the app status)
# All we need is to make sure a unit is up, doesn't even need to
# be in 'active' or 'idle', i.e.
# await model.wait_for_idle(status="waiting", wait_for_exact_units=1)
await jasyncio.sleep(5)
unit = app.units[0]
await unit.add_storage("pgdata", size=512)
storages = await model.list_storage()
Expand Down

0 comments on commit 0f577e2

Please sign in to comment.