diff --git a/clinica/pipelines/engine.py b/clinica/pipelines/engine.py index 6c46b09d5..e4c8da1e9 100644 --- a/clinica/pipelines/engine.py +++ b/clinica/pipelines/engine.py @@ -467,7 +467,12 @@ def __init__( try: check_caps_folder(self._caps_directory) except ClinicaCAPSError as e: - desc = build_caps_dataset_description(self._name, self._caps_directory) + desc = build_caps_dataset_description( + self._caps_directory, + self._caps_directory, + self._name, + f"subjects/*/*/{self._name}", + ) raise ClinicaCAPSError( f"{e}\nYou might want to create a 'dataset_description.json' " f"file with the following content:\n{desc}" @@ -482,7 +487,12 @@ def __init__( or len([f for f in self._caps_directory.iterdir()]) == 0 ): self._caps_directory.mkdir(parents=True, exist_ok=True) - write_caps_dataset_description(self._name, self._caps_directory) + write_caps_dataset_description( + self._bids_directory, + self._caps_directory, + self._name, + f"subjects/*/*/{self._name}", + ) check_caps_folder(self._caps_directory) self._compute_subjects_and_sessions() self._init_nodes() diff --git a/clinica/utils/caps.py b/clinica/utils/caps.py index d24a87a15..4c4fcc3bd 100644 --- a/clinica/utils/caps.py +++ b/clinica/utils/caps.py @@ -1,15 +1,17 @@ +import datetime import json -from enum import Enum +import warnings from pathlib import Path -from typing import IO, Optional +from typing import IO, List, MutableSequence, Optional from attrs import define, fields -from cattr.gen import make_dict_unstructure_fn, override +from cattr.gen import make_dict_structure_fn, make_dict_unstructure_fn, override from cattr.preconf.json import make_converter from clinica.utils.bids import BIDS_VERSION -from clinica.utils.exceptions import ClinicaCAPSError +from clinica.utils.exceptions import ClinicaBIDSError, ClinicaCAPSError from clinica.utils.inputs import DatasetType +from clinica.utils.stream import log_and_raise __all__ = [ "CAPS_VERSION", @@ -22,6 +24,130 @@ CAPS_VERSION = "1.0.0" +def _get_username() -> str: + import os + import pwd + + return pwd.getpwuid(os.getuid()).pw_name + + +def _get_machine_name() -> str: + import platform + + return platform.node() + + +def _get_current_timestamp() -> datetime.datetime: + return datetime.datetime.now() + + +def _generate_random_name() -> str: + import uuid + + return str(uuid.uuid4()) + + +def _get_bids_version(dataset_folder: Path): + """Returns the BIDS version number of a BIDS or CAPS dataset.""" + try: + with open(dataset_folder / "dataset_description.json", "r") as fp: + bids_metadata = json.load(fp) + return bids_metadata["BIDSVersion"] + except FileNotFoundError: + log_and_raise( + ( + f"File {dataset_folder / 'dataset_description.json'} is missing " + "while it is mandatory for a BIDS/CAPS dataset." + ), + ClinicaBIDSError, + ) + except KeyError: + log_and_raise( + ( + f"File {dataset_folder / 'dataset_description.json'} is missing a " + "'BIDSVersion' key while it is mandatory." + ), + ClinicaBIDSError, + ) + except json.JSONDecodeError as e: + log_and_raise( + f"File {dataset_folder / 'dataset_description.json'} is not formatted correctly:\n{e}.", + ClinicaBIDSError, + ) + + +@define +class CAPSProcessingDescription: + """This class models a CAPS processing pipeline metadata. + + Attributes + ---------- + name : str + The name of the processing pipeline. + Example: 't1-linear'. + + date : datetime + The date at which the processing pipeline has been run. + More precisely, this is the date at which the dataset_description.json + file is written to disk, which precedes the date at which the pipeline + finishes processing. + + author : str + This is the name of the user who ran this processing pipeline. + + machine : str + This is the name of the machine of which the processing pipeline was run. + + processing_path : str + This is the path to the processing folder(s) relative to the root of the + CAPS dataset. + + input_path : str + This is the path to the input dataset. + """ + + name: str + date: datetime.datetime + author: str + machine: str + processing_path: str + input_path: str + + @classmethod + def from_values(cls, name: str, processing_path: str, input_path: str): + return cls( + name, + _get_current_timestamp(), + _get_username(), + _get_machine_name(), + processing_path, + input_path, + ) + + @classmethod + def from_dict(cls, values: dict): + return cls( + values["Name"], + values["Date"], + values["Author"], + values["Machine"], + values["ProcessingPath"], + values["InputPath"], + ) + + def write(self, to: IO[str]): + json.dump(converter.unstructure(self), to, indent=4) + + def __str__(self): + return json.dumps(converter.unstructure(self)) + + @classmethod + def from_file(cls, json_file: Path): + with open(json_file, "r") as fp: + content = json.load(fp) + return converter.structure(content, CAPSProcessingDescription) + + @define class CAPSDatasetDescription: """Model representing a CAPS dataset description. @@ -36,12 +162,19 @@ class CAPSDatasetDescription: caps_version : str The version number of the CAPS specifications used. + + dataset_type : DatasetType + The dataset type. + + processing : List of CAPSProcessingDescription + The list of processing pipelines that have been run. """ name: str bids_version: str = BIDS_VERSION caps_version: str = CAPS_VERSION dataset_type: DatasetType = DatasetType.DERIVATIVE + processing: MutableSequence[CAPSProcessingDescription] = [] def write(self, to: IO[str]): json.dump(converter.unstructure(self), to, indent=4) @@ -49,36 +182,78 @@ def write(self, to: IO[str]): def __str__(self): return json.dumps(converter.unstructure(self)) + def has_processing(self, processing_name: str) -> bool: + return any( + [processing.name == processing_name for processing in self.processing] + ) + + def get_processing( + self, processing_name: str + ) -> Optional[CAPSProcessingDescription]: + for processing in self.processing: + if processing.name == processing_name: + return processing + return None + + def delete_processing(self, processing_name: str): + for processing in self.processing: + if processing.name == processing_name: + self.processing.remove(processing) + + def add_processing( + self, + processing_name: str, + processing_output_path: str, + processing_input_path: str, + ): + new_processing = CAPSProcessingDescription.from_values( + processing_name, processing_output_path, processing_input_path + ) + if (existing_processing := self.get_processing(processing_name)) is not None: + warnings.warn( + f"The CAPS dataset '{self.name}' already has a processing named {processing_name}:\n" + f"{existing_processing}\nIt will be overwritten with the following:\n{new_processing}" + ) + self.delete_processing(existing_processing.name) + self.processing.append(new_processing) + @classmethod def from_values( cls, - name: str, + name: Optional[str] = None, bids_version: Optional[str] = None, caps_version: Optional[str] = None, + processing: Optional[List[CAPSProcessingDescription]] = None, ): return cls( - name, + name or _generate_random_name(), bids_version or BIDS_VERSION, caps_version or CAPS_VERSION, DatasetType.DERIVATIVE, + processing or [], ) @classmethod def from_file(cls, json_file: Path): - parsed = json.loads(json_file.read_text()) - try: - return cls( - parsed["Name"], - parsed["BidsVersion"], - parsed["CAPSVersion"], - DatasetType(parsed["DatasetType"]), - ) - except KeyError: - raise ClinicaCAPSError( - f"CAPS dataset_description.json file {json_file} is not valid and " - "cannot be parsed as a CAPSDatasetDescription. " - "Please verify that the file is well formatted." - ) + with open(json_file, "r") as fp: + content = json.load(fp) + return converter.structure(content, CAPSDatasetDescription) + + @classmethod + def from_dict(cls, values: dict): + processing = [] + if "Processing" in values: + processing = [ + CAPSProcessingDescription.from_dict(processing) + for processing in values["Processing"] + ] + return cls( + values["Name"], + values["BIDSVersion"], + values["CAPSVersion"], + DatasetType(values["DatasetType"]), + processing, + ) def is_compatible_with(self, other) -> bool: if self.bids_version != other.bids_version: @@ -91,10 +266,10 @@ def is_compatible_with(self, other) -> bool: def _rename(name: str) -> str: """Rename attributes following the specification for the JSON file. - Basically pascal case with known acronyms such as BIDS fully capitalized. + Basically pascal case with known acronyms such as CAPS fully capitalized. """ return "".join( - word.upper() if word == "caps" else word.capitalize() + word.upper() if word in ("bids", "caps") else word.capitalize() for word in name.split("_") ) @@ -102,22 +277,63 @@ def _rename(name: str) -> str: # Register a JSON converter for the CAPS dataset description model. converter = make_converter() +# Unstructuring hooks first +converter.register_unstructure_hook(datetime.datetime, lambda dt: dt.isoformat()) +caps_processing_field_renaming = { + a.name: override(rename=_rename(a.name)) for a in fields(CAPSProcessingDescription) +} +caps_processing_field_renaming_unstructure_hook = make_dict_unstructure_fn( + CAPSProcessingDescription, + converter, + **caps_processing_field_renaming, +) +converter.register_unstructure_hook( + CAPSProcessingDescription, + caps_processing_field_renaming_unstructure_hook, +) +caps_dataset_description_field_renaming = { + a.name: override(rename=_rename(a.name)) for a in fields(CAPSDatasetDescription) +} +caps_dataset_field_renaming_unstructure_hook = make_dict_unstructure_fn( + CAPSDatasetDescription, + converter, + **caps_dataset_description_field_renaming, +) converter.register_unstructure_hook( CAPSDatasetDescription, - make_dict_unstructure_fn( - CAPSDatasetDescription, - converter, - **{ - a.name: override(rename=_rename(a.name)) - for a in fields(CAPSDatasetDescription) - }, - ), + caps_dataset_field_renaming_unstructure_hook, +) + +# And structuring hooks +converter.register_structure_hook( + datetime.datetime, lambda ts, _: datetime.datetime.fromisoformat(ts) +) +caps_processing_field_renaming_structure_hook = make_dict_structure_fn( + CAPSProcessingDescription, + converter, + **caps_processing_field_renaming, +) +converter.register_structure_hook( + CAPSProcessingDescription, + caps_processing_field_renaming_structure_hook, +) +caps_dataset_field_renaming_structure_hook = make_dict_structure_fn( + CAPSDatasetDescription, + converter, + **caps_dataset_description_field_renaming, +) +converter.register_structure_hook( + CAPSDatasetDescription, + caps_dataset_field_renaming_structure_hook, ) def write_caps_dataset_description( - name: str, - caps_dir: Path, + input_dir: Path, + output_dir: Path, + processing_name: str, + processing_output_path: str, + dataset_name: Optional[str] = None, bids_version: Optional[str] = None, caps_version: Optional[str] = None, ) -> None: @@ -125,11 +341,30 @@ def write_caps_dataset_description( Parameters ---------- - name : str - The name of the CAPS dataset. - - caps_dir : Path - The path to the CAPS dataset. + input_dir : Path + The path to the folder of the input dataset. + It can be a BIDS dataset or a CAPS dataset. + + output_dir : Path + The path to the folder of the output dataset. + This has to be a CAPS dataset, and this is where + the requested dataset_description.json file will be written. + + processing_name : str + The name of the processing performed. By default, pipelines of + Clinica will set this as the name of the pipeline, but any name + is possible. + + processing_output_path : str + The path to the subfolder(s) in which the results of the processing + will be stored, relative to the root of the CAPS dataset (defined as + output_dir). If there are multiple folders, use a regexp. + For example, for t1-linear: 'subjects/*/*/t1-linear'. + + dataset_name : str, optional + The name of the CAPS dataset. If not specified, a random identifier will + be generated. If a dataset_description.json file already exists, the + existing name will be kept. bids_version : str, optional The version of the BIDS specifications used. @@ -139,16 +374,25 @@ def write_caps_dataset_description( The version of the CAPS specifications used. By default, this will be set as the CAPS version currently supported by Clinica. """ - new_desc = build_caps_dataset_description( - name, caps_dir, bids_version=bids_version, caps_version=caps_version + description = build_caps_dataset_description( + input_dir, + output_dir, + processing_name, + processing_output_path, + dataset_name=dataset_name, + bids_version=bids_version, + caps_version=caps_version, ) - with open(caps_dir / "dataset_description.json", "w") as f: - new_desc.write(to=f) + with open(output_dir / "dataset_description.json", "w") as f: + description.write(to=f) def build_caps_dataset_description( - name: str, - caps_dir: Path, + input_dir: Path, + output_dir: Path, + processing_name: str, + processing_output_path: str, + dataset_name: Optional[str] = None, bids_version: Optional[str] = None, caps_version: Optional[str] = None, ) -> CAPSDatasetDescription: @@ -156,11 +400,30 @@ def build_caps_dataset_description( Parameters ---------- - name : str - The name of the CAPS dataset. - - caps_dir : Path - The path to the CAPS dataset. + input_dir : Path + The path to the folder of the input dataset. + It can be a BIDS dataset or a CAPS dataset. + + output_dir : Path + The path to the folder of the output dataset. + This has to be a CAPS dataset, and this is where + the requested dataset_description.json file will be written. + + processing_name : str + The name of the processing performed. By default, pipelines of + Clinica will set this as the name of the pipeline, but any name + is possible. + + processing_output_path : str + The path to the subfolder(s) in which the results of the processing + will be stored, relative to the root of the CAPS dataset (defined as + output_dir). If there are multiple folders, use a regexp. + For example, for t1-linear: 'subjects/*/*/t1-linear'. + + dataset_name : str, optional + The name of the CAPS dataset. If not specified, a random identifier will + be generated. If a dataset_description.json file already exists, the + existing name will be kept. bids_version : str, optional The version of the BIDS specifications used. @@ -177,21 +440,53 @@ def build_caps_dataset_description( """ from clinica.utils.stream import cprint, log_and_raise - new_desc = CAPSDatasetDescription.from_values(name, bids_version, caps_version) - if (caps_dir / "dataset_description.json").exists(): + bids_version_from_input_dir = None + try: + bids_version_from_input_dir = _get_bids_version(input_dir) + except ClinicaBIDSError: + warnings.warn( + f"Unable to retrieve the BIDS version from the input folder {input_dir}." + f"Please verify your input dataset. Clinica will assume a BIDS version of {BIDS_VERSION}." + ) + if ( + bids_version is not None + and bids_version_from_input_dir is not None + and bids_version != bids_version_from_input_dir + ): + log_and_raise( + f"The input dataset {input_dir} has BIDS specifications following " + f"version {bids_version_from_input_dir}, while the BIDS specifications version " + f"asked for the CAPS creation is {bids_version}. " + "Please make sure the versions are the same before processing.", + ClinicaBIDSError, + ) + new_desc = CAPSDatasetDescription.from_values( + dataset_name, bids_version_from_input_dir, caps_version + ) + if (output_dir / "dataset_description.json").exists(): cprint( - f"The CAPS dataset {name} already contains a dataset_description.json file.", + ( + f"The CAPS dataset '{dataset_name}', located at {output_dir}, already " + "contains a 'dataset_description.json' file." + ), lvl="info", ) previous_desc = CAPSDatasetDescription.from_file( - caps_dir / "dataset_description.json" + output_dir / "dataset_description.json" ) if not previous_desc.is_compatible_with(new_desc): msg = ( - f"Impossible to write the dataset_description.json file in {caps_dir} " + f"Impossible to write the 'dataset_description.json' file in {output_dir} " "because it already exists and it contains incompatible metadata." ) log_and_raise(msg, ClinicaCAPSError) if previous_desc.name != new_desc.name: - new_desc.name = f"{previous_desc.name} + {new_desc.name}" + warnings.warn( + f"The existing CAPS dataset, located at {output_dir} has a name '{previous_desc.name}' different " + f"from the new name '{new_desc.name}'. The old name will be kept." + ) + new_desc.name = previous_desc.name + for processing in previous_desc.processing: + new_desc.processing.append(processing) + new_desc.add_processing(processing_name, processing_output_path, str(input_dir)) return new_desc diff --git a/test/unittests/pipelines/t1_linear/test_anat_linear_pipeline.py b/test/unittests/pipelines/t1_linear/test_anat_linear_pipeline.py index ff3ca9947..e3344660c 100644 --- a/test/unittests/pipelines/t1_linear/test_anat_linear_pipeline.py +++ b/test/unittests/pipelines/t1_linear/test_anat_linear_pipeline.py @@ -62,7 +62,7 @@ def test_anat_linear_pipeline_write_caps_dataset_description(tmp_path): desc = CAPSDatasetDescription.from_file(caps / "dataset_description.json") - assert desc.name == "AnatLinear" assert desc.bids_version == "1.7.0" assert desc.caps_version == "1.0.0" assert desc.dataset_type == DatasetType.DERIVATIVE + assert desc.processing[0].name == "AnatLinear" diff --git a/test/unittests/utils/test_caps.py b/test/unittests/utils/test_caps.py index e0ee9b416..365eb26af 100644 --- a/test/unittests/utils/test_caps.py +++ b/test/unittests/utils/test_caps.py @@ -1,103 +1,353 @@ +import datetime import json +from pathlib import Path import pytest -def test_write_caps_dataset_description(tmp_path): - from clinica.utils.caps import write_caps_dataset_description +def mock_processing_metadata(mocker): + """Processing metadata is specific to the user and machine. - (tmp_path / "caps").mkdir() + This mock makes sure tests are reproducible by setting: + - date to be 2024-08-06T16:30:00 + - user name to be 'John Doe' + - machine name to be 'my machine' + """ + mocker.patch( + "clinica.utils.caps._get_current_timestamp", + return_value=datetime.datetime(2024, 8, 6, 16, 30, 0), + ) + mocker.patch("clinica.utils.caps._get_username", return_value="John Doe") + mocker.patch("clinica.utils.caps._get_machine_name", return_value="my machine") + return mocker - write_caps_dataset_description("foo", tmp_path / "caps") - files = [f for f in (tmp_path / "caps").iterdir()] - assert len(files) == 1 - assert json.loads(files[0].read_text()) == { +def test_caps_processing_description(tmp_path, mocker): + from clinica.utils.caps import CAPSProcessingDescription + + mocker = mock_processing_metadata(mocker) + desc = CAPSProcessingDescription.from_values( + "foo", str(tmp_path / "output"), str(tmp_path / "input") + ) + + assert desc.name == "foo" + assert desc.date == datetime.datetime(2024, 8, 6, 16, 30) + assert desc.author == "John Doe" + assert desc.machine == "my machine" + assert desc.processing_path == str(tmp_path / "output") + assert desc.input_path == str(tmp_path / "input") + assert json.loads(str(desc)) == { "Name": "foo", - "BidsVersion": "1.7.0", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": f"{tmp_path}/output", + "InputPath": f"{tmp_path}/input", + } + with open(tmp_path / "dataset_description.json", "w") as fp: + desc.write(fp) + desc2 = CAPSProcessingDescription.from_file(tmp_path / "dataset_description.json") + assert desc == desc2 + + +def test_caps_dataset_description(tmp_path, mocker): + from clinica.utils.caps import CAPSDatasetDescription + + mocker = mock_processing_metadata(mocker) + mocker.patch( + "clinica.utils.caps._generate_random_name", return_value="my caps dataset" + ) + + desc = CAPSDatasetDescription.from_values() + + assert desc.name == "my caps dataset" + assert desc.bids_version == "1.7.0" + assert desc.caps_version == "1.0.0" + assert desc.processing == [] + + desc.add_processing( + "processing-1", "subjects/*/*/processing-1", str(tmp_path / "bids") + ) + + assert len(desc.processing) == 1 + assert desc.has_processing("processing-1") + assert not desc.has_processing("processing-2") + proc = desc.get_processing("processing-1") + assert proc.name == "processing-1" + assert proc.date == datetime.datetime(2024, 8, 6, 16, 30, 0) + assert proc.author == "John Doe" + assert proc.machine == "my machine" + assert proc.processing_path == "subjects/*/*/processing-1" + assert proc.input_path == str(tmp_path / "bids") + assert json.loads(str(desc)) == { + "Name": "my caps dataset", + "BIDSVersion": "1.7.0", "CAPSVersion": "1.0.0", "DatasetType": "derivative", + "Processing": [ + { + "Name": "processing-1", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subjects/*/*/processing-1", + "InputPath": f"{tmp_path}/bids", + } + ], } + desc.add_processing( + "processing-2", "subjects/*/*/processing-2", str(tmp_path / "bids") + ) + assert len(desc.processing) == 2 + assert json.loads(str(desc)) == { + "Name": "my caps dataset", + "BIDSVersion": "1.7.0", + "CAPSVersion": "1.0.0", + "DatasetType": "derivative", + "Processing": [ + { + "Name": "processing-1", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subjects/*/*/processing-1", + "InputPath": f"{tmp_path}/bids", + }, + { + "Name": "processing-2", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subjects/*/*/processing-2", + "InputPath": f"{tmp_path}/bids", + }, + ], + } + desc.delete_processing("processing-1") + assert len(desc.processing) == 1 + desc.delete_processing("processing-2") + assert len(desc.processing) == 0 -def test_write_caps_dataset_description_specify_bids_and_caps_versions(tmp_path): +def initialize_input_dir(folder: Path): + desc = {"Name": "Input dataset", "BIDSVersion": "1.7.0"} + folder.mkdir(exist_ok=True, parents=True) + with open(folder / "dataset_description.json", "w") as fp: + json.dump(desc, fp) + + +def test_write_caps_dataset_description(tmp_path, mocker): from clinica.utils.caps import write_caps_dataset_description + mocker = mock_processing_metadata(mocker) (tmp_path / "caps").mkdir() + initialize_input_dir(tmp_path / "bids") write_caps_dataset_description( - "foo", tmp_path / "caps", bids_version="foobar", caps_version="2.0.0" + tmp_path / "bids", + tmp_path / "caps", + "foo", + "subject/*/*/foo", + "my CAPS dataset", ) files = [f for f in (tmp_path / "caps").iterdir()] assert len(files) == 1 assert json.loads(files[0].read_text()) == { - "Name": "foo", - "BidsVersion": "foobar", - "CAPSVersion": "2.0.0", + "Name": "my CAPS dataset", + "BIDSVersion": "1.7.0", + "CAPSVersion": "1.0.0", "DatasetType": "derivative", + "Processing": [ + { + "Name": "foo", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subject/*/*/foo", + "InputPath": f"{tmp_path}/bids", + } + ], } -def test_read_caps_dataset_description(tmp_path): +def test_write_caps_dataset_description_specify_bids_and_caps_versions(tmp_path): + from clinica.utils.caps import write_caps_dataset_description + from clinica.utils.exceptions import ClinicaBIDSError + + (tmp_path / "caps").mkdir() + initialize_input_dir(tmp_path / "bids") + + with pytest.raises( + ClinicaBIDSError, + match=( + f"The input dataset {tmp_path}/bids has BIDS specifications following version 1.7.0, " + "while the BIDS specifications version asked for the CAPS creation is 1.18.23. " + "Please make sure the versions are the same before processing." + ), + ): + write_caps_dataset_description( + tmp_path / "bids", + tmp_path / "caps", + "foo", + "subjects/*/*/foo", + bids_version="1.18.23", + caps_version="2.0.0", + ) + + +def test_read_caps_dataset_description(tmp_path, mocker): from clinica.utils.caps import ( CAPSDatasetDescription, DatasetType, write_caps_dataset_description, ) + mocker = mock_processing_metadata(mocker) + initialize_input_dir(tmp_path / "bids") caps_dir = tmp_path / "caps" caps_dir.mkdir() write_caps_dataset_description( - "foo", caps_dir, bids_version="1.7.0", caps_version="1.0.0" + tmp_path / "bids", + caps_dir, + "foo", + "subject/*/*/foo", + "my CAPS dataset", ) + desc = CAPSDatasetDescription.from_file(caps_dir / "dataset_description.json") - assert desc.name == "foo" + assert desc.name == "my CAPS dataset" assert desc.bids_version == "1.7.0" assert desc.caps_version == "1.0.0" assert desc.dataset_type == DatasetType.DERIVATIVE + assert len(desc.processing) == 1 + proc = desc.get_processing("foo") + assert proc.name == "foo" + assert proc.author == "John Doe" + assert proc.date == datetime.datetime(2024, 8, 6, 16, 30) + assert proc.machine == "my machine" -def test_write_caps_dataset_description_error(tmp_path): - from clinica.utils.caps import ( - CAPSDatasetDescription, - DatasetType, - write_caps_dataset_description, - ) - from clinica.utils.exceptions import ClinicaCAPSError +def test_write_caps_dataset_description_renaming_gives_warning(tmp_path): + from clinica.utils.caps import write_caps_dataset_description caps_dir = tmp_path / "caps" caps_dir.mkdir() + initialize_input_dir(tmp_path / "bids") write_caps_dataset_description( - "foo", caps_dir, bids_version="1.7.0", caps_version="1.0.0" + tmp_path / "bids", + tmp_path / "caps", + "foo", + "subject/*/*/foo", + "my CAPS dataset", ) - # Re-writing the same description works - write_caps_dataset_description( - "foo", caps_dir, bids_version="1.7.0", caps_version="1.0.0" + with pytest.warns(UserWarning) as records: + write_caps_dataset_description( + tmp_path / "bids", + tmp_path / "caps", + "foo", + "subject/*/*/foo", + "my CAPS dataset 2", + ) + assert len(records) == 2 + assert records[0].message.args[0] == ( + f"The existing CAPS dataset, located at {tmp_path}/caps has a name 'my CAPS dataset' " + "different from the new name 'my CAPS dataset 2'. The old name will be kept." ) - # Re-writing the same description with a different name works - write_caps_dataset_description( - "bar", caps_dir, bids_version="1.7.0", caps_version="1.0.0" + assert ( + records[1] + .message.args[0] + .startswith( + "The CAPS dataset 'my CAPS dataset' already has a processing named foo" + ) ) - desc = CAPSDatasetDescription.from_file(caps_dir / "dataset_description.json") - assert desc.name == "foo + bar" - assert desc.bids_version == "1.7.0" - assert desc.caps_version == "1.0.0" - assert desc.dataset_type == DatasetType.DERIVATIVE +def test_write_caps_dataset_description_version_mismatch_error(tmp_path): + from clinica.utils.caps import write_caps_dataset_description + from clinica.utils.exceptions import ClinicaCAPSError - # But re-writing a different description raises an error + caps_dir = tmp_path / "caps" + caps_dir.mkdir() + initialize_input_dir(tmp_path / "bids") + + # Write a first processing named 'foo', with a CAPS version of 1.0.1 + write_caps_dataset_description( + tmp_path / "bids", + tmp_path / "caps", + "foo", + "subject/*/*/foo", + "my CAPS dataset", + caps_version="1.0.1", + ) + # Now, write a second processing, named 'bar', but with a CAPS version of 1.0.2 with pytest.raises( ClinicaCAPSError, match=( - f"Impossible to write the dataset_description.json file in {caps_dir} " + f"Impossible to write the 'dataset_description.json' file in {tmp_path}/caps " "because it already exists and it contains incompatible metadata." ), ): write_caps_dataset_description( - "bar", caps_dir, bids_version="1.7.1", caps_version="1.0.0" + tmp_path / "bids", + tmp_path / "caps", + "bar", + "subject/*/*/bar", + "my CAPS dataset", + caps_version="1.0.2", ) + + +def test_write_caps_dataset_description_multiple_processing(tmp_path, mocker): + from clinica.utils.caps import write_caps_dataset_description + + mocker = mock_processing_metadata(mocker) + caps_dir = tmp_path / "caps" + caps_dir.mkdir() + initialize_input_dir(tmp_path / "bids") + + # Write a first processing named 'foo' + write_caps_dataset_description( + tmp_path / "bids", + caps_dir, + "foo", + "subject/*/*/foo", + "my CAPS dataset", + ) + # Write a second processing, named 'bar' + write_caps_dataset_description( + tmp_path / "bids", + caps_dir, + "bar", + "subject/*/*/bar", + "my CAPS dataset", + ) + files = [f for f in (tmp_path / "caps").iterdir()] + assert len(files) == 1 + assert json.loads(files[0].read_text()) == { + "Name": "my CAPS dataset", + "BIDSVersion": "1.7.0", + "CAPSVersion": "1.0.0", + "DatasetType": "derivative", + "Processing": [ + { + "Name": "foo", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subject/*/*/foo", + "InputPath": f"{tmp_path}/bids", + }, + { + "Name": "bar", + "Date": "2024-08-06T16:30:00", + "Author": "John Doe", + "Machine": "my machine", + "ProcessingPath": "subject/*/*/bar", + "InputPath": f"{tmp_path}/bids", + }, + ], + }