Skip to content

Commit

Permalink
Manual MaxPatchPublishDate selection through date versioning (#257)
Browse files Browse the repository at this point in the history
* Manual MaxPatchPublishDate selection through date versioning

* Mitigation mode

* Enhanced mitigation mode

* Additional test coverage

* Modified check flow

* One additional line of code coverage

* Comments for code readability

* Different data validation method

* Additional error condition check

* Force codecov run

* Codecov reset

* Soft reset over master

* Test merge

* Additional asserts
  • Loading branch information
kjohn-msft authored Nov 8, 2024
1 parent e89747a commit 82b15b2
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
47 changes: 45 additions & 2 deletions src/core/src/core_logic/ExecutionConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import os
import uuid

from core.src.bootstrap.Constants import Constants


Expand Down Expand Up @@ -58,8 +59,9 @@ def __init__(self, env_layer, composite_logger, execution_parameters):
self.included_package_name_mask_list = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.PATCHES_TO_INCLUDE, [])
self.excluded_package_name_mask_list = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.PATCHES_TO_EXCLUDE, [])
self.maintenance_run_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.MAINTENANCE_RUN_ID)
self.health_store_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.HEALTH_STORE_ID)
self.max_patch_publish_date = self.__get_max_patch_publish_date(self.health_store_id)
self.max_patch_publish_date = self.__get_max_patch_publish_date_from_inclusions(self.included_package_name_mask_list) # supersedes in a mitigation scenario
self.health_store_id = self.__get_execution_configuration_value_safely(self.config_settings, Constants.ConfigSettings.HEALTH_STORE_ID) if self.max_patch_publish_date == str() else str()
self.max_patch_publish_date = self.__get_max_patch_publish_date(self.health_store_id) if self.max_patch_publish_date == str() else self.max_patch_publish_date
if self.operation == Constants.INSTALLATION:
self.reboot_setting = self.config_settings[Constants.ConfigSettings.REBOOT_SETTING] # expected to throw if not present
else:
Expand Down Expand Up @@ -113,6 +115,47 @@ def __get_max_patch_publish_date(self, health_store_id):
self.composite_logger.log_debug("[EC] Getting max patch publish date. [MaxPatchPublishDate={0}][HealthStoreId={1}]".format(str(max_patch_publish_date), str(health_store_id)))
return max_patch_publish_date

def __get_max_patch_publish_date_from_inclusions(self, included_package_name_mask_list):
# type (str) -> str
# This is for AzGPS mitigation mode execution for Strict safe-deployment of patches.
if included_package_name_mask_list is None or included_package_name_mask_list == str():
return str()

mitigation_mode_flag = False
mitigation_mode_flag_pos = -1
candidate = str()
candidate_pos = -1

for i in range(0, len(included_package_name_mask_list)):
if mitigation_mode_flag and candidate != str():
break # everything we're looking for has been found

if included_package_name_mask_list[i] == "AzGPS_Mitigation_Mode_No_SLA":
mitigation_mode_flag = True
mitigation_mode_flag_pos = i
continue # mitigation mode flag found

if candidate != str() or not included_package_name_mask_list[i].startswith("MaxPatchPublishDate="):
continue # good candidate already found, or candidate not found and does not match what we are looking for

candidate = included_package_name_mask_list[i].replace("MaxPatchPublishDate=", "")
try:
datetime.datetime.strptime(candidate, "%Y%m%dT%H%M%SZ")
self.composite_logger.log_debug("[EC] Discovered effective MaxPatchPublishDate in patch inclusions. [MaxPatchPublishDate={0}]".format(str(candidate)))
candidate_pos = i
except ValueError:
self.composite_logger.log_debug("[EC] Invalid match on MaxPatchPublishDate in patch inclusions. [MaxPatchPublishDate={0}]".format(str(candidate)))
candidate = str()

