diff --git a/clinica/pipelines/engine.py b/clinica/pipelines/engine.py index 5583137ca..d055bd449 100644 --- a/clinica/pipelines/engine.py +++ b/clinica/pipelines/engine.py @@ -5,7 +5,7 @@ import abc from pathlib import Path -from typing import List, Optional, Tuple +from typing import Iterable, List, Optional, Tuple import click from nipype.interfaces.utility import IdentityInterface @@ -49,14 +49,15 @@ def func_wrapper(self, *args, **kwargs): def _detect_cross_sectional_and_longitudinal_subjects( - all_subs: List[str], bids_dir: Path + subjects: Iterable[str], bids_dir: Path ) -> Tuple[List[str], List[str]]: - """This function detects whether a subject has a longitudinal or cross sectional structure. - If it has a mixed structure, it will be considered cross sectionnal. + """Detect whether a subject's folder has a longitudinal or cross-sectional structure. + + If it has a mixed structure, it will be considered cross-sectionnal. Parameters ---------- - all_subs: list of str + subjects: Iterable of str List containing all the names of the subjects. bids_dir: Path @@ -64,10 +65,10 @@ def _detect_cross_sectional_and_longitudinal_subjects( Returns ------- - cross_subj: list of str + cross_sectional_subjects: list of str List of all the subject detected with a cross_sectional organisation. - long_subj: list of str + longitudinal_subjects: list of str List of all the subject detected with a longitudinal organisation. Examples @@ -77,24 +78,18 @@ def _detect_cross_sectional_and_longitudinal_subjects( │   └── anat └── sub-02 └── ses-M000 - >>> _detect_cross_sectional_and_longitudinal_subjects(["sub-01", "sub-02"], /Users/name.surname/BIDS) + >>> _detect_cross_sectional_and_longitudinal_subjects(["sub-01", "sub-02"], Path("/Users/name.surname/BIDS")) (["sub-01"], ["sub-02]) """ - from os import listdir - from os.path import isdir, join - - cross_subj = [] - long_subj = [] - for sub in all_subs: - folder_list = [ - f for f in listdir(join(bids_dir, sub)) if isdir(join(bids_dir, sub, f)) - ] - - if not all([fold.startswith("ses-") for fold in folder_list]): - cross_subj.append(sub) + cross_sectional_subjects = [] + longitudinal_subjects = [] + for subject in subjects: + folders = [f.name for f in (bids_dir / subject).iterdir() if f.is_dir()] + if not all([folder.startswith("ses-") for folder in folders]): + cross_sectional_subjects.append(subject) else: - long_subj.append(sub) - return cross_subj, long_subj + longitudinal_subjects.append(subject) + return cross_sectional_subjects, longitudinal_subjects SYMBOLS = { @@ -192,12 +187,12 @@ def _add_session_label(filename: str) -> str: Examples -------- - >>> _add_ses("sub-ADNI001_scans.tsv") + >>> _add_session_label("sub-ADNI001_scans.tsv") sub-ADNI001_ses-M000_scans.tsv No modification done if filename already has a session: - >>> _add_ses("sub-023a_ses-M012_T1w.nii.gz") + >>> _add_session_label("sub-023a_ses-M012_T1w.nii.gz") sub-023a_ses-M012_T1w.nii.gz Parameters @@ -211,13 +206,6 @@ def _add_session_label(filename: str) -> str: """ import re - # If filename contains ses-..., returns the original filename - # Regex explication: - # ^ start of string - # ([a-zA-Z0-9]*) matches any number of characters from a to z, - # A to Z, 0 to 9, and store it in group(1) - # (?!ses-[a-zA-Z0-9]) do not match if there is already a 'ses-' - # (.*) catches the rest of the string m = re.search(r"(^sub-[a-zA-Z0-9]*)_(?!ses-[a-zA-Z0-9])(.*)", filename) try: return m.group(1) + "_ses-M000_" + m.group(2) @@ -225,38 +213,37 @@ def _add_session_label(filename: str) -> str: return filename -def _copy2_add_session_label(src: Path, dst: Path) -> Path: - """Calls copy2 function from shutil, but modifies the filename of the - copied files if they match the regex template described in the - _add_session_label() function +def _copy_and_add_session_label(src: str, dst: str) -> str: + """Copy src to dst, but modifies the filename of the copied files if they + match the regex template described in the _add_session_label() function. Parameters ---------- - src : Path + src : str The path to the file that needs to be copied. - dst : Path + dst : str The original destination for the copied file. Returns ------- - Path : - copy2 with modified filename. + str : + Copy with modified filename. """ from shutil import copy2 - return copy2(src, dst / _add_session_label(src.name)) + filename = Path(src).name + + return copy2(src, str(Path(dst).parent / _add_session_label(filename))) def _convert_cross_sectional( bids_in: Path, bids_out: Path, - cross_subjects: List[str], - long_subjects: List[str], -) -> None: - """ - This function converts a cross-sectional-bids dataset into a - longitudinal clinica-compliant dataset + cross_subjects: Iterable[str], + long_subjects: Iterable[str], +): + """Convert a cross-sectional-bids dataset into a longitudinal clinica-compliant dataset. Parameters ---------- @@ -266,21 +253,30 @@ def _convert_cross_sectional( bids_out : Path The path to the converted longitudinal bids dataset. - cross_subjects : list of str - List of subjects in cross-sectional form (they need some adjustment). + cross_subjects : Iterable of str + Subjects in cross-sectional form (they need some adjustment). - long_subjects : list of str - List of subjects in longitudinal form (they just need to be copied). + long_subjects : Iterable of str + Subjects in longitudinal form (they just need to be copied). """ if not bids_out.exists(): bids_out.mkdir() + _copy_metadata_files(bids_in, bids_out, ("dataset_description.json", ".bidsignore")) _convert(bids_in, bids_out, cross_subjects, cross_sectional=True) _convert(bids_in, bids_out, long_subjects, cross_sectional=False) +def _copy_metadata_files(bids_in: Path, bids_out: Path, files_to_copy: Iterable[str]): + from shutil import copy2 + + for file in files_to_copy: + if (bids_in / file).is_file(): + copy2(bids_in / file, bids_out / file) + + def _convert( - bids_in: Path, bids_out: Path, subjects: List[str], cross_sectional: bool -) -> None: + bids_in: Path, bids_out: Path, subjects: Iterable[str], cross_sectional: bool +): for subject in subjects: output_folder = ( bids_out / subject / "ses-M000" if cross_sectional else bids_out / subject @@ -294,7 +290,7 @@ def _convert( copy_func(bids_in / subject / file_to_copy, output_folder) -def _copy_cross_sectional(file_to_copy: Path, output_folder: Path) -> None: +def _copy_cross_sectional(file_to_copy: Path, output_folder: Path): from shutil import copy2, copytree if not (output_folder / file_to_copy.name).exists(): @@ -302,7 +298,7 @@ def _copy_cross_sectional(file_to_copy: Path, output_folder: Path) -> None: copytree( file_to_copy, output_folder / file_to_copy.name, - copy_function=_copy2_add_session_label, + copy_function=_copy_and_add_session_label, ) elif file_to_copy.is_file(): new_filename_wo_ses = _add_session_label(file_to_copy.name) @@ -319,13 +315,6 @@ def _copy_longitudinal(file_to_copy: Path, output_folder: Path) -> None: copy2(file_to_copy, output_folder) -def _check_cross_subj(cross_subj: list) -> None: - from clinica.utils.exceptions import ClinicaInconsistentDatasetError - - if len(cross_subj) > 0: - raise ClinicaInconsistentDatasetError(cross_subj) - - class Pipeline(Workflow): """Clinica Pipeline class. @@ -428,7 +417,6 @@ def __init__( from tempfile import mkdtemp from clinica.utils.inputs import check_bids_folder, check_caps_folder - from clinica.utils.participant import get_subject_session_list self._is_built: bool = False self._overwrite_caps: bool = overwrite_caps @@ -445,7 +433,10 @@ def __init__( / "info.json" ) self._info: dict = {} - + self._subjects: Optional[List[str]] = None + self._sessions: Optional[List[str]] = None + self._input_node = None + self._output_node = None if base_dir: self.base_dir = Path(base_dir).absolute() self._base_dir_was_specified = True @@ -462,29 +453,35 @@ def __init__( f"The {self._name} pipeline does not contain " "BIDS nor CAPS directory at the initialization." ) - check_caps_folder(self._caps_directory) - input_dir = self._caps_directory - is_bids_dir = False + self.is_bids_dir = False else: check_bids_folder(self._bids_directory) - input_dir = self._bids_directory - is_bids_dir = True + self.is_bids_dir = True + self._compute_subjects_and_sessions() + self._init_nodes() + + def _compute_subjects_and_sessions(self): + from clinica.utils.participant import get_subject_session_list + self._subjects, self._sessions = get_subject_session_list( - input_dir, - subject_session_file=tsv_file, - is_bids_dir=is_bids_dir, + self.input_dir, + subject_session_file=self.tsv_file, + is_bids_dir=self.is_bids_dir, use_session_tsv=False, - tsv_dir=base_dir, + tsv_dir=self.base_dir, ) self._subjects, self._sessions = self.filter_qc() - self._input_node = None - self._output_node = None - self._init_nodes() def filter_qc(self) -> tuple[list[str], list[str]]: return self._subjects, self._sessions + @property + def input_dir(self) -> Path: + if self.is_bids_dir: + return self._bids_directory + return self._caps_directory + @property def base_dir_was_specified(self) -> bool: return self._base_dir_was_specified @@ -687,10 +684,9 @@ def run( from clinica.utils.stream import cprint from clinica.utils.ux import print_failed_images + self._handle_cross_sectional_dataset() if not self.is_built: self.build() - self._check_not_cross_sectional() - if not bypass_check: self._check_size() plugin_args = self._update_parallelize_info(plugin_args) @@ -907,51 +903,73 @@ def _update_parallelize_info(self, plugin_args: Optional[dict]) -> dict: return plugin_args - def _check_not_cross_sectional(self): - """ - This function checks if the dataset is longitudinal. + def _handle_cross_sectional_dataset(self): + """Check if the dataset is longitudinal or cross-sectional. - If it is cross-sectional, clinica proposes to convert it in a clinica compliant form. - - author: Arnaud Marcoux + If it is cross-sectional, propose to convert it to a longitudinal layout. """ - import sys - - from clinica.utils.exceptions import ClinicaInconsistentDatasetError - from clinica.utils.stream import cprint - if self.bids_directory is None: return - all_subs = [ + subjects = [ f.name for f in self.bids_directory.iterdir() if (self.bids_directory / f).is_dir() and f.name.startswith("sub-") ] - cross_subj, long_subj = _detect_cross_sectional_and_longitudinal_subjects( - all_subs, self.bids_directory + ( + cross_sectional_subjects, + longitudinal_subjects, + ) = _detect_cross_sectional_and_longitudinal_subjects( + subjects, self.bids_directory ) - try: - _check_cross_subj(cross_subj) - except ClinicaInconsistentDatasetError as e: - cprint(e, lvl="warning") - proposed_bids = ( - self.bids_directory / f"{self.bids_directory.name}_clinica_compliant" + if cross_sectional_subjects: + self._convert_to_longitudinal_if_user_agrees( + cross_sectional_subjects, longitudinal_subjects ) - if not click.confirm( - "Do you want to proceed with the conversion in another folder? " - "(Your original BIDS folder will not be modified " - f"and the folder {proposed_bids} will be created.)" - ): - click.echo("Clinica will now exit...") - sys.exit() - else: - cprint("Converting cross-sectional dataset into longitudinal...") - _convert_cross_sectional( - self.bids_directory, proposed_bids, cross_subj, long_subj - ) - cprint( - f"Conversion succeeded. Your clinica-compliant dataset is located here: {proposed_bids}" - ) + + def _convert_to_longitudinal_if_user_agrees( + self, + cross_sectional_subjects: List[str], + longitudinal_subjects: List[str], + ): + import sys + + from clinica.utils.stream import cprint + + cprint( + ( + f"The following subjects of the input dataset {self.bids_directory} seem to " + "have a cross-sectional layout which is not supported by Clinica:\n" + + "\n- ".join(cross_sectional_subjects) + ), + lvl="warning", + ) + proposed_bids = ( + self.bids_directory.parent / f"{self.bids_directory.name}_clinica_compliant" + ) + if not click.confirm( + "Do you want to proceed with the conversion in another folder? " + "(Your original BIDS folder will not be modified " + f"and the folder {proposed_bids} will be created.)" + ): + click.echo( + "Clinica does not support cross-sectional layout for BIDS dataset input. " + "To run the pipeline, please provide a dataset in longitudinal format or accept " + "the automatic conversion. Clinica will now exit." + ) + sys.exit() + cprint("Converting cross-sectional dataset into longitudinal...") + _convert_cross_sectional( + self.bids_directory, + proposed_bids, + cross_sectional_subjects, + longitudinal_subjects, + ) + cprint( + f"Conversion succeeded. Your clinica-compliant dataset is located here: {proposed_bids}. " + "The pipeline will run using this new dataset as input." + ) + self._bids_directory = proposed_bids + self._compute_subjects_and_sessions() @abc.abstractmethod def _build_core_nodes(self): diff --git a/test/unittests/pipelines/test_nipype_engine.py b/test/unittests/pipelines/test_nipype_engine.py index cc970da50..929d41f58 100644 --- a/test/unittests/pipelines/test_nipype_engine.py +++ b/test/unittests/pipelines/test_nipype_engine.py @@ -19,12 +19,50 @@ def test_detect_cross_sectional_and_longitudinal_subjects( _detect_cross_sectional_and_longitudinal_subjects, ) - bids_dir = tmp_path / Path("bids") - dirs = [ - bids_dir / Path(f"{subs[i]}") / Path(f"{directory_content[i]}") - for i in range(0, len(subs)) + bids_dir = tmp_path / "bids" + directories = [ + bids_dir / f"{subject}" / f"{directory_content[i]}" + for i, subject in enumerate(subs) ] - for dir in dirs: - if not dir.is_dir(): - os.makedirs(dir) + for directory in directories: + if not directory.is_dir(): + directory.mkdir(parents=True) + assert _detect_cross_sectional_and_longitudinal_subjects(subs, bids_dir) == expected + + +def _build_cross_sectional_bids(folder: Path): + folder.mkdir(exist_ok=True, parents=True) + for filename in ("dataset_description.json", ".bidsignore", "foo.txt"): + (folder / filename).touch() + for subject in ("sub-01", "sub-02"): + (folder / subject / "anat").mkdir(parents=True, exist_ok=True) + for extension in (".nii.gz", ".json"): + (folder / subject / "anat" / f"{subject}_T1w{extension}").touch() + + +def test_convert_cross_sectional(tmp_path): + from clinica.pipelines.engine import _convert_cross_sectional + + bids_in = tmp_path / "bids_in" + _build_cross_sectional_bids(bids_in) + bids_out = tmp_path / "bids_out" + bids_out.mkdir() + + _convert_cross_sectional(bids_in, bids_out, ("sub-01", "sub-02"), ()) + + assert set([f.name for f in bids_out.iterdir()]) == { + "dataset_description.json", + ".bidsignore", + "sub-01", + "sub-02", + } + for subject in ("sub-01", "sub-02"): + for extension in (".nii.gz", ".json"): + assert ( + bids_out + / subject + / "ses-M000" + / "anat" + / f"{subject}_ses-M000_T1w{extension}" + ).exists()