From 08776fc4024013c09aa6e681bc0cac26073bfc86 Mon Sep 17 00:00:00 2001 From: Guillaume Belanger Date: Thu, 22 Aug 2024 10:55:55 -0400 Subject: [PATCH] test: use scenario for unit testing (pt.2) (#225) Signed-off-by: guillaume --- src/charm.py | 25 ++- tests/unit/test_charm_collect_status.py | 69 ++++++++ ...{test_charm.py => test_charm_configure.py} | 155 +++--------------- tests/unit/test_charm_get_ca_certificate.py | 49 ++++++ .../test_charm_get_issued_certificates.py | 86 ++++++++++ tests/unit/test_send_ca_cert.py | 45 ----- 6 files changed, 239 insertions(+), 190 deletions(-) create mode 100644 tests/unit/test_charm_collect_status.py rename tests/unit/{test_charm.py => test_charm_configure.py} (72%) create mode 100644 tests/unit/test_charm_get_ca_certificate.py create mode 100644 tests/unit/test_charm_get_issued_certificates.py delete mode 100644 tests/unit/test_send_ca_cert.py diff --git a/src/charm.py b/src/charm.py index 6639c9b..2117c93 100755 --- a/src/charm.py +++ b/src/charm.py @@ -23,7 +23,7 @@ generate_certificate, generate_private_key, ) -from ops.charm import ActionEvent, CharmBase, CollectStatusEvent, RelationJoinedEvent +from ops.charm import ActionEvent, CharmBase, CollectStatusEvent from ops.framework import EventBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus, SecretNotFoundError @@ -66,7 +66,7 @@ def __init__(self, *args): ) self.framework.observe( self.on[SEND_CA_CERT_REL_NAME].relation_joined, - self._on_send_ca_cert_relation_joined, + self._configure, ) def _on_collect_unit_status(self, event: CollectStatusEvent): @@ -297,28 +297,23 @@ def _on_get_ca_certificate(self, event: ActionEvent): ca_certificate_secret_content = ca_certificate_secret.get_content(refresh=True) event.set_results({"ca-certificate": ca_certificate_secret_content["ca-certificate"]}) - def _on_send_ca_cert_relation_joined(self, event: RelationJoinedEvent): - self._send_ca_cert(rel_id=event.relation.id) - def _send_ca_cert(self, *, rel_id=None): """There is one (and only one) CA cert that we need to forward to multiple apps. Args: rel_id: Relation id. If not given, update all relations. """ + if not self._root_certificate_is_stored: + return send_ca_cert = CertificateTransferProvides(self, SEND_CA_CERT_REL_NAME) - if self._root_certificate_is_stored: - secret = self.model.get_secret(label=CA_CERTIFICATES_SECRET_LABEL) - secret_content = secret.get_content(refresh=True) - ca = secret_content["ca-certificate"] - if rel_id: - send_ca_cert.set_certificate("", ca, [], relation_id=rel_id) - else: - for relation in self.model.relations.get(SEND_CA_CERT_REL_NAME, []): - send_ca_cert.set_certificate("", ca, [], relation_id=relation.id) + secret = self.model.get_secret(label=CA_CERTIFICATES_SECRET_LABEL) + secret_content = secret.get_content(refresh=True) + ca = secret_content["ca-certificate"] + if rel_id: + send_ca_cert.set_certificate("", ca, [], relation_id=rel_id) else: for relation in self.model.relations.get(SEND_CA_CERT_REL_NAME, []): - send_ca_cert.remove_certificate(relation.id) + send_ca_cert.set_certificate("", ca, [], relation_id=relation.id) def _push_ca_cert_to_container(self, ca_certificate: str): """Store the CA certificate in the charm container. diff --git a/tests/unit/test_charm_collect_status.py b/tests/unit/test_charm_collect_status.py new file mode 100644 index 0000000..bf43416 --- /dev/null +++ b/tests/unit/test_charm_collect_status.py @@ -0,0 +1,69 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +from unittest.mock import patch + +import pytest +import scenario +from ops.model import ActiveStatus, BlockedStatus + +from charm import SelfSignedCertificatesCharm + + +class TestCharmCollectStatus: + @pytest.fixture(autouse=True) + def context(self): + self.ctx = scenario.Context( + charm_type=SelfSignedCertificatesCharm, + ) + + def test_given_invalid_config_when_collect_unit_status_then_status_is_blocked(self): + state_in = scenario.State( + config={ + "ca-common-name": "", + "certificate-validity": 100, + }, + leader=True, + ) + + state_out = self.ctx.run(event="collect_unit_status", state=state_in) + + assert state_out.unit_status == BlockedStatus( + "The following configuration values are not valid: ['ca-common-name']" + ) + + def test_given_invalid_validity_config_when_collect_unit_status_then_status_is_blocked(self): + state_in = scenario.State( + config={ + "ca-common-name": "pizza.com", + "certificate-validity": 0, + }, + leader=True, + ) + + state_out = self.ctx.run(event="collect_unit_status", state=state_in) + + assert state_out.unit_status == BlockedStatus( + "The following configuration values are not valid: ['certificate-validity']" + ) + + @patch("charm.generate_private_key") + @patch("charm.generate_ca") + def test_given_valid_config_when_collect_unit_status_then_status_is_active( + self, + patch_generate_ca, + patch_generate_private_key, + ): + patch_generate_ca.return_value = "whatever CA certificate" + patch_generate_private_key.return_value = "whatever private key" + state_in = scenario.State( + config={ + "ca-common-name": "pizza.com", + "certificate-validity": 100, + }, + leader=True, + ) + + state_out = self.ctx.run(event="collect_unit_status", state=state_in) + + assert state_out.unit_status == ActiveStatus() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm_configure.py similarity index 72% rename from tests/unit/test_charm.py rename to tests/unit/test_charm_configure.py index 2c06646..616f307 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm_configure.py @@ -1,7 +1,6 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -import json from datetime import datetime, timedelta from unittest.mock import mock_open, patch @@ -15,15 +14,13 @@ generate_csr, generate_private_key, ) -from ops.model import ActiveStatus, BlockedStatus from charm import SelfSignedCertificatesCharm TLS_LIB_PATH = "charms.tls_certificates_interface.v4.tls_certificates" -CA_CERT_PATH = "/tmp/ca-cert.pem" -class TestCharm: +class TestCharmConfigure: @pytest.fixture(autouse=True) def setup(self): self.mock_open = mock_open() @@ -36,57 +33,6 @@ def context(self): charm_type=SelfSignedCertificatesCharm, ) - def test_given_invalid_config_when_collect_unit_status_then_status_is_blocked(self): - state_in = scenario.State( - config={ - "ca-common-name": "", - "certificate-validity": 100, - }, - leader=True, - ) - - state_out = self.ctx.run(event="collect_unit_status", state=state_in) - - assert state_out.unit_status == BlockedStatus( - "The following configuration values are not valid: ['ca-common-name']" - ) - - def test_given_invalid_validity_config_when_collect_unit_status_then_status_is_blocked(self): - state_in = scenario.State( - config={ - "ca-common-name": "pizza.com", - "certificate-validity": 0, - }, - leader=True, - ) - - state_out = self.ctx.run(event="collect_unit_status", state=state_in) - - assert state_out.unit_status == BlockedStatus( - "The following configuration values are not valid: ['certificate-validity']" - ) - - @patch("charm.generate_private_key") - @patch("charm.generate_ca") - def test_given_valid_config_when_collect_unit_status_then_status_is_active( - self, - patch_generate_ca, - patch_generate_private_key, - ): - patch_generate_ca.return_value = "whatever CA certificate" - patch_generate_private_key.return_value = "whatever private key" - state_in = scenario.State( - config={ - "ca-common-name": "pizza.com", - "certificate-validity": 100, - }, - leader=True, - ) - - state_out = self.ctx.run(event="collect_unit_status", state=state_in) - - assert state_out.unit_status == ActiveStatus() - @patch("charm.generate_private_key") @patch("charm.generate_ca") def test_given_valid_config_when_config_changed_then_ca_certificate_is_pushed_to_charm_container( # noqa: E501 @@ -390,94 +336,43 @@ def test_given_initial_config_when_config_changed_then_stored_ca_common_name_use assert ca_certificates_secret["ca-certificate"] == str(new_ca_certificate) assert ca_certificates_secret["private-key"] == str(new_ca_private_key) - def test_given_no_certificates_issued_when_get_issued_certificates_action_then_action_fails( - self, - ): - state_in = scenario.State() - - action_output = self.ctx.run_action("get-issued-certificates", state=state_in) - - assert not action_output.success - assert action_output.failure == "No certificates issued yet." - - @patch(f"{TLS_LIB_PATH}.TLSCertificatesProvidesV4.get_issued_certificates") - def test_given_certificates_issued_when_get_issued_certificates_action_then_action_returns_certificates( # noqa: E501 - self, - patch_get_issued_certificates, - ): - ca_private_key = generate_private_key() - ca_certificate = generate_ca( - private_key=ca_private_key, - common_name="example.com", - validity=100, + def test_given_certificate_transfer_relations_when_configure_then_ca_cert_is_advertised(self): + traefik_relation = scenario.Relation( + endpoint="send-ca-cert", + interface="certificate_transfer", ) - requirer_private_key = generate_private_key() - csr = generate_csr(private_key=requirer_private_key, common_name="example.com") - certificate = generate_certificate( - csr=csr, - ca=ca_certificate, - ca_private_key=ca_private_key, - validity=100, + another_relation = scenario.Relation( + endpoint="send-ca-cert", + interface="certificate_transfer", ) - chain = [ca_certificate, certificate] - revoked = False - patch_get_issued_certificates.return_value = [ - ProviderCertificate( - relation_id=1, - certificate_signing_request=csr, - certificate=certificate, - ca=ca_certificate, - chain=chain, - revoked=revoked, - ) - ] - state_in = scenario.State( - config={ - "ca-common-name": "example.com", - "certificate-validity": 100, - }, - leader=True, + provider_private_key = generate_private_key() + provider_ca = generate_ca( + private_key=provider_private_key, + common_name="example.com", + validity=100, ) - - action_output = self.ctx.run_action("get-issued-certificates", state=state_in) - - assert action_output.results - output_certificate = json.loads(action_output.results["certificates"][0]) - assert output_certificate["csr"] == str(csr) - assert output_certificate["certificate"] == str(certificate) - assert output_certificate["ca"] == str(ca_certificate) - assert output_certificate["chain"] == [str(ca_certificate), str(certificate)] - assert output_certificate["revoked"] == revoked - - def test_given_ca_cert_generated_when_get_ca_certificate_action_then_returns_ca_certificate( - self, - ): - ca_certificate = "whatever CA certificate" - ca_certificates_secret = scenario.Secret( + secret = scenario.Secret( id="0", label="ca-certificates", contents={ 0: { - "ca-certificate": ca_certificate, + "ca-certificate": str(provider_ca), + "private-key": str(provider_private_key), } }, owner="app", ) state_in = scenario.State( + relations=[traefik_relation, another_relation], + secrets=[secret], leader=True, - secrets=[ca_certificates_secret], - ) - - action_output = self.ctx.run_action("get-ca-certificate", state=state_in) - assert action_output.results - assert action_output.results["ca-certificate"] == ca_certificate - - def test_given_ca_cert_not_generated_when_get_ca_certificate_action_then_action_fails(self): - state_in = scenario.State( - leader=True, + config={ + "ca-common-name": "example.com", + "certificate-validity": 100, + }, ) - action_output = self.ctx.run_action("get-ca-certificate", state=state_in) + state_out = self.ctx.run(event="config_changed", state=state_in) - assert not action_output.success - assert action_output.failure == "Root Certificate is not yet generated" + assert state_out.relations[0].local_unit_data["ca"] == str(provider_ca) + assert state_out.relations[1].local_unit_data["ca"] == str(provider_ca) diff --git a/tests/unit/test_charm_get_ca_certificate.py b/tests/unit/test_charm_get_ca_certificate.py new file mode 100644 index 0000000..583de98 --- /dev/null +++ b/tests/unit/test_charm_get_ca_certificate.py @@ -0,0 +1,49 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + + +import pytest +import scenario + +from charm import SelfSignedCertificatesCharm + + +class TestCharmGetCACertificate: + @pytest.fixture(autouse=True) + def context(self): + self.ctx = scenario.Context( + charm_type=SelfSignedCertificatesCharm, + ) + + def test_given_ca_cert_generated_when_get_ca_certificate_action_then_returns_ca_certificate( + self, + ): + ca_certificate = "whatever CA certificate" + ca_certificates_secret = scenario.Secret( + id="0", + label="ca-certificates", + contents={ + 0: { + "ca-certificate": ca_certificate, + } + }, + owner="app", + ) + state_in = scenario.State( + leader=True, + secrets=[ca_certificates_secret], + ) + + action_output = self.ctx.run_action("get-ca-certificate", state=state_in) + assert action_output.results + assert action_output.results["ca-certificate"] == ca_certificate + + def test_given_ca_cert_not_generated_when_get_ca_certificate_action_then_action_fails(self): + state_in = scenario.State( + leader=True, + ) + + action_output = self.ctx.run_action("get-ca-certificate", state=state_in) + + assert not action_output.success + assert action_output.failure == "Root Certificate is not yet generated" diff --git a/tests/unit/test_charm_get_issued_certificates.py b/tests/unit/test_charm_get_issued_certificates.py new file mode 100644 index 0000000..76f7c36 --- /dev/null +++ b/tests/unit/test_charm_get_issued_certificates.py @@ -0,0 +1,86 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import json +from unittest.mock import patch + +import pytest +import scenario +from charms.tls_certificates_interface.v4.tls_certificates import ( + ProviderCertificate, + generate_ca, + generate_certificate, + generate_csr, + generate_private_key, +) + +from charm import SelfSignedCertificatesCharm + +TLS_LIB_PATH = "charms.tls_certificates_interface.v4.tls_certificates" + + +class TestCharmGetIssuedCertificates: + @pytest.fixture(autouse=True) + def context(self): + self.ctx = scenario.Context( + charm_type=SelfSignedCertificatesCharm, + ) + + def test_given_no_certificates_issued_when_get_issued_certificates_action_then_action_fails( + self, + ): + state_in = scenario.State() + + action_output = self.ctx.run_action("get-issued-certificates", state=state_in) + + assert not action_output.success + assert action_output.failure == "No certificates issued yet." + + @patch(f"{TLS_LIB_PATH}.TLSCertificatesProvidesV4.get_issued_certificates") + def test_given_certificates_issued_when_get_issued_certificates_action_then_action_returns_certificates( # noqa: E501 + self, + patch_get_issued_certificates, + ): + ca_private_key = generate_private_key() + ca_certificate = generate_ca( + private_key=ca_private_key, + common_name="example.com", + validity=100, + ) + requirer_private_key = generate_private_key() + csr = generate_csr(private_key=requirer_private_key, common_name="example.com") + certificate = generate_certificate( + csr=csr, + ca=ca_certificate, + ca_private_key=ca_private_key, + validity=100, + ) + chain = [ca_certificate, certificate] + revoked = False + patch_get_issued_certificates.return_value = [ + ProviderCertificate( + relation_id=1, + certificate_signing_request=csr, + certificate=certificate, + ca=ca_certificate, + chain=chain, + revoked=revoked, + ) + ] + state_in = scenario.State( + config={ + "ca-common-name": "example.com", + "certificate-validity": 100, + }, + leader=True, + ) + + action_output = self.ctx.run_action("get-issued-certificates", state=state_in) + + assert action_output.results + output_certificate = json.loads(action_output.results["certificates"][0]) + assert output_certificate["csr"] == str(csr) + assert output_certificate["certificate"] == str(certificate) + assert output_certificate["ca"] == str(ca_certificate) + assert output_certificate["chain"] == [str(ca_certificate), str(certificate)] + assert output_certificate["revoked"] == revoked diff --git a/tests/unit/test_send_ca_cert.py b/tests/unit/test_send_ca_cert.py deleted file mode 100644 index cbdac88..0000000 --- a/tests/unit/test_send_ca_cert.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import unittest -from unittest.mock import mock_open, patch - -import ops -import ops.testing - -from charm import CA_CERTIFICATES_SECRET_LABEL, SelfSignedCertificatesCharm - - -class TestSendCaCert(unittest.TestCase): - def setUp(self): - self.harness = ops.testing.Harness(SelfSignedCertificatesCharm) - self.addCleanup(self.harness.cleanup) - self.harness.set_leader(is_leader=True) - self.mock_open = mock_open() - self.patcher = patch("builtins.open", self.mock_open) - self.patcher.start() - self.harness.begin_with_initial_hooks() - - def tearDown(self): - self.patcher.stop() - - def test_when_relation_join_then_ca_cert_is_advertised(self): - # Add a few apps - apps = ["traefik", "another"] - rel_ids = [ - self.harness.add_relation(relation_name="send-ca-cert", remote_app=app) for app in apps - ] - for app, rel_id in zip(apps, rel_ids): - self.harness.add_relation_unit(relation_id=rel_id, remote_unit_name=f"{app}/0") - - # Now make sure all the apps have the same ca - secret = self.harness.charm.model.get_secret( - label=CA_CERTIFICATES_SECRET_LABEL - ).get_content() - ca_from_secret = secret["ca-certificate"] - - for rel_id in rel_ids: - with self.subTest(rel_id=rel_id): - data = self.harness.get_relation_data(rel_id, self.harness.charm.unit) - ca_from_rel_data = data["ca"] - self.assertEqual(ca_from_secret, ca_from_rel_data)