diff --git a/securedrop_client/export.py b/securedrop_client/export.py index 2dc616e12..fabc421a9 100644 --- a/securedrop_client/export.py +++ b/securedrop_client/export.py @@ -21,10 +21,23 @@ def __init__(self, status: str): class ExportStatus(Enum): + """ + Status codes representing results of export- and print- related calls. + + Codes may represent failure states (`USB_BAD_PASSPHRASE`) or success states + (`USB_CONNECTED`). + + Warning: do not make changes to existing values without reviewing + `securedrop-export/securedrop_export/export.py`. + """ + # On the way to success USB_CONNECTED = "USB_CONNECTED" DISK_ENCRYPTED = "USB_ENCRYPTED" + # (Success) Drive is compatible and unlocked; do not prompt for passphrase + USB_ENCRYPTED_UNLOCKED = "USB_ENCRYPTED_UNLOCKED" + # Not too far from success USB_NOT_CONNECTED = "USB_NOT_CONNECTED" BAD_PASSPHRASE = "USB_BAD_PASSPHRASE" @@ -37,6 +50,9 @@ class ExportStatus(Enum): PRINTER_NOT_FOUND = "ERROR_PRINTER_NOT_FOUND" MISSING_PRINTER_URI = "ERROR_MISSING_PRINTER_URI" + # Used internally + WARNING_QUBESOS_NOT_DETECTED = "WARNING_QUBESOS_NOT_DETECTED" + class Export(QObject): """ @@ -66,10 +82,15 @@ class Export(QObject): DISK_ENCRYPTION_KEY_NAME = "encryption_key" DISK_EXPORT_DIR = "export_data" + # Set of supported states for external disks that pass initial usb testing + # (meaning, they are attached and configured with a supported encryption and partition + # scheme, and may optionally be unlocked). + SUPPORTED_DISK_STATUSES = [ExportStatus.DISK_ENCRYPTED, ExportStatus.USB_ENCRYPTED_UNLOCKED] + # Set up signals for communication with the GUI thread begin_preflight_check = pyqtSignal() preflight_check_call_failure = pyqtSignal(object) - preflight_check_call_success = pyqtSignal() + preflight_check_call_success = pyqtSignal(str) begin_usb_export = pyqtSignal(list, str) export_usb_call_failure = pyqtSignal(object) export_usb_call_success = pyqtSignal() @@ -204,28 +225,35 @@ def _run_usb_test(self, archive_dir: str) -> None: if status != ExportStatus.USB_CONNECTED.value: raise ExportError(status) - def _run_disk_test(self, archive_dir: str) -> None: + def _run_disk_test(self, archive_dir: str) -> str: """ Run disk-test. Args: archive_dir (str): The path to the directory in which to create the archive. + Returns: + status (str): the status code resulting from the disk-test. + Raises: - ExportError: Raised if the usb-test does not return a DISK_ENCRYPTED status. + ExportError: Raised if the disk-test does not return a supported status + (Currently supported: DISK_ENCRYPTED, USB_ENCRYPTED_UNLOCKED). """ archive_path = self._create_archive(archive_dir, self.DISK_TEST_FN, self.DISK_TEST_METADATA) status = self._export_archive(archive_path) - if status != ExportStatus.DISK_ENCRYPTED.value: + if status not in (item.value for item in Export.SUPPORTED_DISK_STATUSES): raise ExportError(status) + return status def _run_disk_export(self, archive_dir: str, filepaths: List[str], passphrase: str) -> None: """ - Run disk-test. + Run disk-export. Args: archive_dir (str): The path to the directory in which to create the archive. + filepaths (List[str]): the list of files to add to the archive + passphrase (str): passphrase for the encrypted USB. Raises: ExportError: Raised if the usb-test does not return a DISK_ENCRYPTED status. @@ -256,6 +284,8 @@ def _run_print(self, archive_dir: str, filepaths: List[str]) -> None: def run_preflight_checks(self) -> None: """ Run preflight checks to verify that the usb device is connected and luks-encrypted. + Emits disk status (an ExportStatus enum, whose value describes the encryption type and + whether the drive is already unlocked). """ with TemporaryDirectory() as temp_dir: try: @@ -264,10 +294,14 @@ def run_preflight_checks(self) -> None: threading.current_thread().ident ) ) + + # Checks to see if USB is connected self._run_usb_test(temp_dir) - self._run_disk_test(temp_dir) + + # Checks whether disk state is supported + disk_status = self._run_disk_test(temp_dir) logger.debug("completed preflight checks: success") - self.preflight_check_call_success.emit() + self.preflight_check_call_success.emit(disk_status) except ExportError as e: logger.debug("completed preflight checks: failure") self.preflight_check_call_failure.emit(e) diff --git a/securedrop_client/gui/widgets.py b/securedrop_client/gui/widgets.py index 5b3cef90f..9b4d09164 100644 --- a/securedrop_client/gui/widgets.py +++ b/securedrop_client/gui/widgets.py @@ -2465,8 +2465,8 @@ def _print_file(self) -> None: self.controller.print_file(self.file_uuid) self.close() - @pyqtSlot() - def _on_preflight_success(self) -> None: + @pyqtSlot(str) + def _on_preflight_success(self, status: str = None) -> None: # If the continue button is disabled then this is the result of a background preflight check self.stop_animate_header() self.header_icon.update_image("printer.svg", svg_size=QSize(64, 64)) @@ -2696,20 +2696,33 @@ def _export_file(self, checked: bool = False) -> None: self.passphrase_field.setDisabled(True) self.controller.export_file_to_usb_drive(self.file_uuid, self.passphrase_field.text()) - @pyqtSlot() - def _on_preflight_success(self) -> None: - # If the continue button is disabled then this is the result of a background preflight check + @pyqtSlot(str) + def _on_preflight_success(self, status: str = None) -> None: self.stop_animate_header() self.header_icon.update_image("savetodisk.svg", QSize(64, 64)) self.header.setText(self.ready_header) + + # Check if drive is unlocked. If so, skip passowrd prompt screen. + is_drive_unlocked = status == ExportStatus.USB_ENCRYPTED_UNLOCKED.value + + # If the continue button is disabled then this is the result of a background preflight check if not self.continue_button.isEnabled(): self.continue_button.clicked.disconnect() - self.continue_button.clicked.connect(self._show_passphrase_request_message) + if is_drive_unlocked: + self.continue_button.clicked.connect(self._export_file) + else: + self.continue_button.clicked.connect(self._show_passphrase_request_message) self.continue_button.setEnabled(True) self.continue_button.setFocus() return - self._show_passphrase_request_message() + # If the continue button is already enabled, either export files or + # prompt for passphrase and export files, depending on disk status. + else: + if is_drive_unlocked: + self._export_file() + else: + self._show_passphrase_request_message() @pyqtSlot(object) def _on_preflight_failure(self, error: ExportError) -> None: diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 4e9098a5a..0bd5e3bae 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -60,7 +60,7 @@ SendReplyJobTimeoutError, ) from securedrop_client.crypto import GpgHelper -from securedrop_client.export import Export +from securedrop_client.export import Export, ExportStatus from securedrop_client.queue import ApiJobQueue from securedrop_client.sync import ApiSync from securedrop_client.utils import check_dir_permissions @@ -933,31 +933,39 @@ def run_printer_preflight_checks(self) -> None: """ Run preflight checks to make sure the Export VM is configured correctly. """ - logger.info("Running printer preflight check") if not self.qubes: - self.export.printer_preflight_success.emit() + logger.warning("QubesOS not detected, skipping printer preflight check") + self.export.printer_preflight_success.emit( + ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value + ) return + logger.info("Running printer preflight check") self.export.begin_printer_preflight.emit() def run_export_preflight_checks(self) -> None: """ Run preflight checks to make sure the Export VM is configured correctly. """ - logger.info("Running export preflight check") if not self.qubes: - self.export.preflight_check_call_success.emit() + logger.warning("QubesOS not detected; skipping export preflight check") + self.export.preflight_check_call_success.emit( + ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value + ) return + logger.info("Running export preflight check") self.export.begin_preflight_check.emit() - def export_file_to_usb_drive(self, file_uuid: str, passphrase: str) -> None: + def export_file_to_usb_drive(self, file_uuid: str, passphrase: str = "") -> None: """ Send the file specified by file_uuid to the Export VM with the user-provided passphrase for unlocking the attached transfer device. If the file is missing, update the db so that is_downloaded is set to False. + + If the drive is already unlocked, send an empty string as the passphrase. """ file = self.get_file(file_uuid) file_location = file.location(self.data_dir) @@ -967,7 +975,9 @@ def export_file_to_usb_drive(self, file_uuid: str, passphrase: str) -> None: return if not self.qubes: - self.export.export_usb_call_success.emit() + self.export.export_usb_call_success.emit( + ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value + ) return self.export.begin_usb_export.emit([file_location], passphrase) diff --git a/tests/gui/test_widgets.py b/tests/gui/test_widgets.py index 3579dec2e..c1d0c5d09 100644 --- a/tests/gui/test_widgets.py +++ b/tests/gui/test_widgets.py @@ -3805,6 +3805,32 @@ def test_ExportDialog__on_preflight_success_when_continue_enabled(mocker, export export_dialog._show_passphrase_request_message.assert_called_once_with() +def test_ExportDialog__on_preflight_success_drive_unlocked(mocker, export_dialog): + export_dialog._show_passphrase_request_message = mocker.MagicMock() + export_dialog.continue_button = mocker.MagicMock() + mocker.patch.object(export_dialog.continue_button, "isEnabled", return_value=False) + export_dialog.continue_button.clicked = mocker.MagicMock() + + # Call with the signal that is emitted when a drive is already unlocked + export_dialog._on_preflight_success(status=ExportStatus.USB_ENCRYPTED_UNLOCKED.value) + + export_dialog.continue_button.clicked.connect.assert_called_once_with( + export_dialog._export_file + ) + + +def test_ExportDialog__on_preflight_success_drive_unlocked_continue_enabled(mocker, export_dialog): + export_dialog._show_passphrase_request_message = mocker.MagicMock() + export_dialog.continue_button.setEnabled(True) + mock_export_file = mocker.patch.object(export_dialog, "_export_file") + + # Call with the signal that is emitted when a drive is already unlocked + export_dialog._on_preflight_success(status=ExportStatus.USB_ENCRYPTED_UNLOCKED.value) + + export_dialog._show_passphrase_request_message.assert_not_called() + mock_export_file.assert_called_once_with() + + def test_ExportDialog__on_preflight_success_enabled_after_preflight_success(mocker, export_dialog): assert not export_dialog.continue_button.isEnabled() export_dialog._on_preflight_success() diff --git a/tests/test_export.py b/tests/test_export.py index dd78df0d2..5725d87dd 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -4,7 +4,7 @@ import pytest -from securedrop_client.export import Export, ExportError +from securedrop_client.export import Export, ExportError, ExportStatus def test_run_printer_preflight(mocker): @@ -50,7 +50,7 @@ def test__run_printer_preflight(mocker): """ Ensure _export_archive and _create_archive are called with the expected parameters, _export_archive is called with the return value of _create_archive, and - _run_disk_test returns without error if 'USB_CONNECTED' is the return value of _export_archive. + _run_usb_test returns without error if 'USB_CONNECTED' is the return value of _export_archive. """ export = Export() export._create_archive = mocker.MagicMock(return_value="mock_archive_path") @@ -66,7 +66,7 @@ def test__run_printer_preflight(mocker): def test__run_printer_preflight_raises_ExportError_if_not_empty_string(mocker): """ - Ensure ExportError is raised if _run_disk_test returns anything other than 'USB_CONNECTED'. + Ensure ExportError is raised if _run_usb_test returns anything other than 'USB_CONNECTED'. """ export = Export() export._create_archive = mocker.MagicMock(return_value="mock_archive_path") @@ -199,7 +199,8 @@ def test_send_file_to_usb_device_error(mocker): export.export_completed.emit.assert_called_once_with(["path1", "path2"]) -def test_run_preflight_checks(mocker): +@pytest.mark.parametrize("supported_status", (i.value for i in Export.SUPPORTED_DISK_STATUSES)) +def test_run_preflight_checks(mocker, supported_status): """ Ensure TemporaryDirectory is used when creating and sending the archives during the preflight checks and that the success signal is emitted by Export. @@ -210,14 +211,14 @@ def test_run_preflight_checks(mocker): export = Export() export.preflight_check_call_success = mocker.MagicMock() export.preflight_check_call_success.emit = mocker.MagicMock() - _run_usb_export = mocker.patch.object(export, "_run_usb_test") - _run_disk_export = mocker.patch.object(export, "_run_disk_test") + _run_usb_test = mocker.patch.object(export, "_run_usb_test") + _run_disk_test = mocker.patch.object(export, "_run_disk_test", return_value=supported_status) export.run_preflight_checks() - _run_usb_export.assert_called_once_with("mock_temp_dir") - _run_disk_export.assert_called_once_with("mock_temp_dir") - export.preflight_check_call_success.emit.assert_called_once_with() + _run_usb_test.assert_called_once_with("mock_temp_dir") + _run_disk_test.assert_called_once_with("mock_temp_dir") + export.preflight_check_call_success.emit.assert_called_once_with(supported_status) def test_run_preflight_checks_error(mocker): @@ -232,13 +233,13 @@ def test_run_preflight_checks_error(mocker): export.preflight_check_call_failure = mocker.MagicMock() export.preflight_check_call_failure.emit = mocker.MagicMock() error = ExportError("bang!") - _run_usb_export = mocker.patch.object(export, "_run_usb_test") - _run_disk_export = mocker.patch.object(export, "_run_disk_test", side_effect=error) + _run_usb_test = mocker.patch.object(export, "_run_usb_test") + _run_disk_test = mocker.patch.object(export, "_run_disk_test", side_effect=error) export.run_preflight_checks() - _run_usb_export.assert_called_once_with("mock_temp_dir") - _run_disk_export.assert_called_once_with("mock_temp_dir") + _run_usb_test.assert_called_once_with("mock_temp_dir") + _run_disk_test.assert_called_once_with("mock_temp_dir") export.preflight_check_call_failure.emit.assert_called_once_with(error) @@ -263,27 +264,30 @@ def test__run_disk_export(mocker): ) -def test__run_disk_export_raises_ExportError_if_not_empty_string(mocker): +def test__run_disk_export_raises_ExportError_if_not_supported_status(mocker): """ - Ensure ExportError is raised if _run_disk_test returns anything other than ''. + Ensure ExportError is raised if _run_disk_test returns anything other than a + supported status. """ export = Export() export._create_archive = mocker.MagicMock(return_value="mock_archive_path") - export._export_archive = mocker.MagicMock(return_value="SOMETHING_OTHER_THAN_EMPTY_STRING") + export._export_archive = mocker.MagicMock(return_value="DEFINITELY_NOT_SUPPORTED_DISK_STATUS") with pytest.raises(ExportError): export._run_disk_export("mock_archive_dir", ["mock_filepath"], "mock_passphrase") -def test__run_disk_test(mocker): +# We have to sort the list or pytest complains +@pytest.mark.parametrize("supported_status", (i.value for i in Export.SUPPORTED_DISK_STATUSES)) +def test__run_disk_test(mocker, supported_status): """ Ensure _export_archive and _create_archive are called with the expected parameters, _export_archive is called with the return value of _create_archive, and - _run_disk_test returns without error if 'USB_ENCRYPTED' is the ouput status of _export_archive. + _run_disk_test returns without error if a supported status is the ouput status of _export_archive. """ export = Export() export._create_archive = mocker.MagicMock(return_value="mock_archive_path") - export._export_archive = mocker.MagicMock(return_value="USB_ENCRYPTED") + export._export_archive = mocker.MagicMock(return_value=supported_status) export._run_disk_test("mock_archive_dir") @@ -293,13 +297,13 @@ def test__run_disk_test(mocker): ) -def test__run_disk_test_raises_ExportError_if_not_USB_ENCRYPTED(mocker): +def test__run_disk_test_raises_ExportError_if_not_supported_status(mocker): """ - Ensure ExportError is raised if _run_disk_test returns anything other than 'USB_ENCRYPTED'. + Ensure ExportError is raised if _run_disk_test returns an unsupported status. """ export = Export() export._create_archive = mocker.MagicMock(return_value="mock_archive_path") - export._export_archive = mocker.MagicMock(return_value="SOMETHING_OTHER_THAN_USB_ENCRYPTED") + export._export_archive = mocker.MagicMock(return_value="DEFINITELY_NOT_SUPPORTED_DISK_STATUS") with pytest.raises(ExportError): export._run_disk_test("mock_archive_dir")