Source code for neuroconv.datainterfaces.ecephys.spikeglx.spikeglxnidqinterface

import warnings
from pathlib import Path

import numpy as np
from pydantic import ConfigDict, DirectoryPath, FilePath, validate_call
from pynwb import NWBFile

from .spikeglx_utils import get_session_start_time
from ....basedatainterface import BaseDataInterface
from ....tools.signal_processing import get_rising_frames_from_ttl
from ....utils import (
    DeepDict,
    get_json_schema_from_method_signature,
)


[docs] class SpikeGLXNIDQInterface(BaseDataInterface): """Primary data interface class for converting the high-pass (ap) SpikeGLX format.""" display_name = "NIDQ Recording" keywords = ("Neuropixels", "nidq", "NIDQ", "SpikeGLX") associated_suffixes = (".nidq", ".meta", ".bin") info = "Interface for NIDQ board recording data."
[docs] @classmethod def get_source_schema(cls) -> dict: source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=[]) source_schema["properties"]["file_path"]["description"] = "Path to SpikeGLX .nidq file." source_schema["properties"]["metadata_key"]["description"] = ( "Key used to organize metadata in the metadata dictionary. This is especially useful " "when multiple NIDQ interfaces are used in the same conversion. The metadata_key is used " "to organize TimeSeries and Events metadata." ) return source_schema
@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def __init__( self, file_path: FilePath | None = None, verbose: bool = False, es_key: str = "ElectricalSeriesNIDQ", folder_path: DirectoryPath | None = None, metadata_key: str = "SpikeGLXNIDQ", ): """ Read analog and digital channel data from the NIDQ board for the SpikeGLX recording. The NIDQ stream records both analog and digital (usually non-neural) signals. XD channels are converted to events directly. XA, MA and MD channels are all written together to a single TimeSeries at the moment. Note that the multiplexed channels MA and MD are written multiplexed at the moment. Parameters ---------- folder_path : DirectoryPath Path to the folder containing the .nidq.bin file. file_path : FilePath Path to .nidq.bin file. verbose : bool, default: False Whether to output verbose text. es_key : str, default: "ElectricalSeriesNIDQ" metadata_key : str, default: "SpikeGLXNIDQ" Key used to organize metadata in the metadata dictionary. This is especially useful when multiple NIDQ interfaces are used in the same conversion. The metadata_key is used to organize TimeSeries and Events metadata. """ if file_path is not None: warnings.warn( "file_path is deprecated and will be removed by the end of 2025. " "The first argument of this interface will be `folder_path` afterwards. " "Use folder_path and stream_id instead.", DeprecationWarning, stacklevel=2, ) if file_path is None and folder_path is None: raise ValueError("Either 'file_path' or 'folder_path' must be provided.") if file_path is not None: file_path = Path(file_path) self.folder_path = file_path.parent if folder_path is not None: self.folder_path = Path(folder_path) from spikeinterface.extractors.extractor_classes import ( SpikeGLXRecordingExtractor, ) self.recording_extractor = SpikeGLXRecordingExtractor( folder_path=self.folder_path, stream_id="nidq", all_annotations=True, ) channel_ids = self.recording_extractor.get_channel_ids() # analog_channel_signatures are "XA" and "MA" self.analog_channel_ids = [ch for ch in channel_ids if "XA" in ch or "MA" in ch] self.has_analog_channels = len(self.analog_channel_ids) > 0 self.has_digital_channels = len(self.analog_channel_ids) < len(channel_ids) if self.has_digital_channels: import ndx_events # noqa: F401 from spikeinterface.extractors.extractor_classes import ( SpikeGLXEventExtractor, ) self.event_extractor = SpikeGLXEventExtractor(folder_path=self.folder_path) self.metadata_key = metadata_key super().__init__( verbose=verbose, es_key=es_key, folder_path=self.folder_path, file_path=file_path, ) signal_info_key = (0, "nidq") # Key format is (segment_index, stream_id) self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key] self.meta = self._signals_info_dict["meta"] def _get_default_events_metadata(self) -> dict: """ Returns default metadata for digital channel events. Single source of truth for default digital channel event metadata. Each call returns a new instance to prevent accidental mutation of global state. Returns ------- dict Dictionary mapping channel IDs to their default metadata configurations. """ default_metadata = {} if self.has_digital_channels: for channel_id in self.event_extractor.channel_ids: channel_name = channel_id.split("#")[-1] # Get extractor labels for this channel events_structure = self.event_extractor.get_events(channel_id=channel_id) raw_labels = events_structure["label"] # Build default labels_map from extractor (data value -> label string) if raw_labels.size > 0: unique_labels = np.unique(raw_labels) labels_map = {idx: str(label) for idx, label in enumerate(unique_labels)} else: labels_map = {} default_metadata[channel_id] = { "name": f"EventsNIDQDigitalChannel{channel_name}", "description": f"On and Off Events from channel {channel_name}", "labels_map": labels_map, } return default_metadata
[docs] def get_metadata(self) -> DeepDict: metadata = super().get_metadata() session_start_time = get_session_start_time(self.meta) if session_start_time: metadata["NWBFile"]["session_start_time"] = session_start_time # Device metadata device = dict( name="NIDQBoard", description="A NIDQ board used in conjunction with SpikeGLX.", manufacturer="National Instruments", ) metadata["Devices"] = [device] # TimeSeries metadata for analog channels if self.has_analog_channels: if "TimeSeries" not in metadata: metadata["TimeSeries"] = {} # Try to get channel names, fall back to channel IDs if not available channel_names_property = self.recording_extractor.get_property(key="channel_names") if channel_names_property is not None: channel_names = [channel_names_property[i] for i, ch_id in enumerate(self.analog_channel_ids)] else: channel_names = list(self.analog_channel_ids) metadata["TimeSeries"][self.metadata_key] = { "name": "TimeSeriesNIDQ", "description": f"Analog data from the NIDQ board. Channels are {channel_names} in that order.", } # Events metadata for digital channels if self.has_digital_channels: if "Events" not in metadata: metadata["Events"] = {} metadata["Events"][self.metadata_key] = self._get_default_events_metadata() return metadata
[docs] def get_channel_names(self) -> list[str]: """ Get a list of channel names from the recording extractor. Returns ------- list of str The names of all channels in the NIDQ recording. """ return list(self.recording_extractor.get_channel_ids())
def _get_digital_channel_config(self, channel_id: str, metadata: dict | None = None) -> dict: """ Get configuration for a digital channel from metadata or use defaults. This method can be overridden in subclasses to provide custom channel configurations. For example, IBL-specific interfaces can override this to provide semantic channel names. Parameters ---------- channel_id : str The channel ID (e.g., "nidq#XD0") metadata : dict | None, default: None The metadata dictionary that may contain custom channel configurations. If None or missing required entries, defaults are used. Returns ------- dict Configuration dictionary with keys: - name: str, name for the LabeledEvents object - description: str, description of what the channel represents - labels_map: dict, maps data values (int) to semantic label strings """ # Get default configuration default_events_metadata = self._get_default_events_metadata() default_config = default_events_metadata.get(channel_id, {}) # Check if custom configuration exists in metadata (don't modify it) if metadata is not None: events_metadata = metadata.get("Events", {}) interface_events = events_metadata.get(self.metadata_key, {}) custom_config = interface_events.get(channel_id, {}) else: custom_config = {} # Merge custom config with defaults (custom takes precedence) config = { "name": custom_config.get("name", default_config.get("name")), "description": custom_config.get("description", default_config.get("description")), "labels_map": custom_config.get("labels_map", default_config.get("labels_map")), } return config
[docs] def add_to_nwbfile( self, nwbfile: NWBFile, metadata: dict | None = None, stub_test: bool = False, iterator_type: str | None = "v2", iterator_opts: dict | None = None, always_write_timestamps: bool = False, ): """ Add NIDQ board data to an NWB file, including both analog and digital channels if present. Parameters ---------- nwbfile : NWBFile The NWB file to which the NIDQ data will be added metadata : dict | None, default: None Metadata dictionary with device information. If None, uses default metadata stub_test : bool, default: False If True, only writes a small amount of data for testing iterator_type : str | None, default: "v2" Type of iterator to use for data streaming iterator_opts : dict | None, default: None Additional options for the iterator always_write_timestamps : bool, default: False If True, always writes timestamps instead of using sampling rate """ from ....tools.spikeinterface import _stub_recording recording = self.recording_extractor if stub_test: recording = _stub_recording(recording=self.recording_extractor) metadata = metadata or self.get_metadata() # Add devices device_metadata = metadata.get("Devices", []) for device in device_metadata: if device["name"] not in nwbfile.devices: nwbfile.create_device(**device) # Add analog and digital channels if self.has_analog_channels: self._add_analog_channels( nwbfile=nwbfile, recording=recording, iterator_type=iterator_type, iterator_opts=iterator_opts, always_write_timestamps=always_write_timestamps, metadata=metadata, ) if self.has_digital_channels: self._add_digital_channels(nwbfile=nwbfile, metadata=metadata)
def _add_analog_channels( self, nwbfile: NWBFile, recording, iterator_type: str | None, iterator_opts: dict | None, always_write_timestamps: bool, metadata: dict | None = None, ): """ Add analog channels from the NIDQ board to the NWB file. Parameters ---------- nwbfile : NWBFile The NWB file to add the analog channels to recording : BaseRecording The recording extractor containing the analog channels iterator_type : str | None Type of iterator to use for data streaming iterator_opts : dict | None Additional options for the iterator always_write_timestamps : bool If True, always writes timestamps instead of using sampling rate metadata : dict | None, default: None Metadata dictionary with TimeSeries information """ from ....tools.spikeinterface import add_recording_as_time_series_to_nwbfile analog_recorder = recording.select_channels(channel_ids=self.analog_channel_ids) # Create default metadata if not provided if metadata is None: metadata = self.get_metadata() # Ensure TimeSeries metadata exists for this interface if "TimeSeries" not in metadata: metadata["TimeSeries"] = {} if self.metadata_key not in metadata["TimeSeries"]: channel_names = analog_recorder.get_property(key="channel_names") metadata["TimeSeries"][self.metadata_key] = { "name": "TimeSeriesNIDQ", "description": f"Analog data from the NIDQ board. Channels are {channel_names} in that order.", } # Use add_recording_as_time_series_to_nwbfile to add the time series add_recording_as_time_series_to_nwbfile( recording=analog_recorder, nwbfile=nwbfile, metadata=metadata, iterator_type=iterator_type, iterator_opts=iterator_opts, always_write_timestamps=always_write_timestamps, metadata_key=self.metadata_key, ) def _add_digital_channels(self, nwbfile: NWBFile, metadata: dict | None = None): """ Add digital channels from the NIDQ board to the NWB file as events. Parameters ---------- nwbfile : NWBFile The NWB file to add the digital channels to metadata : dict | None, default: None Metadata dictionary that may contain custom channel configurations. If None or missing required entries, defaults from get_metadata() are used. """ from ndx_events import LabeledEvents event_channels = self.event_extractor.channel_ids for channel_id in event_channels: events_structure = self.event_extractor.get_events(channel_id=channel_id) timestamps = events_structure["time"] raw_labels = events_structure["label"] # Some channels have no events if timestamps.size > 0: # Timestamps are not ordered, the ones for off are first and then the ones for on ordered_indices = np.argsort(timestamps) ordered_timestamps = timestamps[ordered_indices] ordered_raw_labels = raw_labels[ordered_indices] # Get configuration for this channel # _get_digital_channel_config will check metadata first, then fall back to defaults config = self._get_digital_channel_config(channel_id, metadata) # Get labels_map: {data_value: label_string} labels_map = config["labels_map"] # Build reverse mapping from extractor labels to data values # First, get unique extractor labels and map them to consecutive integers unique_raw_labels = np.unique(raw_labels) extractor_label_to_value = {str(label): idx for idx, label in enumerate(unique_raw_labels)} # Map ordered raw labels to data values data = [extractor_label_to_value[str(label)] for label in ordered_raw_labels] # Fill missing mappings with default labels from extractor # This ensures all data values have a corresponding label num_unique_values = len(unique_raw_labels) complete_labels_map = dict(labels_map) # Copy user-provided mappings default_events_metadata = self._get_default_events_metadata() default_labels_map = default_events_metadata.get(channel_id, {}).get("labels_map", {}) for data_value in range(num_unique_values): if data_value not in complete_labels_map: # Fill with default if available, otherwise use generic label complete_labels_map[data_value] = default_labels_map.get(data_value, f"unknown_{data_value}") # Derive labels list from complete labels_map for LabeledEvents # Sort by data value to ensure correct ordering sorted_items = sorted(complete_labels_map.items()) labels_list = [label for _, label in sorted_items] labeled_events = LabeledEvents( name=config["name"], description=config["description"], timestamps=ordered_timestamps, data=data, labels=labels_list, ) nwbfile.add_acquisition(labeled_events)
[docs] def get_event_times_from_ttl(self, channel_name: str) -> np.ndarray: """ Return the start of event times from the rising part of TTL pulses on one of the NIDQ channels. Parameters ---------- channel_name : str Name of the channel in the .nidq.bin file. Returns ------- rising_times : numpy.ndarray The times of the rising TTL pulses. """ # TODO: consider RAM cost of these operations and implement safer buffering version rising_frames = get_rising_frames_from_ttl( trace=self.recording_extractor.get_traces(channel_ids=[channel_name]) ) nidq_timestamps = self.recording_extractor.get_times() rising_times = nidq_timestamps[rising_frames] return rising_times