From affc04aae58dafe46bbade54176e71e7a6ae7aa6 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Fri, 20 Dec 2024 12:22:43 +0100 Subject: [PATCH 1/9] device interface denied list: device denied as qube property --- Makefile | 3 ++ qubes/api/admin.py | 69 +++++++++++++++++++++++++++++ qubes/device_protocol.py | 14 +++--- qubes/devices.py | 2 - qubes/ext/admin.py | 41 ++--------------- qubes/tests/api_admin.py | 90 ++++++++++++++++++++++++++++++++++---- qubes/vm/qubesvm.py | 28 ++++++++++++ rpm_spec/core-dom0.spec.in | 3 ++ 8 files changed, 196 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index a52eb2e29..fe8014bd7 100644 --- a/Makefile +++ b/Makefile @@ -92,6 +92,9 @@ ADMIN_API_METHODS_SIMPLE = \ admin.vm.device.mic.Detach \ admin.vm.device.mic.Set.assignment \ admin.vm.device.mic.Unassign \ + admin.vm.device.denied.List \ + admin.vm.device.denied.Add \ + admin.vm.device.denied.Remove \ admin.vm.feature.CheckWithNetvm \ admin.vm.feature.CheckWithTemplate \ admin.vm.feature.CheckWithAdminVM \ diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 6340729a7..32361ac27 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -50,6 +50,7 @@ UnknownDevice, DeviceAssignment, AssignmentMode, + DeviceInterface, ) @@ -1630,6 +1631,74 @@ async def vm_device_set_required(self, endpoint, untrusted_payload): await self.dest.devices[devclass].update_assignment(dev, mode) self.app.save() + @qubes.api.method( + "admin.vm.device.denied.List", + no_payload=True, scope="local", read=True) + async def vm_device_denied_list(self): + """ + List all denied device interfaces for the VM. + + Returns a newline-separated string. + """ + self.enforce(not self.arg) + + self.fire_event_for_permission() + + denied = self.dest.devices_denied + return "\n".join(map(repr, DeviceInterface.from_str_bulk(denied))) + + @qubes.api.method( + "admin.vm.device.denied.Add", scope="local", write=True) + async def vm_device_denied_add(self, untrusted_payload): + """ + Add device interface(s) to the denied list for the VM. + + Payload: + Encoded device interface (can be repeated without any separator). + """ + payload = untrusted_payload.decode("ascii", errors="strict") + to_add = DeviceInterface.from_str_bulk(payload) + + # may contain duplicates + self.fire_event_for_permission(interfaces=to_add) + + to_add_enc = "".join(map(repr, to_add)) + + self.dest.devices_denied = self.dest.devices_denied + to_add_enc + self.app.save() + + @qubes.api.method( + "admin.vm.device.denied.Remove", scope="local", write=True) + async def vm_device_denied_remove(self, untrusted_payload): + """ + Remove device interface(s) from the denied list for the VM. + + Payload: + Encoded device interface (can be repeated without any separator). + If payload is empty, all interfaces are removed. + """ + denied = DeviceInterface.from_str_bulk(self.dest.devices_denied) + + payload = untrusted_payload.decode("ascii", errors="strict") + if payload: + to_remove = DeviceInterface.from_str_bulk(payload) + else: + to_remove = denied.copy() + + # may contain missing values + self.fire_event_for_permission(interfaces=to_remove) + + for interface in to_remove: + try: + denied.remove(interface) + except ValueError: + raise qubes.exc.QubesValueError( + f"Interface {interface} are not listed " + f"in the devices denied list of {self.dest} vm.") + new_denied = "".join(map(repr, denied)) + self.dest.devices_denied = new_denied + self.app.save() + @qubes.api.method( "admin.vm.firewall.Get", no_payload=True, scope="local", read=True ) diff --git a/qubes/device_protocol.py b/qubes/device_protocol.py index 69271ee43..a55b62bd3 100644 --- a/qubes/device_protocol.py +++ b/qubes/device_protocol.py @@ -733,6 +733,14 @@ def unknown(cls) -> "DeviceInterface": """Value for unknown device interface.""" return cls("?******") + @staticmethod + def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: + interfaces = interfaces or [] + return [ + DeviceInterface(interfaces[i: i + 7]) + for i in range(0, len(interfaces), 7) + ] + def __repr__(self): return self._interface_encoding @@ -1120,11 +1128,7 @@ def _deserialize( if "interfaces" in properties: interfaces = properties["interfaces"] - interfaces = [ - DeviceInterface(interfaces[i : i + 7]) - for i in range(0, len(interfaces), 7) - ] - properties["interfaces"] = interfaces + properties["interfaces"] = DeviceInterface.from_str_bulk(interfaces) if "parent_ident" in properties: properties["parent"] = Port( diff --git a/qubes/devices.py b/qubes/devices.py index bbdd55f82..69745b410 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -73,8 +73,6 @@ ) from qubes.exc import ProtocolError -DEVICE_DENY_LIST = "/etc/qubes/device-deny.list" - class DeviceNotAssigned(qubes.exc.QubesException, KeyError): """ diff --git a/qubes/ext/admin.py b/qubes/ext/admin.py index ef1c1b127..209436e57 100644 --- a/qubes/ext/admin.py +++ b/qubes/ext/admin.py @@ -17,7 +17,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . import importlib -import os import qubes.api import qubes.api.internal @@ -27,7 +26,6 @@ from qrexec.policy import utils, parser from qubes.device_protocol import DeviceInterface -from qubes.devices import DEVICE_DENY_LIST class JustEvaluateAskResolution(parser.AskResolution): @@ -184,44 +182,11 @@ def on_device_attach(self, vm, event, dest, arg, device, mode, **kwargs): if mode != "manual": return - # load device deny list - deny = {} - AdminExtension._load_deny_list(deny, DEVICE_DENY_LIST) - - # load drop ins - drop_in_path = DEVICE_DENY_LIST + ".d" - if os.path.isdir(drop_in_path): - for deny_list_name in os.listdir(drop_in_path): - deny_list_path = os.path.join(drop_in_path, deny_list_name) - - if os.path.isfile(deny_list_path): - AdminExtension._load_deny_list(deny, deny_list_path) - - # check if any presented interface is on deny list - for interface in deny.get(dest.name, set()): - pattern = DeviceInterface(interface) + # check if any presented interface is on denied list + denied = set(DeviceInterface.from_str_bulk(dest.devices_denied)) + for pattern in denied: for devint in device.interfaces: if pattern.matches(devint): raise qubes.exc.PermissionDenied( f"Device exposes a banned interface: {devint}" ) - - @staticmethod - def _load_deny_list(deny: dict, path: str) -> None: - try: - with open(path, "r", encoding="utf-8") as file: - for line in file: - line = line.strip() - - # skip comments - if line.startswith("#"): - continue - - if line: - name, *values = line.split() - values = " ".join(values).replace(",", " ").split() - values = [v for v in values if len(v) > 0] - - deny[name] = deny.get(name, set()).union(set(values)) - except IOError: - pass diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index ae27bf8e5..6bccf3f5b 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3912,6 +3912,78 @@ def test_655_vm_device_set_mode_invalid_value(self): ) self.assertFalse(self.app.save.called) + def test_6XX_vm_device_denied_list_empty(self): + actual = self.call_mgmt_func(b"admin.vm.device.denied.List", + b"test-vm1") + self.assertEqual(actual, "") + self.assertFalse(self.app.save.called) + + def test_6XX_vm_device_denied_list(self): + self.vm.devices_denied = "b******p012345p53**2*" + actual = self.call_mgmt_func(b"admin.vm.device.denied.List", + b"test-vm1") + self.assertEqual(actual, "b******\np012345\np53**2*") + self.assertFalse(self.app.save.called) + + def test_6XX_vm_device_denied_add(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"u112233") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*u112233") + self.assertTrue(self.app.save.called) + + def test_6XX_vm_device_denied_add_multiple(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"u112233m******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*u112233m******") + self.assertTrue(self.app.save.called) + + def test_6XX_vm_device_denied_add_repeated(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"u112233u112233") + self.assertFalse(self.app.save.called) + + def test_6XX_vm_device_denied_add_present(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"b******") + self.assertFalse(self.app.save.called) + + def test_6XX_vm_device_denied_remove(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"b******") + self.assertEqual(self.vm.devices_denied, + "p012345p53**2*") + self.assertTrue(self.app.save.called) + + def test_6XX_vm_device_denied_remove_repeated(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"b******b******") + self.assertFalse(self.app.save.called) + + def test_6XX_vm_device_denied_remove_all(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"") + self.assertEqual(self.vm.devices_denied, "") + self.assertTrue(self.app.save.called) + + def test_6XX_vm_device_denied_remove_missing(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"m******") + self.assertFalse(self.app.save.called) + def test_660_pool_set_revisions_to_keep(self): self.app.pools["test-pool"] = unittest.mock.Mock() value = self.call_mgmt_func( @@ -4208,7 +4280,7 @@ def test_990_vm_unexpected_payload(self): b"admin.vm.property.List", b"admin.vm.property.Get", b"admin.vm.property.Help", - # b'admin.vm.property.HelpRst', + # b"admin.vm.property.HelpRst", b"admin.vm.property.Reset", b"admin.vm.feature.List", b"admin.vm.feature.Get", @@ -4316,7 +4388,7 @@ def test_992_dom0_unexpected_payload(self): b"admin.property.List", b"admin.property.Get", b"admin.property.Help", - # b'admin.property.HelpRst', + # b"admin.property.HelpRst", b"admin.property.Reset", b"admin.pool.List", b"admin.pool.ListDrivers", @@ -4400,19 +4472,19 @@ def test_994_dom0_only_calls(self): b"admin.property.Get", b"admin.property.Set", b"admin.property.Help", - # b'admin.property.HelpRst', + # b"admin.property.HelpRst", b"admin.property.Reset", b"admin.pool.List", b"admin.pool.ListDrivers", b"admin.pool.Info", b"admin.pool.Add", b"admin.pool.Remove", - # b'admin.pool.volume.List', - # b'admin.pool.volume.Info', - # b'admin.pool.volume.ListSnapshots', - # b'admin.pool.volume.Snapshot', - # b'admin.pool.volume.Revert', - # b'admin.pool.volume.Resize', + # b"admin.pool.volume.List", + # b"admin.pool.volume.Info", + # b"admin.pool.volume.ListSnapshots", + # b"admin.pool.volume.Snapshot", + # b"admin.pool.volume.Revert", + # b"admin.pool.volume.Resize", b"admin.backup.Execute", b"admin.backup.Info", ] diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index 195b28510..e50f77477 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -45,6 +45,7 @@ import qubes.vm import qubes.vm.adminvm import qubes.vm.mix.net +from qubes.device_protocol import DeviceInterface qmemman_present = False try: @@ -133,6 +134,27 @@ def _setter_default_user(self, prop, value): return value +def _setter_denied_list(self, prop, value): + """ Helper for setting denied list """ + value = str(value) + if len(value) == 0: + return value + interfaces = DeviceInterface.from_str_bulk(value) + if len(interfaces) != len(set(interfaces)): + raise qubes.exc.QubesPropertyValueError( + self, prop, value, + "Interface code list contains duplicates.") + # block, usb, mic, pci, * + pattern = r"^([bump\*][0123456789\*]{6})*$" + if not re.fullmatch(pattern, value): + raise qubes.exc.QubesPropertyValueError( + self, prop, value, + "Interface code list should be in the form cddddddcdddddd...," + 'where c is one of "b", "u", "m", "p", "*" ' + 'and d is a digit or "*".') + return value + + def _setter_virt_mode(self, prop, value): value = str(value) value = value.lower() @@ -830,6 +852,12 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): doc="True if this machine may be updated on its own.", ) + devices_denied = qubes.property( + "devices_denied", default="", + type=str, + setter=_setter_denied_list, + doc="List of device interface codes that are denied for this VM.") + # for changes in keyboard_layout, see also the same property in AdminVM keyboard_layout = qubes.property( "keyboard_layout", diff --git a/rpm_spec/core-dom0.spec.in b/rpm_spec/core-dom0.spec.in index 0095cb8da..485799ad1 100644 --- a/rpm_spec/core-dom0.spec.in +++ b/rpm_spec/core-dom0.spec.in @@ -272,6 +272,9 @@ admin.vm.device.pci.Available admin.vm.device.pci.Detach admin.vm.device.pci.Set.assignment admin.vm.device.pci.Unassign +admin.vm.device.denied.List +admin.vm.device.denied.Add +admin.vm.device.denied.Remove admin.vm.feature.CheckWithAdminVM admin.vm.feature.CheckWithNetvm admin.vm.feature.CheckWithTemplate From f7e8503358685da000b3b0cdc60bb282334cac54 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Sun, 29 Dec 2024 13:12:27 +0100 Subject: [PATCH 2/9] device interface denied list: test renumbering --- qubes/tests/api_admin.py | 56 ++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 6bccf3f5b..f55da4d3d 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3912,20 +3912,20 @@ def test_655_vm_device_set_mode_invalid_value(self): ) self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_list_empty(self): + def test_660_vm_device_denied_list_empty(self): actual = self.call_mgmt_func(b"admin.vm.device.denied.List", b"test-vm1") self.assertEqual(actual, "") self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_list(self): + def test_661_vm_device_denied_list(self): self.vm.devices_denied = "b******p012345p53**2*" actual = self.call_mgmt_func(b"admin.vm.device.denied.List", b"test-vm1") self.assertEqual(actual, "b******\np012345\np53**2*") self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_add(self): + def test_662_vm_device_denied_add(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"u112233") @@ -3933,7 +3933,7 @@ def test_6XX_vm_device_denied_add(self): "b******p012345p53**2*u112233") self.assertTrue(self.app.save.called) - def test_6XX_vm_device_denied_add_multiple(self): + def test_663_vm_device_denied_add_multiple(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"u112233m******") @@ -3941,21 +3941,21 @@ def test_6XX_vm_device_denied_add_multiple(self): "b******p012345p53**2*u112233m******") self.assertTrue(self.app.save.called) - def test_6XX_vm_device_denied_add_repeated(self): + def test_664_vm_device_denied_add_repeated(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"u112233u112233") self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_add_present(self): + def test_665_vm_device_denied_add_present(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"b******") self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_remove(self): + def test_666_vm_device_denied_remove(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******") @@ -3963,28 +3963,28 @@ def test_6XX_vm_device_denied_remove(self): "p012345p53**2*") self.assertTrue(self.app.save.called) - def test_6XX_vm_device_denied_remove_repeated(self): + def test_667_vm_device_denied_remove_repeated(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******b******") self.assertFalse(self.app.save.called) - def test_6XX_vm_device_denied_remove_all(self): + def test_668_vm_device_denied_remove_all(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"") self.assertEqual(self.vm.devices_denied, "") self.assertTrue(self.app.save.called) - def test_6XX_vm_device_denied_remove_missing(self): + def test_669_vm_device_denied_remove_missing(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"m******") self.assertFalse(self.app.save.called) - def test_660_pool_set_revisions_to_keep(self): + def test_700_pool_set_revisions_to_keep(self): self.app.pools["test-pool"] = unittest.mock.Mock() value = self.call_mgmt_func( b"admin.pool.Set.revisions_to_keep", b"dom0", b"test-pool", b"2" @@ -3994,7 +3994,7 @@ def test_660_pool_set_revisions_to_keep(self): self.assertEqual(self.app.pools["test-pool"].revisions_to_keep, 2) self.app.save.assert_called_once_with() - def test_661_pool_set_revisions_to_keep_negative(self): + def test_701_pool_set_revisions_to_keep_negative(self): self.app.pools["test-pool"] = unittest.mock.Mock() with self.assertRaises(qubes.exc.ProtocolError): self.call_mgmt_func( @@ -4006,7 +4006,7 @@ def test_661_pool_set_revisions_to_keep_negative(self): self.assertEqual(self.app.pools["test-pool"].mock_calls, []) self.assertFalse(self.app.save.called) - def test_662_pool_set_revisions_to_keep_not_a_number(self): + def test_702_pool_set_revisions_to_keep_not_a_number(self): self.app.pools["test-pool"] = unittest.mock.Mock() with self.assertRaises(qubes.exc.ProtocolError): self.call_mgmt_func( @@ -4018,7 +4018,7 @@ def test_662_pool_set_revisions_to_keep_not_a_number(self): self.assertEqual(self.app.pools["test-pool"].mock_calls, []) self.assertFalse(self.app.save.called) - def test_663_pool_set_ephemeral(self): + def test_703_pool_set_ephemeral(self): self.app.pools["test-pool"] = unittest.mock.Mock() value = self.call_mgmt_func( b"admin.pool.Set.ephemeral_volatile", b"dom0", b"test-pool", b"true" @@ -4028,7 +4028,7 @@ def test_663_pool_set_ephemeral(self): self.assertEqual(self.app.pools["test-pool"].ephemeral_volatile, True) self.app.save.assert_called_once_with() - def test_664_pool_set_ephemeral_not_a_boolean(self): + def test_704_pool_set_ephemeral_not_a_boolean(self): self.app.pools["test-pool"] = unittest.mock.Mock() with self.assertRaises(qubes.exc.ProtocolError): self.call_mgmt_func( @@ -4040,7 +4040,7 @@ def test_664_pool_set_ephemeral_not_a_boolean(self): self.assertEqual(self.app.pools["test-pool"].mock_calls, []) self.assertFalse(self.app.save.called) - def test_670_vm_volume_set_revisions_to_keep(self): + def test_710_vm_volume_set_revisions_to_keep(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4061,7 +4061,7 @@ def test_670_vm_volume_set_revisions_to_keep(self): self.assertEqual(self.vm.volumes["private"].revisions_to_keep, 2) self.app.save.assert_called_once_with() - def test_671_vm_volume_set_revisions_to_keep_negative(self): + def test_711_vm_volume_set_revisions_to_keep_negative(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4076,7 +4076,7 @@ def test_671_vm_volume_set_revisions_to_keep_negative(self): b"-2", ) - def test_672_vm_volume_set_revisions_to_keep_not_a_number(self): + def test_712_vm_volume_set_revisions_to_keep_not_a_number(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4091,7 +4091,7 @@ def test_672_vm_volume_set_revisions_to_keep_not_a_number(self): b"abc", ) - def test_680_vm_volume_set_rw(self): + def test_720_vm_volume_set_rw(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4109,7 +4109,7 @@ def test_680_vm_volume_set_rw(self): self.assertEqual(self.vm.volumes["private"].rw, True) self.app.save.assert_called_once_with() - def test_681_vm_volume_set_rw_invalid(self): + def test_721_vm_volume_set_rw_invalid(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4122,7 +4122,7 @@ def test_681_vm_volume_set_rw_invalid(self): ) self.assertFalse(self.app.save.called) - def test_685_vm_volume_set_ephemeral(self): + def test_725_vm_volume_set_ephemeral(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4140,7 +4140,7 @@ def test_685_vm_volume_set_ephemeral(self): self.assertEqual(self.vm.volumes["volatile"].ephemeral, True) self.app.save.assert_called_once_with() - def test_686_vm_volume_set_ephemeral_invalid(self): + def test_726_vm_volume_set_ephemeral_invalid(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { "keys.return_value": ["root", "private", "volatile", "kernel"], @@ -4156,7 +4156,7 @@ def test_686_vm_volume_set_ephemeral_invalid(self): ) self.assertFalse(self.app.save.called) - def test_690_vm_console(self): + def test_730_vm_console(self): self.vm._libvirt_domain = unittest.mock.Mock() xml_desc = ( "\n" @@ -4176,7 +4176,7 @@ def test_690_vm_console(self): value = self.call_mgmt_func(b"admin.vm.Console", b"test-vm1") self.assertEqual(value, "/dev/pts/42") - def test_691_vm_console_not_running(self): + def test_731_vm_console_not_running(self): self.vm._libvirt_domain = unittest.mock.Mock() xml_desc = ( "\n" @@ -4195,7 +4195,7 @@ def test_691_vm_console_not_running(self): with self.assertRaises(qubes.exc.QubesVMNotRunningError): self.call_mgmt_func(b"admin.vm.Console", b"test-vm1") - def test_700_pool_volume_list(self): + def test_800_pool_volume_list(self): self.app.pools = { "pool1": unittest.mock.Mock( config={"param1": "value1", "param2": "value2"}, @@ -4212,7 +4212,7 @@ def test_700_pool_volume_list(self): ) self.assertEqual(value, "vol1\nvol2\n") - def test_710_vm_volume_clear(self): + def test_810_vm_volume_clear(self): with tempfile.TemporaryDirectory() as tmpdir: tmpfile = os.path.join(tmpdir, "testfile") @@ -4257,13 +4257,13 @@ async def coroutine_mock(*args, **kwargs): self.assertEventFired(self.emitter, "domain-volume-import-begin") self.assertEventFired(self.emitter, "domain-volume-import-end") - def test_800_current_state_default(self): + def test_900_current_state_default(self): value = self.call_mgmt_func(b"admin.vm.CurrentState", b"test-vm1") self.assertEqual( value, "mem=0 mem_static_max=0 cputime=0 power_state=Halted" ) - def test_801_current_state_changed(self): + def test_901_current_state_changed(self): self.vm.get_mem = lambda: 512 self.vm.get_mem_static_max = lambda: 1024 self.vm.get_cputime = lambda: 100 From 0815b1e00be22d758e6c250f70ef10eab3af2646 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Sun, 29 Dec 2024 13:37:38 +0100 Subject: [PATCH 3/9] device interface denied list: update no-payload/no-argument tests --- qubes/tests/api_admin.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index f55da4d3d..91a72f3ae 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -4292,9 +4292,13 @@ def test_990_vm_unexpected_payload(self): b"admin.vm.tag.Set", b"admin.vm.firewall.Get", b"admin.vm.firewall.Reload", - b"admin.vm.device.pci.Detach", - b"admin.vm.device.pci.List", + b"admin.deviceclass.List", b"admin.vm.device.pci.Available", + b"admin.vm.device.pci.Assigned", + b"admin.vm.device.pci.Attached", + b"admin.vm.device.pci.Unassign", + b"admin.vm.device.pci.Detach", + b"admin.vm.device.denied.List", b"admin.vm.volume.ListSnapshots", b"admin.vm.volume.List", b"admin.vm.volume.Info", @@ -4344,6 +4348,8 @@ def test_991_vm_unexpected_argument(self): b"admin.vm.firewall.Get", b"admin.vm.firewall.Set", b"admin.vm.firewall.Reload", + b"admin.deviceclass.List", + b"admin.vm.device.denied.List", b"admin.vm.volume.List", b"admin.vm.Start", b"admin.vm.Pause", From 868cc6dc98d5eb6d1e59b53a232a9d15e269f116 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 6 Jan 2025 08:23:52 +0100 Subject: [PATCH 4/9] device interface denied list: allow hexdigits --- qubes/device_protocol.py | 16 +++++++++++++--- qubes/tests/api_admin.py | 12 ++++++------ qubes/vm/qubesvm.py | 6 +++--- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/qubes/device_protocol.py b/qubes/device_protocol.py index a55b62bd3..2004f35ec 100644 --- a/qubes/device_protocol.py +++ b/qubes/device_protocol.py @@ -38,7 +38,7 @@ from typing import TYPE_CHECKING import qubes.utils -from qubes.exc import ProtocolError +from qubes.exc import ProtocolError, QubesValueError if TYPE_CHECKING: from qubes.vm.qubesvm import QubesVM @@ -693,7 +693,12 @@ def __init__(self, interface_encoding: str, devclass: Optional[str] = None): f"for given {devclass=}", file=sys.stderr, ) - ifc_full = devclass[0] + ifc_padded + if not all(c in string.hexdigits + "*" for c in ifc_padded): + raise ProtocolError("Invalid characters in interface encoding") + devclass_code = devclass[0].lower() + if devclass_code not in string.ascii_lowercase: + raise ProtocolError("Invalid characters in devclass encoding") + ifc_full = devclass_code + ifc_padded else: known_devclasses = { "p": "pci", @@ -735,7 +740,12 @@ def unknown(cls) -> "DeviceInterface": @staticmethod def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: - interfaces = interfaces or [] + interfaces = interfaces or "" + if len(interfaces) % 7 != 0: + QubesValueError( + f"Invalid length of {interfaces=} " + f"(is {len(interfaces)}, expected multiple of 7)", + ) return [ DeviceInterface(interfaces[i: i + 7]) for i in range(0, len(interfaces), 7) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 91a72f3ae..337240d47 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3919,26 +3919,26 @@ def test_660_vm_device_denied_list_empty(self): self.assertFalse(self.app.save.called) def test_661_vm_device_denied_list(self): - self.vm.devices_denied = "b******p012345p53**2*" + self.vm.devices_denied = "b******p012345pff**2*" actual = self.call_mgmt_func(b"admin.vm.device.denied.List", b"test-vm1") - self.assertEqual(actual, "b******\np012345\np53**2*") + self.assertEqual(actual, "b******\np012345\npff**2*") self.assertFalse(self.app.save.called) def test_662_vm_device_denied_add(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"u112233") + b"", b"uabcdef") self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*u112233") + "b******p012345p53**2*uabcdef") self.assertTrue(self.app.save.called) def test_663_vm_device_denied_add_multiple(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"u112233m******") + b"", b"uabcdefm******") self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*u112233m******") + "b******p012345p53**2*uabcdefm******") self.assertTrue(self.app.save.called) def test_664_vm_device_denied_add_repeated(self): diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index e50f77477..dd933eade 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -145,13 +145,13 @@ def _setter_denied_list(self, prop, value): self, prop, value, "Interface code list contains duplicates.") # block, usb, mic, pci, * - pattern = r"^([bump\*][0123456789\*]{6})*$" + pattern = r"^([bump\*][0123456789abcdef\*]{6})*$" if not re.fullmatch(pattern, value): raise qubes.exc.QubesPropertyValueError( self, prop, value, - "Interface code list should be in the form cddddddcdddddd...," + "Interface code list should be in the form chhhhhhchhhhhh...," 'where c is one of "b", "u", "m", "p", "*" ' - 'and d is a digit or "*".') + 'and h is a hexdigit or "*".') return value From b9dfda2baa04450efcf42278dc6aae9953690993 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 6 Jan 2025 09:11:26 +0100 Subject: [PATCH 5/9] device interface denied list: special value "all" for removing all items + tests for empty payloads --- qubes/api/admin.py | 10 ++++++++-- qubes/tests/api_admin.py | 24 +++++++++++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 32361ac27..00e3139d7 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1659,6 +1659,9 @@ async def vm_device_denied_add(self, untrusted_payload): payload = untrusted_payload.decode("ascii", errors="strict") to_add = DeviceInterface.from_str_bulk(payload) + if not to_add: + return + # may contain duplicates self.fire_event_for_permission(interfaces=to_add) @@ -1675,16 +1678,19 @@ async def vm_device_denied_remove(self, untrusted_payload): Payload: Encoded device interface (can be repeated without any separator). - If payload is empty, all interfaces are removed. + If payload is "all", all interfaces are removed. """ denied = DeviceInterface.from_str_bulk(self.dest.devices_denied) payload = untrusted_payload.decode("ascii", errors="strict") - if payload: + if payload != "all": to_remove = DeviceInterface.from_str_bulk(payload) else: to_remove = denied.copy() + if not to_remove: + return + # may contain missing values self.fire_event_for_permission(interfaces=to_remove) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 337240d47..c1c7ca9fe 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3955,7 +3955,14 @@ def test_665_vm_device_denied_add_present(self): b"", b"b******") self.assertFalse(self.app.save.called) - def test_666_vm_device_denied_remove(self): + def test_666_vm_device_denied_add_nothing(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"b******") + self.assertFalse(self.app.save.called) + + def test_670_vm_device_denied_remove(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******") @@ -3963,27 +3970,34 @@ def test_666_vm_device_denied_remove(self): "p012345p53**2*") self.assertTrue(self.app.save.called) - def test_667_vm_device_denied_remove_repeated(self): + def test_671_vm_device_denied_remove_repeated(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******b******") self.assertFalse(self.app.save.called) - def test_668_vm_device_denied_remove_all(self): + def test_672_vm_device_denied_remove_all(self): self.vm.devices_denied = "b******p012345p53**2*" self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"") + b"", b"all") self.assertEqual(self.vm.devices_denied, "") self.assertTrue(self.app.save.called) - def test_669_vm_device_denied_remove_missing(self): + def test_673_vm_device_denied_remove_missing(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"m******") self.assertFalse(self.app.save.called) + def test_674_vm_device_denied_remove_nothing(self): + self.vm.devices_denied = "b******p012345p53**2*" + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"") + self.assertFalse(self.app.save.called) + def test_700_pool_set_revisions_to_keep(self): self.app.pools["test-pool"] = unittest.mock.Mock() value = self.call_mgmt_func( From 73ab4a67a576708507c7a97248d32d4174b8c6de Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 6 Jan 2025 13:02:24 +0100 Subject: [PATCH 6/9] device interface denied list: make add/remove idempotent --- qubes/api/admin.py | 31 +++++++++++++++++++------------ qubes/tests/api_admin.py | 40 ++++++++++++++++++++++++++++------------ qubes/vm/qubesvm.py | 14 ++++++++------ 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 00e3139d7..1713916a4 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1662,13 +1662,20 @@ async def vm_device_denied_add(self, untrusted_payload): if not to_add: return - # may contain duplicates + if len(set(to_add)) != len(to_add): + raise qubes.exc.QubesValueError( + "Duplicated device interfaces in payload.") + self.fire_event_for_permission(interfaces=to_add) to_add_enc = "".join(map(repr, to_add)) + prev = self.dest.devices_denied + # "auto" ignoring of duplications self.dest.devices_denied = self.dest.devices_denied + to_add_enc - self.app.save() + # do not save if nothing changed + if prev != self.dest.devices_denied: + self.app.save() @qubes.api.method( "admin.vm.device.denied.Remove", scope="local", write=True) @@ -1691,19 +1698,19 @@ async def vm_device_denied_remove(self, untrusted_payload): if not to_remove: return + if len(set(to_remove)) != len(to_remove): + raise qubes.exc.QubesValueError( + "Duplicated device interfaces in payload.") + # may contain missing values self.fire_event_for_permission(interfaces=to_remove) - for interface in to_remove: - try: - denied.remove(interface) - except ValueError: - raise qubes.exc.QubesValueError( - f"Interface {interface} are not listed " - f"in the devices denied list of {self.dest} vm.") - new_denied = "".join(map(repr, denied)) - self.dest.devices_denied = new_denied - self.app.save() + # ignore missing values + new_denied = "".join(repr(i) for i in denied if i not in to_remove) + + if new_denied != self.dest.devices_denied: + self.dest.devices_denied = new_denied + self.app.save() @qubes.api.method( "admin.vm.firewall.Get", no_payload=True, scope="local", read=True diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index c1c7ca9fe..314fa23c8 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3946,20 +3946,24 @@ def test_664_vm_device_denied_add_repeated(self): with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"u112233u112233") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_665_vm_device_denied_add_present(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"b******") + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"b******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_666_vm_device_denied_add_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"b******") + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_670_vm_device_denied_remove(self): @@ -3975,6 +3979,8 @@ def test_671_vm_device_denied_remove_repeated(self): with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******b******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_672_vm_device_denied_remove_all(self): @@ -3986,16 +3992,26 @@ def test_672_vm_device_denied_remove_all(self): def test_673_vm_device_denied_remove_missing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"m******") + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"m******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) + def test_673_vm_device_denied_remove_missing_and_present(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"m******p53**2*") + self.assertEqual(self.vm.devices_denied, + "b******p012345") + self.assertTrue(self.app.save.called) + def test_674_vm_device_denied_remove_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"") + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_700_pool_set_revisions_to_keep(self): diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index dd933eade..c578e251e 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -139,12 +139,14 @@ def _setter_denied_list(self, prop, value): value = str(value) if len(value) == 0: return value - interfaces = DeviceInterface.from_str_bulk(value) - if len(interfaces) != len(set(interfaces)): - raise qubes.exc.QubesPropertyValueError( - self, prop, value, - "Interface code list contains duplicates.") - # block, usb, mic, pci, * + + # remove duplicates + value = "".join(map(repr, set(DeviceInterface.from_str_bulk(value)))) + + # The requirements for the interface encoding are more relaxed + # in the DeviceInterface class compared to the denied list. + # allowed classes: block, usb, mic, pci, * + # allowed interface encoding: hex digits + * pattern = r"^([bump\*][0123456789abcdef\*]{6})*$" if not re.fullmatch(pattern, value): raise qubes.exc.QubesPropertyValueError( From 408f72d5d2e8f4613740ea426007c2759bbc25b6 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 6 Jan 2025 14:45:41 +0100 Subject: [PATCH 7/9] device interface denied list: keep order --- qubes/api/admin.py | 6 ------ qubes/tests/api_admin.py | 2 +- qubes/vm/qubesvm.py | 3 ++- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 1713916a4..6f4d22bed 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1659,9 +1659,6 @@ async def vm_device_denied_add(self, untrusted_payload): payload = untrusted_payload.decode("ascii", errors="strict") to_add = DeviceInterface.from_str_bulk(payload) - if not to_add: - return - if len(set(to_add)) != len(to_add): raise qubes.exc.QubesValueError( "Duplicated device interfaces in payload.") @@ -1695,9 +1692,6 @@ async def vm_device_denied_remove(self, untrusted_payload): else: to_remove = denied.copy() - if not to_remove: - return - if len(set(to_remove)) != len(to_remove): raise qubes.exc.QubesValueError( "Duplicated device interfaces in payload.") diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 314fa23c8..ad9bc8b0d 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3938,7 +3938,7 @@ def test_663_vm_device_denied_add_multiple(self): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdefm******") self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*uabcdefm******") + "b******m******p012345p53**2*uabcdef") self.assertTrue(self.app.save.called) def test_664_vm_device_denied_add_repeated(self): diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index c578e251e..aa896f328 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -141,7 +141,8 @@ def _setter_denied_list(self, prop, value): return value # remove duplicates - value = "".join(map(repr, set(DeviceInterface.from_str_bulk(value)))) + value = "".join( + sorted(map(repr, set(DeviceInterface.from_str_bulk(value))))) # The requirements for the interface encoding are more relaxed # in the DeviceInterface class compared to the denied list. From 3fda64755813344361ebca17352e7862ac65c6f6 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 7 Jan 2025 10:30:53 +0100 Subject: [PATCH 8/9] device interface denied list: actually raise an exception --- qubes/device_protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qubes/device_protocol.py b/qubes/device_protocol.py index 2004f35ec..30b0e4190 100644 --- a/qubes/device_protocol.py +++ b/qubes/device_protocol.py @@ -742,7 +742,7 @@ def unknown(cls) -> "DeviceInterface": def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: interfaces = interfaces or "" if len(interfaces) % 7 != 0: - QubesValueError( + raise QubesValueError( f"Invalid length of {interfaces=} " f"(is {len(interfaces)}, expected multiple of 7)", ) From d3de44369ca1077dc869147e5a44039793f14447 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 7 Jan 2025 10:51:38 +0100 Subject: [PATCH 9/9] device interface denied list: reformat code --- qubes/api/admin.py | 16 +++--- qubes/device_protocol.py | 2 +- qubes/tests/api_admin.py | 106 ++++++++++++++++++++++----------------- qubes/vm/qubesvm.py | 18 ++++--- 4 files changed, 82 insertions(+), 60 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 6f4d22bed..7eb0448dd 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1632,8 +1632,8 @@ async def vm_device_set_required(self, endpoint, untrusted_payload): self.app.save() @qubes.api.method( - "admin.vm.device.denied.List", - no_payload=True, scope="local", read=True) + "admin.vm.device.denied.List", no_payload=True, scope="local", read=True + ) async def vm_device_denied_list(self): """ List all denied device interfaces for the VM. @@ -1647,8 +1647,7 @@ async def vm_device_denied_list(self): denied = self.dest.devices_denied return "\n".join(map(repr, DeviceInterface.from_str_bulk(denied))) - @qubes.api.method( - "admin.vm.device.denied.Add", scope="local", write=True) + @qubes.api.method("admin.vm.device.denied.Add", scope="local", write=True) async def vm_device_denied_add(self, untrusted_payload): """ Add device interface(s) to the denied list for the VM. @@ -1661,7 +1660,8 @@ async def vm_device_denied_add(self, untrusted_payload): if len(set(to_add)) != len(to_add): raise qubes.exc.QubesValueError( - "Duplicated device interfaces in payload.") + "Duplicated device interfaces in payload." + ) self.fire_event_for_permission(interfaces=to_add) @@ -1675,7 +1675,8 @@ async def vm_device_denied_add(self, untrusted_payload): self.app.save() @qubes.api.method( - "admin.vm.device.denied.Remove", scope="local", write=True) + "admin.vm.device.denied.Remove", scope="local", write=True + ) async def vm_device_denied_remove(self, untrusted_payload): """ Remove device interface(s) from the denied list for the VM. @@ -1694,7 +1695,8 @@ async def vm_device_denied_remove(self, untrusted_payload): if len(set(to_remove)) != len(to_remove): raise qubes.exc.QubesValueError( - "Duplicated device interfaces in payload.") + "Duplicated device interfaces in payload." + ) # may contain missing values self.fire_event_for_permission(interfaces=to_remove) diff --git a/qubes/device_protocol.py b/qubes/device_protocol.py index 30b0e4190..e84af6d13 100644 --- a/qubes/device_protocol.py +++ b/qubes/device_protocol.py @@ -747,7 +747,7 @@ def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: f"(is {len(interfaces)}, expected multiple of 7)", ) return [ - DeviceInterface(interfaces[i: i + 7]) + DeviceInterface(interfaces[i : i + 7]) for i in range(0, len(interfaces), 7) ] diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index ad9bc8b0d..89d48f9e4 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3913,105 +3913,119 @@ def test_655_vm_device_set_mode_invalid_value(self): self.assertFalse(self.app.save.called) def test_660_vm_device_denied_list_empty(self): - actual = self.call_mgmt_func(b"admin.vm.device.denied.List", - b"test-vm1") + actual = self.call_mgmt_func( + b"admin.vm.device.denied.List", b"test-vm1" + ) self.assertEqual(actual, "") self.assertFalse(self.app.save.called) def test_661_vm_device_denied_list(self): self.vm.devices_denied = "b******p012345pff**2*" - actual = self.call_mgmt_func(b"admin.vm.device.denied.List", - b"test-vm1") + actual = self.call_mgmt_func( + b"admin.vm.device.denied.List", b"test-vm1" + ) self.assertEqual(actual, "b******\np012345\npff**2*") self.assertFalse(self.app.save.called) def test_662_vm_device_denied_add(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"uabcdef") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*uabcdef") + self.call_mgmt_func( + b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdef" + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*uabcdef") self.assertTrue(self.app.save.called) def test_663_vm_device_denied_add_multiple(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"uabcdefm******") - self.assertEqual(self.vm.devices_denied, - "b******m******p012345p53**2*uabcdef") + self.call_mgmt_func( + b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdefm******" + ) + self.assertEqual( + self.vm.devices_denied, "b******m******p012345p53**2*uabcdef" + ) self.assertTrue(self.app.save.called) def test_664_vm_device_denied_add_repeated(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"u112233u112233") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Add", + b"test-vm1", + b"", + b"u112233u112233", + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_665_vm_device_denied_add_present(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"b******") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Add", b"test-vm1", b"", b"b******" + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_666_vm_device_denied_add_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Add", b"test-vm1", b"", b"" + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_670_vm_device_denied_remove(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"b******") - self.assertEqual(self.vm.devices_denied, - "p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******" + ) + self.assertEqual(self.vm.devices_denied, "p012345p53**2*") self.assertTrue(self.app.save.called) def test_671_vm_device_denied_remove_repeated(self): self.vm.devices_denied = "b******p012345p53**2*" with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"b******b******") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", + b"test-vm1", + b"", + b"b******b******", + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_672_vm_device_denied_remove_all(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"all") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"all" + ) self.assertEqual(self.vm.devices_denied, "") self.assertTrue(self.app.save.called) def test_673_vm_device_denied_remove_missing(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"m******") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"m******" + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_673_vm_device_denied_remove_missing_and_present(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"m******p53**2*") - self.assertEqual(self.vm.devices_denied, - "b******p012345") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", + b"test-vm1", + b"", + b"m******p53**2*", + ) + self.assertEqual(self.vm.devices_denied, "b******p012345") self.assertTrue(self.app.save.called) def test_674_vm_device_denied_remove_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"") - self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*") + self.call_mgmt_func( + b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"" + ) + self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_700_pool_set_revisions_to_keep(self): diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index aa896f328..165a4599e 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -135,14 +135,15 @@ def _setter_default_user(self, prop, value): def _setter_denied_list(self, prop, value): - """ Helper for setting denied list """ + """Helper for setting denied list""" value = str(value) if len(value) == 0: return value # remove duplicates value = "".join( - sorted(map(repr, set(DeviceInterface.from_str_bulk(value))))) + sorted(map(repr, set(DeviceInterface.from_str_bulk(value)))) + ) # The requirements for the interface encoding are more relaxed # in the DeviceInterface class compared to the denied list. @@ -151,10 +152,13 @@ def _setter_denied_list(self, prop, value): pattern = r"^([bump\*][0123456789abcdef\*]{6})*$" if not re.fullmatch(pattern, value): raise qubes.exc.QubesPropertyValueError( - self, prop, value, + self, + prop, + value, "Interface code list should be in the form chhhhhhchhhhhh...," 'where c is one of "b", "u", "m", "p", "*" ' - 'and h is a hexdigit or "*".') + 'and h is a hexdigit or "*".', + ) return value @@ -856,10 +860,12 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): ) devices_denied = qubes.property( - "devices_denied", default="", + "devices_denied", + default="", type=str, setter=_setter_denied_list, - doc="List of device interface codes that are denied for this VM.") + doc="List of device interface codes that are denied for this VM.", + ) # for changes in keyboard_layout, see also the same property in AdminVM keyboard_layout = qubes.property(