diff --git a/moabb/datasets/__init__.py b/moabb/datasets/__init__.py index 5b3a41fae..a86de8455 100644 --- a/moabb/datasets/__init__.py +++ b/moabb/datasets/__init__.py @@ -82,6 +82,7 @@ from .utils import _init_dataset_list from .Weibo2014 import Weibo2014 from .Zhou2016 import Zhou2016 +from .beetl import beetlA, beetlB # Call this last in order to make sure the dataset list is populated with diff --git a/moabb/datasets/beetl.py b/moabb/datasets/beetl.py new file mode 100644 index 000000000..1b3e57ca5 --- /dev/null +++ b/moabb/datasets/beetl.py @@ -0,0 +1,272 @@ +import logging +import os +from pathlib import Path +import mne +import numpy as np +import pooch +from .base import BaseDataset +from .download import get_dataset_path +import moabb.datasets.download as dl + + +LOGGER = logging.getLogger(__name__) +BASE_URL = "https://ndownloader.figshare.com/files/" + +LEADERBOARD_ARTICLE_ID = 14839650 +FINAL_EVALUATION_ARTICLE_ID = 16586213 +FINAL_LABEL_TXT_ARTICLE_ID = 21602622 + +class beetlA(BaseDataset): + """Motor Imagery dataset from BEETL Competition - Dataset A. + + Dataset A contains data from subjects with 500 Hz sampling rate and 63 EEG channels. + In the leaderboard phase, this includes subjects 1-2, while in the final phase it includes + subjects 1-3. + + Motor imagery tasks include: + - Rest (label 0) + - Left hand (label 1) + - Right hand (label 2) + - Feet (label 3) + + References + ---------- + .. [1] Original dataset: https://www.kaggle.com/competitions/beetl + """ + + def __init__(self, phase="final"): + """Initialize BEETL Dataset A. + + Parameters + ---------- + phase : str + Either "leaderboard" (subjects 1-2) or "final" (subjects 1-3) + """ + if phase not in ["leaderboard", "final"]: + raise ValueError("Phase must be either 'leaderboard' or 'final'") + + self.phase = phase + subjects = list(range(1, 3)) if phase == "leaderboard" else list(range(1, 4)) + + # Channel setup + self.ch_names = ['Fp1', 'Fz', 'F3', 'F7', 'FT9', 'FC5', 'FC1', 'C3', 'T7', 'TP9', + 'CP5', 'CP1', 'Pz', 'P3', 'P7', 'O1', 'Oz', 'O2', 'P4', 'P8', + 'TP10', 'CP6', 'CP2', 'C4', 'T8', 'FT10', 'FC6', 'FC2', 'F4', + 'F8', 'Fp2', 'AF7', 'AF3', 'AFz', 'F1', 'F5', 'FT7', 'FC3', + 'FCz', 'C1', 'C5', 'TP7', 'CP3', 'P1', 'P5', 'PO7', 'PO3', + 'POz', 'PO4', 'PO8', 'P6', 'P2', 'CPz', 'CP4', 'TP8', 'C6', + 'C2', 'FC4', 'FT8', 'F6', 'F2', 'AF4', 'AF8'] + + self.sfreq = 500 + + + + super().__init__( + subjects=subjects, + sessions_per_subject=1, # Data is concatenated into one session + events=dict( + rest=0, + left_hand=1, + right_hand=2, + feet=3 + ), + code="beetl", + interval=[0, 4], # 4s trial window + paradigm="imagery", + ) + + def _get_single_subject_data(self, subject): + """Return data for a single subject.""" + file_paths = self.data_path(subject) + + # Create MNE info + info = mne.create_info(ch_names=self.ch_names, sfreq=self.sfreq, ch_types=['eeg'] * len(self.ch_names)) + + + phase_str = "leaderboardMI" if self.phase == "leaderboard" else "finalMI" + subject_dir = Path(file_paths[0]) / phase_str / phase_str / f'S{subject}' + + data_list = [] + labels_list = [] + + # Load training data + for race in range(1, 6): + data_file = subject_dir / 'training' / f'race{race}_padsData.npy' + label_file = subject_dir / 'training' / f'race{race}_padsLabel.npy' + if data_file.exists() and label_file.exists(): + data_list.append(np.load(data_file, allow_pickle=True)) + labels_list.append(np.load(label_file, allow_pickle=True)) + + data = np.concatenate(data_list) + labels = np.concatenate(labels_list) + + # Create events array + events = np.column_stack(( + np.arange(0, len(labels) * data.shape[-1], data.shape[-1]), + np.zeros(len(labels), dtype=int), + labels + )) + + # Create Raw object + event_desc = {int(code): name for name, code in self.event_id.items()} + raw = mne.io.RawArray(np.hstack(data), info) + raw.set_annotations(mne.annotations_from_events( + events=events, + event_desc=event_desc, + sfreq=self.sfreq + )) + + return {"0": {"0": raw}} + + def data_path( + self, subject, path=None, force_update=False, update_path=None, verbose=None + ): + """Return path to the data files.""" + if subject not in self.subject_list: + raise ValueError(f"Subject {subject} not in {self.subject_list}") + + path = get_dataset_path("BEETL", path) + base_path = Path(os.path.join(path, f"MNE-{self.code:s}-data") +) + # Create the directory if it doesn't exist + base_path.mkdir(parents=True, exist_ok=True) + + # Download data if needed + for article_id in [LEADERBOARD_ARTICLE_ID, FINAL_EVALUATION_ARTICLE_ID]: + file_list = dl.fs_get_file_list(article_id) + hash_file_list = dl.fs_get_file_hash(file_list) + id_file_list = dl.fs_get_file_id(file_list) + + for file_name in id_file_list.keys(): + fpath = base_path / file_name + if not fpath.exists() or force_update: + pooch.retrieve( + url=BASE_URL + id_file_list[file_name], + known_hash=hash_file_list[id_file_list[file_name]], + fname=file_name, + path=base_path, + processor=pooch.Unzip(extract_dir=os.path.splitext(file_name)[0]), + downloader=pooch.HTTPDownloader(progressbar=True), + ) + + return [str(base_path)] + + +class beetlB(BaseDataset): + """Motor Imagery dataset from BEETL Competition - Dataset B. + + Dataset B contains data from subjects with 200 Hz sampling rate and 32 EEG channels. + In the leaderboard phase, this includes subjects 3-5, while in the final phase it includes + subjects 4-5. + + Motor imagery tasks include: + - Left hand (label 0) + - Right hand (label 1) + - Feet (label 2) + - Rest (label 3) + + References + ---------- + .. [1] Original dataset: https://www.kaggle.com/competitions/beetl + """ + + def __init__(self, phase="final"): + """Initialize BEETL Dataset B. + + Parameters + ---------- + phase : str + Either "leaderboard" (subjects 3-5) or "final" (subjects 4-5) + """ + if phase not in ["leaderboard", "final"]: + raise ValueError("Phase must be either 'leaderboard' or 'final'") + + self.phase = phase + subjects = list(range(3, 6)) if phase == "leaderboard" else list(range(4, 6)) + + super().__init__( + subjects=subjects, + sessions_per_subject=1, # Data is concatenated into one session + events=dict( + left_hand=0, + right_hand=1, + feet=2, + rest=3 + ), + code="beetl", + interval=[0, 4], # 4s trial window + paradigm="imagery", + ) + + def _get_single_subject_data(self, subject): + """Return data for a single subject.""" + file_paths = self.data_path(subject) + + # Channel setup + ch_names = ['Fp1', 'Fp2', 'F3', 'Fz', 'F4', 'FC5', 'FC1', 'FC2', 'FC6', + 'C5', 'C3', 'C1', 'Cz', 'C2', 'C4', 'C6', 'CP5', 'CP3', 'CP1', + 'CPz', 'CP2', 'CP4', 'CP6', 'P7', 'P5', 'P3', 'P1', 'Pz', 'P2', + 'P4', 'P6', 'P8'] + sfreq = 200 + + # Create MNE info + info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=['eeg'] * len(ch_names)) + + # Load data + phase_str = "leaderboardMI" if self.phase == "leaderboard" else "finalMI" + subject_dir = Path(file_paths[0]) / phase_str / phase_str / f'S{subject}' + + # Load training data + data = np.load(subject_dir / 'training' / f'training_s{subject}X.npy', allow_pickle=True) + labels = np.load(subject_dir / 'training' / f'training_s{subject}y.npy', allow_pickle=True) + + # Create events array + events = np.column_stack(( + np.arange(0, len(labels) * data.shape[-1], data.shape[-1]), + np.zeros(len(labels), dtype=int), + labels + )) + + # Create Raw object + event_desc = {int(code): name for name, code in self.event_id.items()} + raw = mne.io.RawArray(np.hstack(data), info) + raw.set_annotations(mne.annotations_from_events( + events=events, + event_desc=event_desc, + sfreq=sfreq + )) + + return {"0": {"0": raw}} + + def data_path( + self, subject, path=None, force_update=False, update_path=None, verbose=None + ): + """Return path to the data files.""" + if subject not in self.subject_list: + raise ValueError(f"Subject {subject} not in {self.subject_list}") + + path = get_dataset_path("BEETL", path) + base_path = Path(path) + + # Create the directory if it doesn't exist + base_path.mkdir(parents=True, exist_ok=True) + + # Download data if needed + for article_id in [LEADERBOARD_ARTICLE_ID, FINAL_EVALUATION_ARTICLE_ID]: + file_list = dl.fs_get_file_list(article_id) + hash_file_list = dl.fs_get_file_hash(file_list) + id_file_list = dl.fs_get_file_id(file_list) + + for file_name in id_file_list.keys(): + fpath = base_path / file_name + if not fpath.exists() or force_update: + pooch.retrieve( + url=BASE_URL + id_file_list[file_name], + known_hash=hash_file_list[id_file_list[file_name]], + fname=file_name, + path=base_path, + processor=pooch.Unzip(extract_dir=os.path.splitext(file_name)[0]), + downloader=pooch.HTTPDownloader(progressbar=True), + ) + + return [str(base_path)]