Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhim-um authored Oct 14, 2024
2 parents 0f2963f + 82f45bb commit 7b8bafd
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 8 deletions.
16 changes: 15 additions & 1 deletion src/core/src/package_managers/YumPackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def get_all_updates(self, cached=False):
def get_security_updates(self):
"""Get missing security updates"""
self.composite_logger.log("\nDiscovering 'security' packages...")
self.install_yum_security_prerequisite()

if not self.__is_image_rhel8_or_higher():
self.install_yum_security_prerequisite()

out = self.invoke_package_manager(self.yum_check_security)
security_packages, security_package_versions = self.extract_packages_and_versions(out)

Expand Down Expand Up @@ -176,6 +179,17 @@ def get_other_updates(self):
self.composite_logger.log("Discovered " + str(len(other_packages)) + " 'other' package entries.")
return other_packages, other_package_versions

def __is_image_rhel8_or_higher(self):
""" Check if image is RHEL8+ return true else false """
if self.env_layer.platform.linux_distribution() is not None:
os_offer, os_version, os_code = self.env_layer.platform.linux_distribution()

if "Red Hat Enterprise Linux" in os_offer and int(os_version.split('.')[0]) >= 8:
self.composite_logger.log_debug("Verify RHEL image version: " + str(os_version))
return True

return False

def set_max_patch_publish_date(self, max_patch_publish_date=str()):
pass

Expand Down
62 changes: 62 additions & 0 deletions src/core/tests/Test_ConfigurePatchingProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import os
import re
import unittest
import sys
from io import StringIO

from core.src.CoreMain import CoreMain
from core.src.bootstrap.Constants import Constants
from core.tests.library.ArgumentComposer import ArgumentComposer
Expand All @@ -42,6 +45,8 @@ def mock_package_manager_get_current_auto_os_patch_state_returns_unknown(self):
return Constants.AutomaticOSPatchStates.DISABLED
else:
return Constants.AutomaticOSPatchStates.UNKNOWN
def mock_get_current_auto_os_patch_state(self):
raise Exception("Mocked Exception")
#endregion Mocks

def test_operation_success_for_configure_patching_request_for_apt_with_default_updates_config(self):
Expand Down Expand Up @@ -309,6 +314,63 @@ def test_configure_patching_with_patch_mode_and_assessment_mode_by_platform(self
# stop test runtime
runtime.stop()

def test_configure_patching_raise_exception_auto_os_patch_state(self):
# arrange capture std IO
captured_output = StringIO()
sys.stdout = captured_output

argument_composer = ArgumentComposer()
argument_composer.operation = Constants.CONFIGURE_PATCHING
argument_composer.patch_mode = Constants.PatchModes.AUTOMATIC_BY_PLATFORM
argument_composer.assessment_mode = Constants.AssessmentModes.AUTOMATIC_BY_PLATFORM
runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
runtime.package_manager.get_current_auto_os_patch_state = runtime.backup_get_current_auto_os_patch_state
runtime.set_legacy_test_type('HappyPath')

# mock swap
backup_package_manager_get_current_auto_os_patch_state = runtime.package_manager.get_current_auto_os_patch_state
runtime.package_manager.get_current_auto_os_patch_state = self.mock_get_current_auto_os_patch_state

runtime.configure_patching_processor.start_configure_patching()

# restore sdt.out ouptput
sys.stdout = sys.__stdout__

# assert
output = captured_output.getvalue()
self.assertIn("Error while processing patch mode configuration", output)

# check status file
with runtime.env_layer.file_system.open(runtime.execution_config.status_file_path, 'r') as file_handle:
substatus_file_data = json.load(file_handle)[0]["status"]["substatus"]
self.assertEqual(len(substatus_file_data), 1)
self.assertTrue(substatus_file_data[0]["name"] == Constants.CONFIGURE_PATCHING_SUMMARY)
self.assertTrue(substatus_file_data[0]["status"].lower() == Constants.STATUS_TRANSITIONING.lower())

# restore
runtime.package_manager.get_current_auto_os_patch_state = backup_package_manager_get_current_auto_os_patch_state

runtime.stop()

def test_configure_patching_raise_exception_auto_assessment_systemd(self):
argument_composer = ArgumentComposer()
argument_composer.operation = Constants.CONFIGURE_PATCHING
argument_composer.patch_mode = Constants.PatchModes.AUTOMATIC_BY_PLATFORM
argument_composer.assessment_mode = Constants.AssessmentModes.AUTOMATIC_BY_PLATFORM
runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.APT)
runtime.set_legacy_test_type('HappyPath')