# if everything we're looking for is present, remove them from the list
if mitigation_mode_flag and candidate != str() and mitigation_mode_flag_pos != -1 and candidate_pos != -1:
self.composite_logger.log_warning("AzGPS Mitigation Mode: There is no support or SLA for execution in this mode without explicit direction from AzGPS engineering.")
included_package_name_mask_list.pop(mitigation_mode_flag_pos)
included_package_name_mask_list.pop(candidate_pos - 1 if mitigation_mode_flag_pos < candidate_pos else candidate_pos)
return candidate

return str()

@staticmethod
def __get_value_from_argv(argv, key, default_value=Constants.DEFAULT_UNSPECIFIED_VALUE):
""" Discovers the value associated with a specific parameter in input arguments. """
Expand Down
56 changes: 56 additions & 0 deletions src/core/tests/Test_AptitudePackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,62 @@ def test_eula_not_accepted_for_patches(self):
self.assertTrue("ACCEPT_EULA=Y" not in package_manager_for_test.single_package_dependency_resolution_template)
self.assertTrue("ACCEPT_EULA=Y" not in package_manager_for_test.single_package_upgrade_cmd)

def test_maxpatchpublishdate_mitigation_mode(self):
self.runtime.set_legacy_test_type('HappyPath')
package_manager = self.container.get('package_manager')
self.assertIsNotNone(package_manager)
self.runtime.stop()

# classic happy path mode
argument_composer = ArgumentComposer()
argument_composer.classifications_to_include = [Constants.PackageClassification.CRITICAL]
argument_composer.patches_to_include = ["AzGPS_Mitigation_Mode_No_SLA", "MaxPatchPublishDate=20250101T010203Z", "*kernel*"]
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
execution_config = self.runtime.container.get('execution_config')
self.assertEqual(execution_config.max_patch_publish_date, "20250101T010203Z")
self.assertEqual(len(execution_config.included_package_name_mask_list), 1) # inclusion list is sanitized
self.runtime.stop()

# retains valid inclusions while honoring mitigation mode entries
argument_composer = ArgumentComposer()
argument_composer.patches_to_include = ["*kernel*", "MaxPatchPublishDate=20250101T010203Z", "AzGPS_Mitigation_Mode_No_SLA"]
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
execution_config = self.runtime.container.get('execution_config')
self.assertEqual(execution_config.max_patch_publish_date, "20250101T010203Z")
self.assertEqual(len(execution_config.included_package_name_mask_list), 1) # inclusion list is sanitized
self.assertEqual(execution_config.included_package_name_mask_list[0], "*kernel*")
self.runtime.stop()

# missing required disclaimer entry
argument_composer = ArgumentComposer()
argument_composer.patches_to_include = ["MaxPatchPublishDate=20250101T010203Z", "*firefox=1.1"]
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
execution_config = self.runtime.container.get('execution_config')
self.assertEqual(execution_config.max_patch_publish_date, "") # because no mitigation mode
self.assertEqual(len(execution_config.included_package_name_mask_list), 2) # addition is ignored for removal
self.assertEqual(execution_config.included_package_name_mask_list[1], "*firefox=1.1")
self.runtime.stop()

# badly formatted date
argument_composer = ArgumentComposer()
argument_composer.patches_to_include = ["*firefox*", "MaxPatchPublishDate=20250101010203Z", "AzGPS_Mitigation_Mode_No_SLA", "*kernel*"]
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
execution_config = self.runtime.container.get('execution_config')
self.assertEqual(execution_config.max_patch_publish_date, "")
self.assertEqual(len(execution_config.included_package_name_mask_list), 4)
self.assertEqual(execution_config.included_package_name_mask_list[0], "*firefox*")
self.assertEqual(execution_config.included_package_name_mask_list[3], "*kernel*") # because nothing is removed
self.runtime.stop()

# no patches to include set (assessment)
argument_composer = ArgumentComposer()
argument_composer.patches_to_include = None
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
execution_config = self.runtime.container.get('execution_config')
self.assertEqual(execution_config.max_patch_publish_date, "")
self.assertEqual(execution_config.included_package_name_mask_list, None)
self.runtime.stop()

def test_eula_acceptance_file_read_success(self):
self.runtime.stop()

Expand Down

0 comments on commit 82b15b2

Please sign in to comment.