diff --git a/src/core/src/core_logic/ExecutionConfig.py b/src/core/src/core_logic/ExecutionConfig.py index a783464c..cfff4d7d 100644 --- a/src/core/src/core_logic/ExecutionConfig.py +++ b/src/core/src/core_logic/ExecutionConfig.py @@ -19,6 +19,7 @@ import json import os import uuid + from core.src.bootstrap.Constants import Constants @@ -58,8 +59,9 @@ def __init__(self, env_layer, composite_logger, execution_parameters): self.included_package_name_mask_list = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.PATCHES_TO_INCLUDE, []) self.excluded_package_name_mask_list = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.PATCHES_TO_EXCLUDE, []) self.maintenance_run_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.MAINTENANCE_RUN_ID) - self.health_store_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.HEALTH_STORE_ID) - self.max_patch_publish_date = self.__get_max_patch_publish_date(self.health_store_id) + self.max_patch_publish_date = self.__get_max_patch_publish_date_from_inclusions(self.included_package_name_mask_list) # supersedes in a mitigation scenario + self.health_store_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.HEALTH_STORE_ID) if self.max_patch_publish_date == str() else str() + self.max_patch_publish_date = self.__get_max_patch_publish_date(self.health_store_id) if self.max_patch_publish_date == str() else self.max_patch_publish_date if self.operation == Constants.INSTALLATION: self.reboot_setting = self.config_settings[Constants.ConfigSettings.REBOOT_SETTING] # expected to throw if not present else: @@ -113,6 +115,47 @@ def __get_max_patch_publish_date(self, health_store_id): self.composite_logger.log_debug("[EC] Getting max patch publish date. [MaxPatchPublishDate={0}][HealthStoreId={1}]".format(str(max_patch_publish_date), str(health_store_id))) return max_patch_publish_date + def __get_max_patch_publish_date_from_inclusions(self, included_package_name_mask_list): + # type (str) -> str + # This is for AzGPS mitigation mode execution for Strict safe-deployment of patches. + if included_package_name_mask_list is None or included_package_name_mask_list == str(): + return str() + + mitigation_mode_flag = False + mitigation_mode_flag_pos = -1 + candidate = str() + candidate_pos = -1 + + for i in range(0, len(included_package_name_mask_list)): + if mitigation_mode_flag and candidate != str(): + break # everything we're looking for has been found + + if included_package_name_mask_list[i] == "AzGPS_Mitigation_Mode_No_SLA": + mitigation_mode_flag = True + mitigation_mode_flag_pos = i + continue # mitigation mode flag found + + if candidate != str() or not included_package_name_mask_list[i].startswith("MaxPatchPublishDate="): + continue # good candidate already found, or candidate not found and does not match what we are looking for + + candidate = included_package_name_mask_list[i].replace("MaxPatchPublishDate=", "") + try: + datetime.datetime.strptime(candidate, "%Y%m%dT%H%M%SZ") + self.composite_logger.log_debug("[EC] Discovered effective MaxPatchPublishDate in patch inclusions. [MaxPatchPublishDate={0}]".format(str(candidate))) + candidate_pos = i + except ValueError: + self.composite_logger.log_debug("[EC] Invalid match on MaxPatchPublishDate in patch inclusions. [MaxPatchPublishDate={0}]".format(str(candidate))) + candidate = str() + + # if everything we're looking for is present, remove them from the list + if mitigation_mode_flag and candidate != str() and mitigation_mode_flag_pos != -1 and candidate_pos != -1: + self.composite_logger.log_warning("AzGPS Mitigation Mode: There is no support or SLA for execution in this mode without explicit direction from AzGPS engineering.") + included_package_name_mask_list.pop(mitigation_mode_flag_pos) + included_package_name_mask_list.pop(candidate_pos - 1 if mitigation_mode_flag_pos < candidate_pos else candidate_pos) + return candidate + + return str() + @staticmethod def __get_value_from_argv(argv, key, default_value=Constants.DEFAULT_UNSPECIFIED_VALUE): """ Discovers the value associated with a specific parameter in input arguments. """ diff --git a/src/core/tests/Test_AptitudePackageManager.py b/src/core/tests/Test_AptitudePackageManager.py index 62992185..667732c5 100644 --- a/src/core/tests/Test_AptitudePackageManager.py +++ b/src/core/tests/Test_AptitudePackageManager.py @@ -588,6 +588,62 @@ def test_eula_not_accepted_for_patches(self): self.assertTrue("ACCEPT_EULA=Y" not in package_manager_for_test.single_package_dependency_resolution_template) self.assertTrue("ACCEPT_EULA=Y" not in package_manager_for_test.single_package_upgrade_cmd) + def test_maxpatchpublishdate_mitigation_mode(self): + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertIsNotNone(package_manager) + self.runtime.stop() + + # classic happy path mode + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.CRITICAL] + argument_composer.patches_to_include = ["AzGPS_Mitigation_Mode_No_SLA", "MaxPatchPublishDate=20250101T010203Z", "*kernel*"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + execution_config = self.runtime.container.get('execution_config') + self.assertEqual(execution_config.max_patch_publish_date, "20250101T010203Z") + self.assertEqual(len(execution_config.included_package_name_mask_list), 1) # inclusion list is sanitized + self.runtime.stop() + + # retains valid inclusions while honoring mitigation mode entries + argument_composer = ArgumentComposer() + argument_composer.patches_to_include = ["*kernel*", "MaxPatchPublishDate=20250101T010203Z", "AzGPS_Mitigation_Mode_No_SLA"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + execution_config = self.runtime.container.get('execution_config') + self.assertEqual(execution_config.max_patch_publish_date, "20250101T010203Z") + self.assertEqual(len(execution_config.included_package_name_mask_list), 1) # inclusion list is sanitized + self.assertEqual(execution_config.included_package_name_mask_list[0], "*kernel*") + self.runtime.stop() + + # missing required disclaimer entry + argument_composer = ArgumentComposer() + argument_composer.patches_to_include = ["MaxPatchPublishDate=20250101T010203Z", "*firefox=1.1"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + execution_config = self.runtime.container.get('execution_config') + self.assertEqual(execution_config.max_patch_publish_date, "") # because no mitigation mode + self.assertEqual(len(execution_config.included_package_name_mask_list), 2) # addition is ignored for removal + self.assertEqual(execution_config.included_package_name_mask_list[1], "*firefox=1.1") + self.runtime.stop() + + # badly formatted date + argument_composer = ArgumentComposer() + argument_composer.patches_to_include = ["*firefox*", "MaxPatchPublishDate=20250101010203Z", "AzGPS_Mitigation_Mode_No_SLA", "*kernel*"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + execution_config = self.runtime.container.get('execution_config') + self.assertEqual(execution_config.max_patch_publish_date, "") + self.assertEqual(len(execution_config.included_package_name_mask_list), 4) + self.assertEqual(execution_config.included_package_name_mask_list[0], "*firefox*") + self.assertEqual(execution_config.included_package_name_mask_list[3], "*kernel*") # because nothing is removed + self.runtime.stop() + + # no patches to include set (assessment) + argument_composer = ArgumentComposer() + argument_composer.patches_to_include = None + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + execution_config = self.runtime.container.get('execution_config') + self.assertEqual(execution_config.max_patch_publish_date, "") + self.assertEqual(execution_config.included_package_name_mask_list, None) + self.runtime.stop() + def test_eula_acceptance_file_read_success(self): self.runtime.stop()