Skip to content

Commit

Permalink
test: use scenario for unit testing (pt.2) (#225)
Browse files Browse the repository at this point in the history
Signed-off-by: guillaume <[email protected]>
  • Loading branch information
gruyaume authored Aug 22, 2024
1 parent 3c5590a commit 08776fc
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 190 deletions.
25 changes: 10 additions & 15 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
generate_certificate,
generate_private_key,
)
from ops.charm import ActionEvent, CharmBase, CollectStatusEvent, RelationJoinedEvent
from ops.charm import ActionEvent, CharmBase, CollectStatusEvent
from ops.framework import EventBase
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, SecretNotFoundError
Expand Down Expand Up @@ -66,7 +66,7 @@ def __init__(self, *args):
)
self.framework.observe(
self.on[SEND_CA_CERT_REL_NAME].relation_joined,
self._on_send_ca_cert_relation_joined,
self._configure,
)

def _on_collect_unit_status(self, event: CollectStatusEvent):
Expand Down Expand Up @@ -297,28 +297,23 @@ def _on_get_ca_certificate(self, event: ActionEvent):
ca_certificate_secret_content = ca_certificate_secret.get_content(refresh=True)
event.set_results({"ca-certificate": ca_certificate_secret_content["ca-certificate"]})

def _on_send_ca_cert_relation_joined(self, event: RelationJoinedEvent):
self._send_ca_cert(rel_id=event.relation.id)

def _send_ca_cert(self, *, rel_id=None):
"""There is one (and only one) CA cert that we need to forward to multiple apps.
Args:
rel_id: Relation id. If not given, update all relations.
"""
if not self._root_certificate_is_stored:
return
send_ca_cert = CertificateTransferProvides(self, SEND_CA_CERT_REL_NAME)
if self._root_certificate_is_stored:
secret = self.model.get_secret(label=CA_CERTIFICATES_SECRET_LABEL)
secret_content = secret.get_content(refresh=True)
ca = secret_content["ca-certificate"]
if rel_id:
send_ca_cert.set_certificate("", ca, [], relation_id=rel_id)
else:
for relation in self.model.relations.get(SEND_CA_CERT_REL_NAME, []):
send_ca_cert.set_certificate("", ca, [], relation_id=relation.id)
secret = self.model.get_secret(label=CA_CERTIFICATES_SECRET_LABEL)
secret_content = secret.get_content(refresh=True)
ca = secret_content["ca-certificate"]
if rel_id:
send_ca_cert.set_certificate("", ca, [], relation_id=rel_id)
else:
for relation in self.model.relations.get(SEND_CA_CERT_REL_NAME, []):
send_ca_cert.remove_certificate(relation.id)
send_ca_cert.set_certificate("", ca, [], relation_id=relation.id)

