Skip to content

Commit

Permalink
[DPE-5686] Fix flaky CI (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Oct 23, 2024
1 parent 97b255b commit d78ed3f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 61 deletions.
10 changes: 8 additions & 2 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DataPeerUnitData,
)
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
from kazoo.client import AuthFailedError, NoNodeError
from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError
from kazoo.exceptions import NoAuthError
from lightkube.resources.core_v1 import Node, Pod
from ops.model import Application, Relation, Unit
Expand Down Expand Up @@ -714,7 +714,13 @@ def broker_active(self) -> bool:
zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
try:
brokers = zk.leader_znodes(path=path)
except (NoNodeError, AuthFailedError, QuorumLeaderNotFoundError, NoAuthError) as e:
except (
NoNodeError,
AuthFailedError,
QuorumLeaderNotFoundError,
ConnectionLoss,
NoAuthError,
) as e:
logger.debug(str(e))
brokers = set()

Expand Down
35 changes: 21 additions & 14 deletions tests/integration/test_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from literals import (
PEER_CLUSTER_ORCHESTRATOR_RELATION,
Expand Down Expand Up @@ -141,6 +142,7 @@ async def test_minimum_brokers_balancer_starts(self, ops_test: OpsTest):
status="active",
timeout=1800,
idle_period=60,
raise_on_error=False,
)

assert balancer_is_running(
Expand All @@ -157,10 +159,9 @@ async def test_balancer_monitor_state(self, ops_test: OpsTest):
assert balancer_is_ready(ops_test=ops_test, app_name=self.balancer_app)

@pytest.mark.abort_on_fail
@pytest.mark.skip
# @pytest.mark.skipif(
# deployment_strat == "single", reason="Testing full rebalance on large deployment"
# )
@pytest.mark.skipif(
deployment_strat == "single", reason="Testing full rebalance on large deployment"
)
async def test_add_unit_full_rebalance(self, ops_test: OpsTest):
await ops_test.model.applications[APP_NAME].add_units(
count=1 # up to 4, new unit won't have any partitions
Expand Down Expand Up @@ -279,14 +280,17 @@ async def test_balancer_prepare_unit_removal(self, ops_test: OpsTest):

assert balancer_is_ready(ops_test=ops_test, app_name=self.balancer_app)

rebalance_action = await leader_unit.run_action(
"rebalance",
mode="remove",
brokerid=new_broker_id,
dryrun=False,
)
response = await rebalance_action.wait()
assert not response.results.get("error", "")
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(60), reraise=True):
with attempt:
rebalance_action = await leader_unit.run_action(
"rebalance",
mode="remove",
brokerid=new_broker_id,
dryrun=False,
)

response = await rebalance_action.wait()
assert not response.results.get("error", "")

post_rebalance_replica_counts = get_replica_count_by_broker_id(ops_test, self.balancer_app)

Expand Down Expand Up @@ -320,9 +324,12 @@ async def test_tls(self, ops_test: OpsTest):
await ops_test.model.add_relation(TLS_NAME, f"{BALANCER_APP}:{TLS_RELATION}")

await ops_test.model.wait_for_idle(
apps=list({APP_NAME, ZK_NAME, self.balancer_app}), idle_period=30, timeout=1800
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
status="active",
idle_period=30,
timeout=1800,
)
async with ops_test.fast_forward(fast_interval="20s"):
async with ops_test.fast_forward(fast_interval="30s"):
await asyncio.sleep(120) # ensure update-status adds broker-capacities if missed

# Assert that balancer is running and using certificates
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,5 +352,5 @@ async def test_deploy_with_existing_storage(ops_test: OpsTest):
add_unit_cmd = f"add-unit {APP_NAME} --model={ops_test.model.info.name} --attach-storage={data_storage_id}".split()
await ops_test.juju(*add_unit_cmd)
await ops_test.model.wait_for_idle(
apps=[APP_NAME], status="active", timeout=1000, idle_period=60
apps=[APP_NAME], status="active", timeout=2000, idle_period=30, raise_on_error=False
)
14 changes: 10 additions & 4 deletions tests/integration/test_password_rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,27 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm):
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], timeout=2000, idle_period=30, raise_on_error=False
)

assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

