From 04b74b758b50c2ce33402389bfba698a16c64cdc Mon Sep 17 00:00:00 2001 From: mcw-work <111986660+mcw-work@users.noreply.github.com> Date: Tue, 5 Dec 2023 17:26:07 +0000 Subject: [PATCH] use dbus in shutdown manager instead of subprocess (#195) --- landscape/client/manager/shutdownmanager.py | 232 ++++++------------ .../manager/tests/test_shutdownmanager.py | 194 ++------------- landscape/client/package/changer.py | 31 +-- .../client/package/tests/test_changer.py | 145 +---------- snap/snapcraft.yaml | 1 + 5 files changed, 119 insertions(+), 484 deletions(-) diff --git a/landscape/client/manager/shutdownmanager.py b/landscape/client/manager/shutdownmanager.py index eba2be77d..4be7c9717 100644 --- a/landscape/client/manager/shutdownmanager.py +++ b/landscape/client/manager/shutdownmanager.py @@ -1,94 +1,102 @@ import logging -from twisted.internet.defer import Deferred -from twisted.internet.error import ProcessDone -from twisted.internet.protocol import ProcessProtocol +import dbus +from twisted.internet import reactor +from twisted.internet import task from landscape.client.manager.plugin import FAILED from landscape.client.manager.plugin import ManagerPlugin from landscape.client.manager.plugin import SUCCEEDED -class ShutdownFailedError(Exception): - """Raised when a call to C{/sbin/shutdown} fails. - - @ivar data: The data that the process printed before failing. +class ShutdownManager(ManagerPlugin): """ + Plugin that either shuts down or reboots the device. - def __init__(self, data): - self.data = data + In both cases, the manager sends the success command + before attempting the shutdown/reboot. + With reboot - the call is instanteous but the success + message will be send as soon as the device comes back up. + For shutdown there is a 120 second delay between + sending the success and firing the shutdown. + This is usually sufficent. + """ -class ShutdownManager(ManagerPlugin): - def __init__(self, process_factory=None): - if process_factory is None: - from twisted.internet import reactor as process_factory - self._process_factory = process_factory + def __init__(self, dbus=dbus, shutdown_delay=120): + self.dbus_sysbus = dbus.SystemBus() + self.shutdown_delay = shutdown_delay def register(self, registry): - """Add this plugin to C{registry}. - - The shutdown manager handles C{shutdown} activity messages broadcast - from the server. - """ super().register(registry) - registry.register_message("shutdown", self.perform_shutdown) + self.config = registry.config - def perform_shutdown(self, message): - """Request a system restart or shutdown. + registry.register_message("shutdown", self._handle_shutdown) - If the call to C{/sbin/shutdown} runs without errors the activity - specified in the message will be responded as succeeded. Otherwise, - it will be responded as failed. + def _handle_shutdown(self, message, DBus_System_Bus=None): + """ + Choose shutdown or reboot """ operation_id = message["operation-id"] reboot = message["reboot"] - protocol = ShutdownProcessProtocol() - protocol.set_timeout(self.registry.reactor) - protocol.result.addCallback(self._respond_success, operation_id) - protocol.result.addErrback(self._respond_failure, operation_id, reboot) - command, args = self._get_command_and_args(protocol, reboot) - self._process_factory.spawnProcess(protocol, command, args=args) - - def _respond_success(self, data, operation_id): - logging.info("Shutdown request succeeded.") + + if reboot: + logging.info("Reboot Requested") + deferred = self._respond_reboot_success( + "Reboot requested of the system", + operation_id, + ) + return deferred + else: + logging.info("Shutdown Requested") + deferred = self._respond_shutdown_success( + "Shutdown requested of the system", + operation_id, + ) + return deferred + + def _Reboot(self, _, Dbus_System_bus=None): + logging.info("Sending Reboot Command") + + bus_object = self.dbus_sysbus.get_object( + "org.freedesktop.login1", + "/org/freedesktop/login1", + ) + bus_object.Reboot( + True, + dbus_interface="org.freedesktop.login1.Manager", + ) + + def _Shutdown(self): + logging.info("Sending Shutdown Command") + bus_object = self.dbus_sysbus.get_object( + "org.freedesktop.login1", + "/org/freedesktop/login1", + ) + bus_object.PowerOff( + True, + dbus_interface="org.freedesktop.login1.Manager", + ) + + def _respond_reboot_success(self, data, operation_id): deferred = self._respond(SUCCEEDED, data, operation_id) - # After sending the result to the server, stop accepting messages and - # wait for the reboot/shutdown. - deferred.addCallback(lambda _: self.registry.broker.stop_exchanger()) + deferred.addCallback(self._Reboot) + deferred.addErrback(self._respond_fail) return deferred - def _respond_failure(self, failure, operation_id, reboot): - logging.info("Shutdown request failed.") - failure_report = "\n".join( - [ - failure.value.data, - "", - "Attempting to force {operation}. Please note that if this " - "succeeds, Landscape will have no way of knowing and will " - "still mark this activity as having failed. It is recommended " - "you check the state of the machine manually to determine " - "whether {operation} succeeded.".format( - operation="reboot" if reboot else "shutdown", - ), - ], - ) - deferred = self._respond(FAILED, failure_report, operation_id) - # Add another callback spawning the poweroff or reboot command (which - # seem more reliable in aberrant situations like a post-trusty release - # upgrade where upstart has been replaced with systemd). If this - # succeeds, we won't have any opportunity to report it and if it fails - # we'll already have responded indicating we're attempting to force - # the operation so either way there's no sense capturing output - protocol = ProcessProtocol() - command, args = self._get_command_and_args(protocol, reboot, True) - deferred.addCallback( - lambda _: self._process_factory.spawnProcess( - protocol, - command, - args=args, - ), + def _respond_shutdown_success(self, data, operation_id): + deferred = self._respond(SUCCEEDED, data, operation_id) + self.shutdown_deferred = task.deferLater( + reactor, + self.shutdown_delay, + self._Shutdown, ) + deferred.addErrback(self._respond_fail) + return deferred + + def _respond_fail(self, data, operation_id): + logging.info("Shutdown/Reboot request failed.") + deferred = self._respond(FAILED, data, operation_id) return deferred def _respond(self, status, data, operation_id): @@ -103,93 +111,3 @@ def _respond(self, status, data, operation_id): self._session_id, True, ) - - def _get_command_and_args(self, protocol, reboot, force=False): - """ - Returns a C{command, args} 2-tuple suitable for use with - L{IReactorProcess.spawnProcess}. - """ - minutes = None if force else f"+{protocol.delay//60:d}" - args = { - (False, False): [ - "/sbin/shutdown", - "-h", - minutes, - "Landscape is shutting down the system", - ], - (False, True): [ - "/sbin/shutdown", - "-r", - minutes, - "Landscape is rebooting the system", - ], - (True, False): ["/sbin/poweroff"], - (True, True): ["/sbin/reboot"], - }[force, reboot] - return args[0], args - - -class ShutdownProcessProtocol(ProcessProtocol): - """A ProcessProtocol for calling C{/sbin/shutdown}. - - C{shutdown} doesn't return immediately when a time specification is - provided. Failures are reported immediately after it starts and return a - non-zero exit code. The process protocol calls C{shutdown} and waits for - failures for C{timeout} seconds. If no failures are reported it fires - C{result}'s callback with whatever output was received from the process. - If failures are reported C{result}'s errback is fired. - - @ivar result: A L{Deferred} fired when C{shutdown} fails or - succeeds. - @ivar reboot: A flag indicating whether a shutdown or reboot should be - performed. Default is C{False}. - @ivar delay: The time in seconds from now to schedule the shutdown. - Default is 240 seconds. The time will be converted to minutes using - integer division when passed to C{shutdown}. - """ - - def __init__(self, reboot=False, delay=240): - self.result = Deferred() - self.reboot = reboot - self.delay = delay - self._data = [] - self._waiting = True - - def get_data(self): - """Get the data printed by the subprocess.""" - return b"".join(self._data).decode("utf-8", "replace") - - def set_timeout(self, reactor, timeout=10): - """ - Set the error checking timeout, after which C{result}'s callback will - be fired. - """ - reactor.call_later(timeout, self._succeed) - - def childDataReceived(self, fd, data): # noqa: N802 - """Some data was received from the child. - - Add it to our buffer to pass to C{result} when it's fired. - """ - if self._waiting: - self._data.append(data) - - def processEnded(self, reason): # noqa: N802 - """Fire back the C{result} L{Deferred}. - - C{result}'s callback will be fired with the string of data received - from the subprocess, or if the subprocess failed C{result}'s errback - will be fired with the string of data received from the subprocess. - """ - if self._waiting: - if reason.check(ProcessDone): - self._succeed() - else: - self.result.errback(ShutdownFailedError(self.get_data())) - self._waiting = False - - def _succeed(self): - """Fire C{result}'s callback with data accumulated from the process.""" - if self._waiting: - self.result.callback(self.get_data()) - self._waiting = False diff --git a/landscape/client/manager/tests/test_shutdownmanager.py b/landscape/client/manager/tests/test_shutdownmanager.py index 4d2a28e57..796710720 100644 --- a/landscape/client/manager/tests/test_shutdownmanager.py +++ b/landscape/client/manager/tests/test_shutdownmanager.py @@ -1,15 +1,10 @@ -from twisted.internet.error import ProcessDone -from twisted.internet.error import ProcessTerminated -from twisted.internet.protocol import ProcessProtocol -from twisted.python.failure import Failure +from unittest.mock import Mock + +import twisted.internet.defer -from landscape.client.manager.plugin import FAILED -from landscape.client.manager.plugin import SUCCEEDED from landscape.client.manager.shutdownmanager import ShutdownManager -from landscape.client.manager.shutdownmanager import ShutdownProcessProtocol from landscape.client.tests.helpers import LandscapeTest from landscape.client.tests.helpers import ManagerHelper -from landscape.lib.testing import StubProcessFactory class ShutdownManagerTest(LandscapeTest): @@ -18,179 +13,40 @@ class ShutdownManagerTest(LandscapeTest): def setUp(self): super().setUp() + self.broker_service.message_store.set_accepted_types( ["shutdown", "operation-result"], ) self.broker_service.pinger.start() - self.process_factory = StubProcessFactory() - self.plugin = ShutdownManager(process_factory=self.process_factory) + + self.dbus_mock = Mock() + self.dbus_sysbus_mock = Mock() + self.dbus_mock.get_object.return_value = self.dbus_sysbus_mock + self.plugin = ShutdownManager(dbus=self.dbus_mock, shutdown_delay=0) self.manager.add(self.plugin) - def test_restart(self): - """ - C{shutdown} processes run until the shutdown is to be performed. The - L{ShutdownProcessProtocol} watches a process for errors, for 10 - seconds by default, and if none occur the activity is marked as - L{SUCCEEDED}. Data printed by the process is included in the - activity's result text. - """ + def test_reboot(self): message = {"type": "shutdown", "reboot": True, "operation-id": 100} - self.plugin.perform_shutdown(message) - [arguments] = self.process_factory.spawns - protocol = arguments[0] - self.assertTrue(isinstance(protocol, ShutdownProcessProtocol)) - self.assertEqual( - arguments[1:3], - ( - "/sbin/shutdown", - [ - "/sbin/shutdown", - "-r", - "+4", - "Landscape is rebooting the system", - ], - ), - ) + deferred = self.plugin._handle_shutdown(message) - def restart_performed(ignore): - self.assertTrue(self.broker_service.exchanger.is_urgent()) - self.assertEqual( - self.broker_service.message_store.get_pending_messages(), - [ - { - "type": "operation-result", - "api": b"3.2", - "operation-id": 100, - "timestamp": 10, - "status": SUCCEEDED, - "result-text": "Data may arrive in batches.", - }, - ], - ) + def check(_): + self.plugin.dbus_sysbus.get_object.assert_called_once() + self.plugin.dbus_sysbus.get_object().Reboot.assert_called_once() - protocol.result.addCallback(restart_performed) - protocol.childDataReceived(0, b"Data may arrive ") - protocol.childDataReceived(0, b"in batches.") - # We need to advance both reactors to simulate that fact they - # are loosely in sync with each other - self.broker_service.reactor.advance(10) - self.manager.reactor.advance(10) - return protocol.result + deferred.addCallback(check) + return deferred def test_shutdown(self): - """ - C{shutdown} messages have a flag that indicates whether a reboot or - shutdown has been requested. The C{shutdown} command is called - appropriately. - """ message = {"type": "shutdown", "reboot": False, "operation-id": 100} - self.plugin.perform_shutdown(message) - [arguments] = self.process_factory.spawns - self.assertEqual( - arguments[1:3], - ( - "/sbin/shutdown", - [ - "/sbin/shutdown", - "-h", - "+4", - "Landscape is shutting down the system", - ], - ), - ) - - def test_restart_fails(self): - """ - If an error occurs before the error checking timeout the activity will - be failed. Data printed by the process prior to the failure is - included in the activity's result text. - """ - message = {"type": "shutdown", "reboot": True, "operation-id": 100} - self.plugin.perform_shutdown(message) - - def restart_failed(message_id): - self.assertTrue(self.broker_service.exchanger.is_urgent()) - messages = self.broker_service.message_store.get_pending_messages() - self.assertEqual(len(messages), 1) - message = messages[0] - self.assertEqual(message["type"], "operation-result") - self.assertEqual(message["api"], b"3.2") - self.assertEqual(message["operation-id"], 100) - self.assertEqual(message["timestamp"], 0) - self.assertEqual(message["status"], FAILED) - self.assertIn("Failure text is reported.", message["result-text"]) - - # Check that after failing, we attempt to force the shutdown by - # switching the binary called - [spawn1_args, spawn2_args] = self.process_factory.spawns - protocol = spawn2_args[0] - self.assertIsInstance(protocol, ProcessProtocol) - self.assertEqual( - spawn2_args[1:3], - ("/sbin/reboot", ["/sbin/reboot"]), - ) - - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.result.addCallback(restart_failed) - protocol.childDataReceived(0, b"Failure text is reported.") - protocol.processEnded(Failure(ProcessTerminated(exitCode=1))) - return protocol.result + deferred = self.plugin._handle_shutdown(message) - def test_process_ends_after_timeout(self): - """ - If the process ends after the error checking timeout has passed - C{result} will not be re-fired. - """ - message = {"type": "shutdown", "reboot": False, "operation-id": 100} - self.plugin.perform_shutdown(message) - - stash = [] - - def restart_performed(ignore): - self.assertEqual(stash, []) - stash.append(True) - - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.result.addCallback(restart_performed) - self.manager.reactor.advance(10) - protocol.processEnded(Failure(ProcessTerminated(exitCode=1))) - return protocol.result - - def test_process_data_is_not_collected_after_firing_result(self): - """ - Data printed in the sub-process is not collected after C{result} has - been fired. - """ - message = {"type": "shutdown", "reboot": False, "operation-id": 100} - self.plugin.perform_shutdown(message) - - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.childDataReceived(0, b"Data may arrive ") - protocol.childDataReceived(0, b"in batches.") - self.manager.reactor.advance(10) - self.assertEqual(protocol.get_data(), "Data may arrive in batches.") - protocol.childDataReceived(0, b"Even when you least expect it.") - self.assertEqual(protocol.get_data(), "Data may arrive in batches.") - - def test_restart_stops_exchanger(self): - """ - After a successful shutdown, the broker stops processing new messages. - """ - message = {"type": "shutdown", "reboot": False, "operation-id": 100} - self.plugin.perform_shutdown(message) + def check(_): + self.plugin.dbus_sysbus.get_object.assert_called_once() + self.plugin.dbus_sysbus.get_object().PowerOff.assert_called_once() - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.processEnded(Failure(ProcessDone(status=0))) - self.broker_service.reactor.advance(100) - self.manager.reactor.advance(100) + self.plugin.shutdown_deferred.addCallback(check) + return deferred - # New messages will not be exchanged after a reboot process is in - # process. - self.manager.broker.exchanger.schedule_exchange() - payloads = self.manager.broker.exchanger._transport.payloads - self.assertEqual(0, len(payloads)) - return protocol.result + def test_shutdown_failed(self): + deferred = self.plugin._respond_fail("", 100) + self.assertIsInstance(deferred, twisted.internet.defer.Deferred) diff --git a/landscape/client/package/changer.py b/landscape/client/package/changer.py index f3d78ee32..3bd0cdaa2 100644 --- a/landscape/client/package/changer.py +++ b/landscape/client/package/changer.py @@ -4,6 +4,7 @@ import pwd import time +import dbus from twisted.internet import reactor from twisted.internet.defer import maybeDeferred from twisted.internet.defer import succeed @@ -11,7 +12,6 @@ from landscape.client import GROUP from landscape.client import USER from landscape.client.manager.manager import FAILED -from landscape.client.manager.shutdownmanager import ShutdownProcessProtocol from landscape.client.monitor.rebootrequired import REBOOT_REQUIRED_FILENAME from landscape.client.package.reporter import find_reporter_command from landscape.client.package.taskhandler import PackageTaskError @@ -30,7 +30,6 @@ from landscape.lib import base64 from landscape.lib.config import get_bindir from landscape.lib.fs import create_binary_file -from landscape.lib.log import log_failure class UnknownPackageData(Exception): @@ -367,27 +366,19 @@ def handle_change_packages(self, message): def _reboot_later(self, result): self._landscape_reactor.call_later(5, self._run_reboot) - def _run_reboot(self): + def _run_reboot(self, bus=dbus.SystemBus()): """ - Create a C{ShutdownProcessProtocol} and return its result deferred. + Fire a dbus system shutdown """ - protocol = ShutdownProcessProtocol() - minutes = "now" - protocol.set_timeout(self._landscape_reactor) - protocol.result.addCallback(self._log_reboot, minutes) - protocol.result.addErrback(log_failure, "Reboot failed.") - args = [ - "/sbin/shutdown", - "-r", - minutes, - "Landscape is rebooting the system", - ] - self._process_factory.spawnProcess( - protocol, - "/sbin/shutdown", - args=args, + self.bus = bus + self.bus_object = self.bus.get_object( + "org.freedesktop.login1", + "/org/freedesktop/login1", + ) + self.bus_object.Reboot( + True, + dbus_interface="org.freedesktop.login1.Manager", ) - return protocol.result def _log_reboot(self, result, minutes): """Log the reboot.""" diff --git a/landscape/client/package/tests/test_changer.py b/landscape/client/package/tests/test_changer.py index be5d4c9c5..30de8a89c 100644 --- a/landscape/client/package/tests/test_changer.py +++ b/landscape/client/package/tests/test_changer.py @@ -6,12 +6,8 @@ from unittest.mock import patch from twisted.internet.defer import Deferred -from twisted.internet.error import ProcessDone -from twisted.internet.error import ProcessTerminated -from twisted.python.failure import Failure from landscape.client.manager.manager import FAILED -from landscape.client.manager.shutdownmanager import ShutdownFailedError from landscape.client.package.changer import ChangePackagesResult from landscape.client.package.changer import DEPENDENCY_ERROR_RESULT from landscape.client.package.changer import ERROR_RESULT @@ -1617,140 +1613,6 @@ def got_result(result): return result.addCallback(got_result) - def test_change_packages_with_reboot_flag(self): - """ - When a C{reboot-if-necessary} flag is passed in the C{change-packages}, - A C{ShutdownProtocolProcess} is created and the package result change - is returned. - """ - self.store.add_task( - "changer", - { - "type": "change-packages", - "install": [2], - "binaries": [(HASH2, 2, PKGDEB2)], - "operation-id": 123, - "reboot-if-necessary": True, - }, - ) - - def return_good_result(self): - return "Yeah, I did whatever you've asked for!" - - self.replace_perform_changes(return_good_result) - - result = self.changer.handle_tasks() - - def got_result(result): - self.assertIn( - "Landscape is rebooting the system", - self.logfile.getvalue(), - ) - self.assertMessages( - self.get_pending_messages(), - [ - { - "operation-id": 123, - "result-code": 1, - "result-text": "Yeah, I did whatever you've " - "asked for!", - "type": "change-packages-result", - }, - ], - ) - - self.landscape_reactor.advance(5) - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.processEnded(Failure(ProcessDone(status=0))) - self.broker_service.reactor.advance(100) - self.landscape_reactor.advance(10) - return result.addCallback(got_result) - - def test_change_packages_with_failed_reboot(self): - """ - When a C{reboot-if-necessary} flag is passed in the C{change-packages}, - A C{ShutdownProtocol} is created and the package result change is - returned, even if the reboot fails. - """ - self.store.add_task( - "changer", - { - "type": "change-packages", - "install": [2], - "binaries": [(HASH2, 2, PKGDEB2)], - "operation-id": 123, - "reboot-if-necessary": True, - }, - ) - - def return_good_result(self): - return "Yeah, I did whatever you've asked for!" - - self.replace_perform_changes(return_good_result) - - result = self.changer.handle_tasks() - - def got_result(result): - self.assertMessages( - self.get_pending_messages(), - [ - { - "operation-id": 123, - "result-code": 1, - "result-text": "Yeah, I did whatever you've " - "asked for!", - "type": "change-packages-result", - }, - ], - ) - self.log_helper.ignore_errors(ShutdownFailedError) - - self.landscape_reactor.advance(5) - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.processEnded(Failure(ProcessTerminated(exitCode=1))) - self.landscape_reactor.advance(10) - return result.addCallback(got_result) - - def test_no_exchange_after_reboot(self): - """ - After initiating a reboot process, no more messages are exchanged. - """ - self.store.add_task( - "changer", - { - "type": "change-packages", - "install": [2], - "binaries": [(HASH2, 2, PKGDEB2)], - "operation-id": 123, - "reboot-if-necessary": True, - }, - ) - - def return_good_result(self): - return "Yeah, I did whatever you've asked for!" - - self.replace_perform_changes(return_good_result) - - result = self.changer.handle_tasks() - - def got_result(result): - # Advance both reactors so the pending messages are exchanged. - self.broker_service.reactor.advance(100) - self.landscape_reactor.advance(10) - payloads = self.broker_service.exchanger._transport.payloads - self.assertEqual(0, len(payloads)) - - self.landscape_reactor.advance(5) - - [arguments] = self.process_factory.spawns - protocol = arguments[0] - protocol.processEnded(Failure(ProcessDone(status=0))) - self.broker_service.reactor.advance(100) - self.landscape_reactor.advance(10) - return result.addCallback(got_result) - def test_run_gets_session_id(self): """ Invoking L{PackageChanger.run} results in the session ID being fetched. @@ -1762,3 +1624,10 @@ def assert_session_id(ignored): self.changer._session_id = None result = self.changer.run() return result.addCallback(assert_session_id) + + def test_reboot_flag(self): + self.dbus_mock = Mock() + + self.changer._run_reboot(bus=self.dbus_mock) + self.changer.bus.get_object.assert_called_once() + self.changer.bus_object.Reboot.assert_called_once() diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index d56bdd02e..7e933712b 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -108,6 +108,7 @@ parts: - python3-netifaces - python3-pycurl - python3-twisted + - python3-dbus - ubuntu-advantage-tools override-prime: | craftctl default