# mock swap
back_up_auto_assess_service_manager = runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists
runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists = lambda: False

self.assertRaises(Exception, runtime.configure_patching_processor.start_configure_patching())

# restore
runtime.configure_patching_processor.auto_assess_service_manager.systemd_exists = back_up_auto_assess_service_manager

runtime.stop()

def __check_telemetry_events(self, runtime):
all_events = os.listdir(runtime.telemetry_writer.events_folder_path)
self.assertTrue(len(all_events) > 0)
Expand Down
39 changes: 39 additions & 0 deletions src/core/tests/Test_MaintenanceWindow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# Requires Python 2.7+

import datetime
from io import StringIO
import sys
import unittest
from core.tests.library.ArgumentComposer import ArgumentComposer
from core.tests.library.RuntimeCompositor import RuntimeCompositor
Expand Down Expand Up @@ -62,6 +64,43 @@ def test_RemainingTime_after_duration_complete(self):
self.assertEqual(int(remaining_time), 0)
runtime.stop()

def test_RemainingTime_log_to_stdout_true(self):
# Arrange, Capture stdout
captured_output = StringIO()
sys.stdout = captured_output # Redirect stdout to the StringIO object

argument_composer = ArgumentComposer()
argument_composer.start_time = "2017-02-15T18:15:12.9828835Z"
argument_composer.maximum_duration = "PT1H"
runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True)

current_time = datetime.datetime.strptime('2017-02-15 18:30:20', "%Y-%m-%d %H:%M:%S")
remaining_time = runtime.maintenance_window.get_remaining_time_in_minutes(current_time, log_to_stdout=True)

# Restore stdout
sys.stdout = sys.__stdout__

# Assert
output = captured_output.getvalue()
self.assertEqual(int(remaining_time), 44)
self.assertIn("Maintenance Window Utilization:", output) # Verify the log output contains the expected text

runtime.stop()

def test_RemainingTime_raise_exception(self):
# Arrange
argument_composer = ArgumentComposer()
argument_composer.start_time = "Invalid datetime format"
argument_composer.maximum_duration = "PT1H"
runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True)

# Assert
with self.assertRaises(ValueError) as context:
runtime.maintenance_window.get_remaining_time_in_minutes()
self.assertIn("Invalid datetime format", str(context.exception))

runtime.stop()

def test_check_available_time(self):
argument_composer = ArgumentComposer()
argument_composer.start_time = (datetime.datetime.utcnow() - datetime.timedelta(minutes=39)).strftime("%Y-%m-%dT%H:%M:%S.9999Z")
Expand Down
4 changes: 2 additions & 2 deletions src/core/tests/Test_StatusHandlerTruncation.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,9 @@ def __set_up_patches_func(self, val, random_char=None):
test_patches_list.append('python-samba' + str(i))

if random_char is not None:
test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu' + random_char)
test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu\u20ac' + random_char)
else:
test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu')
test_patches_version_list.append('2:4.4.5+dfsg-2ubuntu\u20ac')

return test_patches_list, test_patches_version_list

Expand Down
13 changes: 8 additions & 5 deletions src/core/tests/Test_TelemetryWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,22 @@ def test_write_event_msg_size_limit(self):
f.close()

def test_write_event_msg_size_limit_char_more_than_1_bytes(self):
""" Perform 1 byte truncation on char that is more than 1 byte, use decode('utf-8', errors='replace') to replace bad unicode with a good 1 byte char () """
""" Perform 1 byte truncation on char that is more than 1 byte, use decode('utf-8', errors='replace') to replace bad unicode with a good 1 byte char (\uFFFD) """