# needed to open localhost ports
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
await ops_test.model.add_relation(APP_NAME, ZK_NAME)

await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], status="active", idle_period=30, timeout=3600
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], status="active", idle_period=30, timeout=3600
)


async def test_password_rotation(ops_test: OpsTest):
"""Check that password stored on ZK has changed after a password rotation."""
initial_sync_user = get_user(
username="sync",
model_full_name=ops_test.model_full_name,
)

Expand All @@ -54,6 +59,7 @@ async def test_password_rotation(ops_test: OpsTest):
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], status="active", idle_period=30)

new_sync_user = get_user(
username="sync",
model_full_name=ops_test.model_full_name,
)

Expand Down
12 changes: 8 additions & 4 deletions tests/integration/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ async def test_deploy_charms_relate_active(
await ops_test.model.add_relation(APP_NAME, ZK)
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME_1}:{REL_NAME_CONSUMER}")

# async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, DUMMY_NAME_1, ZK], idle_period=30, status="active", timeout=1000
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK, DUMMY_NAME_1],
idle_period=30,
status="active",
timeout=2000,
raise_on_error=False,
)

usernames.update(get_client_usernames(ops_test))

Expand Down
66 changes: 30 additions & 36 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pytest_operator.plugin import OpsTest

from literals import (
CHARM_KEY,
REL_NAME,
SECURITY_PROTOCOL_PORTS,
TLS_RELATION,
Expand All @@ -20,6 +19,7 @@
)

from .helpers import (
APP_NAME,
REL_NAME_ADMIN,
check_tls,
extract_ca,
Expand All @@ -34,8 +34,6 @@

logger = logging.getLogger(__name__)

pytestmark = pytest.mark.broker

TLS_NAME = "self-signed-certificates"
CERTS_NAME = "tls-certificates-operator"
MTLS_NAME = "mtls"
Expand All @@ -54,19 +52,17 @@ async def test_deploy_tls(ops_test: OpsTest, kafka_charm):
ops_test.model.deploy(ZK, channel="edge", series="jammy", application_name=ZK),
ops_test.model.deploy(
kafka_charm,
application_name=CHARM_KEY,
application_name=APP_NAME,
series="jammy",
config={
"ssl_principal_mapping_rules": "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT"
},
),
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK].units) == 1)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK, TLS_NAME], idle_period=15, timeout=1800
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK, TLS_NAME], idle_period=15, timeout=1800)

assert ops_test.model.applications[CHARM_KEY].status == "blocked"
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK].status == "active"
assert ops_test.model.applications[TLS_NAME].status == "active"

