diff --git a/clinica/pipelines/anatomical/freesurfer/atlas/pipeline.py b/clinica/pipelines/anatomical/freesurfer/atlas/pipeline.py index 18c236765..29d756b25 100644 --- a/clinica/pipelines/anatomical/freesurfer/atlas/pipeline.py +++ b/clinica/pipelines/anatomical/freesurfer/atlas/pipeline.py @@ -51,7 +51,12 @@ def get_to_process_with_atlases( get_processed_images, ) from clinica.utils.filemanip import extract_image_ids - from clinica.utils.input_files import T1_FS_DESTRIEUX + from clinica.utils.image import HemiSphere + from clinica.utils.input_files import ( + Parcellation, + get_t1_freesurfer_segmentation, + get_t1_freesurfer_statistics, + ) from clinica.utils.inputs import clinica_file_reader part_ids, sess_ids, list_long_id = grab_image_ids_from_caps_directory( @@ -66,13 +71,6 @@ def get_to_process_with_atlases( if caps_directory.is_dir(): for atlas in atlas_list: - atlas_info = dict( - { - "pattern": f"t1/freesurfer_cross_sectional/sub-*_ses-*/stats/rh.{atlas}.stats", - "description": f"{atlas}-based segmentation", - "needed_pipeline": "t1-freesurfer", - } - ) t1_freesurfer_longitudinal_output = get_processed_images( caps_directory, part_ids, sess_ids, list_long_id ) @@ -87,13 +85,13 @@ def get_to_process_with_atlases( subjects, sessions, caps_directory, - T1_FS_DESTRIEUX, + get_t1_freesurfer_segmentation(Parcellation.DESTRIEUX), ) t1_freesurfer_files, _ = clinica_file_reader( subjects, sessions, caps_directory, - atlas_info, + get_t1_freesurfer_statistics(atlas, HemiSphere.RIGHT), ) image_ids = extract_image_ids(t1_freesurfer_files) image_ids_2 = extract_image_ids(t1_freesurfer_output) diff --git a/clinica/pipelines/anatomical/freesurfer/longitudinal/correction/pipeline.py b/clinica/pipelines/anatomical/freesurfer/longitudinal/correction/pipeline.py index 3fa7114da..c2072ff62 100644 --- a/clinica/pipelines/anatomical/freesurfer/longitudinal/correction/pipeline.py +++ b/clinica/pipelines/anatomical/freesurfer/longitudinal/correction/pipeline.py @@ -59,7 +59,11 @@ def _build_input_node(self): save_part_sess_long_ids_to_tsv, ) from clinica.utils.exceptions import ClinicaException - from clinica.utils.input_files import T1_FS_DESTRIEUX, T1_FS_T_DESTRIEUX + from clinica.utils.input_files import ( + Parcellation, + get_t1_freesurfer_segmentation, + get_t1_freesurfer_template, + ) from clinica.utils.inputs import ( clinica_file_reader, format_clinica_file_reader_errors, @@ -119,19 +123,22 @@ def _build_input_node(self): ) = extract_subject_session_longitudinal_ids_from_filename( to_process_ids ) - + pattern_segmentation = get_t1_freesurfer_segmentation(Parcellation.DESTRIEUX) _, errors_destrieux = clinica_file_reader( - self.subjects, self.sessions, self.caps_directory, T1_FS_DESTRIEUX + self.subjects, self.sessions, self.caps_directory, pattern_segmentation ) + pattern_template = get_t1_freesurfer_template(Parcellation.DESTRIEUX) _, errors_t_destrieux = clinica_file_reader( - self.subjects, list_long_id, self.caps_directory, T1_FS_T_DESTRIEUX + self.subjects, list_long_id, self.caps_directory, pattern_template ) all_errors = [errors_destrieux, errors_t_destrieux] if any(all_errors): message = "Clinica faced errors while trying to read files in your CAPS directory.\n" - for error, info in zip(all_errors, [T1_FS_DESTRIEUX, T1_FS_T_DESTRIEUX]): - message += format_clinica_file_reader_errors(error, info) + for error, pattern in zip( + all_errors, [pattern_segmentation, pattern_template] + ): + message += format_clinica_file_reader_errors(error, pattern) raise ClinicaException(message) save_part_sess_long_ids_to_tsv( diff --git a/clinica/pipelines/anatomical/freesurfer/longitudinal/template/pipeline.py b/clinica/pipelines/anatomical/freesurfer/longitudinal/template/pipeline.py index a1d5b34e8..240e4e821 100644 --- a/clinica/pipelines/anatomical/freesurfer/longitudinal/template/pipeline.py +++ b/clinica/pipelines/anatomical/freesurfer/longitudinal/template/pipeline.py @@ -2,6 +2,11 @@ from typing import List from clinica.pipelines.engine import Pipeline +from clinica.utils.input_files import ( + Parcellation, + get_t1_freesurfer_segmentation, + get_t1_freesurfer_template, +) class T1FreeSurferTemplate(Pipeline): @@ -17,7 +22,6 @@ def get_processed_images( ) -> List[str]: import re - from clinica.utils.input_files import T1_FS_T_DESTRIEUX from clinica.utils.inputs import clinica_file_reader from clinica.utils.longitudinal import get_long_id from clinica.utils.participant import get_unique_subjects @@ -28,11 +32,13 @@ def get_processed_images( list_long_id = [ get_long_id(list_session_ids) for list_session_ids in list_list_session_ids ] - image_ids: List[str] = [] if caps_directory.is_dir(): t1_freesurfer_files, _ = clinica_file_reader( - list_participant_id, list_long_id, caps_directory, T1_FS_T_DESTRIEUX + list_participant_id, + list_long_id, + caps_directory, + get_t1_freesurfer_template(Parcellation.DESTRIEUX), ) image_ids = [ re.search(r"(sub-[a-zA-Z0-9]+)_(long-[a-zA-Z0-9]+)", file).group() @@ -88,9 +94,7 @@ def _build_input_node(self): from clinica.pipelines.anatomical.freesurfer.longitudinal.utils import ( save_part_sess_long_ids_to_tsv, ) - from clinica.utils.exceptions import ClinicaCAPSError, ClinicaException from clinica.utils.filemanip import extract_subjects_sessions_from_filename - from clinica.utils.input_files import T1_FS_DESTRIEUX from clinica.utils.inputs import clinica_file_filter from clinica.utils.longitudinal import ( get_long_id, @@ -149,11 +153,12 @@ def _build_input_node(self): self.subjects, self.sessions = extract_subjects_sessions_from_filename( to_process_ids ) - _, self.subjects, self.sessions = clinica_file_filter( - self.subjects, self.sessions, self.caps_directory, T1_FS_DESTRIEUX + self.subjects, + self.sessions, + self.caps_directory, + get_t1_freesurfer_segmentation(Parcellation.DESTRIEUX), ) - long_ids = get_participants_long_id(self.subjects, self.sessions) save_part_sess_long_ids_to_tsv( self.subjects, self.sessions, long_ids, self.base_dir / self.name diff --git a/clinica/pipelines/anatomical/freesurfer/t1/pipeline.py b/clinica/pipelines/anatomical/freesurfer/t1/pipeline.py index f69d529d1..bbb8dc5bf 100644 --- a/clinica/pipelines/anatomical/freesurfer/t1/pipeline.py +++ b/clinica/pipelines/anatomical/freesurfer/t1/pipeline.py @@ -21,13 +21,19 @@ def get_processed_images( caps_directory: Path, subjects: List[str], sessions: List[str] ) -> List[str]: from clinica.utils.filemanip import extract_image_ids - from clinica.utils.input_files import T1_FS_DESTRIEUX + from clinica.utils.input_files import ( + Parcellation, + get_t1_freesurfer_segmentation, + ) from clinica.utils.inputs import clinica_file_reader image_ids: List[str] = [] if caps_directory.is_dir(): t1_freesurfer_files, _ = clinica_file_reader( - subjects, sessions, caps_directory, T1_FS_DESTRIEUX + subjects, + sessions, + caps_directory, + get_t1_freesurfer_segmentation(Parcellation.DESTRIEUX), ) image_ids = extract_image_ids(t1_freesurfer_files) return image_ids @@ -96,7 +102,7 @@ def _build_input_node(self): extract_subjects_sessions_from_filename, save_participants_sessions, ) - from clinica.utils.input_files import T1W_NII + from clinica.utils.input_files import get_t1w_mri from clinica.utils.inputs import clinica_file_filter from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process @@ -131,9 +137,8 @@ def _build_input_node(self): ) t1w_files, self.subjects, self.sessions = clinica_file_filter( - self.subjects, self.sessions, self.bids_directory, T1W_NII + self.subjects, self.sessions, self.bids_directory, get_t1w_mri() ) - if not t1w_files: raise ClinicaException("Empty dataset or already processed") diff --git a/clinica/pipelines/dwi/connectome/pipeline.py b/clinica/pipelines/dwi/connectome/pipeline.py index 38c0691b5..fea4abe8a 100644 --- a/clinica/pipelines/dwi/connectome/pipeline.py +++ b/clinica/pipelines/dwi/connectome/pipeline.py @@ -4,6 +4,7 @@ from nipype import config from clinica.pipelines.engine import Pipeline +from clinica.utils.input_files import QueryPattern cfg = dict(execution={"parameterize_dirs": False}) config.update_config(cfg) @@ -54,6 +55,35 @@ def get_output_fields(self) -> List[str]: """ return ["response", "fod", "tracts", "nodes", "connectomes"] + @staticmethod + def _get_input_patterns() -> list[QueryPattern]: + from clinica.utils.input_files import ( + DWIFileType, + Parcellation, + get_dwi_preprocessed_brainmask, + get_dwi_preprocessed_file, + get_t1_freesurfer_extracted_brain, + get_t1_freesurfer_segmentation, + get_t1_freesurfer_segmentation_white_matter, + ) + + patterns = [get_t1_freesurfer_segmentation_white_matter()] + patterns.extend( + [ + get_t1_freesurfer_segmentation(p) + for p in (Parcellation.DESIKAN, Parcellation.DESTRIEUX) + ] + ) + patterns.append(get_t1_freesurfer_extracted_brain()) + patterns.extend( + [ + get_dwi_preprocessed_file(file_type) + for file_type in (DWIFileType.NII, DWIFileType.BVEC, DWIFileType.BVAL) + ] + ) + patterns.append(get_dwi_preprocessed_brainmask()) + return patterns + def _build_input_node(self): """Build and connect an input node to the pipeline.""" import re @@ -61,33 +91,18 @@ def _build_input_node(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - import clinica.utils.input_files as input_files from clinica.utils.exceptions import ClinicaCAPSError from clinica.utils.filemanip import save_participants_sessions from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process - # Read CAPS files list_caps_files = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - # Inputs from t1-freesurfer pipeline - input_files.T1_FS_WM, # list_caps_files[0] - input_files.T1_FS_DESIKAN, # list_caps_files[1] - input_files.T1_FS_DESTRIEUX, # list_caps_files[2] - input_files.T1_FS_BRAIN, # list_caps_files[3] - # Inputs from dwi-preprocessing pipeline - input_files.DWI_PREPROC_NII, # list_caps_files[4] - input_files.DWI_PREPROC_BRAINMASK, # list_caps_files[5] - input_files.DWI_PREPROC_BVEC, # list_caps_files[6] - input_files.DWI_PREPROC_BVAL, # list_caps_files[7] - ], - raise_exception=True, + self._get_input_patterns(), ) - # Check space of DWI dataset dwi_file_spaces = [ re.search( @@ -110,7 +125,7 @@ def _build_input_node(self): ] list_grad_fsl = [ - (bvec, bval) for bvec, bval in zip(list_caps_files[6], list_caps_files[7]) + (bvec, bval) for bvec, bval in zip(list_caps_files[5], list_caps_files[6]) ] # Save subjects to process in //participants.tsv @@ -133,7 +148,7 @@ def _build_input_node(self): ("wm_mask_file", list_caps_files[0]), ("t1_brain_file", list_caps_files[3]), ("dwi_file", list_caps_files[4]), - ("dwi_brainmask_file", list_caps_files[5]), + ("dwi_brainmask_file", list_caps_files[7]), ("grad_fsl", list_grad_fsl), ("atlas_files", list_atlas_files), ], @@ -161,7 +176,7 @@ def _build_input_node(self): iterables=[ ("wm_mask_file", list_caps_files[0]), ("dwi_file", list_caps_files[4]), - ("dwi_brainmask_file", list_caps_files[5]), + ("dwi_brainmask_file", list_caps_files[7]), ("grad_fsl", list_grad_fsl), ("atlas_files", list_atlas_files), ], diff --git a/clinica/pipelines/dwi/dti/pipeline.py b/clinica/pipelines/dwi/dti/pipeline.py index f400e53c2..d3bb148b5 100644 --- a/clinica/pipelines/dwi/dti/pipeline.py +++ b/clinica/pipelines/dwi/dti/pipeline.py @@ -67,22 +67,26 @@ def _build_input_node(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - import clinica.utils.input_files as input_files from clinica.utils.filemanip import save_participants_sessions + from clinica.utils.input_files import ( + DWIFileType, + get_dwi_preprocessed_brainmask, + get_dwi_preprocessed_file, + ) from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process + patterns = [ + get_dwi_preprocessed_file(file_type) + for file_type in (DWIFileType.NII, DWIFileType.BVEC, DWIFileType.BVAL) + ] + patterns.append(get_dwi_preprocessed_brainmask()) list_caps_files = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - input_files.DWI_PREPROC_NII, - input_files.DWI_PREPROC_BVEC, - input_files.DWI_PREPROC_BVAL, - input_files.DWI_PREPROC_BRAINMASK, - ], + patterns, raise_exception=True, ) diff --git a/clinica/pipelines/dwi/preprocessing/fmap/pipeline.py b/clinica/pipelines/dwi/preprocessing/fmap/pipeline.py index b40fb2d52..c9faf7571 100644 --- a/clinica/pipelines/dwi/preprocessing/fmap/pipeline.py +++ b/clinica/pipelines/dwi/preprocessing/fmap/pipeline.py @@ -4,6 +4,13 @@ from nipype import config from clinica.pipelines.dwi.preprocessing.engine import DWIPreprocessingPipeline +from clinica.utils.input_files import ( + DWIFileType, + get_dwi_file, + get_dwi_fmap_magnitude1_file, + get_dwi_fmap_phasediff_file, + get_dwi_preprocessed_file, +) # Use hash instead of parameters for iterables folder names # Otherwise path will be too long and generate OSError @@ -30,13 +37,15 @@ def get_processed_images( caps_directory: Path, subjects: List[str], sessions: List[str] ) -> List[str]: from clinica.utils.filemanip import extract_image_ids - from clinica.utils.input_files import DWI_PREPROC_NII from clinica.utils.inputs import clinica_file_reader image_ids: List[str] = [] if caps_directory.is_dir(): preproc_files, _ = clinica_file_reader( - subjects, sessions, caps_directory, DWI_PREPROC_NII + subjects, + sessions, + caps_directory, + get_dwi_preprocessed_file(DWIFileType.NII), ) image_ids = extract_image_ids(preproc_files) return image_ids @@ -97,32 +106,31 @@ def _build_input_node(self): import nipype.pipeline.engine as npe from clinica.utils.filemanip import save_participants_sessions - from clinica.utils.input_files import ( - DWI_BVAL, - DWI_BVEC, - DWI_JSON, - DWI_NII, - FMAP_MAGNITUDE1_NII, - FMAP_PHASEDIFF_JSON, - FMAP_PHASEDIFF_NII, - ) from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process + patterns = [ + get_dwi_file(file_type) + for file_type in ( + DWIFileType.NII, + DWIFileType.BVEC, + DWIFileType.BVAL, + DWIFileType.JSON, + ) + ] + patterns.append(get_dwi_fmap_magnitude1_file(DWIFileType.NII)) + patterns.extend( + [ + get_dwi_fmap_phasediff_file(file_type) + for file_type in (DWIFileType.NII, DWIFileType.JSON) + ] + ) list_bids_files = clinica_list_of_files_reader( self.subjects, self.sessions, self.bids_directory, - [ - DWI_NII, - DWI_BVEC, - DWI_BVAL, - DWI_JSON, - FMAP_MAGNITUDE1_NII, - FMAP_PHASEDIFF_NII, - FMAP_PHASEDIFF_JSON, - ], + patterns, raise_exception=True, ) save_participants_sessions( @@ -131,9 +139,7 @@ def _build_input_node(self): if len(self.subjects): print_images_to_process(self.subjects, self.sessions) cprint( - f"List available in {self.base_dir / self.name / 'participants.tsv'}" - ) - cprint( + f"List available in {self.base_dir / self.name / 'participants.tsv'}\n" "Computational time will depend of the number of volumes in your DWI dataset and the use of CUDA." ) diff --git a/clinica/pipelines/dwi/preprocessing/t1/pipeline.py b/clinica/pipelines/dwi/preprocessing/t1/pipeline.py index 0882cf714..197b5c885 100644 --- a/clinica/pipelines/dwi/preprocessing/t1/pipeline.py +++ b/clinica/pipelines/dwi/preprocessing/t1/pipeline.py @@ -4,6 +4,13 @@ from nipype import config from clinica.pipelines.dwi.preprocessing.engine import DWIPreprocessingPipeline +from clinica.utils.input_files import ( + DWIFileType, + get_dwi_file, + get_dwi_preprocessed_file, + get_t1w_mri, +) +from clinica.utils.inputs import clinica_file_reader cfg = dict(execution={"parameterize_dirs": False}) config.update_config(cfg) @@ -34,13 +41,14 @@ def get_processed_images( caps_directory: Path, subjects: List[str], sessions: List[str] ) -> List[str]: from clinica.utils.filemanip import extract_image_ids - from clinica.utils.input_files import DWI_PREPROC_NII - from clinica.utils.inputs import clinica_file_reader image_ids: List[str] = [] if caps_directory.is_dir(): preproc_files, _ = clinica_file_reader( - subjects, sessions, caps_directory, DWI_PREPROC_NII + subjects, + sessions, + caps_directory, + get_dwi_preprocessed_file(DWIFileType.NII), ) image_ids = extract_image_ids(preproc_files) return image_ids @@ -90,21 +98,20 @@ def filter_qc(self) -> tuple[list[str], list[str]]: from clinica.pipelines.dwi.preprocessing.utils import check_dwi_volume from clinica.pipelines.dwi.utils import DWIDataset from clinica.utils.bids import BIDSFileName - from clinica.utils.input_files import ( - DWI_BVAL, - DWI_BVEC, - DWI_NII, - ) from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint subjects = [] sessions = [] + patterns = [ + get_dwi_file(file_type) + for file_type in (DWIFileType.NII, DWIFileType.BVEC, DWIFileType.BVAL) + ] list_bids_files = clinica_list_of_files_reader( self.subjects, self.sessions, self.bids_directory, - [DWI_NII, DWI_BVEC, DWI_BVAL], + patterns, raise_exception=True, ) for dwi_image_file, b_vectors_file, b_values_file in zip(*list_bids_files): @@ -134,22 +141,25 @@ def _build_input_node(self): import nipype.pipeline.engine as npe from clinica.utils.filemanip import save_participants_sessions - from clinica.utils.input_files import ( - DWI_BVAL, - DWI_BVEC, - DWI_JSON, - DWI_NII, - T1W_NII, - ) from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process + patterns = [ + get_dwi_file(file_type) + for file_type in ( + DWIFileType.NII, + DWIFileType.JSON, + DWIFileType.BVEC, + DWIFileType.BVAL, + ) + ] + patterns.insert(0, get_t1w_mri()) list_bids_files = clinica_list_of_files_reader( self.subjects, self.sessions, self.bids_directory, - [T1W_NII, DWI_NII, DWI_JSON, DWI_BVEC, DWI_BVAL], + patterns, raise_exception=True, ) diff --git a/clinica/pipelines/machine_learning/input.py b/clinica/pipelines/machine_learning/input.py index dfeeb2eb5..d0e1edbb4 100644 --- a/clinica/pipelines/machine_learning/input.py +++ b/clinica/pipelines/machine_learning/input.py @@ -150,42 +150,38 @@ def get_images(self): """ Returns: a list of filenames """ - from clinica.utils.input_files import pet_volume_normalized_suvr_pet + from clinica.utils.group import GroupID, GroupLabel + from clinica.utils.input_files import ( + get_pet_volume_normalized_suvr, + get_t1_volume_template_tpm_in_mni, + ) from clinica.utils.inputs import clinica_file_reader + from clinica.utils.spm import SPMTissue if self._images is not None: return self._images + group_id = GroupID.from_label(GroupLabel(self._input_params["group_label"])) if self._input_params["image_type"] == "T1w": - if self._input_params["fwhm"] == 0: - fwhm_key_value = "" - else: - fwhm_key_value = f"_fwhm-{self._input_params['fwhm']}mm" - - self._images = [ - path.join( - self._input_params["caps_directory"], - "subjects", - self._subjects[i], - self._sessions[i], - "t1", - "spm", - "dartel", - f"group-{self._input_params['group_label']}", - f"{self._subjects[i]}_{self._sessions[i]}_T1w" - f"_segm-graymatter_space-Ixi549Space_modulated-{self._input_params['modulated']}{fwhm_key_value}_probability.nii.gz", - ) - for i in range(len(self._subjects)) - ] - + pattern = get_t1_volume_template_tpm_in_mni( + group_id=group_id, + tissue=SPMTissue.GRAY_MATTER, + modulation=self._input_params["modulated"], + fwhm=self._input_params["fwhm"], + ) + self._images, _ = clinica_file_reader( + self._subjects, + self._sessions, + self._input_params["caps_directory"], + pattern, + ) for image in self._images: if not path.exists(image): raise Exception("File %s doesn't exists." % image) - elif self._input_params["image_type"] == "PET": - caps_files_information = pet_volume_normalized_suvr_pet( - acq_label=self._input_params["acq_label"], - group_label=self._input_params["group_label"], + pattern = get_pet_volume_normalized_suvr( + tracer=self._input_params["acq_label"], + group_id=group_id, suvr_reference_region=self._input_params["suvr_reference_region"], use_brainmasked_image=True, use_pvc_data=self._input_params["use_pvc_data"], @@ -195,7 +191,7 @@ def get_images(self): self._subjects, self._sessions, self._input_params["caps_directory"], - caps_files_information, + pattern, ) else: raise ValueError( diff --git a/clinica/pipelines/machine_learning_spatial_svm/spatial_svm_pipeline.py b/clinica/pipelines/machine_learning_spatial_svm/spatial_svm_pipeline.py index 70b4db0f0..e66ffab88 100644 --- a/clinica/pipelines/machine_learning_spatial_svm/spatial_svm_pipeline.py +++ b/clinica/pipelines/machine_learning_spatial_svm/spatial_svm_pipeline.py @@ -1,6 +1,11 @@ from typing import List from clinica.pipelines.engine import GroupPipeline +from clinica.utils.input_files import ( + get_pet_volume_normalized_suvr, + get_t1_volume_group_template, + get_t1_volume_template_tpm_in_mni, +) class SpatialSVM(GroupPipeline): @@ -49,21 +54,16 @@ def get_output_fields(self) -> List[str]: def _build_input_node(self): """Build and connect an input node to the pipeline.""" - import os - import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe from clinica.utils.exceptions import ClinicaCAPSError, ClinicaException - from clinica.utils.input_files import ( - pet_volume_normalized_suvr_pet, - t1_volume_final_group_template, - ) from clinica.utils.inputs import ( clinica_file_reader, clinica_group_reader, format_clinica_file_reader_errors, ) + from clinica.utils.spm import SPMTissue from clinica.utils.ux import print_groups_in_caps_directory if not self.group_directory.exists(): @@ -80,19 +80,10 @@ def _build_input_node(self): ), ) all_errors = [] - if self.parameters["orig_input_data_ml"] == "t1-volume": - caps_files_information = { - "pattern": os.path.join( - "t1", - "spm", - "dartel", - str(self.group_id), - "*_T1w_segm-graymatter_space-Ixi549Space_modulated-on_probability.nii.gz", - ), - "description": "graymatter tissue segmented in T1w MRI in Ixi549 space", - "needed_pipeline": "t1-volume-tissue-segmentation", - } + pattern = get_t1_volume_template_tpm_in_mni( + self.group_id, SPMTissue.GRAY_MATTER, modulation=True + ) elif self.parameters["orig_input_data_ml"] == "pet-volume": if not ( self.parameters["acq_label"] @@ -104,8 +95,9 @@ def _build_input_node(self): f"- suvr_reference_region: {self.parameters['suvr_reference_region']}\n" f"- use_pvc_data: {self.parameters['use_pvc_data']}\n" ) - caps_files_information = pet_volume_normalized_suvr_pet( - acq_label=self.parameters["acq_label"], + pattern = get_pet_volume_normalized_suvr( + tracer=self.parameters["acq_label"], + group_id=self.group_id, suvr_reference_region=self.parameters["suvr_reference_region"], use_brainmasked_image=False, use_pvc_data=self.parameters["use_pvc_data"], @@ -115,27 +107,17 @@ def _build_input_node(self): raise ValueError( f"Image type {self.parameters['orig_input_data_ml']} unknown." ) - input_image, caps_error = clinica_file_reader( - self.subjects, - self.sessions, - self.caps_directory, - caps_files_information, + self.subjects, self.sessions, self.caps_directory, pattern ) if caps_error: - all_errors.append( - format_clinica_file_reader_errors(caps_error, caps_files_information) - ) - + all_errors.append(format_clinica_file_reader_errors(caps_error, pattern)) try: dartel_input = clinica_group_reader( - self.caps_directory, - t1_volume_final_group_template(self.group_label), + self.caps_directory, get_t1_volume_group_template(self.group_id) ) except ClinicaException as e: all_errors.append(e) - - # Raise all errors if some happened if any(all_errors): error_message = "Clinica faced errors while trying to read files in your CAPS directories.\n" for msg in all_errors: diff --git a/clinica/pipelines/pet/engine.py b/clinica/pipelines/pet/engine.py index 3a638c2f3..1be5834f2 100644 --- a/clinica/pipelines/pet/engine.py +++ b/clinica/pipelines/pet/engine.py @@ -1,4 +1,5 @@ from clinica.pipelines.engine import Pipeline +from clinica.utils.input_files import QueryPattern from clinica.utils.pet import ReconstructionMethod, Tracer from clinica.utils.stream import log_and_raise @@ -22,10 +23,10 @@ def _check_pipeline_parameters(self) -> None: else: self.parameters["reconstruction_method"] = None - def _get_pet_scans_query(self) -> dict: + def _get_pet_scans_query(self) -> QueryPattern: """Return the query to retrieve PET scans.""" - from clinica.utils.input_files import bids_pet_nii + from clinica.utils.input_files import get_pet_nifti - return bids_pet_nii( + return get_pet_nifti( self.parameters["acq_label"], self.parameters["reconstruction_method"] ) diff --git a/clinica/pipelines/pet/linear/pipeline.py b/clinica/pipelines/pet/linear/pipeline.py index 7c3856f32..835a6a5c6 100644 --- a/clinica/pipelines/pet/linear/pipeline.py +++ b/clinica/pipelines/pet/linear/pipeline.py @@ -5,6 +5,7 @@ from nipype import config from clinica.pipelines.pet.engine import PETPipeline +from clinica.utils.input_files import get_t1w_linear cfg = dict(execution={"parameterize_dirs": False}) config.update_config(cfg) @@ -60,17 +61,12 @@ def _build_input_node(self): import nipype.pipeline.engine as npe from clinica.pipelines.pet.utils import get_suvr_mask - from clinica.utils.exceptions import ( - ClinicaBIDSError, - ClinicaCAPSError, - ClinicaException, - ) + from clinica.utils.exceptions import ClinicaBIDSError, ClinicaCAPSError from clinica.utils.image import get_mni_template from clinica.utils.input_files import ( - T1W_LINEAR, - T1W_LINEAR_CROPPED, - T1W_NII, - T1W_TO_MNI_TRANSFORM, + get_t1w_linear, + get_t1w_mri, + get_t1w_to_mni_transform, ) from clinica.utils.inputs import ( clinica_file_reader, @@ -95,22 +91,16 @@ def _build_input_node(self): pet_errors, self._get_pet_scans_query() ) ) - - # T1w file: + pattern = get_t1w_mri() t1w_files, t1w_errors = clinica_file_reader( - self.subjects, self.sessions, self.bids_directory, T1W_NII + self.subjects, self.sessions, self.bids_directory, pattern ) if t1w_errors: raise ClinicaBIDSError( - format_clinica_file_reader_errors(t1w_errors, T1W_NII) + format_clinica_file_reader_errors(t1w_errors, pattern) ) - - # Inputs from t1-linear pipeline - # T1w images registered - t1w_linear_file_pattern = ( - T1W_LINEAR - if self.parameters.get("uncropped_image", False) - else T1W_LINEAR_CROPPED + t1w_linear_file_pattern = get_t1w_linear( + cropped=not self.parameters.get("uncropped_image", False) ) t1w_linear_files, t1w_linear_errors = clinica_file_reader( self.subjects, self.sessions, self.caps_directory, t1w_linear_file_pattern @@ -121,17 +111,14 @@ def _build_input_node(self): t1w_linear_errors, t1w_linear_file_pattern ) ) - # Transformation files from T1w files to MNI: + pattern = get_t1w_to_mni_transform() t1w_to_mni_transformation_files, t1w_to_mni_errors = clinica_file_reader( - self.subjects, self.sessions, self.caps_directory, T1W_TO_MNI_TRANSFORM + self.subjects, self.sessions, self.caps_directory, pattern ) if t1w_to_mni_errors: raise ClinicaCAPSError( - format_clinica_file_reader_errors( - t1w_to_mni_errors, T1W_TO_MNI_TRANSFORM - ) + format_clinica_file_reader_errors(t1w_to_mni_errors, pattern) ) - if len(self.subjects): print_images_to_process(self.subjects, self.sessions) cprint("The pipeline will last approximately 3 minutes per image.") diff --git a/clinica/pipelines/pet/volume/pipeline.py b/clinica/pipelines/pet/volume/pipeline.py index bceb8ae2e..9cad71d4e 100644 --- a/clinica/pipelines/pet/volume/pipeline.py +++ b/clinica/pipelines/pet/volume/pipeline.py @@ -1,9 +1,15 @@ -from typing import List, Optional +from typing import List from nipype import config from clinica.pipelines.engine import GroupPipeline from clinica.pipelines.pet.engine import PETPipeline +from clinica.utils.input_files import ( + get_t1_volume_deformation_to_template, + get_t1_volume_group_template, + get_t1_volume_tpm, + get_t1w_mri, +) # Use hash instead of parameters for iterables folder names # Otherwise path will be too long and generate OSError @@ -88,13 +94,6 @@ def _build_input_node(self): from clinica.pipelines.pet.utils import get_suvr_mask, read_psf_information from clinica.utils.exceptions import ClinicaException from clinica.utils.filemanip import save_participants_sessions - from clinica.utils.input_files import ( - T1W_NII, - t1_volume_deformation_to_template, - t1_volume_final_group_template, - t1_volume_native_tpm, - t1_volume_native_tpm_in_mni, - ) from clinica.utils.inputs import ( clinica_file_reader, clinica_group_reader, @@ -126,7 +125,6 @@ def _build_input_node(self): # PET from BIDS directory # Native T1w-MRI - try: pet_bids, t1w_bids = clinica_list_of_files_reader( self.subjects, @@ -134,7 +132,7 @@ def _build_input_node(self): self.bids_directory, [ self._get_pet_scans_query(), - T1W_NII, + get_t1w_mri(), ], ) except ClinicaException as e: @@ -147,7 +145,7 @@ def _build_input_node(self): self.sessions, self.caps_directory, [ - t1_volume_native_tpm_in_mni(tissue_number, False) + get_t1_volume_tpm(tissue_number, modulation=False, mni_space=True) for tissue_number in self.parameters["mask_tissues"] ], ) @@ -162,25 +160,22 @@ def _build_input_node(self): all_errors += e # Flowfields + pattern = get_t1_volume_deformation_to_template(self.group_id) flowfields_caps, flowfields_errors = clinica_file_reader( self.subjects, self.sessions, self.caps_directory, - t1_volume_deformation_to_template(self.group_label), + pattern, ) if flowfields_errors: all_errors.append( - format_clinica_file_reader_errors( - flowfields_errors, - t1_volume_deformation_to_template(self.group_label), - ) + format_clinica_file_reader_errors(flowfields_errors, pattern) ) # Dartel Template try: final_template = clinica_group_reader( - self.caps_directory, - t1_volume_final_group_template(self.group_label), + self.caps_directory, get_t1_volume_group_template(self.group_id) ) except ClinicaException as e: all_errors.append(e) @@ -200,14 +195,15 @@ def _build_input_node(self): if self.parameters["apply_pvc"]: # pvc tissues input try: + patterns = [ + get_t1_volume_tpm(tissue_number, modulation=False, mni_space=False) + for tissue_number in self.parameters["pvc_mask_tissues"] + ] pvc_tissues_input = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - t1_volume_native_tpm(tissue_number) - for tissue_number in self.parameters["pvc_mask_tissues"] - ], + patterns, ) pvc_tissues_input_final = [] for subject_tissue_list in zip(*pvc_tissues_input): diff --git a/clinica/pipelines/pet_surface/pet_surface_pipeline.py b/clinica/pipelines/pet_surface/pet_surface_pipeline.py index 54046a037..d040dc9eb 100644 --- a/clinica/pipelines/pet_surface/pet_surface_pipeline.py +++ b/clinica/pipelines/pet_surface/pet_surface_pipeline.py @@ -1,6 +1,8 @@ from typing import List from clinica.pipelines.pet.engine import PETPipeline +from clinica.utils.image import HemiSphere +from clinica.utils.input_files import Parcellation class PetSurface(PETPipeline): @@ -72,11 +74,15 @@ def _build_input_node_longitudinal(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - import clinica.utils.input_files as input_files from clinica.iotools.utils.data_handling import ( check_relative_volume_location_in_world_coordinate_system, ) from clinica.utils.exceptions import ClinicaException + from clinica.utils.input_files import ( + get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu, + get_t1_freesurfer_longitudinal_parcellation, + get_t1_freesurfer_longitudinal_white_matter_surface, + ) from clinica.utils.inputs import ( clinica_file_reader, clinica_list_of_files_reader, @@ -100,8 +106,28 @@ def _build_input_node_longitudinal(self): self._get_pet_scans_query(), ) if pet_errors: - all_errors.append(format_clinica_file_reader_errors(pet_errors)) + all_errors.append( + format_clinica_file_reader_errors( + pet_errors, self._get_pet_scans_query() + ) + ) + patterns = [ + get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu() + ] + patterns.extend( + [ + get_t1_freesurfer_longitudinal_white_matter_surface(hemisphere) + for hemisphere in (HemiSphere.RIGHT, HemiSphere.LEFT) + ] + ) + patterns.extend( + [ + get_t1_freesurfer_longitudinal_parcellation(hemisphere, parcellation) + for parcellation in (Parcellation.DESTRIEUX, Parcellation.DESIKAN) + for hemisphere in (HemiSphere.LEFT, HemiSphere.RIGHT) + ] + ) try: ( read_parameters_node.inputs.orig_nu, @@ -115,17 +141,8 @@ def _build_input_node_longitudinal(self): self.subjects, self.sessions, self.caps_directory, - [ - input_files.T1_FS_LONG_ORIG_NU, - input_files.T1_FS_LONG_SURF_R, - input_files.T1_FS_LONG_SURF_L, - input_files.T1_FS_LONG_DESTRIEUX_PARC_L, - input_files.T1_FS_LONG_DESTRIEUX_PARC_R, - input_files.T1_FS_LONG_DESIKAN_PARC_L, - input_files.T1_FS_LONG_DESIKAN_PARC_R, - ], + patterns, ) - except ClinicaException as e: all_errors.append(e) @@ -164,11 +181,15 @@ def _build_input_node_cross_sectional(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - import clinica.utils.input_files as input_files from clinica.iotools.utils.data_handling import ( check_relative_volume_location_in_world_coordinate_system, ) from clinica.utils.exceptions import ClinicaException + from clinica.utils.input_files import ( + get_t1_freesurfer_intensity_normalized_volume_after_nu, + get_t1_freesurfer_parcellation, + get_t1_freesurfer_white_matter_surface, + ) from clinica.utils.inputs import ( clinica_file_reader, clinica_list_of_files_reader, @@ -193,6 +214,20 @@ def _build_input_node_cross_sectional(self): if pet_errors: all_errors.append(format_clinica_file_reader_errors(pet_errors)) + patterns = [get_t1_freesurfer_intensity_normalized_volume_after_nu()] + patterns.extend( + [ + get_t1_freesurfer_white_matter_surface(hemisphere) + for hemisphere in (HemiSphere.RIGHT, HemiSphere.LEFT) + ] + ) + patterns.extend( + [ + get_t1_freesurfer_parcellation(hemisphere, parcellation) + for parcellation in (Parcellation.DESTRIEUX, Parcellation.DESIKAN) + for hemisphere in (HemiSphere.LEFT, HemiSphere.RIGHT) + ] + ) try: ( read_parameters_node.inputs.orig_nu, @@ -206,15 +241,7 @@ def _build_input_node_cross_sectional(self): self.subjects, self.sessions, self.caps_directory, - [ - input_files.T1_FS_ORIG_NU, - input_files.T1_FS_WM_SURF_R, - input_files.T1_FS_WM_SURF_L, - input_files.T1_FS_DESTRIEUX_PARC_L, - input_files.T1_FS_DESTRIEUX_PARC_R, - input_files.T1_FS_DESIKAN_PARC_L, - input_files.T1_FS_DESIKAN_PARC_R, - ], + patterns, ) except ClinicaException as e: all_errors.append(e) diff --git a/clinica/pipelines/statistics_surface/pipeline.py b/clinica/pipelines/statistics_surface/pipeline.py index 66626190d..5523aff24 100644 --- a/clinica/pipelines/statistics_surface/pipeline.py +++ b/clinica/pipelines/statistics_surface/pipeline.py @@ -110,6 +110,7 @@ def get_output_fields(self) -> List[str]: def _build_input_node(self): """Build and connect an input node to the pipeline.""" from clinica.utils.exceptions import ClinicaException + from clinica.utils.input_files import QueryPattern from clinica.utils.inputs import clinica_list_of_files_reader # Check if already present in CAPS @@ -118,35 +119,36 @@ def _build_input_node(self): # Note(AR): if the user wants to compare Cortical Thickness measure with PET measure # using the group_id, Clinica won't allow it. # TODO: Modify this behaviour - group_folder = self.caps_directory / "groups" / str(self.group_id) - if group_folder.exists(): + if self.group_directory.exists(): raise ClinicaException( - f"Group label {self.group_label} already exists (found in {group_folder})." + f"Group label {self.group_label} already exists (found in {self.group_directory})." "Please choose another one or delete the existing folder and " "also the working directory and rerun the pipeline." ) # Check input files - surface_query = [] + patterns: list[QueryPattern] = [] # clinica_files_reader expects regexp to start at subjects/ so sub-*/ses-*/ is removed here fwhm = str(self.parameters["full_width_at_half_maximum"]) for direction, hemi in zip(["left", "right"], ["lh", "rh"]): cut_pattern = "sub-*/ses-*/" query = {"subject": "sub-*", "session": "ses-*", "hemi": hemi, "fwhm": fwhm} pattern_hemisphere = self.parameters["custom_file"] % query - surface_based_info = { - "pattern": pattern_hemisphere[ - pattern_hemisphere.find(cut_pattern) + len(cut_pattern) : - ], - "description": f"surface-based features on {direction} hemisphere at FWHM = {fwhm}", - } - surface_query.append(surface_based_info) + patterns.append( + QueryPattern( + pattern_hemisphere[ + pattern_hemisphere.find(cut_pattern) + len(cut_pattern) : + ], + f"surface-based features on {direction} hemisphere at FWHM = {fwhm}", + "", + ) + ) try: clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - surface_query, + patterns, ) except ClinicaException as e: raise RuntimeError(e) diff --git a/clinica/pipelines/statistics_volume/statistics_volume_pipeline.py b/clinica/pipelines/statistics_volume/statistics_volume_pipeline.py index ab72d5e7d..92c6f7ca3 100644 --- a/clinica/pipelines/statistics_volume/statistics_volume_pipeline.py +++ b/clinica/pipelines/statistics_volume/statistics_volume_pipeline.py @@ -1,6 +1,7 @@ from typing import List from clinica.pipelines.engine import GroupPipeline +from clinica.utils.input_files import QueryPattern from clinica.utils.pet import SUVRReferenceRegion, Tracer @@ -93,14 +94,20 @@ def _build_input_node(self): import nipype.pipeline.engine as npe from clinica.utils.exceptions import ClinicaException + from clinica.utils.group import GroupID, GroupLabel from clinica.utils.input_files import ( - pet_volume_normalized_suvr_pet, - t1_volume_template_tpm_in_mni, + get_pet_volume_normalized_suvr, + get_t1_volume_template_tpm_in_mni, ) from clinica.utils.inputs import clinica_file_filter from clinica.utils.stream import cprint from clinica.utils.ux import print_begin_image, print_images_to_process + dartel_group_id = None + if self.parameters["group_label_dartel"] != "*": + dartel_group_id = GroupID.from_label( + GroupLabel(self.parameters["group_label_dartel"]) + ) if self.parameters["orig_input_data_volume"] == "pet-volume": if not ( self.parameters["acq_label"] @@ -114,9 +121,9 @@ def _build_input_node(self): ) self.parameters["measure_label"] = self.parameters["acq_label"].value - information_dict = pet_volume_normalized_suvr_pet( - acq_label=self.parameters["acq_label"], - group_label=self.parameters["group_label_dartel"], + pattern = get_pet_volume_normalized_suvr( + tracer=self.parameters["acq_label"], + group_id=dartel_group_id, suvr_reference_region=self.parameters["suvr_reference_region"], use_brainmasked_image=True, use_pvc_data=self.parameters["use_pvc_data"], @@ -124,13 +131,12 @@ def _build_input_node(self): ) elif self.parameters["orig_input_data_volume"] == "t1-volume": self.parameters["measure_label"] = "graymatter" - information_dict = t1_volume_template_tpm_in_mni( - group_label=self.parameters["group_label_dartel"], - tissue_number=1, + pattern = get_t1_volume_template_tpm_in_mni( + group_id=dartel_group_id, + tissue=1, modulation=True, fwhm=self.parameters["full_width_at_half_maximum"], ) - elif self.parameters["orig_input_data_volume"] == "custom-pipeline": if not self.parameters["custom_file"]: raise ClinicaException( @@ -138,17 +144,16 @@ def _build_input_node(self): ) # If custom file are grabbed, information of fwhm is irrelevant and should not appear on final filenames self.parameters["full_width_at_half_maximum"] = None - information_dict = { - "pattern": self.parameters["custom_file"], - "description": "custom file provided by user", - } + pattern = QueryPattern( + self.parameters["custom_file"], "custom file provided by user", "" + ) else: raise ValueError( f"Input data {self.parameters['orig_input_data_volume']} unknown." ) input_files, self.subjects, self.sessions = clinica_file_filter( - self.subjects, self.sessions, self.caps_directory, information_dict + self.subjects, self.sessions, self.caps_directory, pattern ) read_parameters_node = npe.Node( diff --git a/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py b/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py index e201ab4fc..379b1c0cd 100644 --- a/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py +++ b/clinica/pipelines/statistics_volume_correction/statistics_volume_correction_pipeline.py @@ -43,15 +43,14 @@ def _build_input_node(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe + from clinica.utils.input_files import QueryPattern from clinica.utils.inputs import clinica_group_reader t_map = clinica_group_reader( self.caps_directory, - { - "pattern": self.parameters["t_map"] + "*", - "description": "statistics t map", - "needed_pipeline": "statistics-volume", - }, + QueryPattern( + self.parameters["t_map"] + "*", "statistics t map", "statistics-volume" + ), ) read_parameters_node = npe.Node( diff --git a/clinica/pipelines/t1_linear/anat_linear_pipeline.py b/clinica/pipelines/t1_linear/anat_linear_pipeline.py index daa0c714d..08fd09a21 100644 --- a/clinica/pipelines/t1_linear/anat_linear_pipeline.py +++ b/clinica/pipelines/t1_linear/anat_linear_pipeline.py @@ -72,7 +72,7 @@ def get_processed_images( caps_directory: Path, subjects: List[str], sessions: List[str] ) -> List[str]: from clinica.utils.filemanip import extract_image_ids - from clinica.utils.input_files import T1W_LINEAR_CROPPED + from clinica.utils.input_files import get_t1w_linear from clinica.utils.inputs import clinica_file_reader image_ids: List[str] = [] @@ -81,7 +81,7 @@ def get_processed_images( subjects, sessions, caps_directory, - T1W_LINEAR_CROPPED, + get_t1w_linear(cropped=True), ) image_ids = extract_image_ids(cropped_files) return image_ids @@ -119,10 +119,9 @@ def _build_input_node(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - from clinica.utils.exceptions import ClinicaBIDSError, ClinicaException from clinica.utils.filemanip import extract_subjects_sessions_from_filename from clinica.utils.image import get_mni_template - from clinica.utils.input_files import T1W_NII, Flair_T2W_NII + from clinica.utils.input_files import get_t1w_mri, get_t2w_mri from clinica.utils.inputs import clinica_file_filter from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process @@ -155,10 +154,9 @@ def _build_input_node(self): # Inputs from anat/ folder # ======================== # anat image file: - query = T1W_NII if self.name == "t1-linear" else Flair_T2W_NII - + pattern = get_t1w_mri() if self.name == "t1-linear" else get_t2w_mri() anat_files, filtered_subjects, filtered_sessions = clinica_file_filter( - self.subjects, self.sessions, self.bids_directory, query + self.subjects, self.sessions, self.bids_directory, pattern ) self.subjects = filtered_subjects self.sessions = filtered_sessions diff --git a/clinica/pipelines/t1_volume_create_dartel/t1_volume_create_dartel_pipeline.py b/clinica/pipelines/t1_volume_create_dartel/t1_volume_create_dartel_pipeline.py index 1e5589f7b..169cc77a1 100644 --- a/clinica/pipelines/t1_volume_create_dartel/t1_volume_create_dartel_pipeline.py +++ b/clinica/pipelines/t1_volume_create_dartel/t1_volume_create_dartel_pipeline.py @@ -46,7 +46,7 @@ def _build_input_node(self): import nipype.pipeline.engine as npe from clinica.utils.exceptions import ClinicaException - from clinica.utils.input_files import t1_volume_dartel_input_tissue + from clinica.utils.input_files import get_t1_volume_dartel_input_tissue from clinica.utils.inputs import clinica_list_of_files_reader from clinica.utils.stream import cprint from clinica.utils.ux import ( @@ -82,16 +82,16 @@ def _build_input_node(self): fields=self.get_input_fields(), mandatory_inputs=True ), ) - + patterns = [ + get_t1_volume_dartel_input_tissue(tissue_number) + for tissue_number in self.parameters["dartel_tissues"] + ] try: d_input = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - t1_volume_dartel_input_tissue(tissue_number) - for tissue_number in self.parameters["dartel_tissues"] - ], + patterns, ) # d_input is a list of size len(self.parameters['dartel_tissues']) # Each element of this list is a list of size len(self.subjects) diff --git a/clinica/pipelines/t1_volume_dartel2mni/t1_volume_dartel2mni_pipeline.py b/clinica/pipelines/t1_volume_dartel2mni/t1_volume_dartel2mni_pipeline.py index 3bb22fc72..91573398b 100644 --- a/clinica/pipelines/t1_volume_dartel2mni/t1_volume_dartel2mni_pipeline.py +++ b/clinica/pipelines/t1_volume_dartel2mni/t1_volume_dartel2mni_pipeline.py @@ -48,9 +48,9 @@ def _build_input_node(self): from clinica.utils.exceptions import ClinicaCAPSError, ClinicaException from clinica.utils.input_files import ( - t1_volume_deformation_to_template, - t1_volume_final_group_template, - t1_volume_native_tpm, + get_t1_volume_deformation_to_template, + get_t1_volume_group_template, + get_t1_volume_tpm, ) from clinica.utils.inputs import ( clinica_file_reader, @@ -70,7 +70,6 @@ def _build_input_node(self): f"Group {self.group_label} does not exist. " "Did you run t1-volume or t1-volume-create-dartel pipeline?" ) - all_errors = [] read_input_node = npe.Node( name="LoadingCLIArguments", @@ -78,18 +77,18 @@ def _build_input_node(self): fields=self.get_input_fields(), mandatory_inputs=True ), ) - # Segmented Tissues # ================= + patterns = [ + get_t1_volume_tpm(tissue_number, modulation=False, mni_space=False) + for tissue_number in self.parameters["tissues"] + ] try: tissues_input = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - t1_volume_native_tpm(tissue_number) - for tissue_number in self.parameters["tissues"] - ], + patterns, ) # Tissues_input has a length of len(self.parameters['mask_tissues']). Each of these elements has a size of # len(self.subjects). We want the opposite : a list of size len(self.subjects) whose elements have a size of @@ -103,25 +102,26 @@ def _build_input_node(self): # Flow Fields # =========== + pattern = get_t1_volume_deformation_to_template(self.group_id) read_input_node.inputs.flowfield_files, flowfield_errors = clinica_file_reader( self.subjects, self.sessions, self.caps_directory, - t1_volume_deformation_to_template(self.group_label), + pattern, ) if flowfield_errors: - all_errors.append(format_clinica_file_reader_errors(flowfield_errors)) + all_errors.append( + format_clinica_file_reader_errors(flowfield_errors, pattern) + ) # Dartel Template # ================ try: read_input_node.inputs.template_file = clinica_group_reader( - self.caps_directory, - t1_volume_final_group_template(self.group_label), + self.caps_directory, get_t1_volume_group_template(self.group_id) ) except ClinicaException as e: all_errors.append(e) - if any(all_errors): error_message = "Clinica faced error(s) while trying to read files in your CAPS/BIDS directories.\n" for msg in all_errors: diff --git a/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py b/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py index 56cf6edf8..c816f7dc9 100644 --- a/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py +++ b/clinica/pipelines/t1_volume_parcellation/t1_volume_parcellation_pipeline.py @@ -46,9 +46,9 @@ def _build_input_node(self): import nipype.interfaces.utility as nutil import nipype.pipeline.engine as npe - from clinica.utils.exceptions import ClinicaCAPSError, ClinicaException - from clinica.utils.input_files import t1_volume_template_tpm_in_mni - from clinica.utils.inputs import clinica_file_filter, clinica_file_reader + from clinica.utils.exceptions import ClinicaException + from clinica.utils.input_files import get_t1_volume_template_tpm_in_mni + from clinica.utils.inputs import clinica_file_filter from clinica.utils.stream import cprint from clinica.utils.ux import ( print_groups_in_caps_directory, @@ -61,16 +61,16 @@ def _build_input_node(self): f"Group {self.group_label} does not exist. " "Did you run t1-volume or t1-volume-create-dartel pipeline?" ) - + pattern = get_t1_volume_template_tpm_in_mni( + group_id=self.group_id, + tissue=1, + modulation=self.parameters["modulate"], + ) gm_mni, self.subjects, self.sessions = clinica_file_filter( self.subjects, self.sessions, self.caps_directory, - t1_volume_template_tpm_in_mni( - group_label=self.group_label, - tissue_number=1, - modulation=self.parameters["modulate"], - ), + pattern, ) read_parameters_node = npe.Node( diff --git a/clinica/pipelines/t1_volume_register_dartel/t1_volume_register_dartel_pipeline.py b/clinica/pipelines/t1_volume_register_dartel/t1_volume_register_dartel_pipeline.py index 0b4955669..2152ffea5 100644 --- a/clinica/pipelines/t1_volume_register_dartel/t1_volume_register_dartel_pipeline.py +++ b/clinica/pipelines/t1_volume_register_dartel/t1_volume_register_dartel_pipeline.py @@ -45,11 +45,10 @@ def _build_input_node(self): from clinica.utils.exceptions import ClinicaCAPSError, ClinicaException from clinica.utils.input_files import ( - t1_volume_dartel_input_tissue, - t1_volume_i_th_iteration_group_template, + get_t1_volume_dartel_input_tissue, + get_t1_volume_i_th_iteration_group_template, ) from clinica.utils.inputs import ( - clinica_file_reader, clinica_group_reader, clinica_list_of_files_reader, ) @@ -61,20 +60,19 @@ def _build_input_node(self): fields=self.get_input_fields(), mandatory_inputs=True ), ) - all_errors = [] - # Dartel Input Tissues # ==================== + patterns = [ + get_t1_volume_dartel_input_tissue(tissue_number) + for tissue_number in self.parameters["tissues"] + ] try: d_input = clinica_list_of_files_reader( self.subjects, self.sessions, self.caps_directory, - [ - t1_volume_dartel_input_tissue(tissue_number) - for tissue_number in self.parameters["tissues"] - ], + patterns, ) read_input_node.inputs.dartel_input_images = d_input except ClinicaException as e: @@ -83,17 +81,16 @@ def _build_input_node(self): # Dartel Templates # ================ dartel_iter_templates = [] - for i in range(1, 7): + for pattern in [ + get_t1_volume_i_th_iteration_group_template(self.group_id, i) + for i in range(1, 7) + ]: try: - current_iter = clinica_group_reader( - self.caps_directory, - t1_volume_i_th_iteration_group_template(str(self.group_label), i), + dartel_iter_templates.append( + clinica_group_reader(self.caps_directory, pattern) ) - - dartel_iter_templates.append(current_iter) except ClinicaException as e: all_errors.append(e) - if any(all_errors): error_message = "Clinica faced error(s) while trying to read files in your CAPS/BIDS directories.\n" for msg in all_errors: diff --git a/clinica/pipelines/t1_volume_tissue_segmentation/t1_volume_tissue_segmentation_pipeline.py b/clinica/pipelines/t1_volume_tissue_segmentation/t1_volume_tissue_segmentation_pipeline.py index 46f9f1c1c..62390780d 100644 --- a/clinica/pipelines/t1_volume_tissue_segmentation/t1_volume_tissue_segmentation_pipeline.py +++ b/clinica/pipelines/t1_volume_tissue_segmentation/t1_volume_tissue_segmentation_pipeline.py @@ -75,8 +75,7 @@ def _build_input_node(self): from clinica.iotools.utils.data_handling import ( check_volume_location_in_world_coordinate_system, ) - from clinica.utils.exceptions import ClinicaBIDSError, ClinicaException - from clinica.utils.input_files import T1W_NII + from clinica.utils.input_files import get_t1w_mri from clinica.utils.inputs import clinica_file_filter from clinica.utils.stream import cprint from clinica.utils.ux import print_images_to_process @@ -85,7 +84,7 @@ def _build_input_node(self): # ======================== # T1w file: t1w_files, subjects, sessions = clinica_file_filter( - self.subjects, self.sessions, self.bids_directory, T1W_NII + self.subjects, self.sessions, self.bids_directory, get_t1w_mri() ) self.subjects = subjects self.sessions = sessions diff --git a/clinica/pydra/query.py b/clinica/pydra/query.py index 8e08ee66b..a8c5a397f 100644 --- a/clinica/pydra/query.py +++ b/clinica/pydra/query.py @@ -1,6 +1,8 @@ import abc from typing import Callable, Dict, Optional +from clinica.utils.input_files import QueryPattern + class Query: """Base Query class. @@ -85,10 +87,15 @@ def format_query(self, input_query: Optional[Dict] = None) -> Dict: if not input_query: return formatted_query for k, q in self.parse_query(input_query).items(): - if isinstance(q, dict): + if isinstance(q, QueryPattern): + formatted_query[k] = q + elif isinstance(q, dict): formatted_query[k] = {**self.default_query(k), **q} elif isinstance(q, list): - formatted_query[k] = [{**self.default_query(k), **qq} for qq in q] + if isinstance(q[0], QueryPattern): + formatted_query[k] = q + else: + formatted_query[k] = [{**self.default_query(k), **qq} for qq in q] else: raise TypeError( f"Unexpected type {type(q)} for query {q}." @@ -198,8 +205,11 @@ def parse_query(self, query: Dict) -> Dict: parsed_query = {} for label, params in query.items(): query_maker = self._query_maker(label) - formatted_query = query_maker(**params) - if len(formatted_query) > 0: + if (formatted_query := query_maker(**params)) is not None: + # if isinstance(formatted_query, QueryPattern): + # parsed_query[label] = formatted_query.to_dict() + # else: + # parsed_query[label] = [q.to_dict() for q in formatted_query] parsed_query[label] = formatted_query return parsed_query @@ -219,7 +229,7 @@ def _query_maker(self, label: str) -> Callable: If the label does not match any entry, a default maker which return an empty dict for any passed parameters is returned. """ - return self._query_makers.get(label, lambda **kwargs: {}) + return self._query_makers.get(label, lambda **kwargs: None) class CAPSFileQuery(CAPSQuery): @@ -252,31 +262,34 @@ class CAPSFileQuery(CAPSQuery): } """ + from functools import partial + from clinica.utils.input_files import ( - custom_pipeline, - pet_volume_normalized_suvr_pet, - t1_volume_dartel_input_tissue, - t1_volume_deformation_to_template, - t1_volume_native_tpm, - t1_volume_native_tpm_in_mni, - t1_volume_template_tpm_in_mni, + get_pet_volume_normalized_suvr, + get_t1_volume_dartel_input_tissue, + get_t1_volume_deformation_to_template, + get_t1_volume_template_tpm_in_mni, + get_t1_volume_tpm, + get_t1w_to_mni_transform, ) - def t1w_to_mni_transform(): - from clinica.utils.input_files import T1W_TO_MNI_TRANSFORM - - return T1W_TO_MNI_TRANSFORM - _query_makers = { - "tissues": t1_volume_native_tpm, - "mask_tissues": t1_volume_native_tpm_in_mni, - "flow_fields": t1_volume_deformation_to_template, - "pvc_mask_tissues": t1_volume_native_tpm, - "dartel_input_tissue": t1_volume_dartel_input_tissue, - "t1w_to_mni": t1w_to_mni_transform, - "pet_volume": pet_volume_normalized_suvr_pet, - "t1_volume": t1_volume_template_tpm_in_mni, - "custom_pipeline": custom_pipeline, + "tissues": partial( + get_t1_volume_tpm, + mni_space=False, + modulation=False, + ), + "mask_tissues": partial(get_t1_volume_tpm, mni_space=True), + "flow_fields": get_t1_volume_deformation_to_template, + "pvc_mask_tissues": partial( + get_t1_volume_tpm, + mni_space=False, + modulation=False, + ), + "dartel_input_tissue": get_t1_volume_dartel_input_tissue, + "t1w_to_mni": get_t1w_to_mni_transform, + "pet_volume": get_pet_volume_normalized_suvr, + "t1_volume": get_t1_volume_template_tpm_in_mni, } @@ -304,15 +317,13 @@ class CAPSGroupQuery(CAPSQuery): """ from clinica.utils.input_files import ( - custom_group, - t1_volume_final_group_template, - t1_volume_i_th_iteration_group_template, + get_t1_volume_group_template, + get_t1_volume_i_th_iteration_group_template, ) _query_makers = { - "dartel_template": t1_volume_final_group_template, - "dartel_iteration_templates": t1_volume_i_th_iteration_group_template, - "t_map": custom_group, + "dartel_template": get_t1_volume_group_template, + "dartel_iteration_templates": get_t1_volume_i_th_iteration_group_template, } diff --git a/clinica/utils/input_files.py b/clinica/utils/input_files.py index f3d925116..ff18b6bae 100644 --- a/clinica/utils/input_files.py +++ b/clinica/utils/input_files.py @@ -4,198 +4,350 @@ """ import functools +from collections import UserString from collections.abc import Iterable +from dataclasses import dataclass +from enum import Enum +from pathlib import Path from typing import Optional, Union from clinica.utils.dwi import DTIBasedMeasure +from clinica.utils.group import GroupID +from clinica.utils.image import HemiSphere from clinica.utils.pet import ReconstructionMethod, SUVRReferenceRegion, Tracer -# BIDS - -T1W_NII = {"pattern": "sub-*_ses-*_t1w.nii*", "description": "T1w MRI"} -Flair_T2W_NII = {"pattern": "sub-*_ses-*_flair.nii*", "description": "FLAIR T2w MRI"} - -# T1-FreeSurfer - -T1_FS_WM = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/wm.seg.mgz", - "description": "segmentation of white matter (mri/wm.seg.mgz).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_BRAIN = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/brain.mgz", - "description": " extracted brain from T1w MRI (mri/brain.mgz).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_ORIG_NU = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/orig_nu.mgz", - "description": "intensity normalized volume generated after correction for" - " non-uniformity in FreeSurfer (mri/orig_nu.mgz).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_LONG_ORIG_NU = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/mri/orig_nu.mgz", - "description": "intensity normalized volume generated after correction for non-uniformity in FreeSurfer (orig_nu.mgz) in longitudinal", - "needed_pipeline": "t1-freesurfer and t1-freesurfer longitudinal", -} - -T1_FS_WM_SURF_R = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/surf/rh.white", - "description": "right white matter/gray matter border surface (rh.white).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_LONG_SURF_R = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/surf/rh.white", - "description": "right white matter/gray matter border surface (rh.white) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer longitudinal", -} - -T1_FS_LONG_SURF_L = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/surf/lh.white", - "description": "left white matter/gray matter border surface (lh.white) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer longitudinal", -} - -T1_FS_WM_SURF_L = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/surf/lh.white", - "description": "left white matter/gray matter border surface (lh.white).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_DESTRIEUX = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/aparc.a2009s+aseg.mgz", - "description": "Destrieux-based segmentation (mri/aparc.a2009s+aseg.mgz).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_DESTRIEUX_PARC_L = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/label/lh.aparc.a2009s.annot", - "description": "left hemisphere surface-based Destrieux parcellation (label/lh.aparc.a2009s.annot).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_LONG_DESTRIEUX_PARC_L = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/label/lh.aparc.a2009s.annot", - "description": "left hemisphere surface-based Destrieux parcellation (label/lh.aparc.a2009s.annot) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer longitudinal", -} - -T1_FS_LONG_DESTRIEUX_PARC_R = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/label/rh.aparc.a2009s.annot", - "description": "right hemisphere surface-based Destrieux parcellation (label/rh.aparc.a2009s.annot) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer longitudinal", -} - -T1_FS_DESTRIEUX_PARC_R = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/label/rh.aparc.a2009s.annot", - "description": "right hemisphere surface-based Destrieux parcellation (label/rh.aparc.a2009s.annot).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_DESIKAN = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/aparc+aseg.mgz", - "description": "Desikan-based segmentation (mri/aparc.a2009s+aseg.mgz).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_DESIKAN_PARC_L = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/label/lh.aparc.annot", - "description": "left hemisphere surface-based Desikan parcellation (label/lh.aparc.annot).", - "needed_pipeline": "t1-freesurfer", -} - -T1_FS_DESIKAN_PARC_R = { - "pattern": "t1/freesurfer_cross_sectional/sub-*_ses-*/label/rh.aparc.annot", - "description": "right hemisphere surface-based Desikan parcellation (label/rh.aparc.annot).", - "needed_pipeline": "t1-freesurfer", -} - -# T1-FreeSurfer-Template -T1_FS_T_DESTRIEUX = { - "pattern": "freesurfer_unbiased_template/sub-*_long-*/mri/aparc.a2009s+aseg.mgz", - "description": "Destrieux-based segmentation (mri/aparc.a2009s+aseg.mgz) from unbiased template.", - "needed_pipeline": "t1-freesurfer-longitudinal or t1-freesurfer-template", -} - -# T1-FreeSurfer-Longitudinal-Correction -T1_FS_LONG_DESIKAN_PARC_L = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/label/lh.aparc.annot", - "description": "left hemisphere surface-based Desikan parcellation (label/lh.aparc.annot) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer-longitudinal", -} - -T1_FS_LONG_DESIKAN_PARC_R = { - "pattern": "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/label/rh.aparc.annot", - "description": "right hemisphere surface-based Desikan parcellation (label/rh.aparc.annot) generated with t1-freesurfer-longitudinal.", - "needed_pipeline": "t1-freesurfer and t1-freesurfer-longitudinal", -} - -T1W_LINEAR = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_T1w.nii.gz", - "description": "T1w image registered in MNI152NLin2009cSym space using t1-linear pipeline", - "needed_pipeline": "t1-linear", -} - -T2W_LINEAR = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_T2w.nii.gz", - "description": "T2w image registered in MNI152NLin2009cSym space using t2-linear pipeline", - "needed_pipeline": "t2-linear", -} - -FLAIR_T2W_LINEAR = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_flair.nii.gz", - "description": "T2w image registered in MNI152NLin2009cSym space using t2-linear pipeline", - "needed_pipeline": "flair-linear", -} - -T1W_LINEAR_CROPPED = { - "pattern": "*space-MNI152NLin2009cSym_desc-Crop_res-1x1x1_T1w.nii.gz", - "description": "T1W Image registered using t1-linear and cropped " - "(matrix size 169×208×179, 1 mm isotropic voxels)", - "needed_pipeline": "t1-linear", -} - -T2W_LINEAR_CROPPED = { - "pattern": "*space-MNI152NLin2009cSym_desc-Crop_res-1x1x1_T2w.nii.gz", - "description": "T2W Image registered using t2-linear and cropped " - "(matrix size 169×208×179, 1 mm isotropic voxels)", - "needed_pipeline": "t2-linear", -} - -FLAIR_T2W_LINEAR_CROPPED = { - "pattern": "*space-MNI152NLin2009cSym_desc-Crop_res-1x1x1_flair.nii.gz", - "description": "T2W Image registered using t2-linear and cropped " - "(matrix size 169×208×179, 1 mm isotropic voxels)", - "needed_pipeline": "flair-linear", -} - -T1W_EXTENSIVE = { - "pattern": "*space-Ixi549Space_desc-SkullStripped_T1w.nii.gz", - "description": "T1w image skull-stripped registered in Ixi549Space space using clinicaDL preprocessing pipeline", - "needed_pipeline": "t1-extensive", -} - -T1W_TO_MNI_TRANSFORM = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_affine.mat", - "description": "Transformation matrix from T1W image to MNI space using t1-linear pipeline", - "needed_pipeline": "t1-linear", -} - -T2W_TO_MNI_TRANSFROM = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_affine.mat", - "description": "Transformation matrix from T2W image to MNI space using t2-linear pipeline", - "needed_pipeline": "t2-linear", -} - -FLAIR_T2W_TO_MNI_TRANSFROM = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_affine.mat", - "description": "Transformation matrix from T2W image to MNI space using t2-linear pipeline", - "needed_pipeline": "flair-linear", -} +from .spm import SPMTissue, get_spm_tissue_from_index + +__all__ = [ + "DWIFileType", + "Parcellation", + "QueryPattern", + "get_t1w_mri", + "get_t2w_mri", + "get_t1_freesurfer_segmentation_white_matter", + "get_t1_freesurfer_extracted_brain", + "get_t1_freesurfer_intensity_normalized_volume_after_nu", + "get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu", + "get_t1w_to_mni_transform", + "get_dwi_file", + "get_dwi_preprocessed_file", + "get_dwi_preprocessed_brainmask", + "get_dwi_fmap_phasediff_file", + "get_dwi_fmap_magnitude1_file", + "get_t1w_linear", + "get_t1_freesurfer_white_matter_surface", + "get_t1_freesurfer_longitudinal_white_matter_surface", + "get_t1_freesurfer_segmentation", + "get_t1_freesurfer_statistics", + "get_t1_freesurfer_parcellation", + "get_t1_freesurfer_template", + "get_t1_freesurfer_longitudinal_parcellation", + "get_t1_volume_tpm", + "get_t1_volume_dartel_input_tissue", + "get_t1_volume_template_tpm_in_mni", + "get_t1_volume_deformation_to_template", + "get_t1_volume_i_th_iteration_group_template", + "get_t1_volume_group_template", + "get_dwi_dti", + "get_pet_nifti", + "get_pet_volume_normalized_suvr", + "get_pet_linear_nifti", +] + + +@dataclass +class QueryPattern: + """Represents a pattern to be used by the clinica_file_reader to query some specific files. + + Attributes + ---------- + pattern : str + The actual pattern string to be used to match file names. + + description : str + A plain text description of the files the pattern matches. + + needed_pipeline : str + The pipelines that should have been run in order to have the requested files. + TODO: Improve this to be an iterable of PipelineName objects. + The difficult part is that some pattern have combinations with AND and OR. + """ + + pattern: str + description: str + needed_pipeline: str + + def __post_init__(self): + if len(self.pattern) == 0: + raise ValueError("Pattern cannot be empty.") + if self.pattern[0] == "/": + raise ValueError( + "pattern argument cannot start with char: / (does not work in os.path.join function). " + "If you want to indicate the exact name of the file, use the format " + "directory_name/filename.extension or filename.extension in the pattern argument." + ) + + def to_dict(self) -> dict: + return { + "pattern": self.pattern, + "description": self.description, + "needed_pipeline": self.needed_pipeline, + } + + +class Parcellation(str, Enum): + """The possible atlas names used for deriving parcellations and segmentations.""" + + DESIKAN = "Desikan" + DESTRIEUX = "Destrieux" + + +class DWIFileType(str, Enum): + NII = "nii" + JSON = "json" + BVEC = "bvec" + BVAL = "bval" + + +def get_t1w_mri() -> QueryPattern: + """Get T1W MRI in BIDS.""" + return QueryPattern("sub-*_ses-*_t1w.nii*", "T1w MRI", "") + + +def get_t2w_mri() -> QueryPattern: + """Get T2W FLAIR MRI in BIDS.""" + return QueryPattern("sub-*_ses-*_flair.nii*", "FLAIR T2w MRI", "") + + +def get_t1_freesurfer_segmentation_white_matter() -> QueryPattern: + """GET Freesurfer segmentation of white matter.""" + return QueryPattern( + "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/wm.seg.mgz", + "segmentation of white matter (mri/wm.seg.mgz).", + "t1-freesurfer", + ) + + +def get_t1_freesurfer_extracted_brain() -> QueryPattern: + return QueryPattern( + "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/brain.mgz", + "extracted brain from T1w MRI (mri/brain.mgz).", + "t1-freesurfer", + ) + + +def get_t1_freesurfer_intensity_normalized_volume_after_nu() -> QueryPattern: + return QueryPattern( + "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/orig_nu.mgz", + ( + "intensity normalized volume generated after correction for " + "non-uniformity in FreeSurfer (mri/orig_nu.mgz)." + ), + "t1-freesurfer", + ) + + +def get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu() -> ( + QueryPattern +): + return QueryPattern( + "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/mri/orig_nu.mgz", + ( + "intensity normalized volume generated after correction for " + "non-uniformity in FreeSurfer (orig_nu.mgz) in longitudinal" + ), + "t1-freesurfer and t1-freesurfer longitudinal", + ) + + +def get_t1w_to_mni_transform() -> QueryPattern: + return QueryPattern( + "*space-MNI152NLin2009cSym_res-1x1x1_affine.mat", + "Transformation matrix from T1W image to MNI space using t1-linear pipeline", + "t1-linear", + ) + + +def get_dwi_file(filetype: Union[str, DWIFileType]) -> QueryPattern: + """Return the query to get DWI files (nii, json, bvec, bval).""" + filetype = DWIFileType(filetype) + return QueryPattern( + f"dwi/sub-*_ses-*_dwi.{filetype.value}*", f"DWI {filetype.value} files.", "" + ) + + +def get_dwi_preprocessed_file(filetype: Union[str, DWIFileType]) -> QueryPattern: + filetype = DWIFileType(filetype) + return QueryPattern( + f"dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.{filetype.value}*", + f"preprocessed {filetype.value} files", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ) + + +def get_dwi_preprocessed_brainmask() -> QueryPattern: + return QueryPattern( + "dwi/preprocessing/sub-*_ses-*_space-*_brainmask.nii*", + "b0 brainmask", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ) + + +def get_dwi_fmap_phasediff_file(filetype: Union[str, DWIFileType]) -> QueryPattern: + filetype = DWIFileType(filetype) + return QueryPattern( + f"fmap/sub-*_ses-*_phasediff.{filetype.value}*", + f"phasediff {filetype.value} file", + "", + ) + + +def get_dwi_fmap_magnitude1_file(filetype: Union[str, DWIFileType]) -> QueryPattern: + filetype = DWIFileType(filetype) + return QueryPattern( + f"fmap/sub-*_ses-*_magnitude1.{filetype.value}*", + f"magnitude1 {filetype.value} file", + "", + ) + + +def get_t1w_linear(cropped: bool) -> QueryPattern: + return QueryPattern( + f"*space-MNI152NLin2009cSym{'_desc-Crop' if cropped else ''}_res-1x1x1_T1w.nii.gz", + ( + "T1w image registered in MNI152NLin2009cSym space " + f"{'and cropped (matrix size 169×208×179) ' if cropped else ''} " + "using t1-linear pipeline" + ), + "t1-linear", + ) + + +def get_t1_freesurfer_white_matter_surface( + hemisphere: Union[str, HemiSphere], +) -> QueryPattern: + """Return the pattern to query white matter border surface files from the Freesurfer output. + + Parameters + ---------- + hemisphere : str or HemiSphere + The hemisphere for which to get the surface. + + Returns + ------- + Query : + The query to use with a file reader. + """ + hemisphere = HemiSphere(hemisphere) + return QueryPattern( + f"t1/freesurfer_cross_sectional/sub-*_ses-*/surf/{hemisphere.value}.white", + ( + f"{'right' if hemisphere == HemiSphere.RIGHT else 'left'} white matter/gray " + f"matter border surface ({hemisphere.value}.white)." + ), + "t1-freesurfer", + ) + + +def get_t1_freesurfer_longitudinal_white_matter_surface( + hemisphere: Union[str, HemiSphere], +) -> QueryPattern: + """Return the query to get white matter border surface files from the Freesurfer longitudinal output. + + Parameters + ---------- + hemisphere : str or HemiSphere + The hemisphere for which to get the surface. + + Returns + ------- + QueryPattern : + The pattern to use with a file reader. + """ + hemisphere = HemiSphere(hemisphere) + return QueryPattern( + f"t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/surf/{hemisphere.value}.white", + ( + f"{'right' if hemisphere == HemiSphere.RIGHT else 'left'} white matter/gray matter border " + f"surface ({hemisphere.value}.white) generated with t1-freesurfer-longitudinal." + ), + "t1-freesurfer and t1-freesurfer longitudinal", + ) + + +def _get_annot_file_name(hemisphere: HemiSphere, parcellation: Parcellation) -> str: + if parcellation == Parcellation.DESIKAN: + return f"{hemisphere.value}.aparc.annot" + if parcellation == Parcellation.DESTRIEUX: + return f"{hemisphere.value}.aparc.a2009s.annot" + + +def get_t1_freesurfer_segmentation(parcellation: Parcellation) -> QueryPattern: + parcellation = Parcellation(parcellation) + filename = ( + f"aparc{'.a2009s' if parcellation == Parcellation.DESTRIEUX else ''}+aseg.mgz" + ) + return QueryPattern( + f"t1/freesurfer_cross_sectional/sub-*_ses-*/mri/{filename}", + f"{parcellation.value}-based segmentation (mri/{filename}).", + "t1-freesurfer", + ) + + +def get_t1_freesurfer_statistics( + atlas: str, hemisphere: Union[str, HemiSphere] +) -> QueryPattern: + hemisphere = HemiSphere(hemisphere) + return QueryPattern( + f"t1/freesurfer_cross_sectional/sub-*_ses-*/stats/{hemisphere.value}.{atlas}.stats", + f"{atlas}-based segmentation", + "t1-freesurfer", + ) + + +def get_t1_freesurfer_parcellation( + hemisphere: Union[str, HemiSphere], + parcellation: Union[str, Parcellation], +) -> QueryPattern: + hemisphere = HemiSphere(hemisphere) + parcellation = Parcellation(parcellation) + return QueryPattern( + f"t1/freesurfer_cross_sectional/sub-*_ses-*/label/{_get_annot_file_name(hemisphere, parcellation)}", + ( + f"{'left' if hemisphere == HemiSphere.LEFT else 'right'} hemisphere surface-based " + f"{parcellation.value} parcellation (label/{_get_annot_file_name(hemisphere, parcellation)})." + ), + "t1-freesurfer", + ) + + +def get_t1_freesurfer_template(parcellation: Union[str, Parcellation]) -> QueryPattern: + parcellation = Parcellation(parcellation) + filename = ( + f"aparc{'.a2009s' if parcellation == Parcellation.DESTRIEUX else ''}+aseg.mgz" + ) + return QueryPattern( + f"freesurfer_unbiased_template/sub-*_long-*/mri/{filename}", + f"{parcellation.value}-based segmentation (mri/{filename}) from unbiased template.", + "t1-freesurfer-longitudinal or t1-freesurfer-template", + ) + + +def get_t1_freesurfer_longitudinal_parcellation( + hemisphere: Union[str, HemiSphere], + parcellation: Union[str, Parcellation], +) -> QueryPattern: + hemisphere = HemiSphere(hemisphere) + parcellation = Parcellation(parcellation) + return QueryPattern( + f"t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/label/{_get_annot_file_name(hemisphere, parcellation)}", + ( + f"{'left' if hemisphere == HemiSphere.LEFT else 'right'} hemisphere surface-based " + f"{parcellation.value} parcellation (label/{_get_annot_file_name(hemisphere, parcellation)}) " + "generated with t1-freesurfer-longitudinal." + ), + "t1-freesurfer and t1-freesurfer-longitudinal", + ) def aggregator(func): @@ -296,12 +448,12 @@ def wrapper_aggregator(*args, **kwargs): arg_sizes = [ len(arg) for arg in args - if (isinstance(arg, Iterable) and not isinstance(arg, str)) + if (isinstance(arg, Iterable) and not isinstance(arg, (str, UserString))) ] arg_sizes += [ len(arg) for k, arg in kwargs.items() - if (isinstance(arg, Iterable) and not isinstance(arg, str)) + if (isinstance(arg, Iterable) and not isinstance(arg, (str, UserString))) ] # If iterable args/kwargs have different lengths, raise @@ -316,7 +468,9 @@ def wrapper_aggregator(*args, **kwargs): arg_size = arg_sizes[0] new_args = [] for arg in args: - if not (isinstance(arg, Iterable) and not isinstance(arg, str)): + if not ( + isinstance(arg, Iterable) and not isinstance(arg, (str, UserString)) + ): new_args.append((arg,) * arg_size) else: new_args.append(arg) @@ -325,7 +479,9 @@ def wrapper_aggregator(*args, **kwargs): new_kwargs = [{} for _ in range(arg_size)] for k, arg in kwargs.items(): for i in range(len(new_kwargs)): - if not (isinstance(arg, Iterable) and not isinstance(arg, str)): + if not ( + isinstance(arg, Iterable) and not isinstance(arg, (str, UserString)) + ): new_kwargs[i][k] = arg else: new_kwargs[i][k] = arg[i] @@ -341,78 +497,65 @@ def wrapper_aggregator(*args, **kwargs): @aggregator -def t1_volume_native_tpm(tissue_number: int) -> dict: - from pathlib import Path - - from .spm import get_spm_tissue_from_index - +def get_t1_volume_tpm( + tissue_number: int, modulation: bool, mni_space: bool +) -> QueryPattern: tissue = get_spm_tissue_from_index(tissue_number) - return { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "native_space" - / f"*_*_T1w_segm-{tissue.value}_probability.nii*", - "description": f"Tissue probability map {tissue.value} in native space", - "needed_pipeline": "t1-volume-tissue-segmentation", - } - - -@aggregator -def t1_volume_dartel_input_tissue(tissue_number: int) -> dict: - from pathlib import Path - - from .spm import get_spm_tissue_from_index - - tissue = get_spm_tissue_from_index(tissue_number) - return { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "dartel_input" - / f"*_*_T1w_segm-{tissue.value}_dartelinput.nii*", - "description": f"Dartel input for tissue probability map {tissue.value} from T1w MRI", - "needed_pipeline": "t1-volume-tissue-segmentation", - } + description = f"Tissue probability map {tissue.value} " + pattern_modulation = "" + space = "" + if mni_space: + pattern_modulation = f"_modulated-{'on' if modulation else 'off'}" + space = "_space-Ixi549Space" + description += f"based on native MRI in MNI space (Ixi549) {'with' if modulation else 'without'} modulation." + else: + description += "in native space" + + return QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / f"{'normalized' if mni_space else 'native'}_space" + / f"*_*_T1w_segm-{tissue.value}{space}{pattern_modulation}_probability.nii*" + ), + description, + "t1-volume-tissue-segmentation", + ) @aggregator -def t1_volume_native_tpm_in_mni(tissue_number: int, modulation: bool) -> dict: - from pathlib import Path - - from .spm import get_spm_tissue_from_index - +def get_t1_volume_dartel_input_tissue(tissue_number: int) -> QueryPattern: tissue = get_spm_tissue_from_index(tissue_number) - pattern_modulation = "on" if modulation else "off" - description_modulation = "with" if modulation else "without" - - return { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "normalized_space" - / f"*_*_T1w_segm-{tissue.value}_space-Ixi549Space_modulated-{pattern_modulation}_probability.nii*", - "description": ( - f"Tissue probability map {tissue.value} based on " - f"native MRI in MNI space (Ixi549) {description_modulation} modulation." + return QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / "dartel_input" + / f"*_*_T1w_segm-{tissue.value}_dartelinput.nii*" ), - "needed_pipeline": "t1-volume-tissue-segmentation", - } - - -def t1_volume_template_tpm_in_mni( - group_label: str, tissue_number: int, modulation: bool, fwhm: Optional[int] = None -) -> dict: - """Build the dictionary required by clinica_file_reader to get the tissue + f"Dartel input for tissue probability map {tissue.value} from T1w MRI", + "t1-volume-tissue-segmentation", + ) + + +def get_t1_volume_template_tpm_in_mni( + group_id: GroupID, + tissue: Union[int, SPMTissue], + modulation: bool, + fwhm: Optional[int] = None, +) -> QueryPattern: + """Build the pattern required by clinica_file_reader to get the tissue probability maps based on group template in MNI space. Parameters ---------- - group_label : str - Label used for the group of interest. + group_id : GroupID + The ID for the group of interest. - tissue_number : int - An integer defining the tissue of interest. + tissue : int or SPMTissue + Either the tissue of interest, or an integer defining the tissue of interest. modulation : {"on", "off"} Whether modulation is on or off. @@ -422,136 +565,74 @@ def t1_volume_template_tpm_in_mni( Returns ------- - dict : - Information dict to be passed to clinica_file_reader. + QueryPattern : + Pattern to be passed to clinica_file_reader. """ - from pathlib import Path - from .spm import get_spm_tissue_from_index - tissue = get_spm_tissue_from_index(tissue_number) + tissue = get_spm_tissue_from_index(tissue) pattern_modulation = "on" if modulation else "off" description_modulation = "with" if modulation else "without" - fwhm_key_value = f"_fwhm-{fwhm}mm" if fwhm else "" - fwhm_description = f"with {fwhm}mm smoothing" if fwhm else "with no smoothing" - - return { - "pattern": Path("t1") - / "spm" - / "dartel" - / f"group-{group_label}" - / f"*_T1w_segm-{tissue.value}_space-Ixi549Space_modulated-{pattern_modulation}{fwhm_key_value}_probability.nii*", - "description": ( - f"Tissue probability map {tissue.value} based on {group_label} template in MNI space " + fwhm_key_value = "" + fwhm_description = "with no smoothing" + if fwhm is not None and fwhm != 0: + fwhm_key_value = f"_fwhm-{fwhm}mm" + fwhm_description = f"with {fwhm}mm smoothing" + + return QueryPattern( + str( + Path("t1") + / "spm" + / "dartel" + / str(group_id) + / f"*_T1w_segm-{tissue.value}_space-Ixi549Space_modulated-{pattern_modulation}{fwhm_key_value}_probability.nii*" + ), + ( + f"Tissue probability map {tissue.value} based on {group_id.label} template in MNI space " f"(Ixi549) {description_modulation} modulation and {fwhm_description}." ), - "needed_pipeline": "t1-volume", - } - - -def t1_volume_deformation_to_template(group_label): - from pathlib import Path - - information = { - "pattern": Path("t1") - / "spm" - / "dartel" - / f"group-{group_label}" - / f"sub-*_ses-*_T1w_target-{group_label}_transformation-forward_deformation.nii*", - "description": f"Deformation from native space to group template {group_label} space.", - "needed_pipeline": "t1-volume-create-dartel", - } - return information + "t1-volume", + ) + + +def get_t1_volume_deformation_to_template(group_id: GroupID) -> QueryPattern: + return QueryPattern( + str( + Path("t1") + / "spm" + / "dartel" + / str(group_id) + / f"sub-*_ses-*_T1w_target-{group_id.label}_transformation-forward_deformation.nii*" + ), + f"Deformation from native space to group template {group_id.label} space.", + "t1-volume-create-dartel", + ) @aggregator -def t1_volume_i_th_iteration_group_template(group_label, i): - from pathlib import Path - - information = { - "pattern": Path(f"group-{group_label}") - / "t1" - / f"group-{group_label}_iteration-{i}_template.nii*", - "description": f"Iteration #{i} of Dartel template {group_label}", - "needed_pipeline": "t1-volume or t1-volume-create-dartel", - } - return information - - -def t1_volume_final_group_template(group_label): - from pathlib import Path - - information = { - "pattern": Path(f"group-{group_label}") - / "t1" - / f"group-{group_label}_template.nii*", - "description": f"T1w template file of group {group_label}", - "needed_pipeline": "t1-volume or t1-volume-create-dartel", - } - return information - - -def custom_group(pattern, description): - information = {"pattern": pattern, "description": description} - return information - - -""" DWI """ - -# BIDS - -DWI_NII = {"pattern": "dwi/sub-*_ses-*_dwi.nii*", "description": "DWI NIfTI"} - -DWI_JSON = {"pattern": "dwi/sub-*_ses-*_dwi.json", "description": "DWI JSON file"} - -DWI_BVAL = {"pattern": "dwi/sub-*_ses-*_dwi.bval", "description": "bval files"} - -DWI_BVEC = {"pattern": "dwi/*_dwi.bvec", "description": "bvec files"} - -FMAP_PHASEDIFF_JSON = { - "pattern": "fmap/sub-*_ses-*_phasediff.json", - "description": "phasediff JSON file", -} - -FMAP_PHASEDIFF_NII = { - "pattern": "fmap/sub-*_ses-*_phasediff.nii*", - "description": "phasediff NIfTI volume", -} - -FMAP_MAGNITUDE1_NII = { - "pattern": "fmap/sub-*_ses-*_magnitude1.nii*", - "description": "magnitude1 file", -} - -# CAPS - -DWI_PREPROC_NII = { - "pattern": "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.nii*", - "description": "preprocessed DWI", - "needed_pipeline": "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", -} - -DWI_PREPROC_BRAINMASK = { - "pattern": "dwi/preprocessing/sub-*_ses-*_space-*_brainmask.nii*", - "description": "b0 brainmask", - "needed_pipeline": "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", -} - -DWI_PREPROC_BVEC = { - "pattern": "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.bvec", - "description": "preprocessed bvec", - "needed_pipeline": "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", -} - -DWI_PREPROC_BVAL = { - "pattern": "dwi/preprocessing/*_space-*_desc-preproc_dwi.bval", - "description": "preprocessed bval", - "needed_pipeline": "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", -} - - -def dwi_dti(measure: Union[str, DTIBasedMeasure], space: Optional[str] = None) -> dict: - """Return the query dict required to capture DWI DTI images. +def get_t1_volume_i_th_iteration_group_template( + group_id: GroupID, i: int +) -> QueryPattern: + return QueryPattern( + str(Path(str(group_id)) / "t1" / f"{group_id}_iteration-{i}_template.nii*"), + f"Iteration #{i} of Dartel template {group_id.label}", + "t1-volume or t1-volume-create-dartel", + ) + + +def get_t1_volume_group_template(group_id: GroupID) -> QueryPattern: + return QueryPattern( + str(Path(str(group_id)) / "t1" / f"{group_id}_template.nii*"), + f"T1w template file of group {group_id.label}", + "t1-volume or t1-volume-create-dartel", + ) + + +def get_dwi_dti( + measure: Union[str, DTIBasedMeasure], + space: Optional[str] = None, +) -> QueryPattern: + """Return the query pattern required to capture DWI DTI images. Parameters ---------- @@ -564,29 +645,24 @@ def dwi_dti(measure: Union[str, DTIBasedMeasure], space: Optional[str] = None) - Returns ------- - dict : - The query dictionary to get DWI DTI images. + QueryPattern : + The query pattern to get DWI DTI images. """ measure = DTIBasedMeasure(measure) space = space or "*" - return { - "pattern": f"dwi/dti_based_processing/*/*_space-{space}_{measure.value}.nii.gz", - "description": f"DTI-based {measure.value} in space {space}.", - "needed_pipeline": "dwi_dti", - } - - -""" PET """ + return QueryPattern( + f"dwi/dti_based_processing/*/*_space-{space}_{measure.value}.nii.gz", + f"DTI-based {measure.value} in space {space}.", + "dwi_dti", + ) -# BIDS - -def bids_pet_nii( +def get_pet_nifti( tracer: Optional[Union[str, Tracer]] = None, reconstruction: Optional[Union[str, ReconstructionMethod]] = None, -) -> dict: - """Return the query dict required to capture PET scans. +) -> QueryPattern: + """Return the query pattern required to capture PET scans. Parameters ---------- @@ -604,11 +680,9 @@ def bids_pet_nii( Returns ------- - dict : - The query dictionary to get PET scans. + QueryPattern : + The query pattern to get PET scans. """ - from pathlib import Path - description = f"PET data" trc = "" if tracer is not None: @@ -621,42 +695,50 @@ def bids_pet_nii( rec = f"_rec-{reconstruction.value}" description += f" and reconstruction method {reconstruction.value}" - return { - "pattern": Path("pet") / f"*{trc}{rec}_pet.nii*", - "description": description, - } - + return QueryPattern( + str(Path("pet") / f"*{trc}{rec}_pet.nii*"), + description, + "", + ) -# PET-Volume - -def pet_volume_normalized_suvr_pet( - acq_label: Union[str, Tracer], - group_label: str, - suvr_reference_region: Union[str, SUVRReferenceRegion], - use_brainmasked_image: bool, - use_pvc_data: bool, +def get_pet_volume_normalized_suvr( + tracer: Optional[Union[str, Tracer]] = None, + group_id: Optional[GroupID] = None, + suvr_reference_region: Optional[Union[str, SUVRReferenceRegion]] = None, + use_brainmasked_image: bool = False, + use_pvc_data: bool = False, fwhm: int = 0, -) -> dict: - from pathlib import Path - - acq_label = Tracer(acq_label) - region = SUVRReferenceRegion(suvr_reference_region) - +) -> QueryPattern: + group_description = "any available template found" + if group_id: + group_description = f"{group_id.label} DARTEL template" + else: + group_id = "*" + tracer_key_value = "" + tracer_description = "" + if tracer: + tracer = Tracer(tracer) + tracer_key_value = f"_trc-{tracer.value}" + tracer_description = f" of {tracer.value}-PET" + suvr_key_value = "" + region_description = "" + if suvr_reference_region: + region = SUVRReferenceRegion(suvr_reference_region) + suvr_key_value = f"_suvr-{region.value}" + region_description = f" (using {region.value} region)" if use_brainmasked_image: mask_key_value = "_mask-brain" mask_description = "brain-masked" else: mask_key_value = "" mask_description = "full" - if use_pvc_data: pvc_key_value = "_pvc-rbv" pvc_description = "using RBV method for PVC" else: pvc_key_value = "" pvc_description = "without PVC" - if fwhm: fwhm_key_value = f"_fwhm-{fwhm}mm" fwhm_description = f"with {fwhm}mm smoothing" @@ -664,47 +746,35 @@ def pet_volume_normalized_suvr_pet( fwhm_key_value = "" fwhm_description = "with no smoothing" - suvr_key_value = f"_suvr-{region.value}" - - information = { - "pattern": Path("pet") - / "preprocessing" - / f"group-{group_label}" - / f"*_trc-{acq_label.value}_pet_space-Ixi549Space{pvc_key_value}{suvr_key_value}{mask_key_value}{fwhm_key_value}_pet.nii*", - "description": ( - f"{mask_description} SUVR map (using {region.value} region) of {acq_label.value}-PET " - f"{pvc_description} and {fwhm_description} in Ixi549Space space based on {group_label} DARTEL template" + return QueryPattern( + str( + Path("pet") + / "preprocessing" + / str(group_id) + / f"*{tracer_key_value}_pet_space-Ixi549Space{pvc_key_value}{suvr_key_value}{mask_key_value}{fwhm_key_value}_pet.nii*" ), - "needed_pipeline": "pet-volume", - } - return information + ( + f"{mask_description} SUVR map{region_description}{tracer_description} " + f"{pvc_description} and {fwhm_description} in Ixi549Space space based on {group_description}" + ), + "pet-volume", + ) -def pet_linear_nii( - acq_label: Union[str, Tracer], +def get_pet_linear_nifti( + tracer: Union[str, Tracer], suvr_reference_region: Union[str, SUVRReferenceRegion], uncropped_image: bool, -) -> dict: - from pathlib import Path - - acq_label = Tracer(acq_label) +) -> QueryPattern: + tracer = Tracer(tracer) region = SUVRReferenceRegion(suvr_reference_region) + description = "" if uncropped_image else "_desc-Crop" - if uncropped_image: - description = "" - else: - description = "_desc-Crop" - - information = { - "pattern": Path("pet_linear") - / f"*_trc-{acq_label.value}_pet_space-MNI152NLin2009cSym{description}_res-1x1x1_suvr-{region.value}_pet.nii.gz", - "description": "", - "needed_pipeline": "pet-linear", - } - return information - - -# CUSTOM -def custom_pipeline(pattern, description): - information = {"pattern": pattern, "description": description} - return information + return QueryPattern( + str( + Path("pet_linear") + / f"*_trc-{tracer.value}_pet_space-MNI152NLin2009cSym{description}_res-1x1x1_suvr-{region.value}_pet.nii.gz" + ), + "PET nifti image obtained with pet-linear", + "pet-linear", + ) diff --git a/clinica/utils/inputs.py b/clinica/utils/inputs.py index 47a75c0dd..abb67a0c3 100644 --- a/clinica/utils/inputs.py +++ b/clinica/utils/inputs.py @@ -6,7 +6,26 @@ from enum import Enum from functools import partial from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import Callable, Iterable, List, Optional, Sequence, Tuple, Union + +from .input_files import QueryPattern + +__all__ = [ + "DatasetType", + "RemoteFileStructure", + "InvalidSubjectSession", + "determine_caps_or_bids", + "check_bids_folder", + "check_caps_folder", + "clinica_file_filter", + "format_clinica_file_reader_errors", + "clinica_file_reader", + "clinica_list_of_files_reader", + "clinica_group_reader", + "compute_sha256_hash", + "fetch_file", + "get_file_from_server", +] class DatasetType(str, Enum): @@ -20,7 +39,9 @@ class DatasetType(str, Enum): InvalidSubjectSession = namedtuple("InvalidSubjectSession", ["subject", "session"]) -def insensitive_glob(pattern_glob: str, recursive: Optional[bool] = False) -> List[str]: +def _insensitive_glob( + pattern_glob: str, recursive: Optional[bool] = False +) -> List[str]: """This function is the glob.glob() function that is insensitive to the case. Parameters @@ -271,14 +292,14 @@ def check_caps_folder(caps_directory: Union[str, os.PathLike]) -> None: raise ClinicaCAPSError(error_string) -def find_images_path( +def _find_images_path( input_directory: os.PathLike, subject: str, session: str, errors: List[InvalidSubjectSession], valid_paths: List[str], is_bids: bool, - pattern: str, + pattern: QueryPattern, ) -> None: """Appends the resulting path corresponding to subject, session and pattern in valid_paths. If an error is encountered, its (subject,session) couple is added to the list `errors`. @@ -311,7 +332,7 @@ def find_images_path( True if `input_dir` is a BIDS folder, False if `input_dir` is a CAPS folder. - pattern : str + pattern : QueryPattern Define the pattern of the final file. """ from clinica.utils.stream import cprint @@ -322,8 +343,8 @@ def find_images_path( else: origin_pattern = input_directory / "subjects" / subject / session - current_pattern = origin_pattern / "**" / pattern - current_glob_found = insensitive_glob(str(current_pattern), recursive=True) + current_pattern = origin_pattern / "**" / pattern.pattern + current_glob_found = _insensitive_glob(str(current_pattern), recursive=True) if len(current_glob_found) > 1: # If we have more than one file at this point, there are two possibilities: # - there is a problem somewhere which made us catch too many files @@ -429,7 +450,7 @@ def _get_entities(files: List[Path], common_suffix: str) -> dict: def _check_common_properties_of_files( - files: List[Path], + files: Iterable[Path], property_name: str, property_extractor: Callable, ) -> str: @@ -437,7 +458,7 @@ def _check_common_properties_of_files( Parameters ---------- - files : List of Paths + files : Iterable of Paths List of file paths for which to verify common property. property_name : str @@ -501,7 +522,7 @@ def _get_suffix(filename: Path) -> str: ) -def _select_run(files: List[str]) -> str: +def _select_run(files: Sequence[str]) -> str: import numpy as np runs = [int(_get_run_number(f)) for f in files] @@ -517,68 +538,19 @@ def _get_run_number(filename: str) -> str: raise ValueError(f"Filename {filename} should contain one and only one run entity.") -def _check_information(information: Dict) -> None: - if not isinstance(information, (dict, list)): - raise TypeError( - "A dict or list of dicts must be provided for the argument 'information'" - ) - - if isinstance(information, list): - for item in information: - if not all(elem in item for elem in ["pattern", "description"]): - raise ValueError( - "'information' must contain the keys 'pattern' and 'description'" - ) - - if not all( - elem in ["pattern", "description", "needed_pipeline"] - for elem in item.keys() - ): - raise ValueError( - "'information' can only contain the keys 'pattern', 'description' and 'needed_pipeline'" - ) - - if isinstance(item["pattern"], str) and item["pattern"][0] == "/": - raise ValueError( - "pattern argument cannot start with char: / (does not work in os.path.join function). " - "If you want to indicate the exact name of the file, use the format " - "directory_name/filename.extension or filename.extension in the pattern argument." - ) - else: - if not all(elem in information for elem in ["pattern", "description"]): - raise ValueError( - "'information' must contain the keys 'pattern' and 'description'" - ) - - if not all( - elem in ["pattern", "description", "needed_pipeline"] - for elem in information.keys() - ): - raise ValueError( - "'information' can only contain the keys 'pattern', 'description' and 'needed_pipeline'" - ) - - if isinstance(information["pattern"], str) and information["pattern"][0] == "/": - raise ValueError( - "pattern argument cannot start with char: / (does not work in os.path.join function). " - "If you want to indicate the exact name of the file, use the format " - "directory_name/filename.extension or filename.extension in the pattern argument." - ) - - def clinica_file_filter( subjects: List[str], sessions: List[str], input_directory: Path, - information: Dict, + pattern: QueryPattern, n_procs: int = 1, ) -> Tuple[List[str], List[str], List[str]]: from clinica.utils.stream import cprint files, errors = clinica_file_reader( - subjects, sessions, input_directory, information, n_procs + subjects, sessions, input_directory, pattern, n_procs ) - cprint(format_clinica_file_reader_errors(errors, information), "warning") + cprint(format_clinica_file_reader_errors(errors, pattern), "warning") filtered_subjects, filtered_sessions = _remove_sub_ses_from_list( subjects, sessions, errors ) @@ -586,16 +558,16 @@ def clinica_file_filter( def format_clinica_file_reader_errors( - errors: Iterable[InvalidSubjectSession], information: Dict + errors: Sequence[InvalidSubjectSession], pattern: QueryPattern ) -> str: message = ( f"Clinica encountered {len(errors)} " - f"problem(s) while getting {information['description']}:\n" + f"problem(s) while getting {pattern.description}:\n" ) - if "needed_pipeline" in information and information["needed_pipeline"]: + if pattern.needed_pipeline != "": message += ( "Please note that the following clinica pipeline(s) must " - f"have run to obtain these files: {information['needed_pipeline']}\n" + f"have run to obtain these files: {pattern.needed_pipeline}\n" ) if errors: message += "".join(f"\t* ({err.subject} | {err.session})\n" for err in errors) @@ -630,10 +602,10 @@ def _remove_sub_ses_from_list( # todo : generalize def clinica_file_reader( - subjects: Iterable[str], - sessions: Iterable[str], + subjects: Sequence[str], + sessions: Sequence[str], input_directory: os.PathLike, - information: Dict, + pattern: QueryPattern, n_procs: int = 1, ) -> Tuple[List[str], List[InvalidSubjectSession]]: """Read files in BIDS or CAPS directory based on participant ID(s). @@ -642,23 +614,17 @@ def clinica_file_reader( Parameters ---------- - subjects : List[str] + subjects : Sequence of str List of subjects. - sessions : List[str] + sessions : Sequence of str List of sessions. Must be same size as `subjects` and must correspond. input_directory : PathLike Path to the BIDS or CAPS directory to read from. - information : Dict - Dictionary containing all the relevant information to look for the files. - The possible keys are: - - - `pattern`: Required. Define the pattern of the final file. - - `description`: Required. String to describe what the file is. - - `needed_pipeline` : Optional. String describing the pipeline(s) - needed to obtain the related file. + pattern : QueryPattern + Pattern to query files. n_procs : int, optional Number of cores used to fetch files in parallel. @@ -751,21 +717,12 @@ def clinica_file_reader( """ input_directory = Path(input_directory) - _check_information(information) - pattern = information["pattern"] - is_bids = determine_caps_or_bids(input_directory) - if is_bids: - check_bids_folder(input_directory) - else: - check_caps_folder(input_directory) - + (check_bids_folder if is_bids else check_caps_folder)(input_directory) if len(subjects) != len(sessions): raise ValueError("Subjects and sessions must have the same length.") - if len(subjects) == 0: return [], [] - file_reader = _read_files_parallel if n_procs > 1 else _read_files_sequential return file_reader( input_directory, @@ -782,7 +739,7 @@ def _read_files_parallel( subjects: Iterable[str], sessions: Iterable[str], is_bids: bool, - pattern: str, + pattern: QueryPattern, n_procs: int, ) -> Tuple[List[str], List[InvalidSubjectSession]]: from multiprocessing import Manager @@ -793,7 +750,7 @@ def _read_files_parallel( shared_results = manager.list() shared_errors_encountered = manager.list() Parallel(n_jobs=n_procs)( - delayed(find_images_path)( + delayed(_find_images_path)( input_directory, sub, ses, @@ -814,22 +771,22 @@ def _read_files_sequential( subjects: Iterable[str], sessions: Iterable[str], is_bids: bool, - pattern: str, + pattern: QueryPattern, **kwargs, ) -> Tuple[List[str], List[InvalidSubjectSession]]: errors_encountered, results = [], [] for sub, ses in zip(subjects, sessions): - find_images_path( + _find_images_path( input_directory, sub, ses, errors_encountered, results, is_bids, pattern ) return results, errors_encountered def clinica_list_of_files_reader( - participant_ids: List[str], - session_ids: List[str], + participant_ids: Sequence[str], + session_ids: Sequence[str], bids_or_caps_directory: os.PathLike, - list_information: List[Dict], + patterns: Iterable[QueryPattern], raise_exception: Optional[bool] = True, ) -> List[List[str]]: """Read list of BIDS or CAPS files. @@ -839,19 +796,19 @@ def clinica_list_of_files_reader( Parameters ---------- - participant_ids : List[str] + participant_ids : Sequence of str List of participant IDs. Example: ['sub-CLNC01', 'sub-CLNC01', 'sub-CLNC02'] - session_ids : List[str] + session_ids : Sequence of str List of sessions ID associated to `participant_ids` Example: ['ses-M00', 'ses-M18', 'ses-M00'] bids_or_caps_directory : PathLike Path to the BIDS of CAPS directory to read from. - list_information : List[Dict] - List of information dictionaries described in `clinica_file_reader`. + patterns : Iterable of QueryPattern + List of query patterns. raise_exception : bool, optional Raise Exception or not. Defaults to True. @@ -865,19 +822,19 @@ def clinica_list_of_files_reader( all_errors = [] list_found_files = [] - for info_file in list_information: + for pattern in patterns: files, errors = clinica_file_reader( participant_ids, session_ids, bids_or_caps_directory, - info_file, + pattern, ) all_errors.append(errors) list_found_files.append([] if errors else files) if any(all_errors) and raise_exception: error_message = "Clinica faced error(s) while trying to read files in your BIDS or CAPS directory.\n" - for error, info in zip(all_errors, list_information): + for error, info in zip(all_errors, patterns): error_message += format_clinica_file_reader_errors(error, info) raise ClinicaBIDSError(error_message) @@ -886,8 +843,8 @@ def clinica_list_of_files_reader( def clinica_group_reader( caps_directory: os.PathLike, - information: Dict, - raise_exception: Optional[bool] = True, + pattern: QueryPattern, + raise_exception: bool = True, ) -> str: """Read files from CAPS directory based on group ID(s). @@ -899,14 +856,8 @@ def clinica_group_reader( caps_directory : PathLike Path to the input CAPS directory. - information : Dict - Dictionary containing all the relevant information to look for the files. - The possible keys are: - - - `pattern`: Required. Define the pattern of the final file. - - `description`: Required. String to describe what the file is. - - `needed_pipeline` : Optional. String describing the pipeline(s) - needed to obtain the related file. + pattern : QueryPattern + Query pattern to be used. raise_exception : bool, optional If True, an exception is raised if errors happen. @@ -923,32 +874,29 @@ def clinica_group_reader( ClinicaCAPSError : If no file is found, or more than 1 files are found. """ - _check_information(information) - pattern = information["pattern"] caps_directory = Path(caps_directory) check_caps_folder(caps_directory) - - current_pattern = caps_directory / "**" / pattern - found_files = insensitive_glob(str(current_pattern), recursive=True) + current_pattern = caps_directory / "**" / pattern.pattern + found_files = _insensitive_glob(str(current_pattern), recursive=True) # Since we are returning found_files[0], force raising even if raise_exception is False # Otherwise we'll get an uninformative IndexError... if (len(found_files) == 0) or (len(found_files) > 1 and raise_exception is True): - _format_and_raise_group_reader_errors(caps_directory, found_files, information) + _format_and_raise_group_reader_errors(caps_directory, found_files, pattern) return found_files[0] def _format_and_raise_group_reader_errors( caps_directory: os.PathLike, - found_files: List, - information: Dict, + found_files: Sequence[str], + pattern: QueryPattern, ) -> None: # todo : TEST from clinica.utils.exceptions import ClinicaCAPSError error_string = ( - f"Clinica encountered a problem while getting {information['description']}. " + f"Clinica encountered a problem while getting {pattern.description}. " ) if len(found_files) == 0: error_string += "No file was found" @@ -959,7 +907,7 @@ def _format_and_raise_group_reader_errors( error_string += ( f"\n\tCAPS directory: {caps_directory}\n" "Please note that the following clinica pipeline(s) must have run to obtain these files: " - f"{information['needed_pipeline']}\n" + f"{pattern.needed_pipeline}\n" ) raise ClinicaCAPSError(error_string) diff --git a/clinica/utils/spm.py b/clinica/utils/spm.py index 8255b86fc..08e8f1e03 100644 --- a/clinica/utils/spm.py +++ b/clinica/utils/spm.py @@ -3,6 +3,7 @@ from enum import Enum from os import PathLike from pathlib import Path +from typing import Union __all__ = [ "SPMTissue", @@ -23,7 +24,9 @@ class SPMTissue(str, Enum): BACKGROUND = "background" -def get_spm_tissue_from_index(index: int) -> SPMTissue: +def get_spm_tissue_from_index(index: Union[int, SPMTissue]) -> SPMTissue: + if isinstance(index, SPMTissue): + return index if index == 1: return SPMTissue.GRAY_MATTER if index == 2: diff --git a/test/unittests/pydra/test_interfaces.py b/test/unittests/pydra/test_interfaces.py index f879f6a02..38f5625f6 100644 --- a/test/unittests/pydra/test_interfaces.py +++ b/test/unittests/pydra/test_interfaces.py @@ -60,6 +60,7 @@ def test_caps_reader_instantiation(tmp_path, query, grabber, name): def test_caps_reader(tmp_path): from clinica.pydra.engine_utils import run from clinica.pydra.interfaces import caps_reader + from clinica.utils.group import GroupID structure = { "groups": ["UnitTest"], @@ -73,7 +74,7 @@ def test_caps_reader(tmp_path): query = CAPSFileQuery( { "mask_tissues": {"tissue_number": (1, 2, 3), "modulation": False}, - "flow_fields": {"group_label": "UnitTest"}, + "flow_fields": {"group_id": GroupID("group-UnitTest")}, "pvc_mask_tissues": {"tissue_number": (1, 2, 3)}, "dartel_template": {"group_label": "UnitTest"}, } @@ -98,7 +99,7 @@ def test_caps_reader(tmp_path): "sub-01_ses-M00_T1w_target-UnitTest_transformation-forward_deformation.nii.gz", "sub-03_ses-M00_T1w_target-UnitTest_transformation-forward_deformation.nii.gz", } - query = CAPSGroupQuery({"dartel_template": {"group_label": "UnitTest"}}) + query = CAPSGroupQuery({"dartel_template": {"group_id": GroupID("group-UnitTest")}}) task = caps_reader(query, tmp_path) results = run(task) assert hasattr(results.output, "dartel_template") diff --git a/test/unittests/pydra/test_query.py b/test/unittests/pydra/test_query.py index f04122b2e..84041c58d 100644 --- a/test/unittests/pydra/test_query.py +++ b/test/unittests/pydra/test_query.py @@ -3,6 +3,8 @@ import pytest from clinica.pydra.query import BIDSQuery, CAPSFileQuery, CAPSGroupQuery, Query +from clinica.utils.group import GroupID +from clinica.utils.input_files import QueryPattern def test_query(): @@ -40,76 +42,86 @@ def test_caps_file_query(): assert len(q) == 1 assert q.query == { "mask_tissues": [ - { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "normalized_space" - / "*_*_T1w_segm-graymatter_space-Ixi549Space_modulated-off_probability.nii*", - "description": "Tissue probability map graymatter based on native MRI in MNI space (Ixi549) without modulation.", - "needed_pipeline": "t1-volume-tissue-segmentation", - }, - { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "normalized_space" - / "*_*_T1w_segm-whitematter_space-Ixi549Space_modulated-off_probability.nii*", - "description": "Tissue probability map whitematter based on native MRI in MNI space (Ixi549) without modulation.", - "needed_pipeline": "t1-volume-tissue-segmentation", - }, + QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / "normalized_space" + / "*_*_T1w_segm-graymatter_space-Ixi549Space_modulated-off_probability.nii*" + ), + "Tissue probability map graymatter based on native MRI in MNI space (Ixi549) without modulation.", + "t1-volume-tissue-segmentation", + ), + QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / "normalized_space" + / "*_*_T1w_segm-whitematter_space-Ixi549Space_modulated-off_probability.nii*" + ), + "Tissue probability map whitematter based on native MRI in MNI space (Ixi549) without modulation.", + "t1-volume-tissue-segmentation", + ), ] } q = CAPSFileQuery( { "mask_tissues": {"tissue_number": (1, 2), "modulation": False}, - "flow_fields": {"group_label": "UnitTest"}, + "flow_fields": {"group_id": GroupID("group-UnitTest")}, } ) assert len(q) == 2 assert q.query == { "mask_tissues": [ - { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "normalized_space" - / "*_*_T1w_segm-graymatter_space-Ixi549Space_modulated-off_probability.nii*", - "description": "Tissue probability map graymatter based on native MRI in MNI space (Ixi549) without modulation.", - "needed_pipeline": "t1-volume-tissue-segmentation", - }, - { - "pattern": Path("t1") - / "spm" - / "segmentation" - / "normalized_space" - / "*_*_T1w_segm-whitematter_space-Ixi549Space_modulated-off_probability.nii*", - "description": "Tissue probability map whitematter based on native MRI in MNI space (Ixi549) without modulation.", - "needed_pipeline": "t1-volume-tissue-segmentation", - }, + QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / "normalized_space" + / "*_*_T1w_segm-graymatter_space-Ixi549Space_modulated-off_probability.nii*" + ), + "Tissue probability map graymatter based on native MRI in MNI space (Ixi549) without modulation.", + "t1-volume-tissue-segmentation", + ), + QueryPattern( + str( + Path("t1") + / "spm" + / "segmentation" + / "normalized_space" + / "*_*_T1w_segm-whitematter_space-Ixi549Space_modulated-off_probability.nii*" + ), + "Tissue probability map whitematter based on native MRI in MNI space (Ixi549) without modulation.", + "t1-volume-tissue-segmentation", + ), ], - "flow_fields": { - "pattern": Path("t1") - / "spm" - / "dartel" - / "group-UnitTest" - / "sub-*_ses-*_T1w_target-UnitTest_transformation-forward_deformation.nii*", - "description": "Deformation from native space to group template UnitTest space.", - "needed_pipeline": "t1-volume-create-dartel", - }, + "flow_fields": QueryPattern( + str( + Path("t1") + / "spm" + / "dartel" + / "group-UnitTest" + / "sub-*_ses-*_T1w_target-UnitTest_transformation-forward_deformation.nii*" + ), + "Deformation from native space to group template UnitTest space.", + "t1-volume-create-dartel", + ), } def test_caps_group_query(): - q = CAPSGroupQuery({"dartel_template": {"group_label": "UnitTest"}}) + q = CAPSGroupQuery({"dartel_template": {"group_id": GroupID("group-UnitTest")}}) assert len(q) == 1 assert q.query == { - "dartel_template": { - "pattern": Path("group-UnitTest") / "t1" / "group-UnitTest_template.nii*", - "description": "T1w template file of group UnitTest", - "needed_pipeline": "t1-volume or t1-volume-create-dartel", - } + "dartel_template": QueryPattern( + str(Path("group-UnitTest") / "t1" / "group-UnitTest_template.nii*"), + "T1w template file of group UnitTest", + "t1-volume or t1-volume-create-dartel", + ) } diff --git a/test/unittests/utils/test_input_files.py b/test/unittests/utils/test_input_files.py index 1625ca53e..307b1b93f 100644 --- a/test/unittests/utils/test_input_files.py +++ b/test/unittests/utils/test_input_files.py @@ -3,6 +3,7 @@ import pytest from clinica.utils.dwi import DTIBasedMeasure +from clinica.utils.input_files import QueryPattern from clinica.utils.pet import ReconstructionMethod, Tracer @@ -40,50 +41,228 @@ def toy_func_3(x, y=2, z=3): toy_func_3((1, 2, 3), z=(4, 5)) +def test_get_t1w_mri(): + from clinica.utils.input_files import get_t1w_mri + + pattern = get_t1w_mri() + + assert pattern.pattern == "sub-*_ses-*_t1w.nii*" + assert pattern.description == "T1w MRI" + assert pattern.needed_pipeline == "" + + +def test_get_t2w_mri(): + from clinica.utils.input_files import get_t2w_mri + + pattern = get_t2w_mri() + + assert pattern.pattern == "sub-*_ses-*_flair.nii*" + assert pattern.description == "FLAIR T2w MRI" + assert pattern.needed_pipeline == "" + + +def test_get_t1_freesurfer_segmentation_white_matter(): + from clinica.utils.input_files import get_t1_freesurfer_segmentation_white_matter + + pattern = get_t1_freesurfer_segmentation_white_matter() + + assert pattern.pattern == "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/wm.seg.mgz" + assert pattern.description == "segmentation of white matter (mri/wm.seg.mgz)." + assert pattern.needed_pipeline == "t1-freesurfer" + + +def test_get_t1_freesurfer_extracted_brain(): + from clinica.utils.input_files import get_t1_freesurfer_extracted_brain + + pattern = get_t1_freesurfer_extracted_brain() + + assert pattern.pattern == "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/brain.mgz" + assert pattern.description == "extracted brain from T1w MRI (mri/brain.mgz)." + assert pattern.needed_pipeline == "t1-freesurfer" + + +def test_get_t1_freesurfer_intensity_normalized_volume_after_nu(): + from clinica.utils.input_files import ( + get_t1_freesurfer_intensity_normalized_volume_after_nu, + ) + + pattern = get_t1_freesurfer_intensity_normalized_volume_after_nu() + + assert ( + pattern.pattern == "t1/freesurfer_cross_sectional/sub-*_ses-*/mri/orig_nu.mgz" + ) + assert pattern.description == ( + "intensity normalized volume generated after correction for" + " non-uniformity in FreeSurfer (mri/orig_nu.mgz)." + ) + assert pattern.needed_pipeline == "t1-freesurfer" + + +def test_get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu(): + from clinica.utils.input_files import ( + get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu, + ) + + pattern = get_t1_freesurfer_longitudinal_intensity_normalized_volume_after_nu() + + assert ( + pattern.pattern + == "t1/long-*/freesurfer_longitudinal/sub-*_ses-*.long.sub-*_*/mri/orig_nu.mgz" + ) + assert pattern.description == ( + "intensity normalized volume generated after correction for non-uniformity " + "in FreeSurfer (orig_nu.mgz) in longitudinal" + ) + assert pattern.needed_pipeline == "t1-freesurfer and t1-freesurfer longitudinal" + + +def test_get_t1w_to_mni_transform(): + from clinica.utils.input_files import get_t1w_to_mni_transform + + pattern = get_t1w_to_mni_transform() + + assert pattern.pattern == "*space-MNI152NLin2009cSym_res-1x1x1_affine.mat" + assert ( + pattern.description + == "Transformation matrix from T1W image to MNI space using t1-linear pipeline" + ) + assert pattern.needed_pipeline == "t1-linear" + + +def test_get_dwi_preprocessed_brainmask(): + from clinica.utils.input_files import get_dwi_preprocessed_brainmask + + pattern = get_dwi_preprocessed_brainmask() + + assert pattern.pattern == "dwi/preprocessing/sub-*_ses-*_space-*_brainmask.nii*" + assert pattern.description == "b0 brainmask" + assert ( + pattern.needed_pipeline + == "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap" + ) + + +@pytest.mark.parametrize( + "filetype,expected_pattern,expected_description,expected_pipelines", + [ + ("nii", "dwi/sub-*_ses-*_dwi.nii*", "DWI nii files.", ""), + ("json", "dwi/sub-*_ses-*_dwi.json*", "DWI json files.", ""), + ("bvec", "dwi/sub-*_ses-*_dwi.bvec*", "DWI bvec files.", ""), + ("bval", "dwi/sub-*_ses-*_dwi.bval*", "DWI bval files.", ""), + ], +) +def test_get_dwi_file( + filetype: str, + expected_pattern: str, + expected_description: str, + expected_pipelines: str, +): + from clinica.utils.input_files import get_dwi_file + + query = get_dwi_file(filetype) + + assert query.pattern == expected_pattern + assert query.description == expected_description + assert query.needed_pipeline == expected_pipelines + + +@pytest.mark.parametrize( + "filetype,expected_pattern,expected_description,expected_pipelines", + [ + ( + "nii", + "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.nii*", + "preprocessed nii files", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ), + ( + "json", + "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.json*", + "preprocessed json files", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ), + ( + "bvec", + "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.bvec*", + "preprocessed bvec files", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ), + ( + "bval", + "dwi/preprocessing/sub-*_ses-*_space-*_desc-preproc_dwi.bval*", + "preprocessed bval files", + "dwi-preprocessing-using-t1 or dwi-preprocessing-using-fieldmap", + ), + ], +) +def test_get_dwi_preprocessed_file( + filetype: str, + expected_pattern: str, + expected_description: str, + expected_pipelines: str, +): + from clinica.utils.input_files import get_dwi_preprocessed_file + + query = get_dwi_preprocessed_file(filetype) + + assert query.pattern == expected_pattern + assert query.description == expected_description + assert query.needed_pipeline == expected_pipelines + + def test_bids_pet_nii_empty(): - from clinica.utils.input_files import bids_pet_nii + from clinica.utils.input_files import get_pet_nifti - assert bids_pet_nii() == { - "pattern": Path("pet") / "*_pet.nii*", - "description": "PET data", - } + query = get_pet_nifti() + + assert query.pattern == str(Path("pet") / "*_pet.nii*") + assert query.description == "PET data" @pytest.fixture -def expected_bids_pet_query(tracer, reconstruction): - return { - "pattern": Path("pet") - / f"*_trc-{tracer.value}_rec-{reconstruction.value}_pet.nii*", - "description": f"PET data with {tracer.value} tracer and reconstruction method {reconstruction.value}", - } +def expected_bids_pet_query( + tracer: Tracer, reconstruction: ReconstructionMethod +) -> QueryPattern: + return QueryPattern( + str(Path("pet") / f"*_trc-{tracer.value}_rec-{reconstruction.value}_pet.nii*"), + f"PET data with {tracer.value} tracer and reconstruction method {reconstruction.value}", + "", + ) @pytest.mark.parametrize("tracer", Tracer) @pytest.mark.parametrize("reconstruction", ReconstructionMethod) -def test_bids_pet_nii(tracer, reconstruction, expected_bids_pet_query): - from clinica.utils.input_files import bids_pet_nii +def test_bids_pet_nii( + tracer: Tracer, + reconstruction: ReconstructionMethod, + expected_bids_pet_query: QueryPattern, +): + from clinica.utils.input_files import get_pet_nifti - assert bids_pet_nii(tracer, reconstruction) == expected_bids_pet_query + assert get_pet_nifti(tracer, reconstruction) == expected_bids_pet_query @pytest.mark.parametrize("dti_measure", DTIBasedMeasure) @pytest.mark.parametrize("space", [None, "*", "T1w"]) def test_dwi_dti_query(dti_measure, space): - from clinica.utils.input_files import dwi_dti + from clinica.utils.input_files import get_dwi_dti space = space or "*" - assert dwi_dti(dti_measure, space=space) == { - "pattern": f"dwi/dti_based_processing/*/*_space-{space}_{dti_measure.value}.nii.gz", - "description": f"DTI-based {dti_measure.value} in space {space}.", - "needed_pipeline": "dwi_dti", - } + query = get_dwi_dti(dti_measure, space=space) + + assert ( + query.pattern + == f"dwi/dti_based_processing/*/*_space-{space}_{dti_measure.value}.nii.gz" + ) + assert query.description == f"DTI-based {dti_measure.value} in space {space}." + assert query.needed_pipeline == "dwi_dti" def test_dwi_dti_query_error(): - from clinica.utils.input_files import dwi_dti + from clinica.utils.input_files import get_dwi_dti with pytest.raises( ValueError, match="'foo' is not a valid DTIBasedMeasure", ): - dwi_dti("foo") + get_dwi_dti("foo") diff --git a/test/unittests/utils/test_utils_inputs.py b/test/unittests/utils/test_utils_inputs.py index 6dbebc647..27489ea40 100644 --- a/test/unittests/utils/test_utils_inputs.py +++ b/test/unittests/utils/test_utils_inputs.py @@ -5,7 +5,9 @@ import pytest +from clinica.utils.dwi import DTIBasedMeasure from clinica.utils.exceptions import ClinicaBIDSError, ClinicaCAPSError +from clinica.utils.input_files import QueryPattern from clinica.utils.inputs import DatasetType, InvalidSubjectSession from clinica.utils.testing_utils import ( build_bids_directory, @@ -40,7 +42,7 @@ def test_remove_sub_ses_from_list( input_subjects, input_sessions, to_remove, expected_subjects, expected_sessions ): - from clinica.utils.inputs import _remove_sub_ses_from_list + from clinica.utils.inputs import _remove_sub_ses_from_list # noqa result_subjects, result_sessions = _remove_sub_ses_from_list( input_subjects, input_sessions, to_remove @@ -50,14 +52,14 @@ def test_remove_sub_ses_from_list( def test_get_parent_path(tmp_path): - from clinica.utils.inputs import _get_parent_path + from clinica.utils.inputs import _get_parent_path # noqa assert _get_parent_path(tmp_path / "bids" / "foo.txt") == str(tmp_path / "bids") @pytest.mark.parametrize("extension", [".txt", ".tar.gz", ".nii.gz", ".foo.bar.baz"]) def test_get_extension(tmp_path, extension): - from clinica.utils.inputs import _get_extension + from clinica.utils.inputs import _get_extension # noqa assert _get_extension(tmp_path / "bids" / f"foo{extension}") == extension @@ -72,7 +74,7 @@ def test_get_extension(tmp_path, extension): ], ) def test_get_suffix(tmp_path, filename, expected_suffix): - from clinica.utils.inputs import _get_suffix + from clinica.utils.inputs import _get_suffix # noqa assert _get_suffix(tmp_path / "bids" / filename) == expected_suffix @@ -86,13 +88,13 @@ def test_get_suffix(tmp_path, filename, expected_suffix): ], ) def test_get_run_number(tmp_path, filename, expected_run_number): - from clinica.utils.inputs import _get_run_number + from clinica.utils.inputs import _get_run_number # noqa assert _get_run_number(str(tmp_path / "bids" / filename)) == expected_run_number def test_select_run(tmp_path): - from clinica.utils.inputs import _select_run + from clinica.utils.inputs import _select_run # noqa files = [ str(tmp_path / "bids" / "foo_run-01.txt"), @@ -104,7 +106,7 @@ def test_select_run(tmp_path): def test_check_common_properties_of_files(tmp_path): - from clinica.utils.inputs import _check_common_properties_of_files + from clinica.utils.inputs import _check_common_properties_of_files # noqa files = [ tmp_path / "bids" / "foo_bar_baz.foo.bar", @@ -128,7 +130,7 @@ def first_entity_dummy_property_extractor(filename: Path) -> str: def test_check_common_properties_of_files_error(tmp_path): - from clinica.utils.inputs import _check_common_properties_of_files + from clinica.utils.inputs import _check_common_properties_of_files # noqa files = [ tmp_path / "bids" / "sub-01_ses-M000_pet.nii.gz", @@ -151,7 +153,7 @@ def first_letter_dummy_property_extractor(filename: Path) -> str: def test_get_entities(tmp_path): - from clinica.utils.inputs import _get_entities + from clinica.utils.inputs import _get_entities # noqa files = [ tmp_path / "bids" / "sub-01_ses-M000_run-01_pet.nii.gz", @@ -219,7 +221,7 @@ def test_get_entities(tmp_path): ), ) def test_are_not_multiple_runs(files): - from clinica.utils.inputs import _are_multiple_runs + from clinica.utils.inputs import _are_multiple_runs # noqa assert not _are_multiple_runs(files) @@ -244,13 +246,13 @@ def test_are_not_multiple_runs(files): ], ) def test_are_multiple_runs(files): - from clinica.utils.inputs import _are_multiple_runs + from clinica.utils.inputs import _are_multiple_runs # noqa assert _are_multiple_runs(files) def test_insensitive_glob(tmp_path): - from clinica.utils.inputs import insensitive_glob + from clinica.utils.inputs import _insensitive_glob # noqa files = [ "foo.py", @@ -266,12 +268,12 @@ def test_insensitive_glob(tmp_path): for file in files: d = tmp_path / file d.mkdir() - python_files = insensitive_glob(str(tmp_path / "*.py")) + python_files = _insensitive_glob(str(tmp_path / "*.py")) assert set([Path(f).name for f in python_files]) == {"foo.py", "bAZ.py", "Fooo.PY"} - text_files = insensitive_glob(str(tmp_path / "*.txt")) + text_files = _insensitive_glob(str(tmp_path / "*.txt")) assert set([Path(f).name for f in text_files]) == {"Bar.txt"} - assert len(insensitive_glob(str(tmp_path / "*.json"))) == 0 - all_python_files = insensitive_glob(str(tmp_path / "**/*.py"), recursive=True) + assert len(_insensitive_glob(str(tmp_path / "*.json"))) == 0 + all_python_files = _insensitive_glob(str(tmp_path / "**/*.py"), recursive=True) assert set([Path(f).name for f in all_python_files]) == { "foo.py", "bAZ.py", @@ -301,7 +303,7 @@ def test_determine_caps_or_bids(tmp_path): @pytest.mark.parametrize("folder_type", DatasetType) def test_validate_folder_existence(folder_type): - from clinica.utils.inputs import _validate_folder_existence + from clinica.utils.inputs import _validate_folder_existence # noqa with pytest.raises( TypeError, @@ -413,13 +415,19 @@ def test_check_caps_folder(tmp_path): def test_find_images_path_error_no_file(tmp_path): """Test function `find_images_path`.""" - from clinica.utils.inputs import find_images_path + from clinica.utils.inputs import _find_images_path # noqa (tmp_path / "sub-01" / "ses-M00" / "anat").mkdir(parents=True) errors, results = [], [] - find_images_path( - tmp_path, "sub-01", "ses-M00", errors, results, True, "sub-*_ses-*_t1w.nii*" + _find_images_path( + tmp_path, + "sub-01", + "ses-M00", + errors, + results, + True, + QueryPattern("sub-*_ses-*_t1w.nii*", "", ""), ) assert len(results) == 0 @@ -429,7 +437,7 @@ def test_find_images_path_error_no_file(tmp_path): def test_find_images_path_error_more_than_one_file(tmp_path): """Test function `find_images_path`.""" - from clinica.utils.inputs import find_images_path + from clinica.utils.inputs import _find_images_path # noqa errors, results = [], [] (tmp_path / "sub-01" / "ses-M00" / "anat" / "sub-01_ses-M00_T1w.nii.gz").mkdir( @@ -439,8 +447,14 @@ def test_find_images_path_error_more_than_one_file(tmp_path): tmp_path / "sub-01" / "ses-M00" / "anat" / "sub-01_ses-M00_foo-bar_T1w.nii.gz" ).mkdir(parents=True) - find_images_path( - tmp_path, "sub-01", "ses-M00", errors, results, True, "sub-*_ses-*_t1w.nii*" + _find_images_path( + tmp_path, + "sub-01", + "ses-M00", + errors, + results, + True, + QueryPattern("sub-*_ses-*_t1w.nii*", "", ""), ) assert len(results) == 0 @@ -450,15 +464,21 @@ def test_find_images_path_error_more_than_one_file(tmp_path): def test_find_images_path(tmp_path): """Test function `find_images_path`.""" - from clinica.utils.inputs import find_images_path + from clinica.utils.inputs import _find_images_path # noqa (tmp_path / "sub-01" / "ses-M00" / "anat" / "sub-01_ses-M00_T1w.nii.gz").mkdir( parents=True ) errors, results = [], [] - find_images_path( - tmp_path, "sub-01", "ses-M00", errors, results, True, "sub-*_ses-*_t1w.nii*" + _find_images_path( + tmp_path, + "sub-01", + "ses-M00", + errors, + results, + True, + QueryPattern("sub-*_ses-*_t1w.nii*", "", ""), ) assert len(results) == 1 @@ -469,7 +489,7 @@ def test_find_images_path(tmp_path): def test_find_images_path_multiple_runs(tmp_path): - from clinica.utils.inputs import find_images_path + from clinica.utils.inputs import _find_images_path # noqa errors, results = [], [] ( @@ -487,8 +507,14 @@ def test_find_images_path_multiple_runs(tmp_path): / "sub-01_ses-M06_run-02_foo-bar_T1w.nii.gz" ).mkdir(parents=True) - find_images_path( - tmp_path, "sub-01", "ses-M06", errors, results, True, "sub-*_ses-*_t1w.nii*" + _find_images_path( + tmp_path, + "sub-01", + "ses-M06", + errors, + results, + True, + QueryPattern("sub-*_ses-*_t1w.nii*", "", ""), ) assert len(results) == 1 @@ -498,64 +524,35 @@ def test_find_images_path_multiple_runs(tmp_path): ) -def test_check_information(): - """Test utility function `_check_information`.""" - from clinica.utils.inputs import _check_information - - with pytest.raises( - TypeError, - match="A dict or list of dicts must be provided for the argument 'information'", - ): - _check_information(42) # noqa - - with pytest.raises( - ValueError, - match="'information' must contain the keys 'pattern' and 'description'", - ): - _check_information({}) - - with pytest.raises( - ValueError, - match="'information' can only contain the keys 'pattern', 'description' and 'needed_pipeline'", - ): - _check_information({"pattern": "foo", "description": "bar", "foo": "bar"}) - - with pytest.raises( - ValueError, - match="pattern argument cannot start with", - ): - _check_information({"pattern": "/foo", "description": "bar"}) - - def test_format_errors(): """Test utility function `_format_errors`.""" from clinica.utils.inputs import format_clinica_file_reader_errors - information = {"description": "foo bar baz"} + pattern = QueryPattern("*", "foo bar baz", "") assert ( - format_clinica_file_reader_errors([], information) + format_clinica_file_reader_errors([], pattern) == "Clinica encountered 0 problem(s) while getting foo bar baz:\n" ) - information["needed_pipeline"] = ["pipeline_1", "pipeline_3"] - assert format_clinica_file_reader_errors([], information) == ( + pattern = QueryPattern("*", "foo bar baz", "pipeline_1 and pipeline_3") + assert format_clinica_file_reader_errors([], pattern) == ( "Clinica encountered 0 problem(s) while getting foo bar baz:\n" "Please note that the following clinica pipeline(s) must have " - "run to obtain these files: ['pipeline_1', 'pipeline_3']\n" + "run to obtain these files: pipeline_1 and pipeline_3\n" ) errors = [ InvalidSubjectSession("sub1", "ses1"), InvalidSubjectSession("sub2", "ses1"), InvalidSubjectSession("sub3", "ses1"), ] - assert format_clinica_file_reader_errors(errors, information) == ( + assert format_clinica_file_reader_errors(errors, pattern) == ( "Clinica encountered 3 problem(s) while getting foo bar baz:\n" "Please note that the following clinica pipeline(s) must have " - "run to obtain these files: ['pipeline_1', 'pipeline_3']\n" + "run to obtain these files: pipeline_1 and pipeline_3\n" "\t* (sub1 | ses1)\n\t* (sub2 | ses1)\n\t* (sub3 | ses1)\n" "Clinica could not identify which file to use (missing or too many) for these sessions. They will not be processed." ) - information.pop("needed_pipeline") - assert format_clinica_file_reader_errors(errors, information) == ( + pattern = QueryPattern("*", "foo bar baz", "") + assert format_clinica_file_reader_errors(errors, pattern) == ( "Clinica encountered 3 problem(s) while getting foo bar baz:\n" "\t* (sub1 | ses1)\n\t* (sub2 | ses1)\n\t* (sub3 | ses1)\n" "Clinica could not identify which file to use (missing or too many) for these sessions. They will not be processed." @@ -574,13 +571,11 @@ def test_clinica_file_reader_bids_directory(tmp_path, data_type): } build_bids_directory(tmp_path, config) - - desc = "T1w MRI" if data_type == "T1w" else "FLAIR T2w MRI" - information = { - "pattern": f"sub-*_ses-*_{data_type}.nii*", - "description": desc, - } - + pattern = QueryPattern( + f"sub-*_ses-*_{data_type}.nii*", + "T1w MRI" if data_type == "T1w" else "FLAIR T2w MRI", + "", + ) with pytest.raises( ValueError, match="Subjects and sessions must have the same length.", @@ -589,12 +584,12 @@ def test_clinica_file_reader_bids_directory(tmp_path, data_type): ["sub-02"], ["ses-M00", "ses-M06"], tmp_path, - information, + pattern, n_procs=1, ) - assert clinica_file_reader([], [], tmp_path, information, n_procs=1) == ([], []) + assert clinica_file_reader([], [], tmp_path, pattern, n_procs=1) == ([], []) results, errors = clinica_file_reader( - ["sub-01"], ["ses-M00"], tmp_path, information, n_procs=1 + ["sub-01"], ["ses-M00"], tmp_path, pattern, n_procs=1 ) assert len(results) == 1 assert Path(results[0]).relative_to(tmp_path) == Path( @@ -606,7 +601,7 @@ def test_clinica_file_reader_bids_directory(tmp_path, data_type): ["sub-01", "sub-02", "sub-02", "sub-06"], ["ses-M00", "ses-M00", "ses-M06", "ses-M00"], tmp_path, - information, + pattern, n_procs=4, ) assert len(results) == 4 @@ -620,7 +615,7 @@ def test_clinica_file_reader_bids_directory(tmp_path, data_type): / f"sub-01_ses-M00_foo-bar_{data_type}.nii.gz" ).mkdir() results, errors = clinica_file_reader( - ["sub-01"], ["ses-M00"], tmp_path, information, n_procs=1 + ["sub-01"], ["ses-M00"], tmp_path, pattern, n_procs=1 ) assert len(results) == 0 assert errors == [InvalidSubjectSession("sub-01", "ses-M00")] @@ -641,12 +636,11 @@ def test_clinica_file_reader_caps_directory(tmp_path): build_caps_directory(tmp_path, config) - information = { - "pattern": "*space-MNI152NLin2009cSym_res-1x1x1_T1w.nii.gz", - "description": "T1w image registered in MNI152NLin2009cSym space using t1-linear pipeline", - "needed_pipeline": "t1-linear", - } - + pattern = QueryPattern( + "*space-MNI152NLin2009cSym_res-1x1x1_T1w.nii.gz", + "T1w image registered in MNI152NLin2009cSym space using t1-linear pipeline", + "t1-linear", + ) with pytest.raises( ValueError, match="Subjects and sessions must have the same length.", @@ -655,14 +649,14 @@ def test_clinica_file_reader_caps_directory(tmp_path): ["sub-01"], ["ses-M00", "ses-M06"], tmp_path, - information, + pattern, n_procs=1, ) - assert clinica_file_reader([], [], tmp_path, information, n_procs=1) == ([], []) + assert clinica_file_reader([], [], tmp_path, pattern, n_procs=1) == ([], []) results, errors = clinica_file_reader( - ["sub-01"], ["ses-M00"], tmp_path, information, n_procs=1 + ["sub-01"], ["ses-M00"], tmp_path, pattern, n_procs=1 ) assert len(results) == 1 assert not errors @@ -671,7 +665,7 @@ def test_clinica_file_reader_caps_directory(tmp_path): ["sub-01", "sub-02", "sub-02", "sub-06"], ["ses-M00", "ses-M00", "ses-M06", "ses-M00"], tmp_path, - information, + pattern, n_procs=4, ) assert len(results) == 4 @@ -686,25 +680,25 @@ def test_clinica_file_reader_caps_directory(tmp_path): / "sub-01_ses-M00_foo-bar_T1w_space-MNI152NLin2009cSym_res-1x1x1_T1w.nii.gz" ).mkdir() results, errors = clinica_file_reader( - ["sub-01"], ["ses-M00"], tmp_path, information, n_procs=1 + ["sub-01"], ["ses-M00"], tmp_path, pattern, n_procs=1 ) assert len(results) == 0 assert errors == [InvalidSubjectSession("sub-01", "ses-M00")] def test_clinica_file_reader_dwi_dti_error(tmp_path): - from clinica.utils.input_files import dwi_dti + from clinica.utils.input_files import get_dwi_dti from clinica.utils.inputs import clinica_file_reader # todo : should be tested by check_caps_folder instead ? - query = dwi_dti("FA", space="T1w") + query = get_dwi_dti(measure=DTIBasedMeasure.FRACTIONAL_ANISOTROPY, space="T1w") with pytest.raises(ClinicaCAPSError): clinica_file_reader(["sub-01"], ["ses-M000"], tmp_path, query) def test_clinica_file_reader_dwi_dti(tmp_path): from clinica.utils.dwi import DTIBasedMeasure - from clinica.utils.input_files import dwi_dti + from clinica.utils.input_files import get_dwi_dti from clinica.utils.inputs import clinica_file_reader, clinica_list_of_files_reader dti_folder = ( @@ -724,11 +718,11 @@ def test_clinica_file_reader_dwi_dti(tmp_path): ) for measure in DTIBasedMeasure: (dti_folder / f"sub-01_ses-M000_space-T1w_{measure.value}.nii.gz").touch() - query = dwi_dti("FA", space="T1w") + query = get_dwi_dti(DTIBasedMeasure.FRACTIONAL_ANISOTROPY, space="T1w") found_files, _ = clinica_file_reader(["sub-01"], ["ses-M000"], tmp_path, query) assert found_files == [str(dti_folder / "sub-01_ses-M000_space-T1w_FA.nii.gz")] - queries = [dwi_dti(measure) for measure in DTIBasedMeasure] + queries = [get_dwi_dti(measure) for measure in DTIBasedMeasure] found_files = clinica_list_of_files_reader( ["sub-01"], ["ses-M000"], tmp_path, queries, raise_exception=True ) @@ -753,22 +747,15 @@ def test_clinica_list_of_files_reader(tmp_path): build_bids_directory(tmp_path, config) - information = [ - { - "pattern": "sub-*_ses-*_t1w.nii*", - "description": "T1w MRI", - }, - { - "pattern": "sub-*_ses-*_flair.nii*", - "description": "FLAIR T2w MRI", - }, + patterns = [ + QueryPattern("sub-*_ses-*_t1w.nii*", "T1w MRI", ""), + QueryPattern("sub-*_ses-*_flair.nii*", "FLAIR T2w MRI", ""), ] - results = clinica_list_of_files_reader( ["sub-02", "sub-06", "sub-02"], ["ses-M00", "ses-M00", "ses-M06"], tmp_path, - information, + patterns, raise_exception=True, ) assert len(results) == 2 @@ -789,14 +776,14 @@ def test_clinica_list_of_files_reader(tmp_path): ["sub-02", "sub-06", "sub-02"], ["ses-M00", "ses-M00", "ses-M06"], tmp_path, - information, + patterns, raise_exception=True, ) results = clinica_list_of_files_reader( ["sub-02", "sub-06", "sub-02"], ["ses-M00", "ses-M00", "ses-M06"], tmp_path, - information, + patterns, raise_exception=False, ) @@ -815,19 +802,22 @@ def test_clinica_group_reader(tmp_path): } build_caps_directory(tmp_path, config) group_label = "UnitTest" - information = { - "pattern": os.path.join( + pattern = QueryPattern( + os.path.join( f"group-{group_label}", "t1", f"group-{group_label}_template.nii*" ), - "description": f"T1w template file of group {group_label}", - "needed_pipeline": "t1-volume or t1-volume-create-dartel", - } + f"T1w template file of group {group_label}", + "t1-volume or t1-volume-create-dartel", + ) with pytest.raises( ClinicaCAPSError, - match="Clinica encountered a problem while getting T1w template file of group UnitTest. No file was found", + match=( + "Clinica encountered a problem while getting T1w " + "template file of group UnitTest. No file was found" + ), ): for raise_exception in [True, False]: - clinica_group_reader(tmp_path, information, raise_exception=raise_exception) + clinica_group_reader(tmp_path, pattern, raise_exception=raise_exception) (tmp_path / "groups").mkdir() (tmp_path / "groups" / f"group-{group_label}").mkdir() (tmp_path / "groups" / f"group-{group_label}" / "t1").mkdir() @@ -838,7 +828,7 @@ def test_clinica_group_reader(tmp_path): / "t1" / f"group-{group_label}_template.nii.gz" ).mkdir() - result = clinica_group_reader(tmp_path, information, raise_exception=True) + result = clinica_group_reader(tmp_path, pattern, raise_exception=True) assert Path(result).relative_to(tmp_path) == Path( "groups/group-UnitTest/t1/group-UnitTest_template.nii.gz" ) @@ -853,6 +843,6 @@ def test_clinica_group_reader(tmp_path): ClinicaCAPSError, match="Clinica encountered a problem while getting T1w template file of group UnitTest. 2 files were found", ): - clinica_group_reader(tmp_path, information, raise_exception=True) - result = clinica_group_reader(tmp_path, information, raise_exception=False) + clinica_group_reader(tmp_path, pattern, raise_exception=True) + result = clinica_group_reader(tmp_path, pattern, raise_exception=False) assert Path(result).stem == "group-UnitTest_template.nii"