message = "a€bc"*3074 # €(\xe2\x82\xac) is 3 bytes char can be written in windows console w/o encoding
message = "a\u20acbc" * 3074 # (\xe2\x82\xac) is 3 bytes char can be written in windows console w/o encoding
min_msg_limit_in_bytes = Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS - Constants.TELEMETRY_BUFFER_FOR_DROPPED_COUNT_MSG_IN_CHARS - Constants.TELEMETRY_EVENT_COUNTER_MSG_SIZE_LIMIT_IN_CHARS
max_msg_limit_in_bytes = Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS
self.runtime.telemetry_writer.write_event(message, Constants.TelemetryEventLevel.Error, "Test Task")
latest_event_file = [pos_json for pos_json in os.listdir(self.runtime.telemetry_writer.events_folder_path) if re.search('^[0-9]+.json$', pos_json)][-1]
with open(os.path.join(self.runtime.telemetry_writer.events_folder_path, latest_event_file), 'r+') as f:
events = json.load(f)
self.assertTrue(events is not None)
self.assertEqual(events[-1]["TaskName"], "Test Task")
self.assertTrue(len(events[-1]["Message"]) < len(message.encode('utf-8')))
chars_dropped = len(message.encode('utf-8')) - Constants.TELEMETRY_MSG_SIZE_LIMIT_IN_CHARS + Constants.TELEMETRY_BUFFER_FOR_DROPPED_COUNT_MSG_IN_CHARS + Constants.TELEMETRY_EVENT_COUNTER_MSG_SIZE_LIMIT_IN_CHARS
self.assertTrue("a€bc" in events[-1]["Message"])
self.assertTrue("a€bc" * (len(message) + 1 - chars_dropped) + ". [{0} chars dropped]".format(chars_dropped) in events[-1]["Message"]) # len(message) + 1 due to bad unicode will be replaced by �
self.assertTrue("a\u20acbc" in events[-1]["Message"])
self.assertTrue(len(events[-1]["Message"].encode('utf-8')) > min_msg_limit_in_bytes)
self.assertTrue(len(events[-1]["Message"].encode('utf-8')) < max_msg_limit_in_bytes)

f.close()

# TODO: The following 3 tests cause widespread test suite failures (on master), so leaving it out. And tracking in: Task 10912099: [Bug] Bug in telemetry writer - overwriting prior events in fast execution
Expand Down
57 changes: 57 additions & 0 deletions src/core/tests/Test_YumPackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import unittest
from core.src.bootstrap.Constants import Constants
from core.tests.library.ArgumentComposer import ArgumentComposer
from core.tests.library.LegacyEnvLayerExtensions import LegacyEnvLayerExtensions
from core.tests.library.RuntimeCompositor import RuntimeCompositor


Expand All @@ -35,6 +36,12 @@ def mock_do_processes_require_restart(self):

def mock_write_with_retry_raise_exception(self, file_path_or_handle, data, mode='a+'):
raise Exception

def mock_linux7_distribution_to_return_redhat(self):
return ['Red Hat Enterprise Linux Server', '7', 'Maipo']

def mock_linux8_distribution_to_return_redhat(self):
return ['Red Hat Enterprise Linux Server', '8', 'Ootpa']
#endregion Mocks

def mock_do_processes_require_restart_raise_exception(self):
Expand Down Expand Up @@ -619,5 +626,55 @@ def test_obsolete_packages_should_not_considered_in_available_updates(self):
self.assertTrue(available_updates[0] == "grub2-tools.x86_64")
self.assertTrue(package_versions[0] == "1:2.02-142.el8")

def test_rhel7_image_with_security_plugin(self):
"""Unit test for yum package manager rhel images below 8 and Classification = Security"""
# mock linux_distribution
backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution
LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux7_distribution_to_return_redhat

self.__assert_test_rhel8_image()

# restore linux_distribution
LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution

def test_rhel8_image_higher_no_security_plugin(self):
"""Unit test for yum package manager rhel images >= 8 and Classification = Security"""
# mock linux_distribution
backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution
LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux8_distribution_to_return_redhat

self.__assert_test_rhel8_image()

# restore linux_distribution
LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution

def __assert_test_rhel8_image(self):
self.runtime.set_legacy_test_type('HappyPath')
package_manager = self.container.get('package_manager')
self.assertIsNotNone(package_manager)
self.runtime.stop()

argument_composer = ArgumentComposer()
argument_composer.classifications_to_include = [Constants.PackageClassification.SECURITY]
argument_composer.patches_to_include = ["ssh", "tcpdump"]
argument_composer.patches_to_exclude = ["ssh*", "test"]
self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.YUM)
self.container = self.runtime.container

package_filter = self.container.get('package_filter')
self.assertIsNotNone(package_filter)

available_updates, package_versions = package_manager.get_available_updates(package_filter)

# test for get_available_updates
self.assertIsNotNone(available_updates)
self.assertIsNotNone(package_versions)
self.assertEqual(len(available_updates), 2)
self.assertEqual(len(package_versions), 2)
self.assertEqual(available_updates[0], "libgcc.i686")
self.assertEqual(package_versions[0], "4.8.5-28.el7")
self.assertEqual(available_updates[1], "tcpdump.x86_64")
self.assertEqual(package_versions[1], "14:4.9.2-3.el7")

if __name__ == '__main__':
unittest.main()

0 comments on commit 7b8bafd

Please sign in to comment.