Expand All @@ -86,13 +82,13 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
"""
# Relate Zookeeper[TLS] to Kafka[Non-TLS]
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.add_relation(ZK, CHARM_KEY)
await ops_test.model.add_relation(ZK, APP_NAME)
await ops_test.model.wait_for_idle(
apps=[ZK], idle_period=15, timeout=1000, status="active"
)

# Unit is on 'blocked' but whole app is on 'waiting'
assert ops_test.model.applications[CHARM_KEY].status == "blocked"
assert ops_test.model.applications[APP_NAME].status == "blocked"

# Set a custom private key, by running set-tls-private-key action with no parameters,
# as this will generate a random one
Expand All @@ -102,19 +98,19 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
# Extract the key
private_key = extract_private_key(
ops_test=ops_test,
unit_name=f"{CHARM_KEY}/{num_unit}",
unit_name=f"{APP_NAME}/{num_unit}",
)

# ensuring at least a few update-status
await ops_test.model.add_relation(f"{CHARM_KEY}:{TLS_RELATION}", TLS_NAME)
await ops_test.model.add_relation(f"{APP_NAME}:{TLS_RELATION}", TLS_NAME)
async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(60)

await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK, TLS_NAME], idle_period=30, timeout=1200, status="active"
apps=[APP_NAME, ZK, TLS_NAME], idle_period=30, timeout=1200, status="active"
)

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME)

assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
Expand All @@ -123,15 +119,15 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
await asyncio.gather(
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)
await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME], timeout=1000, idle_period=30)
await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], timeout=1000, idle_period=30)

# ensuring at least a few update-status
await ops_test.model.add_relation(CHARM_KEY, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(60)

await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, DUMMY_NAME], idle_period=30, status="active"
apps=[APP_NAME, DUMMY_NAME], idle_period=30, status="active"
)

assert check_tls(
Expand All @@ -150,7 +146,7 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
# Extract the key
private_key_2 = extract_private_key(
ops_test=ops_test,
unit_name=f"{CHARM_KEY}/{num_unit}",
unit_name=f"{APP_NAME}/{num_unit}",
)

assert private_key != private_key_2
Expand Down Expand Up @@ -181,16 +177,16 @@ async def test_mtls(ops_test: OpsTest):
)
await ops_test.model.wait_for_idle(apps=[MTLS_NAME], timeout=1000, idle_period=15)
await ops_test.model.add_relation(
f"{CHARM_KEY}:{TRUSTED_CERTIFICATE_RELATION}", f"{MTLS_NAME}:{TLS_RELATION}"
f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", f"{MTLS_NAME}:{TLS_RELATION}"
)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, MTLS_NAME], idle_period=60, timeout=2000, status="active"
apps=[APP_NAME, MTLS_NAME], idle_period=60, timeout=2000, status="active"
)

# getting kafka ca and address
broker_ca = extract_ca(ops_test=ops_test, unit_name=f"{CHARM_KEY}/0")
broker_ca = extract_ca(ops_test=ops_test, unit_name=f"{APP_NAME}/0")

address = await get_address(ops_test, app_name=CHARM_KEY)
address = await get_address(ops_test, app_name=APP_NAME)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
Expand Down Expand Up @@ -235,7 +231,7 @@ async def test_mtls(ops_test: OpsTest):
async def test_mtls_broken(ops_test: OpsTest):
await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY],
apps=[APP_NAME],
status="active",
idle_period=30,
timeout=2000,
Expand All @@ -245,20 +241,18 @@ async def test_mtls_broken(ops_test: OpsTest):
@pytest.mark.abort_on_fail
async def test_kafka_tls_scaling(ops_test: OpsTest):
"""Scale the application while using TLS to check that new units will configure correctly."""
await ops_test.model.applications[CHARM_KEY].add_units(count=2)
await ops_test.model.applications[APP_NAME].add_units(count=2)
await ops_test.model.block_until(
lambda: len(ops_test.model.applications[CHARM_KEY].units) == 3, timeout=1000
lambda: len(ops_test.model.applications[APP_NAME].units) == 3, timeout=1000
)

# Wait for model to settle
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY],
status="active",
idle_period=40,
timeout=1000,
apps=[APP_NAME], status="active", idle_period=40, timeout=1000, raise_on_error=False
)

kafka_zk_relation_data = get_kafka_zk_relation_data(
unit_name=f"{CHARM_KEY}/2",
unit_name=f"{APP_NAME}/2",
ops_test=ops_test,
owner=ZK,
)
Expand All @@ -268,16 +262,16 @@ async def test_kafka_tls_scaling(ops_test: OpsTest):
assert f"{chroot}/brokers/ids/1" in active_brokers
assert f"{chroot}/brokers/ids/2" in active_brokers

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY, unit_num=2)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=2)
assert check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)

# remove relation and check connection again
await ops_test.model.applications[CHARM_KEY].remove_relation(
f"{CHARM_KEY}:{REL_NAME}", f"{DUMMY_NAME}:{REL_NAME_ADMIN}"
await ops_test.model.applications[APP_NAME].remove_relation(
f"{APP_NAME}:{REL_NAME}", f"{DUMMY_NAME}:{REL_NAME_ADMIN}"
)
await ops_test.model.wait_for_idle(apps=[CHARM_KEY])
await ops_test.model.wait_for_idle(apps=[APP_NAME])
assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)
Expand All @@ -286,10 +280,10 @@ async def test_kafka_tls_scaling(ops_test: OpsTest):
async def test_tls_removed(ops_test: OpsTest):
await ops_test.model.remove_application(TLS_NAME, block_until_done=True)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK], timeout=3600, idle_period=30, status="active"
apps=[APP_NAME, ZK], timeout=3600, idle_period=30, status="active"
)

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME)
assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)

0 comments on commit d78ed3f

Please sign in to comment.