Skip to content

Commit

Permalink
Support digital channels on NIDQ interface and use TimeSeries instead…
Browse files Browse the repository at this point in the history
… of ElecricalSeries for analog channels (#1152)

Co-authored-by: Ben Dichter <[email protected]>
  • Loading branch information
h-mayorquin and bendichter authored Dec 11, 2024
1 parent 1032e15 commit 5356263
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 218 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* Make `NWBMetaDataEncoder` public again [PR #1142](https://github.com/catalystneuro/neuroconv/pull/1142)
* Fix a bug where data in `DeepLabCutInterface` failed to write when `ndx-pose` was not imported. [#1144](https://github.com/catalystneuro/neuroconv/pull/1144)
* `SpikeGLXConverterPipe` converter now accepts multi-probe structures with multi-trigger and does not assume a specific folder structure [#1150](https://github.com/catalystneuro/neuroconv/pull/1150)
* `SpikeGLXNIDQInterface` is no longer written as an ElectricalSeries [#1152](https://github.com/catalystneuro/neuroconv/pull/1152)


## Features
* Propagate the `unit_electrode_indices` argument from the spikeinterface tools to `BaseSortingExtractorInterface`. This allows users to map units to the electrode table when adding sorting data [PR #1124](https://github.com/catalystneuro/neuroconv/pull/1124)
Expand All @@ -17,7 +19,10 @@
* `SpikeGLXRecordingInterface` now also accepts `folder_path` making its behavior equivalent to SpikeInterface [#1150](https://github.com/catalystneuro/neuroconv/pull/1150)
* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085)
* Added the `deploy_neuroconv_batch_job` helper function for deploying NeuroConv AWS Batch jobs. [PR #1086](https://github.com/catalystneuro/neuroconv/pull/1086)
* YAML specification files now accept an outer keyword `upload_to_dandiset="< six-digit ID >"` to automatically upload the produced NWB files to the DANDI archive [PR #1089](https://github.com/catalystneuro/neuroconv/pull/1089)
* YAML specification files now accepts an outer keyword `upload_to_dandiset="< six-digit ID >"` to automatically upload the produced NWB files to the DANDI archive [PR #1089](https://github.com/catalystneuro/neuroconv/pull/1089)
*`SpikeGLXNIDQInterface` now handdles digital demuxed channels (`XD0`) [#1152](https://github.com/catalystneuro/neuroconv/pull/1152)




## Improvements
Expand Down
2 changes: 1 addition & 1 deletion docs/conversion_examples_gallery/recording/spikeglx.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We can easily convert all data stored in the native SpikeGLX folder structure to
>>>
>>> folder_path = f"{ECEPHY_DATA_PATH}/spikeglx/Noise4Sam_g0"
>>> converter = SpikeGLXConverterPipe(folder_path=folder_path)
>>>
Source data is valid!
>>> # Extract what metadata we can from the source files
>>> metadata = converter.get_metadata()
>>> # For data provenance we add the time zone information to the conversion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@ def __init__(
Folder path containing the binary files of the SpikeGLX recording.
stream_id: str, optional
Stream ID of the SpikeGLX recording.
Examples are 'nidq', 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc.
Examples are 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc.
file_path : FilePathType
Path to .bin file. Point to .ap.bin for SpikeGLXRecordingInterface and .lf.bin for SpikeGLXLFPInterface.
verbose : bool, default: True
Whether to output verbose text.
es_key : str, the key to access the metadata of the ElectricalSeries.
"""

if stream_id == "nidq":
raise ValueError(
"SpikeGLXRecordingInterface is not designed to handle nidq files. Use SpikeGLXNIDQInterface instead"
)

if file_path is not None and stream_id is None:
self.stream_id = fetch_stream_id_for_spikelgx_file(file_path)
self.folder_path = Path(file_path).parent

else:
self.stream_id = stream_id
self.folder_path = Path(folder_path)
Expand Down
271 changes: 237 additions & 34 deletions src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import warnings
from pathlib import Path
from typing import Optional
from typing import Literal, Optional

import numpy as np
from pydantic import ConfigDict, DirectoryPath, FilePath, validate_call
from pynwb import NWBFile
from pynwb.base import TimeSeries

from .spikeglx_utils import get_session_start_time
from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ....basedatainterface import BaseDataInterface
from ....tools.signal_processing import get_rising_frames_from_ttl
from ....utils import get_json_schema_from_method_signature
from ....tools.spikeinterface.spikeinterface import _recording_traces_to_hdmf_iterator
from ....utils import (
calculate_regular_series_rate,
get_json_schema_from_method_signature,
)


class SpikeGLXNIDQInterface(BaseRecordingExtractorInterface):
class SpikeGLXNIDQInterface(BaseDataInterface):
"""Primary data interface class for converting the high-pass (ap) SpikeGLX format."""

display_name = "NIDQ Recording"
keywords = BaseRecordingExtractorInterface.keywords + ("Neuropixels",)
keywords = ("Neuropixels", "nidq", "NIDQ", "SpikeGLX")
associated_suffixes = (".nidq", ".meta", ".bin")
info = "Interface for NIDQ board recording data."

ExtractorName = "SpikeGLXRecordingExtractor"
stream_id = "nidq"

@classmethod
def get_source_schema(cls) -> dict:
source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["x_pitch", "y_pitch"])
source_schema["properties"]["file_path"]["description"] = "Path to SpikeGLX .nidq file."
return source_schema

def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict:

extractor_kwargs = source_data.copy()
extractor_kwargs["folder_path"] = self.folder_path
extractor_kwargs["stream_id"] = self.stream_id
return extractor_kwargs

@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def __init__(
self,
Expand All @@ -56,12 +53,18 @@ def __init__(
Path to .nidq.bin file.
verbose : bool, default: True
Whether to output verbose text.
load_sync_channel : bool, default: False
Whether to load the last channel in the stream, which is typically used for synchronization.
If True, then the probe is not loaded.
es_key : str, default: "ElectricalSeriesNIDQ"
"""

if load_sync_channel:

warnings.warn(
"The 'load_sync_channel' parameter is deprecated and will be removed in June 2025. "
"The sync channel data is only available the raw files of spikeglx`.",
DeprecationWarning,
stacklevel=2,
)

if file_path is None and folder_path is None:
raise ValueError("Either 'file_path' or 'folder_path' must be provided.")

Expand All @@ -72,18 +75,36 @@ def __init__(
if folder_path is not None:
self.folder_path = Path(folder_path)

from spikeinterface.extractors import SpikeGLXRecordingExtractor

self.recording_extractor = SpikeGLXRecordingExtractor(
folder_path=self.folder_path,
stream_id="nidq",
all_annotations=True,
)

channel_ids = self.recording_extractor.get_channel_ids()
analog_channel_signatures = ["XA", "MA"]
self.analog_channel_ids = [ch for ch in channel_ids if "XA" in ch or "MA" in ch]
self.has_analog_channels = len(self.analog_channel_ids) > 0
self.has_digital_channels = len(self.analog_channel_ids) < len(channel_ids)
if self.has_digital_channels:
import ndx_events # noqa: F401
from spikeinterface.extractors import SpikeGLXEventExtractor

self.event_extractor = SpikeGLXEventExtractor(folder_path=self.folder_path)

super().__init__(
verbose=verbose,
load_sync_channel=load_sync_channel,
es_key=es_key,
folder_path=self.folder_path,
file_path=file_path,
)
self.source_data.update(file_path=str(file_path))

self.recording_extractor.set_property(
key="group_name", values=["NIDQChannelGroup"] * self.recording_extractor.get_num_channels()
)
self.subset_channels = None

signal_info_key = (0, self.stream_id) # Key format is (segment_index, stream_id)
signal_info_key = (0, "nidq") # Key format is (segment_index, stream_id)
self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key]
self.meta = self._signals_info_dict["meta"]

Expand All @@ -101,24 +122,206 @@ def get_metadata(self) -> dict:
manufacturer="National Instruments",
)

# Add groups metadata
metadata["Ecephys"]["Device"] = [device]
metadata["Devices"] = [device]

metadata["Ecephys"]["ElectrodeGroup"][0].update(
name="NIDQChannelGroup", description="A group representing the NIDQ channels.", device=device["name"]
)
metadata["Ecephys"]["Electrodes"] = [
dict(name="group_name", description="Name of the ElectrodeGroup this electrode is a part of."),
]
metadata["Ecephys"]["ElectricalSeriesNIDQ"][
"description"
] = "Raw acquisition traces from the NIDQ (.nidq.bin) channels."
return metadata

def get_channel_names(self) -> list[str]:
"""Return a list of channel names as set in the recording extractor."""
return list(self.recording_extractor.get_channel_ids())

def add_to_nwbfile(
self,
nwbfile: NWBFile,
metadata: Optional[dict] = None,
stub_test: bool = False,
starting_time: Optional[float] = None,
write_as: Literal["raw", "lfp", "processed"] = "raw",
write_electrical_series: bool = True,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
always_write_timestamps: bool = False,
):
"""
Add NIDQ board data to an NWB file, including both analog and digital channels if present.
Parameters
----------
nwbfile : NWBFile
The NWB file to which the NIDQ data will be added
metadata : Optional[dict], default: None
Metadata dictionary with device information. If None, uses default metadata
stub_test : bool, default: False
If True, only writes a small amount of data for testing
starting_time : Optional[float], default: None
DEPRECATED: Will be removed in June 2025. Starting time offset for the TimeSeries
write_as : Literal["raw", "lfp", "processed"], default: "raw"
DEPRECATED: Will be removed in June 2025. Specifies how to write the data
write_electrical_series : bool, default: True
DEPRECATED: Will be removed in June 2025. Whether to write electrical series data
iterator_type : Optional[str], default: "v2"
Type of iterator to use for data streaming
iterator_opts : Optional[dict], default: None
Additional options for the iterator
always_write_timestamps : bool, default: False
If True, always writes timestamps instead of using sampling rate
"""

if starting_time is not None:
warnings.warn(
"The 'starting_time' parameter is deprecated and will be removed in June 2025. "
"Use the time alignment methods for modifying the starting time or timestamps "
"of the data if needed: "
"https://neuroconv.readthedocs.io/en/main/user_guide/temporal_alignment.html",
DeprecationWarning,
stacklevel=2,
)

if write_as != "raw":
warnings.warn(
"The 'write_as' parameter is deprecated and will be removed in June 2025. "
"NIDQ should always be written in the acquisition module of NWB. "
"Writing data as LFP or processed data is not supported.",
DeprecationWarning,
stacklevel=2,
)

if write_electrical_series is not True:
warnings.warn(
"The 'write_electrical_series' parameter is deprecated and will be removed in June 2025. "
"The option to skip the addition of the data is no longer supported. "
"This option was used in ElectricalSeries to write the electrode and electrode group "
"metadata without the raw data.",
DeprecationWarning,
stacklevel=2,
)

if stub_test or self.subset_channels is not None:
recording = self.subset_recording(stub_test=stub_test)
else:
recording = self.recording_extractor

if metadata is None:
metadata = self.get_metadata()

# Add devices
device_metadata = metadata.get("Devices", [])
for device in device_metadata:
if device["name"] not in nwbfile.devices:
nwbfile.create_device(**device)

# Add analog and digital channels
if self.has_analog_channels:
self._add_analog_channels(
nwbfile=nwbfile,
recording=recording,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
always_write_timestamps=always_write_timestamps,
)

if self.has_digital_channels:
self._add_digital_channels(nwbfile=nwbfile)

def _add_analog_channels(
self,
nwbfile: NWBFile,
recording,
iterator_type: Optional[str],
iterator_opts: Optional[dict],
always_write_timestamps: bool,
):
"""
Add analog channels from the NIDQ board to the NWB file.
Parameters
----------
nwbfile : NWBFile
The NWB file to add the analog channels to
recording : BaseRecording
The recording extractor containing the analog channels
iterator_type : Optional[str]
Type of iterator to use for data streaming
iterator_opts : Optional[dict]
Additional options for the iterator
always_write_timestamps : bool
If True, always writes timestamps instead of using sampling rate
"""
analog_recorder = recording.select_channels(channel_ids=self.analog_channel_ids)
channel_names = analog_recorder.get_property(key="channel_names")
segment_index = 0
analog_data_iterator = _recording_traces_to_hdmf_iterator(
recording=analog_recorder,
segment_index=segment_index,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)

name = "TimeSeriesNIDQ"
description = f"Analog data from the NIDQ board. Channels are {channel_names} in that order."
time_series_kwargs = dict(name=name, data=analog_data_iterator, unit="a.u.", description=description)

if always_write_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
shifted_timestamps = timestamps
time_series_kwargs.update(timestamps=shifted_timestamps)
else:
recording_has_timestamps = recording.has_time_vector(segment_index=segment_index)
if recording_has_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
rate = calculate_regular_series_rate(series=timestamps)
recording_t_start = timestamps[0]
else:
rate = recording.get_sampling_frequency()
recording_t_start = recording._recording_segments[segment_index].t_start or 0

if rate:
starting_time = float(recording_t_start)
time_series_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency())
else:
shifted_timestamps = timestamps
time_series_kwargs.update(timestamps=shifted_timestamps)

time_series = TimeSeries(**time_series_kwargs)
nwbfile.add_acquisition(time_series)

def _add_digital_channels(self, nwbfile: NWBFile):
"""
Add digital channels from the NIDQ board to the NWB file as events.
Parameters
----------
nwbfile : NWBFile
The NWB file to add the digital channels to
"""
from ndx_events import LabeledEvents

event_channels = self.event_extractor.channel_ids
for channel_id in event_channels:
events_structure = self.event_extractor.get_events(channel_id=channel_id)
timestamps = events_structure["time"]
labels = events_structure["label"]

# Some channels have no events
if timestamps.size > 0:

# Timestamps are not ordered, the ones for off are first and then the ones for on
ordered_indices = np.argsort(timestamps)
ordered_timestamps = timestamps[ordered_indices]
ordered_labels = labels[ordered_indices]

unique_labels = np.unique(ordered_labels)
label_to_index = {label: index for index, label in enumerate(unique_labels)}
data = [label_to_index[label] for label in ordered_labels]

channel_name = channel_id.split("#")[-1]
description = f"On and Off Events from channel {channel_name}"
name = f"EventsNIDQDigitalChannel{channel_name}"
labeled_events = LabeledEvents(
name=name, description=description, timestamps=ordered_timestamps, data=data, labels=unique_labels
)
nwbfile.add_acquisition(labeled_events)

def get_event_times_from_ttl(self, channel_name: str) -> np.ndarray:
"""
Return the start of event times from the rising part of TTL pulses on one of the NIDQ channels.
Expand Down
Loading

0 comments on commit 5356263

Please sign in to comment.