def _push_ca_cert_to_container(self, ca_certificate: str):
"""Store the CA certificate in the charm container.
Expand Down
69 changes: 69 additions & 0 deletions tests/unit/test_charm_collect_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

from unittest.mock import patch

import pytest
import scenario
from ops.model import ActiveStatus, BlockedStatus

from charm import SelfSignedCertificatesCharm


class TestCharmCollectStatus:
@pytest.fixture(autouse=True)
def context(self):
self.ctx = scenario.Context(
charm_type=SelfSignedCertificatesCharm,
)

def test_given_invalid_config_when_collect_unit_status_then_status_is_blocked(self):
state_in = scenario.State(
config={
"ca-common-name": "",
"certificate-validity": 100,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == BlockedStatus(
"The following configuration values are not valid: ['ca-common-name']"
)

def test_given_invalid_validity_config_when_collect_unit_status_then_status_is_blocked(self):
state_in = scenario.State(
config={
"ca-common-name": "pizza.com",
"certificate-validity": 0,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == BlockedStatus(
"The following configuration values are not valid: ['certificate-validity']"
)

@patch("charm.generate_private_key")
@patch("charm.generate_ca")
def test_given_valid_config_when_collect_unit_status_then_status_is_active(
self,
patch_generate_ca,
patch_generate_private_key,
):
patch_generate_ca.return_value = "whatever CA certificate"
patch_generate_private_key.return_value = "whatever private key"
state_in = scenario.State(
config={
"ca-common-name": "pizza.com",
"certificate-validity": 100,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == ActiveStatus()
155 changes: 25 additions & 130 deletions tests/unit/test_charm.py → tests/unit/test_charm_configure.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
from datetime import datetime, timedelta
from unittest.mock import mock_open, patch

Expand All @@ -15,15 +14,13 @@
generate_csr,
generate_private_key,
)
from ops.model import ActiveStatus, BlockedStatus

from charm import SelfSignedCertificatesCharm

TLS_LIB_PATH = "charms.tls_certificates_interface.v4.tls_certificates"
CA_CERT_PATH = "/tmp/ca-cert.pem"


class TestCharm:
class TestCharmConfigure:
@pytest.fixture(autouse=True)
def setup(self):
self.mock_open = mock_open()
Expand All @@ -36,57 +33,6 @@ def context(self):
charm_type=SelfSignedCertificatesCharm,
)

def test_given_invalid_config_when_collect_unit_status_then_status_is_blocked(self):
state_in = scenario.State(
config={
"ca-common-name": "",
"certificate-validity": 100,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == BlockedStatus(
"The following configuration values are not valid: ['ca-common-name']"
)

def test_given_invalid_validity_config_when_collect_unit_status_then_status_is_blocked(self):
state_in = scenario.State(
config={
"ca-common-name": "pizza.com",
"certificate-validity": 0,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == BlockedStatus(
"The following configuration values are not valid: ['certificate-validity']"
)

@patch("charm.generate_private_key")
@patch("charm.generate_ca")
def test_given_valid_config_when_collect_unit_status_then_status_is_active(
self,
patch_generate_ca,
patch_generate_private_key,
):
patch_generate_ca.return_value = "whatever CA certificate"
patch_generate_private_key.return_value = "whatever private key"
state_in = scenario.State(
config={
"ca-common-name": "pizza.com",
"certificate-validity": 100,
},
leader=True,
)

state_out = self.ctx.run(event="collect_unit_status", state=state_in)

assert state_out.unit_status == ActiveStatus()

@patch("charm.generate_private_key")
@patch("charm.generate_ca")
def test_given_valid_config_when_config_changed_then_ca_certificate_is_pushed_to_charm_container( # noqa: E501
Expand Down Expand Up @@ -390,94 +336,43 @@ def test_given_initial_config_when_config_changed_then_stored_ca_common_name_use
assert ca_certificates_secret["ca-certificate"] == str(new_ca_certificate)
assert ca_certificates_secret["private-key"] == str(new_ca_private_key)

def test_given_no_certificates_issued_when_get_issued_certificates_action_then_action_fails(
self,
):
state_in = scenario.State()

action_output = self.ctx.run_action("get-issued-certificates", state=state_in)

assert not action_output.success
assert action_output.failure == "No certificates issued yet."

@patch(f"{TLS_LIB_PATH}.TLSCertificatesProvidesV4.get_issued_certificates")
def test_given_certificates_issued_when_get_issued_certificates_action_then_action_returns_certificates( # noqa: E501
self,
patch_get_issued_certificates,
):
ca_private_key = generate_private_key()
ca_certificate = generate_ca(
private_key=ca_private_key,
common_name="example.com",
validity=100,
def test_given_certificate_transfer_relations_when_configure_then_ca_cert_is_advertised(self):
traefik_relation = scenario.Relation(
endpoint="send-ca-cert",
interface="certificate_transfer",
)
requirer_private_key = generate_private_key()
csr = generate_csr(private_key=requirer_private_key, common_name="example.com")
certificate = generate_certificate(
csr=csr,
ca=ca_certificate,
ca_private_key=ca_private_key,
validity=100,
another_relation = scenario.Relation(
endpoint="send-ca-cert",
interface="certificate_transfer",
)
chain = [ca_certificate, certificate]
revoked = False
patch_get_issued_certificates.return_value = [
ProviderCertificate(
relation_id=1,
certificate_signing_request=csr,
certificate=certificate,
ca=ca_certificate,
chain=chain,
revoked=revoked,
)
]
state_in = scenario.State(
config={
"ca-common-name": "example.com",
"certificate-validity": 100,
},
leader=True,
provider_private_key = generate_private_key()
provider_ca = generate_ca(
private_key=provider_private_key,
common_name="example.com",
validity=100,
)

action_output = self.ctx.run_action("get-issued-certificates", state=state_in)

assert action_output.results
output_certificate = json.loads(action_output.results["certificates"][0])
assert output_certificate["csr"] == str(csr)
assert output_certificate["certificate"] == str(certificate)
assert output_certificate["ca"] == str(ca_certificate)
assert output_certificate["chain"] == [str(ca_certificate), str(certificate)]
assert output_certificate["revoked"] == revoked

def test_given_ca_cert_generated_when_get_ca_certificate_action_then_returns_ca_certificate(
self,
):
ca_certificate = "whatever CA certificate"
ca_certificates_secret = scenario.Secret(
secret = scenario.Secret(
id="0",
label="ca-certificates",
contents={
0: {
"ca-certificate": ca_certificate,
"ca-certificate": str(provider_ca),
"private-key": str(provider_private_key),
}
},
owner="app",
)
state_in = scenario.State(
relations=[traefik_relation, another_relation],
secrets=[secret],
leader=True,
secrets=[ca_certificates_secret],
)

action_output = self.ctx.run_action("get-ca-certificate", state=state_in)
assert action_output.results
assert action_output.results["ca-certificate"] == ca_certificate

def test_given_ca_cert_not_generated_when_get_ca_certificate_action_then_action_fails(self):
state_in = scenario.State(
leader=True,
config={
"ca-common-name": "example.com",
"certificate-validity": 100,
},
)

action_output = self.ctx.run_action("get-ca-certificate", state=state_in)
state_out = self.ctx.run(event="config_changed", state=state_in)

assert not action_output.success
assert action_output.failure == "Root Certificate is not yet generated"
assert state_out.relations[0].local_unit_data["ca"] == str(provider_ca)
assert state_out.relations[1].local_unit_data["ca"] == str(provider_ca)
49 changes: 49 additions & 0 deletions tests/unit/test_charm_get_ca_certificate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.


import pytest
import scenario

from charm import SelfSignedCertificatesCharm


class TestCharmGetCACertificate:
@pytest.fixture(autouse=True)
def context(self):
self.ctx = scenario.Context(
charm_type=SelfSignedCertificatesCharm,
)

def test_given_ca_cert_generated_when_get_ca_certificate_action_then_returns_ca_certificate(
self,
):
ca_certificate = "whatever CA certificate"
ca_certificates_secret = scenario.Secret(
id="0",
label="ca-certificates",
contents={
0: {
"ca-certificate": ca_certificate,
}
},
owner="app",
)
state_in = scenario.State(
leader=True,
secrets=[ca_certificates_secret],
)

action_output = self.ctx.run_action("get-ca-certificate", state=state_in)
assert action_output.results
assert action_output.results["ca-certificate"] == ca_certificate

def test_given_ca_cert_not_generated_when_get_ca_certificate_action_then_action_fails(self):
state_in = scenario.State(
leader=True,
)

action_output = self.ctx.run_action("get-ca-certificate", state=state_in)

assert not action_output.success
assert action_output.failure == "Root Certificate is not yet generated"
Loading

0 comments on commit 08776fc

Please sign in to comment.