From e1e9e1464ce07c28b385f9452d7fc6842d012a98 Mon Sep 17 00:00:00 2001 From: Ghislain Vaillant Date: Fri, 19 Apr 2024 14:59:15 +0200 Subject: [PATCH] ENH: Add tasks for reading BIDS files and dataset --- clinica/pydra/tasks.py | 104 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/clinica/pydra/tasks.py b/clinica/pydra/tasks.py index fc25848278..e503bd4a4c 100644 --- a/clinica/pydra/tasks.py +++ b/clinica/pydra/tasks.py @@ -1,5 +1,12 @@ -from pathlib import PurePath +from __future__ import annotations +import json +from collections.abc import Sequence +from os import PathLike +from pathlib import PurePath, Path +from typing import Optional + +from pydra.engine import Workflow from pydra.mark import annotate, task @@ -47,3 +54,98 @@ def download_ref_template() -> PurePath: url="https://aramislab.paris.inria.fr/files/data/img_t1_linear/ref_cropped_template.nii.gz", to=str(Path.cwd() / "ref_cropped_template.nii.gz"), ) + + +@task +@annotate({ + "return": { + "dataset_description": Optional[dict], + "participant_ids": list[str], + "session_ids": Optional[list[str]], + } +}) +def read_bids_dataset(dataset_path: PathLike): + dataset_path = Path(dataset_path).resolve() + description_file = dataset_path / "dataset_description.json" + dataset_description = json.load(description_file.read_text()) if description_file.exists() else None + + try: + _ = next(dataset_path.glob("*/ses-*")) + multi_sessions = True + except StopIteration: + multi_sessions = False + + if multi_sessions: + visits = dataset_path.glob("sub-*/ses-*") + participant_ids, session_ids = list(map( + list, zip(*(str(visit.relative_to(dataset_path)).split("/") for visit in visits)) + )) + else: + visits = dataset_path.glob("sub-*") + participant_ids = sorted(str(visit.relative_to(dataset_path)) for visit in visits) + session_ids = None + + return dataset_description, participant_ids, session_ids + + +@task +@annotate({"return": {"files": list[Path]}}) +def read_bids_files( + dataset_path: PathLike, + participant_ids: Sequence[str] | None = None, + session_ids: Sequence[str] | None = None, + datatype: str | None = None, + suffix: str | None = None, + extension: str | None = None, +): + dataset_path = Path(dataset_path).resolve() + datatype = datatype or "*" + suffix = suffix or "*" + extension = extension or "*" + files = [] + + if all([participant_ids, session_ids]): + for participant_id, session_id in zip([participant_ids, session_ids]): + dir_pattern = f"{participant_id}/{session_id}/{datatype}" + name_pattern = f"{participant_id}_{session_id}*_{suffix}.{extension}" + file_pattern = f"{dir_pattern}/{name_pattern}" + files += sorted(dataset_path.glob(file_pattern)) + elif participant_ids: + for participant_id in participant_ids: + dir_pattern = f"{participant_id}/**/{datatype}" + name_pattern = f"{participant_id}*_{suffix}.{extension}" + file_pattern = f"{dir_pattern}/{name_pattern}" + files += sorted(dataset_path.glob(file_pattern)) + else: + dir_pattern = f"**/{datatype}" + name_pattern = f"*_{suffix}.{extension}" + file_pattern = f"{dir_pattern}/{name_pattern}" + files += sorted(dataset_path.glob(file_pattern)) + + return files + + +def read_bids(output_queries: dict, **kwargs): + workflow = Workflow(name="read_bids", input_spec=["dataset_path"], **kwargs) + + workflow.add(read_bids_dataset(name="read_bids_dataset", dataset_path=workflow.lzin.dataset_path)) + connections = { + "dataset_description": workflow.read_bids_dataset.lzout.dataset_description, + "participant_ids": workflow.read_bids_dataset.lzout.participant_ids, + "session_ids": workflow.read_bids_dataset.lzout.session_ids, + } + + for output_name, bids_query in output_queries.items(): + task_ = read_bids_files( + name=f"read_{output_name}", + dataset_path=workflow.lzin.dataset_path, + participant_ids=workflow.read_bids_dataset.lzout.participant_ids, + session_ids=workflow.read_bids_dataset.lzout.session_ids, + **bids_query, + ) + workflow.add(task_) + connections.update({output_name: task_.lzout.files}) + + workflow.set_output(connections=connections) + + return workflow