Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add tasks for reading BIDS files and dataset #1141

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clinica/pydra/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .parsers import parse_bids_file
from .readers import read_bids, read_bids_dataset, read_bids_files
from .templates import (
download_mni_template_2009a,
download_mni_template_2009c,
Expand Down
123 changes: 123 additions & 0 deletions clinica/pydra/tasks/readers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import annotations

from os import PathLike
from pathlib import Path
from typing import Optional, Sequence

from pydra.engine import Workflow
from pydra.mark import annotate, task

__all__ = ["read_bids", "read_bids_dataset", "read_bids_files"]


@task
@annotate(
{
"return": {
"dataset_description": Optional[dict],
"participant_ids": list[str],
"session_ids": Optional[list[str]],
}
}
)
def read_bids_dataset(dataset_path: PathLike):
dataset_path = Path(dataset_path).resolve()
description_file = dataset_path / "dataset_description.json"
dataset_description = (
json.load(description_file.read_text()) if description_file.exists() else None
)

try:
_ = next(dataset_path.glob("*/ses-*"))
multi_sessions = True
except StopIteration:
multi_sessions = False

if multi_sessions:
visits = dataset_path.glob("sub-*/ses-*")
participant_ids, session_ids = list(
map(
list,
zip(
*(
str(visit.relative_to(dataset_path)).split("/")
for visit in visits
)
),
)
)
else:
visits = dataset_path.glob("sub-*")
participant_ids = sorted(
str(visit.relative_to(dataset_path)) for visit in visits
)
session_ids = None

return dataset_description, participant_ids, session_ids


@task
@annotate({"return": {"files": list[Path]}})
def read_bids_files(
dataset_path: PathLike,
participant_ids: Sequence[str] | None = None,
session_ids: Sequence[str] | None = None,
datatype: str | None = None,
suffix: str | None = None,
extension: str | None = None,
):
dataset_path = Path(dataset_path).resolve()
datatype = datatype or "*"
suffix = suffix or "*"
extension = extension or "*"
files = []

if all([participant_ids, session_ids]):
for participant_id, session_id in zip([participant_ids, session_ids]):
dir_pattern = f"{participant_id}/{session_id}/{datatype}"
name_pattern = f"{participant_id}_{session_id}*_{suffix}.{extension}"
file_pattern = f"{dir_pattern}/{name_pattern}"
files += sorted(dataset_path.glob(file_pattern))
elif participant_ids:
for participant_id in participant_ids:
dir_pattern = f"{participant_id}/**/{datatype}"
name_pattern = f"{participant_id}*_{suffix}.{extension}"
file_pattern = f"{dir_pattern}/{name_pattern}"
files += sorted(dataset_path.glob(file_pattern))
else:
dir_pattern = f"**/{datatype}"
name_pattern = f"*_{suffix}.{extension}"
file_pattern = f"{dir_pattern}/{name_pattern}"
files += sorted(dataset_path.glob(file_pattern))

return files


def read_bids(output_queries: dict, **kwargs) -> Workflow:
workflow = Workflow(name="read_bids", input_spec=["dataset_path"], **kwargs)

workflow.add(
read_bids_dataset(
name="read_bids_dataset", dataset_path=workflow.lzin.dataset_path
)
)
connections = {
"dataset_description": workflow.read_bids_dataset.lzout.dataset_description,
"participant_ids": workflow.read_bids_dataset.lzout.participant_ids,
"session_ids": workflow.read_bids_dataset.lzout.session_ids,
}

for output_name, bids_query in output_queries.items():
task_ = read_bids_files(
name=f"read_{output_name}",
dataset_path=workflow.lzin.dataset_path,
participant_ids=workflow.read_bids_dataset.lzout.participant_ids,
session_ids=workflow.read_bids_dataset.lzout.session_ids,
**bids_query,
)
workflow.add(task_)
connections.update({output_name: task_.lzout.files})

workflow.set_output(connections=connections)

return workflow
2 changes: 2 additions & 0 deletions clinica/pydra/tasks/templates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from pathlib import PurePath

from pydra.mark import annotate, task
Expand Down
Loading