Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solves #9674 #643

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ ADMIN_API_METHODS_SIMPLE = \
admin.vm.device.mic.Detach \
admin.vm.device.mic.Set.assignment \
admin.vm.device.mic.Unassign \
admin.vm.device.denied.List \
admin.vm.device.denied.Add \
admin.vm.device.denied.Remove \
admin.vm.feature.CheckWithNetvm \
admin.vm.feature.CheckWithTemplate \
admin.vm.feature.CheckWithAdminVM \
Expand Down
78 changes: 78 additions & 0 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
UnknownDevice,
DeviceAssignment,
AssignmentMode,
DeviceInterface,
)


Expand Down Expand Up @@ -1630,6 +1631,83 @@ async def vm_device_set_required(self, endpoint, untrusted_payload):
await self.dest.devices[devclass].update_assignment(dev, mode)
self.app.save()

@qubes.api.method(
"admin.vm.device.denied.List", no_payload=True, scope="local", read=True
)
async def vm_device_denied_list(self):
"""
List all denied device interfaces for the VM.

Returns a newline-separated string.
"""
self.enforce(not self.arg)
marmarek marked this conversation as resolved.
Show resolved Hide resolved

self.fire_event_for_permission()

denied = self.dest.devices_denied
return "\n".join(map(repr, DeviceInterface.from_str_bulk(denied)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert that there are no newlines in the repr output?


@qubes.api.method("admin.vm.device.denied.Add", scope="local", write=True)
async def vm_device_denied_add(self, untrusted_payload):
"""
Add device interface(s) to the denied list for the VM.

Payload:
Encoded device interface (can be repeated without any separator).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is intended behavior on duplicates? IIUC right now if at least one interface you try to add is already on deny list, the whole operation will fail. Is that intentional? IMHO better would be to ignore duplicates (as in - adding interface that is already denied is no-op, but others non-duplicated added at the same time are still added). Excluding duplicates at the client side is racy (somebody can modify the value between client reads it (to exclude duplicates) and writes it back).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was intentional; I followed the "fail fast" principle. However, now I understand that in the mentioned example, admin methods should be idempotent.
To keep consistency, in the case of removing an interface that is not currently on the list nothing should be done, am I right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so

"""
payload = untrusted_payload.decode("ascii", errors="strict")
to_add = DeviceInterface.from_str_bulk(payload)

if len(set(to_add)) != len(to_add):
raise qubes.exc.QubesValueError(
"Duplicated device interfaces in payload."
)

self.fire_event_for_permission(interfaces=to_add)

to_add_enc = "".join(map(repr, to_add))

prev = self.dest.devices_denied
# "auto" ignoring of duplications
self.dest.devices_denied = self.dest.devices_denied + to_add_enc
# do not save if nothing changed
if prev != self.dest.devices_denied:
self.app.save()

@qubes.api.method(
"admin.vm.device.denied.Remove", scope="local", write=True
)
async def vm_device_denied_remove(self, untrusted_payload):
"""
Remove device interface(s) from the denied list for the VM.

Payload:
Encoded device interface (can be repeated without any separator).
If payload is "all", all interfaces are removed.
"""
denied = DeviceInterface.from_str_bulk(self.dest.devices_denied)

payload = untrusted_payload.decode("ascii", errors="strict")
if payload != "all":
to_remove = DeviceInterface.from_str_bulk(payload)
else:
to_remove = denied.copy()

if len(set(to_remove)) != len(to_remove):
raise qubes.exc.QubesValueError(
"Duplicated device interfaces in payload."
)

# may contain missing values
self.fire_event_for_permission(interfaces=to_remove)

# ignore missing values
new_denied = "".join(repr(i) for i in denied if i not in to_remove)

if new_denied != self.dest.devices_denied:
self.dest.devices_denied = new_denied
self.app.save()

@qubes.api.method(
"admin.vm.firewall.Get", no_payload=True, scope="local", read=True
)
Expand Down
28 changes: 21 additions & 7 deletions qubes/device_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from typing import TYPE_CHECKING

import qubes.utils
from qubes.exc import ProtocolError
from qubes.exc import ProtocolError, QubesValueError

if TYPE_CHECKING:
from qubes.vm.qubesvm import QubesVM
Expand Down Expand Up @@ -693,7 +693,12 @@
f"for given {devclass=}",
file=sys.stderr,
)
ifc_full = devclass[0] + ifc_padded
if not all(c in string.hexdigits + "*" for c in ifc_padded):
raise ProtocolError("Invalid characters in interface encoding")

Check warning on line 697 in qubes/device_protocol.py

View check run for this annotation

Codecov / codecov/patch

qubes/device_protocol.py#L697

Added line #L697 was not covered by tests
devclass_code = devclass[0].lower()
if devclass_code not in string.ascii_lowercase:
raise ProtocolError("Invalid characters in devclass encoding")

Check warning on line 700 in qubes/device_protocol.py

View check run for this annotation

Codecov / codecov/patch

qubes/device_protocol.py#L700

Added line #L700 was not covered by tests
ifc_full = devclass_code + ifc_padded
else:
known_devclasses = {
"p": "pci",
Expand Down Expand Up @@ -733,6 +738,19 @@
"""Value for unknown device interface."""
return cls("?******")

@staticmethod
def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]:
interfaces = interfaces or ""
if len(interfaces) % 7 != 0:
raise QubesValueError(

Check warning on line 745 in qubes/device_protocol.py

View check run for this annotation

Codecov / codecov/patch

qubes/device_protocol.py#L745

Added line #L745 was not covered by tests
f"Invalid length of {interfaces=} "
f"(is {len(interfaces)}, expected multiple of 7)",
)
return [
DeviceInterface(interfaces[i : i + 7])
for i in range(0, len(interfaces), 7)
]

def __repr__(self):
return self._interface_encoding

Expand Down Expand Up @@ -1120,11 +1138,7 @@

if "interfaces" in properties:
interfaces = properties["interfaces"]
interfaces = [
DeviceInterface(interfaces[i : i + 7])
for i in range(0, len(interfaces), 7)
]
properties["interfaces"] = interfaces
properties["interfaces"] = DeviceInterface.from_str_bulk(interfaces)

if "parent_ident" in properties:
properties["parent"] = Port(
Expand Down
2 changes: 0 additions & 2 deletions qubes/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
)
from qubes.exc import ProtocolError

DEVICE_DENY_LIST = "/etc/qubes/device-deny.list"


class DeviceNotAssigned(qubes.exc.QubesException, KeyError):
"""
Expand Down
41 changes: 3 additions & 38 deletions qubes/ext/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import importlib
import os

import qubes.api
import qubes.api.internal
Expand All @@ -27,7 +26,6 @@
from qrexec.policy import utils, parser

from qubes.device_protocol import DeviceInterface
from qubes.devices import DEVICE_DENY_LIST


class JustEvaluateAskResolution(parser.AskResolution):
Expand Down Expand Up @@ -184,44 +182,11 @@
if mode != "manual":
return

# load device deny list
deny = {}
AdminExtension._load_deny_list(deny, DEVICE_DENY_LIST)

# load drop ins
drop_in_path = DEVICE_DENY_LIST + ".d"
if os.path.isdir(drop_in_path):
for deny_list_name in os.listdir(drop_in_path):
deny_list_path = os.path.join(drop_in_path, deny_list_name)

if os.path.isfile(deny_list_path):
AdminExtension._load_deny_list(deny, deny_list_path)

# check if any presented interface is on deny list
for interface in deny.get(dest.name, set()):
pattern = DeviceInterface(interface)
# check if any presented interface is on denied list
denied = set(DeviceInterface.from_str_bulk(dest.devices_denied))
for pattern in denied:

Check warning on line 187 in qubes/ext/admin.py

View check run for this annotation

Codecov / codecov/patch

qubes/ext/admin.py#L186-L187

Added lines #L186 - L187 were not covered by tests
for devint in device.interfaces:
if pattern.matches(devint):
raise qubes.exc.PermissionDenied(
f"Device exposes a banned interface: {devint}"
)

@staticmethod
def _load_deny_list(deny: dict, path: str) -> None:
try:
with open(path, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()

# skip comments
if line.startswith("#"):
continue

if line:
name, *values = line.split()
values = " ".join(values).replace(",", " ").split()
values = [v for v in values if len(v) > 0]

deny[name] = deny.get(name, set()).union(set(values))
except IOError:
pass
Loading