From 7a6a7caa75c9986b2a4b94398dd015e61cc29109 Mon Sep 17 00:00:00 2001 From: Gensollen Date: Thu, 25 Jul 2024 08:49:54 +0200 Subject: [PATCH] [REF TEST] Refactor data handling (#1045) * Add error handling to caller of center_nifti_origin * start refactoring * continue refactoring and add tests * add test * refactor _create_merge_file_from_bids (BROKEN...) * better names and more tests * improve tests and code * refactor further * remove duplicate and broken tests * refactor pipeline handling * port changes from PR 1052 * port changes from PR 1055 * port changes from PR 1062 * fixes * fix broken test * some fixes and enhancements * fix broken exception --- .../converters/adni_to_bids/adni_utils.py | 1 + .../iotools/utils/data_handling/__init__.py | 21 + .../iotools/utils/data_handling/_centering.py | 666 ++++++++++++++++++ clinica/iotools/utils/data_handling/_files.py | 117 +++ .../iotools/utils/data_handling/_merging.py | 355 ++++++++++ .../iotools/utils/data_handling/_missing.py | 427 +++++++++++ clinica/iotools/utils/pipeline_handling.py | 567 +++++++++------ clinica/utils/exceptions.py | 16 +- clinica/utils/testing_utils.py | 47 +- .../iotools/utils/test_data_handling.py | 464 +++++++++++- .../iotools/utils/test_pipeline_handling.py | 295 +++++--- 11 files changed, 2624 insertions(+), 352 deletions(-) create mode 100644 clinica/iotools/utils/data_handling/__init__.py create mode 100644 clinica/iotools/utils/data_handling/_centering.py create mode 100644 clinica/iotools/utils/data_handling/_files.py create mode 100644 clinica/iotools/utils/data_handling/_merging.py create mode 100644 clinica/iotools/utils/data_handling/_missing.py diff --git a/clinica/iotools/converters/adni_to_bids/adni_utils.py b/clinica/iotools/converters/adni_to_bids/adni_utils.py index e42894f36..10dc2c2e6 100644 --- a/clinica/iotools/converters/adni_to_bids/adni_utils.py +++ b/clinica/iotools/converters/adni_to_bids/adni_utils.py @@ -929,6 +929,7 @@ def _create_file( ), lvl="error", ) + raise ValueError(error_msg) else: shutil.copy(image_path, output_image) diff --git a/clinica/iotools/utils/data_handling/__init__.py b/clinica/iotools/utils/data_handling/__init__.py new file mode 100644 index 000000000..cd997b9eb --- /dev/null +++ b/clinica/iotools/utils/data_handling/__init__.py @@ -0,0 +1,21 @@ +from ._centering import ( + center_all_nifti, + center_nifti_origin, + check_relative_volume_location_in_world_coordinate_system, + check_volume_location_in_world_coordinate_system, +) +from ._files import create_subs_sess_list, write_list_of_files +from ._merging import create_merge_file +from ._missing import compute_missing_mods, compute_missing_processing + +__all__ = [ + "create_merge_file", + "center_nifti_origin", + "center_all_nifti", + "compute_missing_mods", + "compute_missing_processing", + "create_subs_sess_list", + "write_list_of_files", + "check_volume_location_in_world_coordinate_system", + "check_relative_volume_location_in_world_coordinate_system", +] diff --git a/clinica/iotools/utils/data_handling/_centering.py b/clinica/iotools/utils/data_handling/_centering.py new file mode 100644 index 000000000..f0cf07dcc --- /dev/null +++ b/clinica/iotools/utils/data_handling/_centering.py @@ -0,0 +1,666 @@ +from os import PathLike +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + +import nibabel as nib +import numpy as np +import pandas as pd + + +def center_nifti_origin( + input_image: PathLike, + output_image: PathLike, +) -> Tuple[PathLike, Optional[str]]: + """Put the origin of the coordinate system at the center of the image. + + Parameters + ---------- + input_image : PathLike + Path to the input image. + + output_image : PathLike + Path to the output image (where the result will be stored). + + Returns + ------- + PathLike : + The path of the output image created. + + str, optional : + The error message if the function failed. None if the function call was successful. + """ + from nibabel.filebasedimages import ImageFileError + + error_str = None + input_image = Path(input_image) + output_image = Path(output_image) + try: + img = nib.load(input_image) + except FileNotFoundError: + error_str = f"No such file {input_image}" + except ImageFileError: + error_str = f"File {input_image} could not be read" + except Exception as e: + error_str = f"File {input_image} could not be loaded with nibabel: {e}" + + if not error_str: + try: + canonical_img = nib.as_closest_canonical(img) + hd = canonical_img.header + qform = np.zeros((4, 4)) + for i in range(1, 4): + qform[i - 1, i - 1] = hd["pixdim"][i] + qform[i - 1, 3] = -1.0 * hd["pixdim"][i] * hd["dim"][i] / 2.0 + new_img = nib.Nifti1Image( + canonical_img.get_fdata(caching="unchanged"), affine=qform, header=hd + ) + # Without deleting already-existing file, nib.save causes a severe bug on Linux system + if output_image.is_file(): + output_image.unlink() + nib.save(new_img, output_image) + if not output_image.is_file(): + error_str = ( + f"NIfTI file created but Clinica could not save it to {output_image}. " + "Please check that the output folder has the correct permissions." + ) + except Exception as e: + error_str = f"File {input_image} could not be processed with nibabel: {e}" + + return output_image, error_str + + +def check_volume_location_in_world_coordinate_system( + nifti_list: List[PathLike], + bids_dir: PathLike, + modality: str = "t1w", + skip_question: bool = False, +) -> bool: + """Check if images are centered around the origin of the world coordinate. + + Parameters + ---------- + nifti_list : list of PathLike + List of path to nifti files. + + bids_dir : PathLike + Path to bids directory associated with this check. + + modality : str, optional + The modality of the image. Default='t1w'. + + skip_question : bool, optional + If True, assume answer is yes. Default=False. + + Returns + ------- + bool : + True if they are centered, False otherwise + + Warns + ------ + If volume is not centered on origin of the world coordinate system + + Notes + ----- + The NIfTI file list provided in argument are approximately centered around the origin of the world coordinates. + Otherwise, issues may arise with further processing such as SPM segmentation. + When not centered, we warn the user of the problem propose to exit clinica to run clinica iotools center-nifti + or to continue with the execution of the pipeline. + """ + import click + import numpy as np + + bids_dir = Path(bids_dir) + list_non_centered_files = [ + Path(file) for file in nifti_list if not _is_centered(Path(file)) + ] + if len(list_non_centered_files) == 0: + return True + centers = [ + _get_world_coordinate_of_center(file) for file in list_non_centered_files + ] + l2_norm = [np.linalg.norm(center, ord=2) for center in centers] + click.echo( + _build_warning_message( + list_non_centered_files, centers, l2_norm, bids_dir, modality + ) + ) + if not skip_question: + _ask_for_confirmation() + + return False + + +def _ask_for_confirmation() -> None: + import sys + + import click + + if not click.confirm("Do you still want to launch the pipeline?"): + click.echo("Clinica will now exit...") + sys.exit(0) + + +def _build_warning_message( + non_centered_files: List[Path], + centers: List[np.array], + l2_norm: List[np.ndarray], + bids_dir: Path, + modality: str, +) -> str: + warning_message = ( + f"It appears that {len(non_centered_files)} files " + "have a center way out of the origin of the world coordinate system. SPM has a high " + "probability to fail on these files (for co-registration or segmentation):\n\n" + ) + df = pd.DataFrame( + [ + (a, b, c) + for a, b, c in zip([f.name for f in non_centered_files], centers, l2_norm) + ], + columns=["File", "Coordinate of center", "Distance to origin"], + ) + warning_message += str(df) + cmd_line = f"`clinica iotools center-nifti {bids_dir.resolve()} {bids_dir.resolve()}_centered --modality {modality}`" + warning_message += ( + "\nIf you are trying to launch the t1-freesurfer pipeline, you can ignore this message " + "if you do not want to run the pet-surface pipeline afterward." + ) + warning_message += ( + "\nClinica provides a tool to counter this problem by replacing the center of the volume" + " at the origin of the world coordinates.\nUse the following command line to correct the " + f"header of the faulty NIFTI volumes in a new folder:\n{cmd_line}" + "You will find more information on the command by typing " + "clinica iotools center-nifti in the console." + ) + + return warning_message + + +def check_relative_volume_location_in_world_coordinate_system( + label_1: str, + nifti_list1: List[PathLike], + label_2: str, + nifti_list2: List[PathLike], + bids_dir: PathLike, + modality: str, + skip_question: bool = False, +): + """Check if the NIfTI file list `nifti_list1` and `nifti_list2` provided in argument are not too far apart, + otherwise coreg in SPM may fail. Norm between center of volumes of 2 files must be less than 80 mm. + + Parameters + ---------- + label_1 : str + Label of the first nifti_list1 files (used in potential warning message). + + nifti_list1 : list of PathLike + First list of files. + + label_2 : str + Label of the second nifti_list. + + nifti_list2 : list of PathLike + Second list of files, must be same length as `nifti_list1`. + + bids_dir : PathLike + BIDS directory (used in potential warning message). + + modality : str + String that must be used in argument of: + clinica iotools bids --modality + (used in potential warning message). + + skip_question : bool, optional + Disable prompts for user input (default is False) + """ + import warnings + + from clinica.utils.stream import cprint + + warning_message = _build_warning_message_relative_volume_location( + label_1, nifti_list1, label_2, nifti_list2, bids_dir, modality + ) + cprint(msg=warning_message, lvl="warning") + warnings.warn(warning_message) + if not skip_question: + _ask_for_confirmation() + + +def _build_warning_message_relative_volume_location( + label_1: str, + nifti_list1: List[PathLike], + label_2: str, + nifti_list2: List[PathLike], + bids_dir: PathLike, + modality: str, +) -> str: + bids_dir = Path(bids_dir) + file_couples = [(Path(f1), Path(f2)) for f1, f2 in zip(nifti_list1, nifti_list2)] + df = pd.DataFrame( + _get_problematic_pairs_with_l2_norm(file_couples), + columns=[label_1, label_2, "Relative distance"], + ) + if len(df) == 0: + return + warning_message = ( + f"It appears that {len(df)} pairs of files have an important relative offset. " + "SPM co-registration has a high probability to fail on these files:\n\n" + ) + warning_message += str(df) + warning_message += ( + "\nClinica provides a tool to counter this problem by replacing the center " + "of the volume at the origin of the world coordinates.\nUse the following " + "command line to correct the header of the faulty NIFTI volumes in a new folder:\n\n" + f"`clinica iotools center-nifti {bids_dir.resolve()} {bids_dir.resolve()}_centered --modality {modality}`\n\n" + "You will find more information on the command by typing `clinica iotools center-nifti` in the console." + ) + return warning_message + + +def _get_problematic_pairs_with_l2_norm( + file_couples: List[Tuple[Path, Path]], + threshold: float = 80.0, +) -> List[Tuple[str, str, float]]: + l2_norm = _compute_l2_norm(file_couples) + pairs_with_problems = [i for i, norm in enumerate(l2_norm) if norm > threshold] + + return [ + (file_couples[k][0].name, file_couples[k][1].name, l2_norm[k]) + for k in pairs_with_problems + ] + + +def _compute_l2_norm(file_couples: List[Tuple[Path, Path]]) -> List[float]: + center_coordinates = [ + (_get_world_coordinate_of_center(f[0]), _get_world_coordinate_of_center(f[1])) + for f in file_couples + ] + + return [np.linalg.norm(center[0] - center[1]) for center in center_coordinates] + + +def center_all_nifti( + bids_dir: PathLike, + output_dir: PathLike, + modalities: Optional[Iterable[str]] = None, + center_all_files: bool = False, +) -> List[Path]: + """Center all the NIfTI images of the input BIDS folder into the empty output_dir specified in argument. + + All the files from bids_dir are copied into output_dir, then all the NIfTI images found are replaced by their + centered version if their center is off the origin by more than 50 mm. + + Parameters + ---------- + bids_dir : PathLike + Path to the BIDS directory. + + output_dir : PathLike + Path to the output directory where the centered files will be written to. + + modalities : iterable of str, optional + Process these modalities only. Process all modalities otherwise. + + center_all_files: bool, default=False + Center files that may cause problem for SPM if set to False, all files otherwise. + + Returns + ------- + list of Path + Centered NIfTI files. + """ + from shutil import copy, copytree + + from clinica.utils.stream import cprint + + bids_dir, output_dir = _validate_bids_and_output_dir(bids_dir, output_dir) + for f in bids_dir.iterdir(): + if f.is_dir() and not (output_dir / f.name).is_dir(): + copytree(f, output_dir / f.name, copy_function=copy) + elif f.is_file() and not (output_dir / f.name).is_file(): + copy(f, output_dir / f.name) + nifti_files_filtered: List[Path] = [] + for f in output_dir.glob("**/*.nii*"): + if modalities and any(elem.lower() in f.name.lower() for elem in modalities): + nifti_files_filtered.append(f) + if not center_all_files: + nifti_files_filtered = [ + file for file in nifti_files_filtered if not _is_centered(file) + ] + + all_errors: List[str] = [] + for f in nifti_files_filtered: + cprint(msg=f"Handling file {f}", lvl="debug") + _, current_error = center_nifti_origin(f, f) + if current_error: + all_errors.append(current_error) + if len(all_errors) > 0: + raise RuntimeError( + f"Clinica encountered {len(all_errors)} error(s) while trying to center all NIfTI images.\n" + + "\n".join(all_errors) + ) + return nifti_files_filtered + + +def _validate_bids_and_output_dir( + bids_dir: PathLike, output_dir: PathLike +) -> Tuple[Path, Path]: + from clinica.utils.exceptions import ClinicaBIDSError + from clinica.utils.inputs import check_bids_folder + + bids_dir = Path(bids_dir) + output_dir = Path(output_dir) + if bids_dir == output_dir: + raise ClinicaBIDSError( + f"Input BIDS ({bids_dir}) and output ({output_dir}) directories must be different." + ) + check_bids_folder(bids_dir) + + return bids_dir, output_dir + + +def _is_centered(nii_volume: Path, threshold_l2: int = 50) -> bool: + """Checks if a NIfTI volume is centered on the origin of the world coordinate system. + + Parameters + --------- + nii_volume : Path + The path to the NIfTI volume to check. + + threshold_l2: int, optional + Maximum distance between the origin of the world coordinate system and the center of the volume to + be considered centered. The threshold where SPM segmentation stops working is around 100 mm + (it was determined empirically after several trials on a generated dataset), so default value + is 50mm in order to have a security margin, even when dealing with co-registered files afterward. + + Returns + ------- + bool : + True if the volume is centered, False otherwise. + + Notes + ------ + SPM has troubles to segment files if the center of the volume is not close from the origin of the world coordinate + system. A series of experiment have been conducted: we take a volume whose center is on the origin of the world + coordinate system. We add an offset using coordinates of affine matrix [0, 3], [1, 3], [2, 3] (or by modifying the + header['srow_x'][3], header['srow_y'][3], header['srow_z'][3], this is strictly equivalent). + It has been determined that volumes were still segmented with SPM when the L2 distance between origin and center of + the volume did not exceed 100 mm. Above this distance, either the volume is either not segmented (SPM error), or the + produced segmentation is wrong (not the shape of a brain anymore) + """ + center = _get_world_coordinate_of_center(nii_volume) + if center is None: + raise ValueError( + f"Unable to compute the world coordinates of center for image {nii_volume}." + "Please verify the image data and header." + ) + distance_from_origin = np.linalg.norm(center, ord=2) + + return distance_from_origin < threshold_l2 + + +def _get_world_coordinate_of_center(nii_volume: Path) -> Optional[np.ndarray]: + """Extract the world coordinates of the center of the image. + + Parameters + --------- + nii_volume : PathLike + The path to the nii volume. + + Returns + ------- + np.ndarray : + The coordinates in the world space. + + References + ------ + https://brainder.org/2012/09/23/the-nifti-file-format/ + """ + from nibabel.filebasedimages import ImageFileError + + from clinica.utils.exceptions import ClinicaException + from clinica.utils.stream import cprint + + if not nii_volume.is_file(): + raise ClinicaException( + f"The input {nii_volume} does not appear to be a path to a file." + ) + try: + orig_nifti = nib.load(nii_volume) + except ImageFileError: + cprint( + msg=f"File {nii_volume} could not be read by nibabel. Is it a valid NIfTI file ?", + lvl="warning", + ) + return None + + header = orig_nifti.header + if isinstance(header, nib.freesurfer.mghformat.MGHHeader): + return _scale_with_affine_transformation_matrix( + header["dims"][0:3] / 2, header, freesurfer_image=True + ) + try: + center_coordinates = _get_center_volume(header) + if header["qform_code"] > 0: + return _scale_with_rotation_matrix(center_coordinates, header) + if header["sform_code"] > 0: + return _scale_with_affine_transformation_matrix(center_coordinates, header) + if header["sform_code"] == 0: + return _scale_coordinates_by_pixdim(center_coordinates, header) + except KeyError as e: + raise ValueError( + f"Cannot get the world coordinates of the center of image {nii_volume}. " + f"This is most likely due to missing data in the header : {e}." + ) + return None + + +def _get_center_volume(header: nib.Nifti1Header) -> np.ndarray: + """Get the voxel coordinates of the center of the data, using header information. + + Parameters + ---------- + header: Nifti1Header + Image header which contains image metadata. + + Returns + ------- + ndarray : + Voxel coordinates of the center of the volume + """ + return np.array([x / 2 for x in header["dim"][1:4]]) + + +def _scale_with_affine_transformation_matrix( + coordinates_vol: np.ndarray, + header: nib.Nifti1Header, + freesurfer_image: bool = False, +) -> np.ndarray: + """Convert coordinates to world space using the affine transformation matrix. + + Parameters + ---------- + coordinates_vol : ndarray + Coordinate in the volume (raw data). + + header : Nifti1Header + Image header containing metadata. + The header must have sform_code > 0. + + freesurfer_image : bool, optional + Whether the image for which the scaling is computed was obtained with + Freesurfer (MGHImage) or not. + Default=False. + + Returns + ------- + ndarray + Coordinates in the world space + + Notes + ----- + This method is used when sform_code is larger than zero. + It relies on a full affine matrix, stored in the header in the fields srow_[x,y,y], + to map voxel to world coordinates. When a nifti file is created with raw data and affine=..., + this is this method that is used to decipher the voxel-to-world correspondence. + """ + homogeneous_coord = np.concatenate( + (np.array(coordinates_vol), np.array([1])), axis=0 + ) + affine = ( + header.get_affine() + if freesurfer_image + else _get_affine_transformation_matrix(header) + ) + + return np.dot(affine, homogeneous_coord)[0:3] + + +def _get_affine_transformation_matrix(header: nib.Nifti1Header) -> np.ndarray: + """Get affine transformation matrix. + + Parameters + ---------- + header : Nifti1Header + The image header for which to compute the affine transformation matrix. + + Returns + ------- + ndarray : + The computed affine transformation matrix + + References + ---------- + https://brainder.org/2012/09/23/the-nifti-file-format/ + """ + matrix = np.zeros((4, 4)) + matrix[0, 0] = header["srow_x"][0] + matrix[0, 1] = header["srow_x"][1] + matrix[0, 2] = header["srow_x"][2] + matrix[0, 3] = header["srow_x"][3] + matrix[1, 0] = header["srow_y"][0] + matrix[1, 1] = header["srow_y"][1] + matrix[1, 2] = header["srow_y"][2] + matrix[1, 3] = header["srow_y"][3] + matrix[2, 0] = header["srow_z"][0] + matrix[2, 1] = header["srow_z"][1] + matrix[2, 2] = header["srow_z"][2] + matrix[2, 3] = header["srow_z"][3] + matrix[3, 3] = 1 + + return matrix + + +def _scale_with_rotation_matrix( + coordinates_vol: np.ndarray, + header: nib.Nifti1Header, +) -> np.ndarray: + """Convert coordinates to world space using the rotation matrix. + + Parameters + ---------- + coordinates_vol : ndarray + Coordinates in the volume (raw data). + + header : Nifti1Header + Image header containing metadata. + The header must have a qform_code > 0. + + Returns + ------- + ndarray : + Coordinates in the world space. + + Notes + ----- + This method is used when short qform_code is larger than zero. + To get the coordinates, we multiply a rotation matrix (r_mat) by coordinates_vol, + then perform Hadamard with pixel dimension pixdim (like in method 1). + Then we add an offset (qoffset_x, qoffset_y, qoffset_z) + """ + from clinica.utils.stream import cprint + + q = header["pixdim"][0] + offset = np.array([header[f"qoffset_{x}"] for x in ("x", "y", "z")]) + if q not in (-1, 1): + cprint( + f"Pixdim of provided header was {q} while either -1 or 1 was expected. Using 1.", + lvl="warning", + ) + q = 1 + return ( + np.dot( + _get_rotation_matrix(header), + np.array([coordinates_vol[0], coordinates_vol[1], q * coordinates_vol[2]]), + ) + * np.array(header["pixdim"][1:4]) + + offset + ) + + +def _get_rotation_matrix(header: nib.Nifti1Header) -> np.ndarray: + """Get the rotation matrix from the provided image header. + + More information here: https://brainder.org/2012/09/23/the-nifti-file-format/ + + Parameters + ---------- + header : Nifti1Header + The header + + Returns + ------- + np.ndarray : + The rotation matrix. + """ + b = header["quatern_b"] + c = header["quatern_c"] + d = header["quatern_d"] + a = np.sqrt(1 - (b**2) - (c**2) - (d**2)) + rotation = np.zeros((3, 3)) + rotation[0, 0] = (a**2) + (b**2) - (c**2) - (d**2) + rotation[0, 1] = 2 * ((b * c) - (a * d)) + rotation[0, 2] = 2 * ((b * d) + (a * c)) + rotation[1, 0] = 2 * ((b * c) + (a * d)) + rotation[1, 1] = (a**2) + (c**2) - (b**2) - (d**2) + rotation[1, 2] = 2 * ((c * d) - (a * b)) + rotation[2, 0] = 2 * ((b * d) - (a * c)) + rotation[2, 1] = 2 * ((b * d) - (a * c)) + rotation[2, 2] = (a**2) + (d**2) - (b**2) - (c**2) + + return rotation + + +def _scale_coordinates_by_pixdim( + coordinates_vol: np.ndarray, + header: nib.Nifti1Header, +) -> np.ndarray: + """Convert coordinates to world space by scaling with pixel dimension. + + Parameters + ---------- + coordinates_vol: ndarray + Coordinates in the volume (raw data). + + header: Nifti1Header + Contains image metadata + + Returns + ------- + ndarray + Coordinates in the world space + + Notes + ----- + This method is for compatibility with analyze and is not supposed to be used as the main orientation method. + But it is used if sform_code = 0. The world coordinates are determined simply by scaling by the voxel size + by their dimension stored in pixdim. + + References + ---------- + https://brainder.org/2012/09/23/the-nifti-file-format/ + """ + return np.array(coordinates_vol) * np.array( + [header["pixdim"][1], header["pixdim"][2], header["pixdim"][3]] + ) diff --git a/clinica/iotools/utils/data_handling/_files.py b/clinica/iotools/utils/data_handling/_files.py new file mode 100644 index 000000000..6aa3ff3f9 --- /dev/null +++ b/clinica/iotools/utils/data_handling/_files.py @@ -0,0 +1,117 @@ +from os import PathLike +from pathlib import Path +from typing import List, Optional + +import pandas as pd + + +def create_subs_sess_list( + input_dir: PathLike, + output_dir: PathLike, + file_name: Optional[str] = None, + is_bids_dir: bool = True, + use_session_tsv: bool = False, +): + """Create the file subject_session_list.tsv that contains the list of the + visits for each subject for a BIDS or CAPS compliant dataset. + + Parameters + ---------- + input_dir : PathLike + Path to the BIDS or CAPS directory. + + output_dir : PathLike + Path to the output directory. + + file_name : str, optional + The name of the output file. + + is_bids_dir : bool, optional + Specify if input_dir is a BIDS directory or not (i.e. a CAPS directory). + Default=True. + + use_session_tsv : bool + Specify if the list uses the sessions listed in the sessions.tsv files. + Default=False. + """ + input_dir = Path(input_dir) + output_dir = Path(output_dir) + output_dir.mkdir(exist_ok=True) + path_to_search = input_dir if is_bids_dir else input_dir / "subjects" + txt = _create_subs_sess_list_as_text(path_to_search, use_session_tsv) + file_name = file_name or "subjects_sessions_list.tsv" + (output_dir / file_name).write_text(txt) + + +def _create_subs_sess_list_as_text(path_to_search: Path, use_session_tsv: bool) -> str: + txt = "participant_id\tsession_id\n" + subjects_paths = [f for f in path_to_search.glob("*sub-*")] + subjects_paths.sort() + if len(subjects_paths) == 0: + raise IOError("Dataset empty or not BIDS/CAPS compliant.") + for subject_path in subjects_paths: + if use_session_tsv: + txt += _create_session_list_for_subject_from_tsv(subject_path) + else: + sessions = [f for f in subject_path.glob("*ses-*")] + sessions.sort() + for ses_path in sorted(sessions): + txt += f"{subject_path.name}\t{ses_path.name}\n" + return txt + + +def _create_session_list_for_subject_from_tsv(subject_path: Path) -> str: + if not (subject_path / f"{subject_path.name}_sessions.tsv").exists(): + raise ValueError( + f"In dataset located at {subject_path.parent}, there is no session " + f"TSV file for subject {subject_path.name}. Consider setting the " + "argument `use_session_tsv` to False in order to rely on folder " + "names parsing." + ) + session_df = pd.read_csv( + subject_path / f"{subject_path.name}_sessions.tsv", sep="\t" + ) + session_df.dropna(how="all", inplace=True) + session_list = sorted(list(session_df["session_id"].to_numpy())) + return ( + "\n".join([f"{subject_path.name}\t{session}" for session in session_list]) + + "\n" + ) + + +def write_list_of_files(file_list: List[PathLike], output_file: PathLike) -> Path: + """Save `file_list` list of files into `output_file` text file. + + Parameters + ---------- + file_list : list of PathLike objects + List of path to files to write. + + output_file : PathLike + Path to the output txt file. + + Returns + ------- + output_file : PathLike + The path to the output file. + + Raises + ------ + TypeError : + If something else than a list was provided for file_list. + IOError : + If output_file already exists. + """ + output_file = Path(output_file) + if not isinstance(file_list, list): + raise TypeError( + f"`file_list` argument must be a list of paths. Instead {type(file_list)} was provided." + ) + if output_file.is_file(): + raise IOError(f"Output file {output_file} already exists.") + + with open(output_file, "w") as fp: + for created_file in file_list: + fp.write(f"{created_file}\n") + + return output_file diff --git a/clinica/iotools/utils/data_handling/_merging.py b/clinica/iotools/utils/data_handling/_merging.py new file mode 100644 index 000000000..d25a7ead6 --- /dev/null +++ b/clinica/iotools/utils/data_handling/_merging.py @@ -0,0 +1,355 @@ +from os import PathLike +from pathlib import Path +from typing import List, Optional, Tuple + +import numpy as np +import pandas as pd + + +def create_merge_file( + bids_dir: PathLike, + out_tsv: PathLike, + caps_dir: Optional[PathLike] = None, + tsv_file: Optional[PathLike] = None, + pipelines: Optional[List[str]] = None, + ignore_scan_files: bool = False, + ignore_sessions_files: bool = False, + **kwargs, +): + """Merge all the TSV files containing clinical data of a BIDS compliant + dataset and store the result inside a TSV file. + + Parameters + ---------- + bids_dir : PathLike + Path to the BIDS folder. + + out_tsv : PathLike + Path to the output tsv file. + + caps_dir : PathLike, optional + Path to the CAPS folder. + + tsv_file : PathLike, optional + Path to a TSV file containing the subjects with their sessions. + + pipelines : list of str, optional + When adding CAPS information, indicates the pipelines that will be merged. + + ignore_scan_files : bool, optional + If True the information related to scans is not read. + + ignore_sessions_files : bool, optional + If True the information related to sessions and scans is not read. + """ + from os import path + + from clinica.utils.stream import cprint + + bids_dir = Path(bids_dir) + caps_dir = _validate_caps_dir(caps_dir) + out_path = _validate_output_tsv_path(out_tsv) + participants_df, sub_ses_df = _get_participants_and_subjects_sessions_df( + bids_dir, tsv_file, ignore_sessions_files + ) + merged_df = _create_merge_file_from_bids( + bids_dir, sub_ses_df, participants_df, ignore_scan_files, ignore_sessions_files + ) + merged_df.to_csv(out_path, sep="\t", index=False) + cprint("End of BIDS information merge.", lvl="debug") + merged_df.reset_index(drop=True, inplace=True) + if caps_dir is not None: + merged_df, merged_summary_df = _add_data_to_merge_file_from_caps( + caps_dir, merged_df, pipelines, **kwargs + ) + summary_path = path.splitext(str(out_path))[0] + "_summary.tsv" + merged_summary_df.to_csv(summary_path, sep="\t", index=False) + merged_df.to_csv(out_path, sep="\t", index=False) + cprint("End of CAPS information merge.", lvl="debug") + + +def _validate_caps_dir(caps_dir: Optional[PathLike] = None) -> Optional[Path]: + if caps_dir is not None: + caps_dir = Path(caps_dir) + if not caps_dir.is_dir(): + raise IOError("The path to the CAPS directory is wrong") + return caps_dir + + +def _validate_output_tsv_path(out_path: PathLike) -> Path: + """Validate that provided file path is a TSV file. + + If folders do not exist, this function will create them. + If provided path is a directory, this function will return + a file named 'merge.tsv' within this directory. + """ + import warnings + + from clinica.utils.stream import cprint + + out_path = Path(out_path).resolve() + if out_path.is_dir(): + out_path = out_path / "merge.tsv" + elif "." not in out_path.name: + out_path = out_path.with_suffix(".tsv") + elif out_path.suffix != ".tsv": + raise TypeError("Output path extension must be tsv.") + if out_path.exists(): + msg = ( + f"Path to TSV file {out_path} already exists. The file will be overwritten." + ) + warnings.warn(msg) + cprint(msg=msg, lvl="warning") + out_dir = out_path.parent + if not out_dir.exists(): + out_dir.mkdir(parents=True) + return out_path + + +def _get_participants_and_subjects_sessions_df( + bids_dir: Path, + tsv_file: Optional[PathLike] = None, + ignore_sessions_files: bool = False, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + from clinica.utils.participant import get_subject_session_list + from clinica.utils.stream import cprint + + index_cols = ["participant_id", "session_id"] + subjects, sessions = get_subject_session_list( + bids_dir, + subject_session_file=tsv_file, + use_session_tsv=(not ignore_sessions_files), + ) + if (bids_dir / "participants.tsv").is_file(): + participants_df = pd.read_csv(bids_dir / "participants.tsv", sep="\t") + else: + participants_df = pd.DataFrame( + sorted(list(set(subjects))), columns=["participant_id"] + ) + sub_ses_df = pd.DataFrame( + [[subject, session] for subject, session in zip(subjects, sessions)], + columns=index_cols, + ) + try: + sub_ses_df.set_index(index_cols, inplace=True, verify_integrity=True) + except ValueError: + cprint( + "Found duplicate subject/session pair. Keeping first occurrence.", + lvl="warning", + ) + sub_ses_df = sub_ses_df.drop_duplicates(subset=index_cols) + sub_ses_df.set_index(index_cols, inplace=True) + + return participants_df, sub_ses_df + + +def _create_merge_file_from_bids( + bids_dir: Path, + sub_ses_df: pd.DataFrame, + participants_df: pd.DataFrame, + ignore_scan_files: bool = False, + ignore_sessions_files: bool = False, +) -> pd.DataFrame: + """Create a merge pandas dataframe for a given BIDS dataset.""" + df = pd.DataFrame(columns=participants_df.columns.values) + for subject, subject_df in sub_ses_df.groupby(level=0): + row_subject_data_only = _get_row_data_from_participant_df( + participants_df, subject + ) + if ignore_sessions_files: + rows_session_data_only = ( + _compute_session_rows_for_subject_without_session_files(subject_df) + ) + else: + rows_session_data_only = ( + _compute_session_rows_for_subject_with_session_files( + bids_dir, subject, subject_df, ignore_scan_files + ) + ) + for row_session_data_only in rows_session_data_only: + full_row = pd.concat([row_subject_data_only, row_session_data_only], axis=1) + df = pd.concat([df, full_row], axis=0) + return _post_process_merge_file_from_bids(df) + + +def _get_row_data_from_participant_df( + participants_df: pd.DataFrame, subject: str +) -> pd.DataFrame: + """Return, for a given subject, part of a merged dataframe row. + This part of the full row only contains information found in the participants dataframe. + In order to make a full row entry, it will be concatenated with information from the sessions and scans. + """ + from clinica.utils.stream import cprint + + row_subject_data = participants_df[participants_df["participant_id"] == subject] + row_subject_data.reset_index(inplace=True, drop=True) + if len(row_subject_data) == 0: + cprint( + msg=f"Participant {subject} does not exist in participants.tsv", + lvl="warning", + ) + row_subject_data = pd.DataFrame([[subject]], columns=["participant_id"]) + + return row_subject_data + + +def _compute_session_rows_for_subject_without_session_files( + subject_df: pd.DataFrame, +) -> List[pd.DataFrame]: + """Compute all rows with session information for a given subject. + This will be concatenated with information from the participants TSV file + to build a full dataframe row with all relevant information. + This is a default implementation when sessions and scans are ignored. + """ + return [ + pd.DataFrame([[session]], columns=["session_id"]) + for _, session in subject_df.index.values + ] + + +def _compute_session_rows_for_subject_with_session_files( + bids_dir: Path, + subject: str, + subject_df: pd.DataFrame, + ignore_scan_files: bool, +) -> List[pd.DataFrame]: + """Compute all rows with session information for a given subject. + This will be concatenated with information from the participants TSV file + to build a full dataframe row with all relevant information. + This function will try to incorporate data from the sessions and scans + if available. + """ + from clinica.utils.exceptions import ClinicaDatasetError + + rows = [] + sub_path = bids_dir / subject + sessions_df = pd.read_csv(sub_path / f"{subject}_sessions.tsv", sep="\t") + for _, session in subject_df.index.values: + row_session_df = sessions_df[sessions_df.session_id == session] + row_session_df.reset_index(inplace=True, drop=True) + if len(row_session_df) == 0: + raise ClinicaDatasetError( + f"The following sessions are not properly formatted : {sessions_df.loc[0, 'session_id']} / {session}" + ) + scan_path = bids_dir / subject / session / f"{subject}_{session}_scans.tsv" + row_scans_df = _get_row_scan_df(scan_path, ignore_scan_files) + row_df = pd.concat([row_session_df, row_scans_df], axis=1) + rows.append(row_df.loc[:, ~row_df.columns.duplicated(keep="last")]) + + return rows + + +def _get_row_scan_df(scan_path: Path, ignore_scan_files: bool) -> pd.DataFrame: + """Return a dataframe for all scans associated with a given subject and session. + The scan data come from the scan TSV file. If it doesn't exist, an empty dataframe + is returned. + """ + row_scans_df = pd.DataFrame() + if ignore_scan_files or not scan_path.is_file(): + return row_scans_df + scans_dict = dict() + scans_df = pd.read_csv(scan_path, sep="\t") + for idx in scans_df.index.values: + filepath = scans_df.loc[idx, "filename"] + if not filepath.endswith(".nii.gz"): + continue + filename = Path(filepath).name.split(".")[0] + modality = "_".join(filename.split("_")[2::]) + for col in scans_df.columns.values: + if col != "filename": + value = scans_df.loc[idx, col] + new_col_name = f"{modality}_{col}" + scans_dict.update({new_col_name: value}) + json_path = scan_path.parent / f"{filepath.split('.')[0]}.json" + scans_dict = _add_metadata_from_json(json_path, scans_dict, modality) + scans_dict = {str(key): str(value) for key, value in scans_dict.items()} + row_scans_df = pd.DataFrame(scans_dict, index=[0]) + + return row_scans_df + + +def _add_metadata_from_json(json_path: Path, scans_dict: dict, modality: str) -> dict: + """Add metadata from the provided JSON sidecar file to the scan dictionary.""" + import json + import warnings + + if json_path.exists(): + try: + with open(json_path, "r") as f: + json_dict = json.load(f) + except json.JSONDecodeError as e: + warnings.warn( + f"Couldn't parse the JSON file {json_path}. Ignoring metadata.\nJson error is:\n{e}" + ) + json_dict = {} + for key, value in json_dict.items(): + new_col_name = f"{modality}_{key}" + scans_dict.update({new_col_name: value}) + return scans_dict + + +def _post_process_merge_file_from_bids(merged_df: pd.DataFrame) -> pd.DataFrame: + """Put participant_id and session_id first, and round numbers.""" + col_list = merged_df.columns.values.tolist() + col_list.insert(0, col_list.pop(col_list.index("participant_id"))) + col_list.insert(1, col_list.pop(col_list.index("session_id"))) + merged_df = merged_df[col_list] + tmp = merged_df.select_dtypes(include=[np.number]) + merged_df.loc[:, tmp.columns] = np.round(tmp, 6) + + return merged_df + + +def _add_data_to_merge_file_from_caps( + caps_dir: Path, + merged_df: pd.DataFrame, + pipelines: Optional[List[str]] = None, + **kwargs, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + from clinica.utils.stream import cprint + + from ..pipeline_handling import ( + PipelineNameForMetricExtraction, + pipeline_metric_extractor_factory, + ) + + merged_summary_df = pd.DataFrame() + if "group_selection" in kwargs and kwargs["group_selection"] is None: + kwargs.pop("group_selection") + pipelines = pipelines or PipelineNameForMetricExtraction + for pipeline in pipelines: + metric_extractor = pipeline_metric_extractor_factory(pipeline) + cprint(f"Extracting from CAPS pipeline output: {pipeline}...", lvl="info") + merged_df, summary_df = metric_extractor(caps_dir, merged_df, **kwargs) + if summary_df is not None and not summary_df.empty: + merged_summary_df = pd.concat([merged_summary_df, summary_df]) + if summary_df is None or summary_df.empty: + cprint(f"{pipeline} outputs were not found in the CAPS folder.", lvl="info") + + return _post_process_merged_df_from_caps(merged_df, merged_summary_df) + + +def _post_process_merged_df_from_caps( + merged_df: pd.DataFrame, + merged_summary_df: pd.DataFrame, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + if len(merged_summary_df) == 0: + raise FileNotFoundError( + "No outputs were found for any pipeline in the CAPS folder. " + "The output only contains BIDS information." + ) + columns = merged_df.columns.values.tolist() + merged_summary_df.reset_index(inplace=True, drop=True) + for idx in merged_summary_df.index: + first_column_name = merged_summary_df.loc[idx, "first_column_name"] + last_column_name = merged_summary_df.loc[idx, "last_column_name"] + merged_summary_df.loc[idx, "first_column_index"] = columns.index( + first_column_name + ) + merged_summary_df.loc[idx, "last_column_index"] = columns.index( + last_column_name + ) + tmp = merged_df.select_dtypes(include=[np.number]) + merged_df.loc[:, tmp.columns] = np.round(tmp, 12) + + return merged_df, merged_summary_df diff --git a/clinica/iotools/utils/data_handling/_missing.py b/clinica/iotools/utils/data_handling/_missing.py new file mode 100644 index 000000000..8131df8cd --- /dev/null +++ b/clinica/iotools/utils/data_handling/_missing.py @@ -0,0 +1,427 @@ +from dataclasses import dataclass +from os import PathLike +from pathlib import Path +from typing import List + +import pandas as pd + + +@dataclass +class SubjectSession: + participant_id: str + session_id: str + session_path: Path + + @property + def sub_ses_id(self) -> str: + return f"{self.participant_id}_{self.session_id}" + + +def compute_missing_mods( + bids_dir: PathLike, out_dir: PathLike, output_prefix: str = "" +) -> None: + """Compute the list of missing modalities for each subject in a BIDS compliant dataset. + + Parameters + ---------- + bids_dir : PathLike + Path to the BIDS directory. + + out_dir : PathLike + Path to the output folder. + + output_prefix : str, optional + String that replaces the default prefix ('missing_mods_') + in the name of all the created output files. + Default = "". + """ + import os + from glob import glob + from os import path + from pathlib import Path + + import pandas as pd + + from ...converter_utils import ( + MissingModsTracker, + write_longitudinal_analysis, + write_statistics, + ) + + out_dir = Path(out_dir) + bids_dir = Path(bids_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + # Find all the modalities and sessions available for the input dataset + mods_and_sess = _find_mods_and_sess(bids_dir) + sessions_found = mods_and_sess["sessions"] + mods_and_sess.pop("sessions") + mods_avail_dict = mods_and_sess + mods_avail = [j for i in mods_avail_dict.values() for j in i] + cols_dataframe = mods_avail[:] + cols_dataframe.insert(0, "participant_id") + mmt = MissingModsTracker(sessions_found, mods_avail) + + out_file_name = "missing_mods_" if output_prefix == "" else output_prefix + "_" + + missing_mods_df = pd.DataFrame(columns=cols_dataframe) + row_to_append_df = pd.DataFrame(columns=cols_dataframe) + subjects_paths_lists = glob(path.join(bids_dir, "*sub-*")) + subjects_paths_lists.sort() + + if len(subjects_paths_lists) == 0: + raise IOError("No subjects found or dataset not BIDS compliant.") + # Check the modalities available for each session + for ses in sessions_found: + for sub_path in subjects_paths_lists: + mods_avail_bids = [] + subj_id = sub_path.split(os.sep)[-1] + row_to_append_df["participant_id"] = pd.Series(subj_id) + ses_path_avail = glob(path.join(sub_path, ses)) + if len(ses_path_avail) == 0: + mmt.increase_missing_ses(ses) + for mod in mods_avail: + row_to_append_df[mod] = pd.Series("0") + else: + ses_path = ses_path_avail[0] + mods_paths_folders = glob(path.join(ses_path, "*/")) + + for p in mods_paths_folders: + p = p[:-1] + mods_avail_bids.append(p.split("/").pop()) + + # Check if a modality folder is available and if is empty + if "func" in mods_avail_bids: + # Extract all the task available + for m in mods_avail_dict["func"]: + tokens = m.split("_") + task_name = tokens[1] + task_avail_list = glob( + path.join(ses_path, "func", "*" + task_name + "*") + ) + + if len(task_avail_list) == 0: + row_to_append_df[m] = pd.Series("0") + else: + row_to_append_df[m] = pd.Series("1") + # If the folder is not available but the modality is + # in the list of the available one mark it as missing + else: + if "func" in mods_avail_dict: + for m in mods_avail_dict["func"]: + row_to_append_df[m] = pd.Series("0") + mmt.add_missing_mod(ses, m) + + if "dwi" in mods_avail_bids: + row_to_append_df["dwi"] = pd.Series("1") + else: + if "dwi" in mods_avail: + row_to_append_df["dwi"] = pd.Series("0") + mmt.add_missing_mod(ses, "dwi") + + if "anat" in mods_avail_bids: + for m in mods_avail_dict["anat"]: + anat_aval_list = glob(path.join(ses_path, "anat", "*.nii.gz")) + anat_aval_list = [ + elem for elem in anat_aval_list if m.lower() in elem.lower() + ] + if len(anat_aval_list) > 0: + row_to_append_df[m] = pd.Series("1") + else: + row_to_append_df[m] = pd.Series("0") + mmt.add_missing_mod(ses, m) + else: + if "anat" in mods_avail_dict: + for m in mods_avail_dict["anat"]: + row_to_append_df[m] = pd.Series("0") + mmt.add_missing_mod(ses, m) + + if "fmap" in mods_avail_bids: + row_to_append_df["fmap"] = pd.Series("1") + else: + if "fmap" in mods_avail: + row_to_append_df["fmap"] = pd.Series("0") + mmt.add_missing_mod(ses, "fmap") + if "pet" in mods_avail_bids: + # Extract all the task available + for m in mods_avail_dict["pet"]: + tokens = m.split("_") + pet_acq = tokens[1] + acq_avail_list = glob( + path.join(ses_path, "pet", "*" + pet_acq + "*") + ) + + if len(acq_avail_list) == 0: + row_to_append_df[m] = pd.Series("0") + else: + row_to_append_df[m] = pd.Series("1") + # If the folder is not available but the modality is + # in the list of the available one mark it as missing + else: + if "pet" in mods_avail_dict: + for m in mods_avail_dict["pet"]: + row_to_append_df[m] = pd.Series("0") + mmt.add_missing_mod(ses, m) + + missing_mods_df = pd.concat([missing_mods_df, row_to_append_df]) + row_to_append_df = pd.DataFrame(columns=cols_dataframe) + + missing_mods_df = missing_mods_df[cols_dataframe] + missing_mods_df.to_csv( + path.join(out_dir, out_file_name + ses + ".tsv"), + sep="\t", + index=False, + encoding="utf-8", + ) + missing_mods_df = pd.DataFrame(columns=cols_dataframe) + + write_statistics( + out_dir / (out_file_name + "summary.txt"), + len(subjects_paths_lists), + sessions_found, + mmt, + ) + write_longitudinal_analysis( + out_dir / "analysis.txt", bids_dir, out_dir, sessions_found, out_file_name + ) + + +def compute_missing_processing( + bids_dir: PathLike, caps_dir: PathLike, out_file: PathLike +): + """Compute the list of missing processing for each subject in a CAPS compliant dataset. + + Parameters + ---------- + bids_dir : PathLike + Path to the BIDS directory. + + caps_dir : PathLike + Path to the CAPS directory. + + out_file : PathLike + Path to the output file (filename included). + """ + bids_dir = Path(bids_dir) + caps_dir = Path(caps_dir) + output_df = pd.DataFrame() + groups = _get_groups(caps_dir) + tracers = _get_pet_tracers(bids_dir) + + for subject_path in (caps_dir / "subjects").glob("sub-*"): + participant_id = subject_path.parent.name + for session_path in subject_path.glob("ses-*"): + session_id = session_path.parent.name + subject = SubjectSession(participant_id, session_id, session_path) + row = _compute_missing_processing_single_row(subject, groups, tracers) + output_df = pd.concat([output_df, row]) + output_df.sort_values(["participant_id", "session_id"], inplace=True) + output_df.to_csv(out_file, sep="\t", index=False) + + +def _get_groups(caps_dir: Path) -> List[str]: + if (caps_dir / "groups").exists(): + return [f.name for f in (caps_dir / "groups").iterdir()] + return [] + + +def _get_pet_tracers(bids_dir: Path) -> List[str]: + """Retrieve pet tracers available.""" + from clinica.utils.stream import cprint + + modalities = _find_mods_and_sess(bids_dir) + modalities.pop("sessions") + tracers = [ + j.split("_")[1] + for modality in modalities.values() + for j in modality + if "pet" in j + ] + cprint(f"Available tracers: {tracers}", lvl="info") + + return tracers + + +def _compute_missing_processing_single_row( + subject: SubjectSession, + groups: List[str], + tracers: List[str], +) -> pd.DataFrame: + """Compute a single row of the missing processing dataframe.""" + row = pd.DataFrame( + [[subject.participant_id, subject.session_id]], + columns=["participant_id", "session_id"], + ) + row = _compute_missing_processing_t1_volume(row, groups, subject) + for pipeline in ("t1-linear", "flair_linear"): + row[0, pipeline] = "1" if (subject.session_path / pipeline).exists() else "0" + row[0, "t1-freesurfer"] = ( + "1" + if (subject.session_path / "t1" / "freesurfer_cross_sectional").exists() + else "0" + ) + row = _compute_missing_processing_pet_volume( + row, groups, tracers, subject.session_path + ) + for tracer in tracers: + row.loc[0, f"pet-surface_{tracer}"] = ( + "1" + if len( + [ + f + for f in (subject.session_path / "pet" / "surface").glob( + f"*{tracer}*" + ) + ] + ) + > 0 + else "0" + ) + for tracer in tracers: + row.loc[0, f"pet-linear_{tracer}"] = ( + "1" + if len( + [f for f in (subject.session_path / "pet_linear").glob(f"*{tracer}*")] + ) + > 0 + else "0" + ) + return row + + +def _compute_missing_processing_t1_volume( + df: pd.DataFrame, + groups: List[str], + subject: SubjectSession, +) -> pd.DataFrame: + if (subject.session_path / "t1" / "spm" / "segmentation").exists(): + df.loc[0, "t1-volume-segmentation"] = "1" + for group in groups: + group_id = group.split("-")[-1] + filename = f"{subject.sub_ses_id}_T1w_target-{group_id}_transformation-forward_deformation.nii.gz" + if ( + subject.session_path / "t1" / "spm" / "dartel" / group / filename + ).exists(): + df.loc[0, f"t1-volume-register-dartel_{group}"] = "1" + pattern = f"{subject.sub_ses_id}_T1w_segm-*_space-Ixi549Space_modulated-*_probability.nii.gz" + dartel2mni = [ + f + for f in ( + subject.session_path / "t1" / "spm" / "dartel" / group + ).glob(pattern) + ] + if len(dartel2mni) > 0: + df.loc[0, f"t1-volume-dartel2mni_{group}"] = "1" + if ( + subject.session_path + / "t1" + / "spm" + / "dartel" + / group + / "atlas_statistics" + ).exists(): + df.loc[0, f"t1-volume-parcellation_{group}"] = "1" + else: + df.loc[0, f"t1-volume-parcellation_{group}"] = "0" + else: + df.loc[0, f"t1-volume-dartel2mni_{group}"] = "0" + df.loc[0, f"t1-volume-parcellation_{group}"] = "0" + else: + df.loc[0, f"t1-volume-register-dartel_{group}"] = "0" + df.loc[0, f"t1-volume-dartel2mni_{group}"] = "0" + df.loc[0, f"t1-volume-parcellation_{group}"] = "0" + else: + df.loc[0, "t1-volume-segmentation"] = "0" + for group in groups: + df.loc[0, f"t1-volume-register-dartel_{group}"] = "0" + df.loc[0, f"t1-volume-dartel2mni_{group}"] = "0" + df.loc[0, f"t1-volume-parcellation_{group}"] = "0" + return df + + +def _compute_missing_processing_pet_volume( + df: pd.DataFrame, + groups: List[str], + tracers: List[str], + session_path: Path, +) -> pd.DataFrame: + for group in groups: + for tracer in tracers: + pet_paths_pvc = [ + f + for f in (session_path / "pet" / "preprocessing" / group).glob( + f"*{tracer}*" + ) + if "pvc" in f + ] + pet_paths_no_pvc = [ + f + for f in (session_path / "pet" / "preprocessing" / group).glob( + f"*{tracer}*" + ) + if "pvc" not in f + ] + df.loc[0, f"pet-volume_{tracer}_{group}_pvc-True"] = ( + "1" if len(pet_paths_pvc) > 0 else "0" + ) + df.loc[0, f"pet-volume_{tracer}_{group}_pvc-False"] = ( + "1" if len(pet_paths_no_pvc) > 0 else "0" + ) + return df + + +def _find_mods_and_sess(bids_dir: Path) -> dict: + """Find all the modalities and sessions available for a given BIDS dataset. + + Parameters + ---------- + bids_dir : Path + The path to the BIDS dataset. + + Returns + ------- + mods_dict : dict + A dictionary that stores the sessions and modalities found and has the following structure. + { + 'sessions': ['ses-M000', 'ses-M018'], + 'fmap': ['fmap'], + 'anat': ['flair', 't1w'], + 'func': ['func_task-rest'], + 'dwi': ['dwi'] + } + """ + from collections import defaultdict + + mods_dict = defaultdict(set) + + for sub_path in bids_dir.glob("*sub-*"): + for session in sub_path.glob("*ses-*"): + mods_dict["sessions"].add(session.name) + for modality in session.glob("*/"): + if modality.name == "func": + for func_path in modality.glob("*bold.nii.gz"): + func_task = func_path.name.split("_")[2] + mods_dict["func"].add(f"func_{func_task}") + if modality.name == "dwi": + mods_dict["dwi"].add("dwi") + if modality.name == "fmap": + mods_dict["fmap"].add("fmap") + if modality.name == "pet": + for pet_path in modality.glob("*pet.nii.gz"): + pet_name = pet_path.name.split(".")[0] + pet_acq = pet_name.split("_")[2] + mods_dict["pet"].add(f"pet_{pet_acq}") + if modality.name == "anat": + for anat_file in modality.glob("*"): + if ".nii.gz" in anat_file.name: + anat_name = anat_file.name.replace(".nii.gz", "") + anat_ext = "nii.gz" + else: + anat_name = anat_file.stem + anat_ext = anat_file.suffix.lstrip(".") + if anat_ext != "json": + file_parts = anat_name.split("_") + anat_type = file_parts[-1].lower() + mods_dict["anat"].add(anat_type) + + return mods_dict diff --git a/clinica/iotools/utils/pipeline_handling.py b/clinica/iotools/utils/pipeline_handling.py index b1da2bc87..3361fcbf2 100644 --- a/clinica/iotools/utils/pipeline_handling.py +++ b/clinica/iotools/utils/pipeline_handling.py @@ -1,144 +1,44 @@ """Methods to find information in the different pipelines of Clinica.""" -import functools -import os -from os import PathLike +from enum import Enum +from functools import partial, reduce from pathlib import Path -from typing import List, Optional, Tuple +from typing import Callable, Iterable, List, Optional, Tuple, Union import pandas as pd +__all__ = [ + "PipelineNameForMetricExtraction", + "pipeline_metric_extractor_factory", +] -def _get_atlas_name(atlas_path: Path, pipeline: str) -> str: - """Helper function for _extract_metrics_from_pipeline.""" - if pipeline == "dwi_dti": - splitter = "_dwi_space-" - elif pipeline in ("t1_freesurfer_longitudinal", "t1-freesurfer"): - splitter = "_parcellation-" - elif pipeline in ("t1-volume", "pet-volume"): - splitter = "_space-" - else: - raise ValueError(f"Not supported pipeline {pipeline}.") - if splitter in atlas_path.stem: - try: - return atlas_path.stem.split(splitter)[-1].split("_")[0] - except Exception: - pass - raise ValueError( - f"Unable to infer the atlas name from {atlas_path} for pipeline {pipeline}." - ) - - -def _get_mod_path(ses_path: Path, pipeline: str) -> Optional[Path]: - """Helper function for _extract_metrics_from_pipeline. - Returns the path to the modality of interest depending - on the pipeline considered. - """ - if pipeline == "dwi_dti": - return ses_path / "dwi" / "dti_based_processing" / "atlas_statistics" - if pipeline == "t1_freesurfer_longitudinal": - mod_path = ses_path / "t1" - long_ids = list(mod_path.glob("long*")) - if len(long_ids) == 0: - return None - return ( - mod_path - / long_ids[0].name - / "freesurfer_longitudinal" - / "regional_measures" - ) - if pipeline == "t1-freesurfer": - return ses_path / "t1" / "freesurfer_cross_sectional" / "regional_measures" - if pipeline == "t1-volume": - return ses_path / "t1" / "spm" / "dartel" - if pipeline == "pet-volume": - return ses_path / "pet" / "preprocessing" - raise ValueError(f"Not supported pipeline {pipeline}.") - - -def _get_label_list( - atlas_path: Path, metric: str, pipeline: str, group: str -) -> List[str]: - """Helper function for _extract_metrics_from_pipeline. - Returns the list of labels to use in the session df depending on the - pipeline, the atlas, and the metric considered. - """ - from clinica.iotools.converter_utils import replace_sequence_chars - atlas_name = _get_atlas_name(atlas_path, pipeline) - atlas_df = pd.read_csv(atlas_path, sep="\t") - if pipeline == "t1-freesurfer": - return [ - f"t1-freesurfer_atlas-{atlas_name}_ROI-{replace_sequence_chars(roi_name)}_thickness" - for roi_name in atlas_df.label_name.values - ] - if pipeline in ("t1-volume", "pet-volume"): - additional_desc = "" - if "trc" in str(atlas_path): - tracer = str(atlas_path).split("_trc-")[1].split("_")[0] - additional_desc += f"_trc-{tracer}" - if "pvc-rbv" in str(atlas_path): - additional_desc += f"_pvc-rbv" - return [ - f"{pipeline}_{group}_atlas-{atlas_name}{additional_desc}_ROI-{replace_sequence_chars(roi_name)}_intensity" - for roi_name in atlas_df.label_name.values - ] - if pipeline == "dwi_dti": - prefix = "dwi-dti_" - metric = metric.rstrip("_statistics") - elif pipeline == "t1-freesurfer_longitudinal": - prefix = "t1-fs-long_" - else: - raise ValueError(f"Not supported pipeline {pipeline}.") - return [ - prefix + metric + "_atlas-" + atlas_name + "_" + x - for x in atlas_df.label_name.values - ] +class PipelineNameForMetricExtraction(str, Enum): + """Pipelines for which a metric extractor has been implemented.""" - -def _skip_atlas( - atlas_path: Path, - pipeline: str, - pvc_restriction: Optional[bool] = None, - tracers_selection: Optional[List[str]] = None, -) -> bool: - """Helper function for _extract_metrics_from_pipeline. - Returns whether the atlas provided through its path should be - skipped for the provided pipeline. - """ - if pipeline == "t1-freesurfer_longitudinal": - return "-wm_" in str(atlas_path) or "-ba_" in str(atlas_path.stem) - if pipeline in ("t1-volume", "pet-volume"): - skip = [] - if pvc_restriction is not None: - if pvc_restriction: - skip.append("pvc-rbv" not in str(atlas_path)) - else: - skip.append("pvc-rbv" in str(atlas_path)) - if tracers_selection: - skip.append( - all([tracer not in str(atlas_path) for tracer in tracers_selection]) - ) - return any(skip) - return False + T1_VOLUME = "t1-volume" + PET_VOLUME = "pet-volume" + T1_FREESURFER = "t1-freesurfer" + T1_FREESURFER_LONGI = "t1-freesurfer-longitudinal" + DWI_DTI = "dwi-dti" def _extract_metrics_from_pipeline( - caps_dir: PathLike, + caps_dir: Path, df: pd.DataFrame, - metrics: List[str], - pipeline: str, - atlas_selection: Optional[List[str]] = None, - group_selection: Optional[List[str]] = None, + metrics: Iterable[str], + pipeline: PipelineNameForMetricExtraction, + atlas_selection: Optional[Iterable[str]] = None, + group_selection: Optional[Iterable[str]] = None, pvc_restriction: Optional[bool] = None, - tracers_selection: Optional[List[str]] = None, + tracers_selection: Optional[Iterable[str]] = None, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Extract and merge the data of the provided pipeline into the merged dataframe already containing the BIDS information. Parameters ---------- - caps_dir : PathLike - Path to the CAPS directory. + caps_dir : Path + The path to the CAPS directory. df : pd.DataFrame DataFrame containing the BIDS information. @@ -147,7 +47,7 @@ def _extract_metrics_from_pipeline( metrics : list of str List of metrics to extract from the pipeline's caps folder. - pipeline : str + pipeline : PipelineNameForMetricExtraction Name of the pipeline for which to extract information. atlas_selection : list of str, optional @@ -176,9 +76,6 @@ def _extract_metrics_from_pipeline( summary_df : pd.DataFrame Summary DataFrame generated by function `generate_summary`. """ - from clinica.utils.stream import cprint - - caps_dir = Path(caps_dir) if df.index.names != ["participant_id", "session_id"]: try: df.set_index( @@ -186,129 +83,357 @@ def _extract_metrics_from_pipeline( ) except KeyError: raise KeyError("Fields `participant_id` and `session_id` are required.") - - ignore_groups = group_selection == [""] - if not ignore_groups: - if group_selection is None: - try: - group_selection = [f.name for f in (caps_dir / "groups").iterdir()] - except FileNotFoundError: - return df, None - else: - group_selection = [f"group-{group}" for group in group_selection] - subjects_dir = caps_dir / "subjects" - records = [] - for participant_id, session_id in df.index.values: - ses_path = subjects_dir / participant_id / session_id - mod_path = _get_mod_path(ses_path, pipeline) - records.append({"participant_id": participant_id, "session_id": session_id}) - if mod_path is None: - cprint( - f"Could not find a longitudinal dataset for participant {participant_id} {session_id}", - lvl="warning", - ) - continue - if mod_path.exists(): - for group in group_selection: - group_path = mod_path / group - if group_path.exists(): - for metric in metrics: - atlas_paths = sorted( - group_path.glob( - f"{participant_id}_{session_id}_*{metric}.tsv" - ) - ) - if len(atlas_paths) == 0: - atlas_paths = sorted( - (group_path / "atlas_statistics").glob( - f"{participant_id}_{session_id}_*{metric}.tsv" - ) - ) - for atlas_path in atlas_paths: - if metric == "segmentationVolumes": - from clinica.iotools.converter_utils import ( - replace_sequence_chars, - ) - - atlas_df = pd.read_csv(atlas_path, sep="\t") - label_list = [ - f"t1-freesurfer_segmentation-volumes_ROI-{replace_sequence_chars(roi_name)}_volume" - for roi_name in atlas_df.label_name.values - ] - values = atlas_df["label_value"].to_numpy() - for label, value in zip(label_list, values): - records[-1][label] = value - continue - if not _skip_atlas( - atlas_path, pipeline, pvc_restriction, tracers_selection - ): - atlas_name = _get_atlas_name(atlas_path, pipeline) - if atlas_path.exists(): - if not ( - atlas_selection - or ( - atlas_selection - and atlas_name in atlas_selection - ) - ): - atlas_df = pd.read_csv(atlas_path, sep="\t") - label_list = _get_label_list( - atlas_path, metric, pipeline, group - ) - key = ( - "label_value" - if "freesurfer" in pipeline - else "mean_scalar" - ) - values = atlas_df[key].to_numpy() - for label, value in zip(label_list, values): - records[-1][label] = value - pipeline_df = pd.DataFrame.from_records( - records, index=["participant_id", "session_id"] + group_selection = _check_group_selection(caps_dir, group_selection) + records = _get_records( + caps_dir, + df, + pipeline, + metrics, + group_selection, + atlas_selection, + pvc_restriction, + tracers_selection, + ) + if records: + pipeline_df = pd.DataFrame.from_records( + records, index=["participant_id", "session_id"] + ) + else: + pipeline_df = pd.DataFrame.from_records( + records, columns=["participant_id", "session_id"] + ) + pipeline_df.set_index( + ["participant_id", "session_id"], inplace=True, verify_integrity=True + ) + summary_df = _generate_summary( + pipeline_df, pipeline, ignore_groups=group_selection == [""] ) - summary_df = generate_summary(pipeline_df, pipeline, ignore_groups=ignore_groups) final_df = pd.concat([df, pipeline_df], axis=1) final_df.reset_index(inplace=True) return final_df, summary_df -dwi_dti_pipeline = functools.partial( +_extract_metrics_from_dwi_dti = partial( _extract_metrics_from_pipeline, metrics=["FA_statistics", "MD_statistics", "RD_statistics", "AD_statistics"], - pipeline="dwi_dti", + pipeline=PipelineNameForMetricExtraction.DWI_DTI, group_selection=[""], ) -t1_freesurfer_longitudinal_pipeline = functools.partial( +_extract_metrics_from_t1_freesurfer_longitudinal = partial( _extract_metrics_from_pipeline, metrics=["volume", "thickness", "segmentationVolumes"], - pipeline="t1_freesurfer_longitudinal", + pipeline=PipelineNameForMetricExtraction.T1_FREESURFER_LONGI, group_selection=[""], ) -t1_freesurfer_pipeline = functools.partial( +_extract_metrics_from_t1_freesurfer = partial( _extract_metrics_from_pipeline, metrics=["thickness", "segmentationVolumes"], - pipeline="t1-freesurfer", + pipeline=PipelineNameForMetricExtraction.T1_FREESURFER, group_selection=[""], ) -t1_volume_pipeline = functools.partial( +_extract_metrics_from_t1_volume = partial( _extract_metrics_from_pipeline, metrics=["statistics"], - pipeline="t1-volume", + pipeline=PipelineNameForMetricExtraction.T1_VOLUME, ) -pet_volume_pipeline = functools.partial( +_extract_metrics_from_pet_volume = partial( _extract_metrics_from_pipeline, metrics=["statistics"], - pipeline="pet-volume", + pipeline=PipelineNameForMetricExtraction.PET_VOLUME, ) -def generate_summary( +def pipeline_metric_extractor_factory( + name: Union[str, PipelineNameForMetricExtraction], +) -> Callable: + """Factory returning a metric extractor given its name.""" + name = PipelineNameForMetricExtraction(name) + if name == PipelineNameForMetricExtraction.T1_VOLUME: + return _extract_metrics_from_t1_volume + if name == PipelineNameForMetricExtraction.PET_VOLUME: + return _extract_metrics_from_pet_volume + if name == PipelineNameForMetricExtraction.T1_FREESURFER: + return _extract_metrics_from_t1_freesurfer + if name == PipelineNameForMetricExtraction.T1_FREESURFER_LONGI: + return _extract_metrics_from_t1_freesurfer_longitudinal + if name == PipelineNameForMetricExtraction.DWI_DTI: + return _extract_metrics_from_dwi_dti + + +def _check_group_selection( + caps_dir: Path, group_selection: Optional[Iterable[str]] = None +) -> Iterable[str]: + if group_selection == [""]: + return group_selection + if group_selection is None: + return [f.name for f in (caps_dir / "groups").iterdir()] + return [f"group-{group}" for group in group_selection] + + +def _get_records( + caps_dir: Path, + df: pd.DataFrame, + pipeline: PipelineNameForMetricExtraction, + metrics: Iterable[str], + group_selection: Iterable[str], + atlas_selection: Optional[Iterable[str]] = None, + pvc_restriction: Optional[bool] = None, + tracers_selection: Optional[Iterable[str]] = None, +) -> List[dict]: + """Returns a list of dictionaries corresponding to the dataframe rows of the pipeline dataframe.""" + from clinica.utils.stream import cprint + + records = [] + subjects_dir = caps_dir / "subjects" + for participant_id, session_id in df.index.values: + if ( + mod_path := _get_modality_path( + subjects_dir / participant_id / session_id, pipeline + ) + ) is None or not mod_path.exists(): + cprint( + f"Could not find a longitudinal dataset for participant {participant_id} {session_id}", + lvl="warning", + ) + continue + for group_path in ( + mod_path / group for group in group_selection if (mod_path / group).exists() + ): + records_of_group = [ + _get_single_record( + group_path, + metric, + participant_id, + session_id, + pipeline, + atlas_selection, + pvc_restriction, + tracers_selection, + ) + for metric in metrics + ] + records.append(reduce(lambda x, y: {**x, **y}, records_of_group)) + return records + + +def _get_modality_path( + ses_path: Path, pipeline: PipelineNameForMetricExtraction +) -> Optional[Path]: + """Returns the path to the modality of interest depending on the pipeline considered.""" + if pipeline == PipelineNameForMetricExtraction.DWI_DTI: + return ses_path / "dwi" / "dti_based_processing" / "atlas_statistics" + if pipeline == PipelineNameForMetricExtraction.T1_FREESURFER_LONGI: + mod_path = ses_path / "t1" + if len(long_ids := list(mod_path.glob("long*"))) == 0: + return None + return ( + mod_path + / long_ids[0].name + / "freesurfer_longitudinal" + / "regional_measures" + ) + if pipeline == PipelineNameForMetricExtraction.T1_FREESURFER: + return ses_path / "t1" / "freesurfer_cross_sectional" / "regional_measures" + if pipeline == PipelineNameForMetricExtraction.T1_VOLUME: + return ses_path / "t1" / "spm" / "dartel" + if pipeline == PipelineNameForMetricExtraction.PET_VOLUME: + return ses_path / "pet" / "preprocessing" + + +def _get_single_record( + group_path: Path, + metric: str, + participant_id: str, + session_id: str, + pipeline: PipelineNameForMetricExtraction, + atlas_selection: Optional[Iterable[str]] = None, + pvc_restriction: Optional[bool] = None, + tracers_selection: Optional[Iterable[str]] = None, +) -> dict: + """Get a single record (dataframe row) for a given participant, session, group, and metric.""" + atlases = [ + atlas_path + for atlas_path in _get_atlas_paths( + group_path, participant_id, session_id, metric + ) + if not _skip_atlas( + atlas_path, pipeline, pvc_restriction, tracers_selection, atlas_selection + ) + ] + return { + **{"participant_id": participant_id, "session_id": session_id}, + **reduce( + lambda a, b: {**a, **b}, + [ + _get_records_for_atlas(atlas, pipeline, metric, group_path.name) + for atlas in atlases + ], + ), + } + + +def _get_atlas_paths( + group_path: Path, participant_id: str, session_id: str, metric: str +) -> List[Path]: + """Returns a list of paths to atlases within the provided group folder and + matching the provided participant, session, and metric. + """ + if ( + len( + paths := sorted( + group_path.glob(f"{participant_id}_{session_id}_*{metric}.tsv") + ) + ) + != 0 + ): + return paths + return sorted( + (group_path / "atlas_statistics").glob( + f"{participant_id}_{session_id}_*{metric}.tsv" + ) + ) + + +def _skip_atlas( + atlas_path: Path, + pipeline: PipelineNameForMetricExtraction, + pvc_restriction: Optional[bool] = None, + tracers_selection: Optional[Iterable[str]] = None, + atlas_selection: Optional[Iterable[str]] = None, +) -> bool: + """Returns whether the atlas provided through its path should be skipped or not.""" + if not atlas_path.exists(): + return True + if _skip_atlas_based_on_pipeline( + atlas_path, pipeline, pvc_restriction, tracers_selection + ): + return True + return _skip_atlas_based_on_selection(atlas_path, pipeline, atlas_selection) + + +def _skip_atlas_based_on_pipeline( + atlas_path: Path, + pipeline: PipelineNameForMetricExtraction, + pvc_restriction: Optional[bool] = None, + tracers_selection: Optional[Iterable[str]] = None, +) -> bool: + """Returns True if the atlas provided through its path should be skipped based on the pipeline name.""" + if pipeline == PipelineNameForMetricExtraction.T1_FREESURFER_LONGI: + return "-wm_" in str(atlas_path) or "-ba_" in str(atlas_path.stem) + if pipeline in ( + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ): + skip = [] + if pvc_restriction is not None: + if pvc_restriction: + skip.append("pvc-rbv" not in str(atlas_path)) + else: + skip.append("pvc-rbv" in str(atlas_path)) + if tracers_selection: + skip.append( + all([tracer not in str(atlas_path) for tracer in tracers_selection]) + ) + return any(skip) + return False + + +def _skip_atlas_based_on_selection( + atlas_path: Path, + pipeline: PipelineNameForMetricExtraction, + atlas_selection: Optional[Iterable[str]] = None, +) -> bool: + """Returns True if the atlas provided through its path should be skipped based on the user-provided selection.""" + return ( + atlas_selection is not None + and _get_atlas_name(atlas_path, pipeline) not in atlas_selection + ) + + +def _get_records_for_atlas( + atlas_path: Path, + pipeline: PipelineNameForMetricExtraction, + metric: str, + group: str, +) -> dict: + atlas_df = pd.read_csv(atlas_path, sep="\t") + label_list = _get_label_list(atlas_path, metric, pipeline, group) + key = "label_value" if "freesurfer" in pipeline.value else "mean_scalar" + values = atlas_df[key].to_numpy() + return {label: value for label, value in zip(label_list, values)} + + +def _get_label_list( + atlas_path: Path, metric: str, pipeline: PipelineNameForMetricExtraction, group: str +) -> List[str]: + """Returns the list of labels to use in the session df depending on the + pipeline, the atlas, and the metric considered. + """ + from clinica.iotools.converter_utils import replace_sequence_chars + + atlas_df = pd.read_csv(atlas_path, sep="\t") + atlas_name = _get_atlas_name(atlas_path, pipeline) + if pipeline == PipelineNameForMetricExtraction.T1_FREESURFER: + return [ + f"t1-freesurfer_{atlas_name}_ROI-{replace_sequence_chars(roi_name)}_{'volume' if metric == 'segmentationVolumes' else metric}" + for roi_name in atlas_df.label_name.values + ] + if pipeline in ( + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ): + additional_desc = "" + if "trc" in str(atlas_path): + tracer = str(atlas_path).split("_trc-")[1].split("_")[0] + additional_desc += f"_trc-{tracer}" + if "pvc-rbv" in str(atlas_path): + additional_desc += f"_pvc-rbv" + return [ + f"{pipeline}_{group}_{atlas_name}{additional_desc}_ROI-{replace_sequence_chars(roi_name)}_intensity" + for roi_name in atlas_df.label_name.values + ] + if pipeline == PipelineNameForMetricExtraction.DWI_DTI: + prefix = "dwi-dti_" + metric = metric.rstrip("_statistics") + else: + prefix = "t1-fs-long_" + return [prefix + metric + atlas_name + "_" + x for x in atlas_df.label_name.values] + + +def _get_atlas_name(atlas_path: Path, pipeline: PipelineNameForMetricExtraction) -> str: + """Return the atlas name, inferred from the atlas_path, for a given pipeline.""" + if pipeline == PipelineNameForMetricExtraction.DWI_DTI: + return _infer_atlas_name("_dwi_space-", atlas_path) + if pipeline in ( + PipelineNameForMetricExtraction.T1_FREESURFER_LONGI, + PipelineNameForMetricExtraction.T1_FREESURFER, + ): + return _infer_atlas_name("_parcellation-", atlas_path) + if pipeline in ( + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ): + return _infer_atlas_name("_space-", atlas_path) + + +def _infer_atlas_name(splitter: str, atlas_path: Path) -> str: + try: + assert splitter in atlas_path.stem + return f"atlas-{atlas_path.stem.split(splitter)[-1].split('_')[0]}" + except Exception: + if "segmentationVolumes" in atlas_path.stem: + return "segmentation-volumes" + else: + raise ValueError(f"Unable to infer the atlas name from {atlas_path}.") + + +def _generate_summary( pipeline_df: pd.DataFrame, pipeline_name: str, ignore_groups: bool = False ): columns = [ @@ -390,11 +515,3 @@ def generate_summary( summary_df = summary_df.replace("_", "n/a") return summary_df - - -class DatasetError(Exception): - def __init__(self, name): - self.name = name - - def __str__(self): - return repr("Bad format for the sessions: " + self.name) diff --git a/clinica/utils/exceptions.py b/clinica/utils/exceptions.py index 6fda976e5..55e737375 100644 --- a/clinica/utils/exceptions.py +++ b/clinica/utils/exceptions.py @@ -13,20 +13,24 @@ class ClinicaEnvironmentVariableError(ClinicaException): """Something is wrong with an environment variable managed by Clinica.""" -class ClinicaBIDSError(ClinicaException): - """Base class for BIDS errors.""" +class ClinicaDatasetError(ClinicaException): + """Base class for errors related to Datasets.""" -class ClinicaCAPSError(ClinicaException): - """Base class for CAPS errors.""" +class ClinicaBIDSError(ClinicaDatasetError): + """Base class for BIDS dataset errors.""" + + +class ClinicaCAPSError(ClinicaDatasetError): + """Base class for CAPS dataset errors.""" class ClinicaParserError(ClinicaException): """Base class for parser errors.""" -class ClinicaXMLParserError(ClinicaException): - """Base class for parser errors.""" +class ClinicaXMLParserError(ClinicaParserError): + """Base class for XML parser errors.""" class ClinicaInconsistentDatasetError(ClinicaException): diff --git a/clinica/utils/testing_utils.py b/clinica/utils/testing_utils.py index af0342dd1..625e162f3 100644 --- a/clinica/utils/testing_utils.py +++ b/clinica/utils/testing_utils.py @@ -1,8 +1,9 @@ import json import os +import random from functools import partial from pathlib import Path -from typing import Callable, Optional +from typing import Callable, Dict, Optional, Tuple import nibabel as nib import numpy as np @@ -11,7 +12,13 @@ from clinica.pipelines.dwi.utils import DWIDataset -def build_bids_directory(directory: os.PathLike, subjects_sessions: dict) -> None: +def build_bids_directory( + directory: os.PathLike, + subjects_sessions: dict, + modalities: Optional[Dict[str, Tuple[str, ...]]] = None, + write_tsv_files: bool = False, + random_seed: int = 42, +) -> None: """Build a fake BIDS dataset at the specified location following the specified structure. @@ -23,32 +30,44 @@ def build_bids_directory(directory: os.PathLike, subjects_sessions: dict) -> Non subjects_sessions : Dict Dictionary containing the subjects and their associated sessions. + modalities : dict + Modalities to be created. + Notes ----- This function is a simple prototype for creating fake datasets for testing. It only adds (for now...) T1W nifti images for all subjects and sessions. """ + random.seed(random_seed) directory = Path(directory) - data_types = {"anat"} - suffixes = {"T1w", "flair"} - extensions = {"nii.gz"} + modalities = modalities or {"anat": ("T1w", "flair")} + extensions = ("nii.gz", "json") with open(directory / "dataset_description.json", "w") as fp: json.dump({"Name": "Example dataset", "BIDSVersion": "1.0.2"}, fp) for sub, sessions in subjects_sessions.items(): (directory / sub).mkdir() + if write_tsv_files: + (directory / sub / f"{sub}_sessions.tsv").write_text( + "\n".join(["session_id"] + sessions) + ) for ses in sessions: - (directory / sub / ses).mkdir() - for data_type in data_types: - (directory / sub / ses / data_type).mkdir() + session_path = directory / sub / ses + session_path.mkdir() + scans_tsv_text = "filename\tscan_id\n" + for modality, suffixes in modalities.items(): + (session_path / modality).mkdir() for suffix in suffixes: for extension in extensions: - ( - directory - / sub - / ses - / data_type + scan_path = ( + session_path + / modality / f"{sub}_{ses}_{suffix}.{extension}" - ).touch() + ) + scan_path.touch() + if write_tsv_files and "nii" in extension: + scans_tsv_text += f"{scan_path.relative_to(session_path)}\t{random.randint(1, 1000000)}\n" + if write_tsv_files and len(scans_tsv_text) > 0: + (session_path / f"{sub}_{ses}_scans.tsv").write_text(scans_tsv_text) def build_caps_directory(directory: os.PathLike, configuration: dict) -> None: diff --git a/test/unittests/iotools/utils/test_data_handling.py b/test/unittests/iotools/utils/test_data_handling.py index a10fcbf13..e7710e508 100644 --- a/test/unittests/iotools/utils/test_data_handling.py +++ b/test/unittests/iotools/utils/test_data_handling.py @@ -1,32 +1,140 @@ -import pytest +from pathlib import Path +import numpy as np +import pandas as pd +import pytest +from numpy.testing import assert_array_equal +from pandas.testing import assert_frame_equal -def test_vox_to_world_space_method_1(): - """Test function `vox_to_world_space_method_1`.""" - import numpy as np +def test_scale_coordinates_by_pixdim(): + """Test function `_scale_coordinates_by_pixdim`.""" from nibabel.nifti1 import Nifti1Header - from clinica.iotools.utils.data_handling import vox_to_world_space_method_1 + from clinica.iotools.utils.data_handling._centering import ( + _scale_coordinates_by_pixdim, + ) head = Nifti1Header() assert not head["qform_code"] > 0 assert head["sform_code"] == 0 assert np.all(head["pixdim"] == 1) center_coordinates = np.array([0.5] * 3) - np.testing.assert_array_equal( - vox_to_world_space_method_1(coordinates_vol=center_coordinates, header=head), + assert_array_equal( + _scale_coordinates_by_pixdim(coordinates_vol=center_coordinates, header=head), center_coordinates, ) head["pixdim"][1] = 2 - np.testing.assert_array_equal( - vox_to_world_space_method_1(coordinates_vol=center_coordinates, header=head), + assert_array_equal( + _scale_coordinates_by_pixdim(coordinates_vol=center_coordinates, header=head), np.array([1.0, 0.5, 0.5]), ) +def test_check_relative_volume_location_in_world_coordinate_system(tmp_path, mocker): + from clinica.iotools.utils.data_handling import ( + check_relative_volume_location_in_world_coordinate_system, + ) + + mocker.patch( + "clinica.iotools.utils.data_handling._centering._get_problematic_pairs_with_l2_norm", + return_value=[ + ("foo.nii.gz", "bar.nii.gz", 81.2), + ("baz.nii", "goo.nii", 113.3), + ], + ) + with pytest.warns( + UserWarning, + match=( + "It appears that 2 pairs of files have an important relative offset. " + "SPM co-registration has a high probability to fail on these files:\n\n" + " foo_label gloo_label Relative distance\n" + "0 foo.nii.gz bar.nii.gz 81.2\n" + "1 baz.nii goo.nii 113.3\n" + "Clinica provides a tool to counter this problem by replacing the center " + "of the volume at the origin of the world coordinates.\n" + "Use the following command line to correct the header of the faulty NIFTI " + "volumes in a new folder:\n\n`" + ), + ): + check_relative_volume_location_in_world_coordinate_system( + "foo_label", + [tmp_path / "foo.nii.gz", tmp_path / "bar.nii.gz", tmp_path / "baz.nii.gz"], + "gloo_label", + [ + tmp_path / "gloo" / "foo.nii.gz", + tmp_path / "gloo" / "bar.nii.gz", + tmp_path / "gloo" / "baz.nii.gz", + ], + tmp_path / "bids", + "T1W", + skip_question=True, + ) + + +def test_get_problematic_pairs_empty(tmp_path, mocker): + from clinica.iotools.utils.data_handling._centering import ( + _get_problematic_pairs_with_l2_norm, + ) + + mocker.patch( + "clinica.iotools.utils.data_handling._centering._get_world_coordinate_of_center", + return_value=np.zeros((3, 3)), + ) + assert ( + _get_problematic_pairs_with_l2_norm( + [ + (tmp_path / "foo.nii.gz", tmp_path / "baz.nii.gz"), + (tmp_path / "bar.nii.gz", tmp_path / "foo_bar.nii.gz"), + ] + ) + == [] + ) + + +def test_get_problematic_pairs(tmp_path, mocker): + from clinica.iotools.utils.data_handling._centering import ( + _get_problematic_pairs_with_l2_norm, + ) + + mocker.patch( + "clinica.iotools.utils.data_handling._centering._compute_l2_norm", + return_value=[81.0, 79.9], + ) + assert _get_problematic_pairs_with_l2_norm( + [ + (tmp_path / "foo.nii.gz", tmp_path / "baz.nii.gz"), + (tmp_path / "bar.nii.gz", tmp_path / "foo_bar.nii.gz"), + ] + ) == [("foo.nii.gz", "baz.nii.gz", 81.0)] + + +def test_build_warning_message(tmp_path): + from clinica.iotools.utils.data_handling._centering import _build_warning_message + + assert ( + "It appears that 3 files have a center way out of the origin of the world coordinate system. " + "SPM has a high probability to fail on these files (for co-registration or segmentation):\n\n" + " File Coordinate of center Distance to origin\n" + "0 foo.nii [1, 2, 3] 0.2\n" + "1 bar.nii [4, 5, 6] 12.7\n" + "2 baz.nii [7, 8, 9] 26.6\n" + "If you are trying to launch the t1-freesurfer pipeline, you can ignore this message if you " + "do not want to run the pet-surface pipeline afterward.\n" + "Clinica provides a tool to counter this problem by replacing the center of the volume at the " + "origin of the world coordinates.\nUse the following command line to correct the header of the " + "faulty NIFTI volumes in a new folder:\n" + ) in _build_warning_message( + [tmp_path / "foo.nii", tmp_path / "bar.nii", tmp_path / "baz.nii"], + [np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])], + [0.2, 12.7, 26.6], + tmp_path / "bids", + "T1W", + ) + + def test_validate_output_tsv_path(tmp_path): - from clinica.iotools.utils.data_handling import _validate_output_tsv_path + from clinica.iotools.utils.data_handling._merging import _validate_output_tsv_path (tmp_path / "foo.tsv").touch() @@ -41,7 +149,7 @@ def test_validate_output_tsv_path(tmp_path): def test_validate_output_tsv_path_error(tmp_path): - from clinica.iotools.utils.data_handling import _validate_output_tsv_path + from clinica.iotools.utils.data_handling._merging import _validate_output_tsv_path (tmp_path / "bar.txt").touch() @@ -50,3 +158,337 @@ def test_validate_output_tsv_path_error(tmp_path): match="Output path extension must be tsv.", ): _validate_output_tsv_path(tmp_path / "bar.txt") + + +def test_get_groups(tmp_path): + from clinica.iotools.utils.data_handling._missing import _get_groups + + assert _get_groups(tmp_path) == [] + (tmp_path / "groups").mkdir() + assert _get_groups(tmp_path) == [] + for group in ("foo", "bar", "baz"): + (tmp_path / "groups" / group).mkdir() + assert sorted(_get_groups(tmp_path)) == ["bar", "baz", "foo"] + + +def create_bids_dataset(folder: Path, write_tsv_files: bool = False) -> None: + from clinica.utils.testing_utils import build_bids_directory + + folder.mkdir() + build_bids_directory( + folder, + { + "sub-01": ["ses-M000", "ses-M006"], + "sub-02": ["ses-M000"], + "sub-03": ["ses-M000", "ses-M012", "ses-M024"], + }, + modalities={ + "anat": ("T1w", "flair"), + "pet": ("trc-18FFDG_pet",), + }, + write_tsv_files=write_tsv_files, + ) + + +def test_find_mods_and_sess(tmp_path): + from clinica.iotools.utils.data_handling._missing import _find_mods_and_sess + + create_bids_dataset(tmp_path / "bids") + ses_mod = _find_mods_and_sess(tmp_path / "bids") + assert len(ses_mod) == 3 + assert ses_mod["sessions"] == {"ses-M000", "ses-M006", "ses-M012", "ses-M024"} + assert ses_mod["anat"] == {"t1w", "flair"} + assert ses_mod["pet"] == {"pet_trc-18FFDG"} + + +@pytest.fixture +def expected_tsv_content() -> str: + return ( + "participant_id\tsession_id\n" + "sub-01\tses-M000\n" + "sub-01\tses-M006\n" + "sub-02\tses-M000\n" + "sub-03\tses-M000\n" + "sub-03\tses-M012\n" + "sub-03\tses-M024\n" + ) + + +def test_create_subs_sess_list_as_text(tmp_path, expected_tsv_content): + from clinica.iotools.utils.data_handling._files import ( + _create_subs_sess_list_as_text, + ) + + create_bids_dataset(tmp_path / "bids") + assert ( + _create_subs_sess_list_as_text(tmp_path / "bids", use_session_tsv=False) + == expected_tsv_content + ) + + +def test_create_subs_sess_list_as_text_using_tsv(tmp_path, expected_tsv_content): + from clinica.iotools.utils.data_handling._files import ( + _create_subs_sess_list_as_text, + ) + + create_bids_dataset(tmp_path / "bids", write_tsv_files=True) + assert ( + _create_subs_sess_list_as_text(tmp_path / "bids", use_session_tsv=True) + == expected_tsv_content + ) + + +def test_create_subs_sess_list_as_text_errors(tmp_path): + from clinica.iotools.utils.data_handling._files import ( + _create_subs_sess_list_as_text, + ) + + with pytest.raises(IOError, match="Dataset empty or not BIDS/CAPS compliant."): + _create_subs_sess_list_as_text(tmp_path, use_session_tsv=False) + create_bids_dataset(tmp_path / "bids") + with pytest.raises( + ValueError, match="there is no session TSV file for subject sub-01" + ): + _create_subs_sess_list_as_text(tmp_path / "bids", use_session_tsv=True) + + +@pytest.mark.parametrize("use_tsv", [True, False]) +def test_create_subs_sess_list(tmp_path, use_tsv, expected_tsv_content): + from clinica.iotools.utils.data_handling import create_subs_sess_list + + create_bids_dataset(tmp_path / "bids", write_tsv_files=True) + create_subs_sess_list( + tmp_path / "bids", + tmp_path / "output", + file_name="foo.tsv", + use_session_tsv=use_tsv, + ) + assert (tmp_path / "output" / "foo.tsv").exists() + assert (tmp_path / "output" / "foo.tsv").read_text() == expected_tsv_content + + +def test_write_list_of_files_errors(tmp_path): + from clinica.iotools.utils.data_handling import write_list_of_files + + with pytest.raises( + TypeError, + match="`file_list` argument must be a list of paths. Instead was provided.", + ): + write_list_of_files(10, tmp_path / "foo.txt") + (tmp_path / "foo.txt").touch() + with pytest.raises( + IOError, + match=f"Output file {tmp_path / 'foo.txt'} already exists.", + ): + write_list_of_files([], tmp_path / "foo.txt") + + +def test_write_list_of_files(tmp_path): + from clinica.iotools.utils.data_handling import write_list_of_files + + file_list = [ + tmp_path / "foo.csv", + tmp_path / "bar" / "baz.jpg", + tmp_path / "boo.nii", + ] + write_list_of_files(file_list, tmp_path / "foo.txt") + assert (tmp_path / "foo.txt").exists() + assert (tmp_path / "foo.txt").read_text() == "\n".join( + [str(f) for f in file_list] + ) + "\n" + + +def test_get_participants_and_subjects_sessions_df(tmp_path): + from clinica.iotools.utils.data_handling._merging import ( + _get_participants_and_subjects_sessions_df, + ) + + create_bids_dataset(tmp_path / "bids", write_tsv_files=True) + participants, sessions = _get_participants_and_subjects_sessions_df( + tmp_path / "bids" + ) + assert_frame_equal( + participants, pd.DataFrame({"participant_id": ["sub-01", "sub-02", "sub-03"]}) + ) + assert len(sessions) == 6 + + +@pytest.mark.parametrize("ignore_sessions", (True, False)) +def test_create_merge_file_from_bids_ignore_sessions_and_scans( + tmp_path, ignore_sessions +): + from clinica.iotools.utils.data_handling._merging import ( + _create_merge_file_from_bids, + _get_participants_and_subjects_sessions_df, + ) + + create_bids_dataset(tmp_path / "bids", write_tsv_files=True) + participants, sessions = _get_participants_and_subjects_sessions_df( + tmp_path / "bids" + ) + df = _create_merge_file_from_bids( + tmp_path / "bids", + sessions, + participants, + ignore_sessions_files=ignore_sessions, + ignore_scan_files=True, + ) + expected = pd.DataFrame( + { + "participant_id": [ + "sub-01", + "sub-01", + "sub-02", + "sub-03", + "sub-03", + "sub-03", + ], + "session_id": [ + "ses-M000", + "ses-M006", + "ses-M000", + "ses-M000", + "ses-M012", + "ses-M024", + ], + } + ) + assert_frame_equal(df.reset_index(drop=True), expected.reset_index(drop=True)) + + +def test_create_merge_file_from_bids(tmp_path): + from clinica.iotools.utils.data_handling._merging import ( + _create_merge_file_from_bids, + _get_participants_and_subjects_sessions_df, + ) + + create_bids_dataset(tmp_path / "bids", write_tsv_files=True) + participants, sessions = _get_participants_and_subjects_sessions_df( + tmp_path / "bids" + ) + df = _create_merge_file_from_bids( + tmp_path / "bids", + sessions, + participants, + ignore_sessions_files=False, + ignore_scan_files=False, + ) + expected = pd.DataFrame( + { + "participant_id": [ + "sub-01", + "sub-01", + "sub-02", + "sub-03", + "sub-03", + "sub-03", + ], + "session_id": [ + "ses-M000", + "ses-M006", + "ses-M000", + "ses-M000", + "ses-M012", + "ses-M024", + ], + "T1w_scan_id": ["670488", "777573", "234054", "107474", "935519", "619177"], + "flair_scan_id": [ + "116740", + "288390", + "146317", + "709571", + "571859", + "442418", + ], + "trc-18FFDG_pet_scan_id": [ + "26226", + "256788", + "772247", + "776647", + "91162", + "33327", + ], + } + ) + assert_frame_equal( + df.reset_index(drop=True), + expected.reset_index(drop=True), + check_dtype=False, + check_column_type=False, + ) + + +def test_post_process_merge_file_from_bids_column_reordering(): + from clinica.iotools.utils.data_handling._merging import ( + _post_process_merge_file_from_bids, + ) + + df = pd.DataFrame(columns=["foo", "session_id", "bar", "participant_id", "baz"]) + assert_frame_equal( + _post_process_merge_file_from_bids(df), + pd.DataFrame(columns=["participant_id", "session_id", "foo", "bar", "baz"]), + ) + + +def test_post_process_merge_file_from_bids_number_rounding(): + from clinica.iotools.utils.data_handling._merging import ( + _post_process_merge_file_from_bids, + ) + + df = pd.DataFrame( + { + "session_id": ["ses-M000", "ses-M006"], + "participant_id": ["sub-01", "sub-01"], + "foo": [1.0123456789, 100.987654321], + } + ) + expected = pd.DataFrame( + { + "participant_id": ["sub-01", "sub-01"], + "session_id": ["ses-M000", "ses-M006"], + "foo": [1.012346, 100.987654], + } + ) + assert_frame_equal(_post_process_merge_file_from_bids(df), expected) + + +def test_add_data_to_merge_file_from_caps_wrong_handler(tmp_path): + from clinica.iotools.utils.data_handling._merging import ( + _add_data_to_merge_file_from_caps, + ) + + with pytest.raises( + ValueError, + match="'foo' is not a valid PipelineNameForMetricExtraction", + ): + _add_data_to_merge_file_from_caps(tmp_path, pd.DataFrame(), pipelines=["foo"]) + + +def test_add_data_to_merge_file_from_caps_empty_pipeline(tmp_path): + from clinica.iotools.utils.data_handling._merging import ( + _add_data_to_merge_file_from_caps, + ) + + df = pd.DataFrame( + { + "participant_id": [ + "sub-01", + "sub-01", + "sub-02", + ], + "session_id": [ + "ses-M000", + "ses-M006", + "ses-M000", + ], + } + ) + (tmp_path / "groups").mkdir() + with pytest.raises( + FileNotFoundError, + match=( + "No outputs were found for any pipeline in the CAPS folder. " + "The output only contains BIDS information." + ), + ): + _add_data_to_merge_file_from_caps(tmp_path, df, pipelines=[]) diff --git a/test/unittests/iotools/utils/test_pipeline_handling.py b/test/unittests/iotools/utils/test_pipeline_handling.py index 1d07f6f68..f10332594 100644 --- a/test/unittests/iotools/utils/test_pipeline_handling.py +++ b/test/unittests/iotools/utils/test_pipeline_handling.py @@ -3,107 +3,168 @@ import pandas as pd import pytest +from pandas.testing import assert_frame_equal + +from clinica.iotools.utils.pipeline_handling import PipelineNameForMetricExtraction truth = operator.truth not_truth = operator.not_ -def test_get_atlas_name(tmp_path): - from clinica.iotools.utils.pipeline_handling import _get_atlas_name - - with pytest.raises( - ValueError, - match="Not supported pipeline foo", +@pytest.fixture +def atlas_path(tmp_path, pipeline: PipelineNameForMetricExtraction) -> Path: + if pipeline in ( + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, ): - _get_atlas_name(tmp_path, "foo") - atlas_path = ( - tmp_path - / "t1" - / "spm" - / "dartel" - / "group-foo" - / "atlas_statistics" - / "sub-01_ses-M000_T1w_space-AAL2_map-graymatter_statistics.tsv" - ) - assert _get_atlas_name(atlas_path, "t1-volume") == "AAL2" - with pytest.raises( - ValueError, - match="Unable to infer the atlas name", + filename = "sub-01_ses-M000_T1w_space-AAL2_map-graymatter_statistics.tsv" + return ( + tmp_path + / "t1" + / "spm" + / "dartel" + / "group-foo" + / "atlas_statistics" + / filename + ) + if pipeline == PipelineNameForMetricExtraction.DWI_DTI: + filename = "sub-01_ses-M000_dwi_space-foobar.tsv" + return tmp_path / "dwi" / filename + if pipeline in ( + PipelineNameForMetricExtraction.T1_FREESURFER_LONGI, + PipelineNameForMetricExtraction.T1_FREESURFER, + ): + filename = "sub-01_ses-M000_parcellation-foobaz.tsv" + return tmp_path / "freesurfer" / filename + + +@pytest.mark.parametrize( + "pipeline,name", + [ + (PipelineNameForMetricExtraction.T1_VOLUME, "AAL2"), + (PipelineNameForMetricExtraction.PET_VOLUME, "AAL2"), + (PipelineNameForMetricExtraction.DWI_DTI, "foobar"), + (PipelineNameForMetricExtraction.T1_FREESURFER_LONGI, "foobaz"), + (PipelineNameForMetricExtraction.T1_FREESURFER, "foobaz"), + ], +) +def test_get_atlas_name(atlas_path, pipeline, name): + from clinica.iotools.utils.pipeline_handling import _get_atlas_name # noqa + + assert _get_atlas_name(atlas_path, pipeline) == f"atlas-{name}" + + +def _get_wrong_atlas_path( + folder: Path, pipeline: PipelineNameForMetricExtraction +) -> Path: + if pipeline in ( + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, ): - _get_atlas_name(atlas_path, "dwi_dti") + filename = ( + "sub-01_ses-M000_T1w_spaaaacccceee-AAL2_map-graymatter_statistics.tsv" + ) + return ( + folder + / "t1" + / "spm" + / "dartel" + / "group-foo" + / "atlas_statistics" + / filename + ) + if pipeline == PipelineNameForMetricExtraction.DWI_DTI: + filename = "sub-01_ses-M000_dwiii_space-foobar.tsv" + return folder / "dwi" / filename -def test_get_mod_path_errors(tmp_path): - from clinica.iotools.utils.pipeline_handling import _get_mod_path +@pytest.mark.parametrize( + "pipeline", + [ + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + PipelineNameForMetricExtraction.DWI_DTI, + ], +) +def test_get_atlas_name_error(tmp_path, pipeline): + from clinica.iotools.utils.pipeline_handling import _get_atlas_name # noqa with pytest.raises( ValueError, - match="Not supported pipeline foo", + match="Unable to infer the atlas name", ): - _get_mod_path(tmp_path, "foo") + _get_atlas_name( + _get_wrong_atlas_path(tmp_path, pipeline), + PipelineNameForMetricExtraction.DWI_DTI, + ) @pytest.mark.parametrize( "pipeline,expected_path", [ - ("dwi_dti", ["dwi", "dti_based_processing", "atlas_statistics"]), - ("t1-freesurfer", ["t1", "freesurfer_cross_sectional", "regional_measures"]), - ("t1-volume", ["t1", "spm", "dartel"]), - ("pet-volume", ["pet", "preprocessing"]), + ( + PipelineNameForMetricExtraction.DWI_DTI, + ["dwi", "dti_based_processing", "atlas_statistics"], + ), + ( + PipelineNameForMetricExtraction.T1_FREESURFER, + ["t1", "freesurfer_cross_sectional", "regional_measures"], + ), + (PipelineNameForMetricExtraction.T1_VOLUME, ["t1", "spm", "dartel"]), + (PipelineNameForMetricExtraction.PET_VOLUME, ["pet", "preprocessing"]), ], ) -def test_get_mod_path(tmp_path, pipeline, expected_path): +def test_get_modality_path(tmp_path, pipeline, expected_path): from functools import reduce - from clinica.iotools.utils.pipeline_handling import _get_mod_path + from clinica.iotools.utils.pipeline_handling import _get_modality_path # noqa - assert _get_mod_path(tmp_path, pipeline) == reduce( + assert _get_modality_path(tmp_path, pipeline) == reduce( lambda x, y: x / y, [tmp_path] + expected_path ) -def test_get_mod_longitudinal(tmp_path): - from clinica.iotools.utils.pipeline_handling import _get_mod_path +def test_get_modality_path_longitudinal(tmp_path): + from clinica.iotools.utils.pipeline_handling import _get_modality_path # noqa - assert _get_mod_path(tmp_path, "t1_freesurfer_longitudinal") is None + assert ( + _get_modality_path( + tmp_path, PipelineNameForMetricExtraction.T1_FREESURFER_LONGI + ) + is None + ) (tmp_path / "t1").mkdir() (tmp_path / "t1" / "longfoo").mkdir() assert ( - _get_mod_path(tmp_path, "t1_freesurfer_longitudinal") + _get_modality_path( + tmp_path, PipelineNameForMetricExtraction.T1_FREESURFER_LONGI + ) == tmp_path / "t1" / "longfoo" / "freesurfer_longitudinal" / "regional_measures" ) -def test_get_label_list(tmp_path): - from clinica.iotools.utils.pipeline_handling import _get_label_list - - with pytest.raises( - ValueError, - match="Not supported pipeline bar", - ): - _get_label_list(tmp_path, "foo", "bar", "baz") - +@pytest.mark.parametrize("pipeline", PipelineNameForMetricExtraction) +def test_skip_atlas_non_existing_file(tmp_path, pipeline): + from clinica.iotools.utils.pipeline_handling import _skip_atlas # noqa -@pytest.mark.parametrize( - "pipeline", ["foo", "t1-freesurfer_longitudinal", "t1-volume", "pet-volume"] -) -def test_skip_atlas_default(tmp_path, pipeline): - from clinica.iotools.utils.pipeline_handling import _skip_atlas - - assert not _skip_atlas(tmp_path, pipeline) + # assert _skip_atlas(tmp_path, pipeline) + assert _skip_atlas(tmp_path / "foo.tsv", pipeline) @pytest.mark.parametrize("txt", ["-wm_", "-ba_"]) def test_skip_atlas_longitudinal(tmp_path, txt): - from clinica.iotools.utils.pipeline_handling import _skip_atlas + from clinica.iotools.utils.pipeline_handling import _skip_atlas # noqa (tmp_path / f"foo{txt}bar.tsv").touch() - assert _skip_atlas(tmp_path / f"foo{txt}bar.tsv", "t1-freesurfer_longitudinal") + assert _skip_atlas( + tmp_path / f"foo{txt}bar.tsv", + PipelineNameForMetricExtraction.T1_FREESURFER_LONGI, + ) @pytest.fixture def expected_operator_pvc(atlas_path, pvc_restriction): - if atlas_path == "stats.tsv": + if atlas_path == "sub-01_ses-M000_space-foobar_stats.tsv": if pvc_restriction: return truth return not_truth @@ -112,23 +173,39 @@ def expected_operator_pvc(atlas_path, pvc_restriction): return truth -@pytest.mark.parametrize("pipeline", ["t1-volume", "pet-volume"]) -@pytest.mark.parametrize("atlas_path", ["stats.tsv", "pvc-rbv_stats.tsv"]) +@pytest.mark.parametrize( + "pipeline", + [ + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ], +) +@pytest.mark.parametrize( + "atlas_path", + ["sub-01_ses-M000_space-foobar_stats.tsv", "pvc-rbv_space-foo_stats.tsv"], +) @pytest.mark.parametrize("pvc_restriction", [True, False]) def test_skip_atlas_volume_pvc( tmp_path, pipeline, atlas_path, pvc_restriction, expected_operator_pvc ): - from clinica.iotools.utils.pipeline_handling import _skip_atlas + from clinica.iotools.utils.pipeline_handling import _skip_atlas # noqa + (tmp_path / atlas_path).touch() assert expected_operator_pvc( _skip_atlas(tmp_path / atlas_path, pipeline, pvc_restriction=pvc_restriction) ) -@pytest.mark.parametrize("pipeline", ["t1-volume", "pet-volume"]) +@pytest.mark.parametrize( + "pipeline", + [ + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ], +) @pytest.mark.parametrize("tracers", [["fdg"], ["trc1", "fdg", "trc2"]]) def test_skip_atlas_volume_tracers(tmp_path, pipeline, tracers): - from clinica.iotools.utils.pipeline_handling import _skip_atlas + from clinica.iotools.utils.pipeline_handling import _skip_atlas # noqa assert _skip_atlas( tmp_path / "pvc-rbv_stats.tsv", pipeline, tracers_selection=tracers @@ -138,25 +215,39 @@ def test_skip_atlas_volume_tracers(tmp_path, pipeline, tracers): @pytest.fixture def expected_operator_volume(atlas_path, pvc_restriction): if pvc_restriction: - if atlas_path == "trc-fdg_stats.tsv": + if atlas_path == "sub-01_ses-M000_space-bar_trc-fdg_stats.tsv": return truth return not_truth - if pvc_restriction is False and atlas_path == "trc-fdg_pvc-rbv_stats.tsv": + if ( + pvc_restriction is False + and atlas_path == "sub-01_ses-M000_space-foo_trc-fdg_pvc-rbv_stats.tsv" + ): return truth return not_truth -@pytest.mark.parametrize("pipeline", ["t1-volume", "pet-volume"]) @pytest.mark.parametrize( - "atlas_path", ["trc-fdg_pvc-rbv_stats.tsv", "trc-fdg_stats.tsv"] + "pipeline", + [ + PipelineNameForMetricExtraction.T1_VOLUME, + PipelineNameForMetricExtraction.PET_VOLUME, + ], +) +@pytest.mark.parametrize( + "atlas_path", + [ + "sub-01_ses-M000_space-foo_trc-fdg_pvc-rbv_stats.tsv", + "sub-01_ses-M000_space-bar_trc-fdg_stats.tsv", + ], ) @pytest.mark.parametrize("tracers", [["fdg"], ["trc2", "fdg"]]) @pytest.mark.parametrize("pvc_restriction", [None, True, False]) def test_skip_atlas_volume( tmp_path, pipeline, atlas_path, tracers, pvc_restriction, expected_operator_volume ): - from clinica.iotools.utils.pipeline_handling import _skip_atlas + from clinica.iotools.utils.pipeline_handling import _skip_atlas # noqa + (tmp_path / atlas_path).touch() assert expected_operator_volume( _skip_atlas( tmp_path / atlas_path, @@ -168,34 +259,42 @@ def test_skip_atlas_volume( def test_extract_metrics_from_pipeline_errors(tmp_path): - from clinica.iotools.utils.pipeline_handling import _extract_metrics_from_pipeline + from clinica.iotools.utils.pipeline_handling import _extract_metrics_from_pipeline # noqa with pytest.raises( KeyError, match="Fields `participant_id` and `session_id` are required.", ): - _extract_metrics_from_pipeline(tmp_path, pd.DataFrame(), ["metrics"], "foo") - df = pd.DataFrame([["bar", "bar"]], columns=["participant_id", "session_id"]) + _extract_metrics_from_pipeline( + tmp_path, + pd.DataFrame(), + ["metrics"], + PipelineNameForMetricExtraction.DWI_DTI, + ) + + +def test_extract_metrics_from_pipeline_empty(tmp_path): + from clinica.iotools.utils.pipeline_handling import _extract_metrics_from_pipeline # noqa + (tmp_path / "groups").mkdir() - (tmp_path / "groups" / "UnitTest").mkdir() - with pytest.raises( - ValueError, - match="Not supported pipeline foo", - ): - _extract_metrics_from_pipeline(tmp_path, df, ["metrics"], "foo") + df = pd.DataFrame([["bar", "bar"]], columns=["participant_id", "session_id"]) + x, y = _extract_metrics_from_pipeline( + tmp_path, df, ["metrics"], PipelineNameForMetricExtraction.T1_VOLUME + ) + assert_frame_equal( + x, pd.DataFrame([["bar", "bar"]], columns=["participant_id", "session_id"]) + ) + assert y.empty def test_extract_metrics_from_pipeline(tmp_path): - from clinica.iotools.utils.pipeline_handling import _extract_metrics_from_pipeline + from clinica.iotools.utils.pipeline_handling import _extract_metrics_from_pipeline # noqa + (tmp_path / "groups" / "UnitTest").mkdir(parents=True) df = pd.DataFrame([["bar", "bar"]], columns=["participant_id", "session_id"]) - assert _extract_metrics_from_pipeline(tmp_path, df, ["metrics"], "foo") == ( - df, - None, + x, y = _extract_metrics_from_pipeline( + tmp_path, df, ["metrics"], PipelineNameForMetricExtraction.T1_VOLUME ) - (tmp_path / "groups").mkdir() - (tmp_path / "groups" / "UnitTest").mkdir() - x, y = _extract_metrics_from_pipeline(tmp_path, df, ["metrics"], "t1-volume") assert isinstance(x, pd.DataFrame) assert isinstance(y, pd.DataFrame) assert len(x) == 1 @@ -213,10 +312,12 @@ def test_extract_metrics_from_pipeline(tmp_path): } -def test_t1_freesurfer_pipeline_nothing_found(tmp_path): +def test_extract_metrics_from_t1_freesurfer_nothing_found(tmp_path): from pandas.testing import assert_frame_equal - from clinica.iotools.utils.pipeline_handling import t1_freesurfer_pipeline + from clinica.iotools.utils.pipeline_handling import ( + pipeline_metric_extractor_factory, + ) caps = tmp_path / "caps" caps.mkdir() @@ -230,9 +331,9 @@ def test_t1_freesurfer_pipeline_nothing_found(tmp_path): merged_df.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True ) - merged_df_with_t1_freesurfer_metrics, summary = t1_freesurfer_pipeline( - caps, merged_df - ) + merged_df_with_t1_freesurfer_metrics, summary = pipeline_metric_extractor_factory( + PipelineNameForMetricExtraction.T1_FREESURFER + )(caps, merged_df) merged_df_with_t1_freesurfer_metrics.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True ) @@ -241,11 +342,11 @@ def test_t1_freesurfer_pipeline_nothing_found(tmp_path): assert summary.empty -def test_t1_freesurfer_longitudinal_pipeline_nothing_found(tmp_path): +def test_extract_metrics_from_t1_freesurfer_longitudinal_nothing_found(tmp_path): from pandas.testing import assert_frame_equal from clinica.iotools.utils.pipeline_handling import ( - t1_freesurfer_longitudinal_pipeline, + pipeline_metric_extractor_factory, ) caps = tmp_path / "caps" @@ -260,9 +361,9 @@ def test_t1_freesurfer_longitudinal_pipeline_nothing_found(tmp_path): merged_df.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True ) - merged_df_with_t1_freesurfer_metrics, summary = t1_freesurfer_longitudinal_pipeline( - caps, merged_df - ) + merged_df_with_t1_freesurfer_metrics, summary = pipeline_metric_extractor_factory( + PipelineNameForMetricExtraction.T1_FREESURFER_LONGI + )(caps, merged_df) merged_df_with_t1_freesurfer_metrics.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True ) @@ -281,8 +382,10 @@ def write_fake_statistics(tsv_file: Path): fake.to_csv(tsv_file, sep="\t") -def test_t1_freesurfer_pipeline(tmp_path): - from clinica.iotools.utils.pipeline_handling import t1_freesurfer_pipeline +def test_extract_metrics_from_t1_freesurfer(tmp_path): + from clinica.iotools.utils.pipeline_handling import ( + pipeline_metric_extractor_factory, + ) caps = tmp_path / "caps" regional_measures_folder = ( @@ -314,9 +417,9 @@ def test_t1_freesurfer_pipeline(tmp_path): merged_df.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True ) - merged_df_with_t1_freesurfer_metrics, summary = t1_freesurfer_pipeline( - caps, merged_df - ) + merged_df_with_t1_freesurfer_metrics, summary = pipeline_metric_extractor_factory( + PipelineNameForMetricExtraction.T1_FREESURFER + )(caps, merged_df) merged_df_with_t1_freesurfer_metrics.set_index( ["participant_id", "session_id"], inplace=True, verify_integrity=True )