From 69ea93f12dc52327ab3bc02a89c41d450feaab4a Mon Sep 17 00:00:00 2001 From: James Falcon Date: Thu, 9 Jun 2022 14:59:52 -0500 Subject: [PATCH] Fix cc_phone_home requiring 'tries' (#1500) As part of dee8231, the code was updated to use a ValueError rather than catching the top level Exception. This should have also included catching TypeError. Additionally, add unit tests for module as it effectively had none. LP: #1977952 --- cloudinit/config/cc_phone_home.py | 13 +-- tests/unittests/config/test_cc_phone_home.py | 87 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/cloudinit/config/cc_phone_home.py b/cloudinit/config/cc_phone_home.py index 681c3729863..bae54134a3e 100644 --- a/cloudinit/config/cc_phone_home.py +++ b/cloudinit/config/cc_phone_home.py @@ -130,7 +130,7 @@ def handle(name, cfg, cloud, log, args): tries = ph_cfg.get("tries") try: tries = int(tries) # type: ignore - except ValueError: + except (ValueError, TypeError): tries = 10 util.logexc( log, @@ -141,10 +141,11 @@ def handle(name, cfg, cloud, log, args): if post_list == "all": post_list = POST_LIST_ALL - all_keys = {} - all_keys["instance_id"] = cloud.get_instance_id() - all_keys["hostname"] = cloud.get_hostname() - all_keys["fqdn"] = cloud.get_hostname(fqdn=True) + all_keys = { + "instance_id": cloud.get_instance_id(), + "hostname": cloud.get_hostname(), + "fqdn": cloud.get_hostname(fqdn=True), + } pubkeys = { "pub_key_dsa": "/etc/ssh/ssh_host_dsa_key.pub", @@ -190,7 +191,7 @@ def handle(name, cfg, cloud, log, args): url_helper.read_file_or_url( url, data=real_submit_keys, - retries=tries, + retries=tries - 1, sec_between=3, ssl_details=util.fetch_ssl_details(cloud.paths), ) diff --git a/tests/unittests/config/test_cc_phone_home.py b/tests/unittests/config/test_cc_phone_home.py index 7264dda1ea0..7964705d9b0 100644 --- a/tests/unittests/config/test_cc_phone_home.py +++ b/tests/unittests/config/test_cc_phone_home.py @@ -1,11 +1,98 @@ +import logging +from functools import partial +from itertools import count +from unittest import mock + import pytest +from cloudinit.config.cc_phone_home import POST_LIST_ALL, handle from cloudinit.config.schema import ( SchemaValidationError, get_schema, validate_cloudconfig_schema, ) from tests.unittests.helpers import skipUnlessJsonSchema +from tests.unittests.util import get_cloud + +LOG = logging.getLogger("TestNoConfig") +phone_home = partial(handle, name="test", cloud=get_cloud(), log=LOG, args=[]) + + +@pytest.fixture(autouse=True) +def common_mocks(mocker): + mocker.patch("cloudinit.util.load_file", side_effect=count()) + + +@mock.patch("cloudinit.url_helper.readurl") +class TestPhoneHome: + def test_default_call(self, m_readurl): + cfg = {"phone_home": {"url": "myurl"}} + phone_home(cfg=cfg) + assert m_readurl.call_args == mock.call( + "myurl", + data={ + "pub_key_dsa": "0", + "pub_key_rsa": "1", + "pub_key_ecdsa": "2", + "pub_key_ed25519": "3", + "instance_id": "iid-datasource-none", + "hostname": "hostname", + "fqdn": "hostname", + }, + retries=9, + sec_between=3, + ssl_details={}, + ) + + def test_no_url(self, m_readurl, caplog): + cfg = {"phone_home": {}} + phone_home(cfg=cfg) + assert "Skipping module named" in caplog.text + assert m_readurl.call_count == 0 + + @pytest.mark.parametrize( + "tries, expected_retries", + [ + (-1, -2), + (0, -1), + (1, 0), + (2, 1), + ("2", 1), + ("two", 9), + (None, 9), + ({}, 9), + ], + ) + def test_tries(self, m_readurl, tries, expected_retries, caplog): + cfg = {"phone_home": {"url": "dontcare"}} + if tries is not None: + cfg["phone_home"]["tries"] = tries + phone_home(cfg=cfg) + assert m_readurl.call_args[1]["retries"] == expected_retries + + def test_post_all(self, m_readurl): + cfg = {"phone_home": {"url": "test", "post": "all"}} + phone_home(cfg=cfg) + for key in POST_LIST_ALL: + assert key in m_readurl.call_args[1]["data"] + + def test_custom_post_list(self, m_readurl): + post_list = ["pub_key_rsa, hostname"] + cfg = {"phone_home": {"url": "test", "post": post_list}} + phone_home(cfg=cfg) + for key in post_list: + assert key in m_readurl.call_args[1]["data"] + assert len(m_readurl.call_args[1]["data"]) == len(post_list) + + def test_invalid_post(self, m_readurl, caplog): + post_list = ["spam", "hostname"] + cfg = {"phone_home": {"url": "test", "post": post_list}} + phone_home(cfg=cfg) + assert "hostname" in m_readurl.call_args[1]["data"] + assert m_readurl.call_args[1]["data"]["spam"] == "N/A" + assert ( + "spam from 'post' configuration list not available" in caplog.text + ) class TestPhoneHomeSchema: