diff --git a/src/core/src/package_managers/YumPackageManager.py b/src/core/src/package_managers/YumPackageManager.py index ed9a090e..1568906e 100644 --- a/src/core/src/package_managers/YumPackageManager.py +++ b/src/core/src/package_managers/YumPackageManager.py @@ -143,7 +143,10 @@ def get_all_updates(self, cached=False): def get_security_updates(self): """Get missing security updates""" self.composite_logger.log("\nDiscovering 'security' packages...") - self.install_yum_security_prerequisite() + + if not self.__is_image_rhel8_or_higher(): + self.install_yum_security_prerequisite() + out = self.invoke_package_manager(self.yum_check_security) security_packages, security_package_versions = self.extract_packages_and_versions(out) @@ -176,6 +179,17 @@ def get_other_updates(self): self.composite_logger.log("Discovered " + str(len(other_packages)) + " 'other' package entries.") return other_packages, other_package_versions + def __is_image_rhel8_or_higher(self): + """ Check if image is RHEL8+ return true else false """ + if self.env_layer.platform.linux_distribution() is not None: + os_offer, os_version, os_code = self.env_layer.platform.linux_distribution() + + if "Red Hat Enterprise Linux" in os_offer and int(os_version.split('.')[0]) >= 8: + self.composite_logger.log_debug("Verify RHEL image version: " + str(os_version)) + return True + + return False + def set_max_patch_publish_date(self, max_patch_publish_date=str()): pass diff --git a/src/core/tests/Test_ConfigurePatchingProcessor.py b/src/core/tests/Test_ConfigurePatchingProcessor.py index e953a839..76db40e3 100644 --- a/src/core/tests/Test_ConfigurePatchingProcessor.py +++ b/src/core/tests/Test_ConfigurePatchingProcessor.py @@ -17,6 +17,9 @@ import os import re import unittest +import sys +from io import StringIO + from core.src.CoreMain import CoreMain from core.src.bootstrap.Constants import Constants from core.tests.library.ArgumentComposer import ArgumentComposer @@ -42,6 +45,8 @@ def mock_package_manager_get_current_auto_os_patch_state_returns_unknown(self): return Constants.AutomaticOSPatchStates.DISABLED else: return Constants.AutomaticOSPatchStates.UNKNOWN + def mock_get_current_auto_os_patch_state(self): + raise Exception("Mocked Exception") #endregion Mocks def test_operation_success_for_configure_patching_request_for_apt_with_default_updates_config(self): @@ -309,6 +314,63 @@ def test_configure_patching_with_patch_mode_and_assessment_mode_by_platform(self # stop test runtime runtime.stop() + def test_configure_patching_raise_exception_auto_os_patch_state(self): + # arrange capture std IO + captured_output = StringIO() + sys.stdout = captured_output + + argument_composer = ArgumentComposer() + argument_composer.operation = Constants.CONFIGURE_PATCHING + argument_composer.patch_mode = Constants.PatchModes.AUTOMATIC_BY_PLATFORM + argument_composer.assessment_mode = Constants.AssessmentModes.AUTOMATIC_BY_PLATFORM + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + runtime.package_manager.get_current_auto_os_patch_state = runtime.backup_get_current_auto_os_patch_state + runtime.set_legacy_test_type('HappyPath') + + # mock swap + backup_package_manager_get_current_auto_os_patch_state = runtime.package_manager.get_current_auto_os_patch_state + runtime.package_manager.get_current_auto_os_patch_state = self.mock_get_current_auto_os_patch_state + + runtime.configure_patching_processor.start_configure_patching() + + # restore sdt.out ouptput + sys.stdout = sys.__stdout__ + + # assert + output = captured_output.getvalue() + self.assertIn("Error while processing patch mode configuration", output) + + # check status file + with runtime.env_layer.file_system.open(runtime.execution_config.status_file_path, 'r') as file_handle: + substatus_file_data = json.load(file_handle)[0]["status"]["substatus"] + self.assertEqual(len(substatus_file_data), 1) + self.assertTrue(substatus_file_data[0]["name"] == Constants.CONFIGURE_PATCHING_SUMMARY) + self.assertTrue(substatus_file_data[0]["status"].lower() == Constants.STATUS_TRANSITIONING.lower()) + + # restore + runtime.package_manager.get_current_auto_os_patch_state = backup_package_manager_get_current_auto_os_patch_state + + runtime.stop() + + def test_configure_patching_raise_exception_auto_assessment_systemd(self): + argument_composer = ArgumentComposer() + argument_composer.operation = Constants.CONFIGURE_PATCHING + argument_composer.patch_mode = Constants.PatchModes.AUTOMATIC_BY_PLATFORM + argument_composer.assessment_mode = Constants.AssessmentModes.AUTOMATIC_BY_PLATFORM + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT) + runtime.set_legacy_test_type('HappyPath') + + # mock swap + back_up_auto_assess_service_manager = runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists + runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists = lambda: False + + self.assertRaises(Exception, runtime.configure_patching_processor.start_configure_patching()) + + # restore + runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists = back_up_auto_assess_service_manager + + runtime.stop() + def __check_telemetry_events(self, runtime): all_events = os.listdir(runtime.telemetry_writer.events_folder_path) self.assertTrue(len(all_events) > 0) diff --git a/src/core/tests/Test_MaintenanceWindow.py b/src/core/tests/Test_MaintenanceWindow.py index ae7c6c84..e9e0fb29 100644 --- a/src/core/tests/Test_MaintenanceWindow.py +++ b/src/core/tests/Test_MaintenanceWindow.py @@ -15,6 +15,8 @@ # Requires Python 2.7+ import datetime +from io import StringIO +import sys import unittest from core.tests.library.ArgumentComposer import ArgumentComposer from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -62,6 +64,43 @@ def test_RemainingTime_after_duration_complete(self): self.assertEqual(int(remaining_time), 0) runtime.stop() + def test_RemainingTime_log_to_stdout_true(self): + # Arrange, Capture stdout + captured_output = StringIO() + sys.stdout = captured_output # Redirect stdout to the StringIO object + + argument_composer = ArgumentComposer() + argument_composer.start_time = "2017-02-15T18:15:12.9828835Z" + argument_composer.maximum_duration = "PT1H" + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True) + + current_time = datetime.datetime.strptime('2017-02-15 18:30:20', "%Y-%m-%d %H:%M:%S") + remaining_time = runtime.maintenance_window.get_remaining_time_in_minutes(current_time, log_to_stdout=True) + + # Restore stdout + sys.stdout = sys.__stdout__ + + # Assert + output = captured_output.getvalue() + self.assertEqual(int(remaining_time), 44) + self.assertIn("Maintenance Window Utilization:", output) # Verify the log output contains the expected text + + runtime.stop() + + def test_RemainingTime_raise_exception(self): + # Arrange + argument_composer = ArgumentComposer() + argument_composer.start_time = "Invalid datetime format" + argument_composer.maximum_duration = "PT1H" + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True) + + # Assert + with self.assertRaises(ValueError) as context: + runtime.maintenance_window.get_remaining_time_in_minutes() + self.assertIn("Invalid datetime format", str(context.exception)) + + runtime.stop() + def test_check_available_time(self): argument_composer = ArgumentComposer() argument_composer.start_time = (datetime.datetime.utcnow() - datetime.timedelta(minutes=39)).strftime("%Y-%m-%dT%H:%M:%S.9999Z") diff --git a/src/core/tests/Test_StatusHandlerTruncation.py b/src/core/tests/Test_StatusHandlerTruncation.py index 73b44f90..ba2ff8e8 100644 --- a/src/core/tests/Test_StatusHandlerTruncation.py +++ b/src/core/tests/Test_StatusHandlerTruncation.py @@ -937,9 +937,9 @@ def __set_up_patches_func(self, val, random_char=None): test_patches_list.append('python-samba' + str(i)) if random_char is not None: - test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu€' + random_char) + test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu\u20ac' + random_char) else: - test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu€') + test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu\u20ac') return test_patches_list, test_patches_version_list diff --git a/src/core/tests/Test_TelemetryWriter.py b/src/core/tests/Test_TelemetryWriter.py index eb716f21..96b60aee 100644 --- a/src/core/tests/Test_TelemetryWriter.py +++ b/src/core/tests/Test_TelemetryWriter.py @@ -102,9 +102,11 @@ def test_write_event_msg_size_limit(self): f.close() def test_write_event_msg_size_limit_char_more_than_1_bytes(self): - """ Perform 1 byte truncation on char that is more than 1 byte, use decode('utf-8', errors='replace') to replace bad unicode with a good 1 byte char (�) """ + """ Perform 1 byte truncation on char that is more than 1 byte, use decode('utf-8', errors='replace') to replace bad unicode with a good 1 byte char (\uFFFD) """ - message = "a€bc"*3074 # €(\xe2\x82\xac) is 3 bytes char can be written in windows console w/o encoding + message = "a\u20acbc" * 3074 # (\xe2\x82\xac) is 3 bytes char can be written in windows console w/o encoding + min_msg_limit_in_bytes = Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS - Constants.TELEMETRY_BUFFER_FOR_DROPPED_COUNT_MSG_IN_CHARS - Constants.TELEMETRY_EVENT_COUNTER_MSG_SIZE_LIMIT_IN_CHARS + max_msg_limit_in_bytes = Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS self.runtime.telemetry_writer.write_event(message, Constants.TelemetryEventLevel.Error, "Test Task") latest_event_file = [pos_json for pos_json in os.listdir(self.runtime.telemetry_writer.events_folder_path) if re.search('^[0-9]+.json$', pos_json)][-1] with open(os.path.join(self.runtime.telemetry_writer.events_folder_path, latest_event_file), 'r+') as f: @@ -112,9 +114,10 @@ def test_write_event_msg_size_limit_char_more_than_1_bytes(self): self.assertTrue(events is not None) self.assertEqual(events[-1]["TaskName"], "Test Task") self.assertTrue(len(events[-1]["Message"]) < len(message.encode('utf-8'))) - chars_dropped = len(message.encode('utf-8')) - Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS + Constants.TELEMETRY_BUFFER_FOR_DROPPED_COUNT_MSG_IN_CHARS + Constants.TELEMETRY_EVENT_COUNTER_MSG_SIZE_LIMIT_IN_CHARS - self.assertTrue("a€bc" in events[-1]["Message"]) - self.assertTrue("a€bc" * (len(message) + 1 - chars_dropped) + ". [{0} chars dropped]".format(chars_dropped) in events[-1]["Message"]) # len(message) + 1 due to bad unicode will be replaced by � + self.assertTrue("a\u20acbc" in events[-1]["Message"]) + self.assertTrue(len(events[-1]["Message"].encode('utf-8')) > min_msg_limit_in_bytes) + self.assertTrue(len(events[-1]["Message"].encode('utf-8')) < max_msg_limit_in_bytes) + f.close() # TODO: The following 3 tests cause widespread test suite failures (on master), so leaving it out. And tracking in: Task 10912099: [Bug] Bug in telemetry writer - overwriting prior events in fast execution diff --git a/src/core/tests/Test_YumPackageManager.py b/src/core/tests/Test_YumPackageManager.py index e8c2c13f..db9be641 100644 --- a/src/core/tests/Test_YumPackageManager.py +++ b/src/core/tests/Test_YumPackageManager.py @@ -18,6 +18,7 @@ import unittest from core.src.bootstrap.Constants import Constants from core.tests.library.ArgumentComposer import ArgumentComposer +from core.tests.library.LegacyEnvLayerExtensions import LegacyEnvLayerExtensions from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -35,6 +36,12 @@ def mock_do_processes_require_restart(self): def mock_write_with_retry_raise_exception(self, file_path_or_handle, data, mode='a+'): raise Exception + + def mock_linux7_distribution_to_return_redhat(self): + return ['Red Hat Enterprise Linux Server', '7', 'Maipo'] + + def mock_linux8_distribution_to_return_redhat(self): + return ['Red Hat Enterprise Linux Server', '8', 'Ootpa'] #endregion Mocks def mock_do_processes_require_restart_raise_exception(self): @@ -619,5 +626,55 @@ def test_obsolete_packages_should_not_considered_in_available_updates(self): self.assertTrue(available_updates[0] == "grub2-tools.x86_64") self.assertTrue(package_versions[0] == "1:2.02-142.el8") + def test_rhel7_image_with_security_plugin(self): + """Unit test for yum package manager rhel images below 8 and Classification = Security""" + # mock linux_distribution + backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux7_distribution_to_return_redhat + + self.__assert_test_rhel8_image() + + # restore linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + + def test_rhel8_image_higher_no_security_plugin(self): + """Unit test for yum package manager rhel images >= 8 and Classification = Security""" + # mock linux_distribution + backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux8_distribution_to_return_redhat + + self.__assert_test_rhel8_image() + + # restore linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + + def __assert_test_rhel8_image(self): + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertIsNotNone(package_manager) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.SECURITY] + argument_composer.patches_to_include = ["ssh", "tcpdump"] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.YUM) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertIsNotNone(package_filter) + + available_updates, package_versions = package_manager.get_available_updates(package_filter) + + # test for get_available_updates + self.assertIsNotNone(available_updates) + self.assertIsNotNone(package_versions) + self.assertEqual(len(available_updates), 2) + self.assertEqual(len(package_versions), 2) + self.assertEqual(available_updates[0], "libgcc.i686") + self.assertEqual(package_versions[0], "4.8.5-28.el7") + self.assertEqual(available_updates[1], "tcpdump.x86_64") + self.assertEqual(package_versions[1], "14:4.9.2-3.el7") + if __name__ == '__main__': unittest.main()