"""DataInterfaces for SpikeGLX."""
import warnings
from pathlib import Path
import numpy as np
from pydantic import DirectoryPath, FilePath, validate_call
from .spikeglx_utils import (
fetch_stream_id_for_spikelgx_file,
get_device_metadata,
get_session_start_time,
)
from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ....utils import DeepDict, get_json_schema_from_method_signature
[docs]
class SpikeGLXRecordingInterface(BaseRecordingExtractorInterface):
"""
Primary SpikeGLX interface for converting raw SpikeGLX data.
Uses the :py:func:`~spikeinterface.extractors.read_spikeglx` reader from SpikeInterface.
"""
display_name = "SpikeGLX Recording"
keywords = BaseRecordingExtractorInterface.keywords + ("Neuropixels",)
associated_suffixes = (".imec{probe_index}", ".ap", ".lf", ".meta", ".bin")
info = "Interface for SpikeGLX recording data."
[docs]
@classmethod
def get_source_schema(cls) -> dict:
source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["x_pitch", "y_pitch"])
source_schema["properties"]["file_path"]["description"] = "Path to SpikeGLX ap.bin or lf.bin file."
return source_schema
def _initialize_extractor(self, interface_kwargs: dict):
"""Override to add stream_id and set folder_path."""
self.extractor_kwargs = interface_kwargs.copy()
self.extractor_kwargs.pop("verbose", None)
self.extractor_kwargs.pop("es_key", None)
self.extractor_kwargs["all_annotations"] = True
self.extractor_kwargs["folder_path"] = self.folder_path
self.extractor_kwargs["stream_id"] = self.stream_id
extractor_class = self.get_extractor_class()
extractor_instance = extractor_class(**self.extractor_kwargs)
return extractor_instance
@validate_call
def __init__(
self,
file_path: FilePath | None = None,
verbose: bool = False,
es_key: str | None = None,
folder_path: DirectoryPath | None = None,
stream_id: str | None = None,
):
"""
Parameters
----------
folder_path: DirectoryPath
Folder path containing the binary files of the SpikeGLX recording.
stream_id: str, optional
Stream ID of the SpikeGLX recording.
Examples are 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc.
file_path : FilePath
Path to .bin file. Point to .ap.bin for SpikeGLXRecordingInterface and .lf.bin for SpikeGLXLFPInterface.
verbose : bool, default: False
Whether to output verbose text.
es_key : str, the key to access the metadata of the ElectricalSeries.
"""
if stream_id == "nidq":
raise ValueError(
"SpikeGLXRecordingInterface is not designed to handle nidq files. Use SpikeGLXNIDQInterface instead"
)
if stream_id is not None and "SYNC" in stream_id:
raise ValueError(
"SpikeGLXRecordingInterface is not designed to handle the SYNC stream. Open an issue if you need this functionality."
)
if file_path is not None:
warnings.warn(
"file_path is deprecated and will be removed by the end of 2025. "
"The first argument of this interface will be `folder_path` afterwards. "
"Use folder_path and stream_id instead.",
DeprecationWarning,
stacklevel=2,
)
if file_path is not None and stream_id is None:
self.stream_id = fetch_stream_id_for_spikelgx_file(file_path)
self.folder_path = Path(file_path).parent
else:
self.stream_id = stream_id
self.folder_path = Path(folder_path)
super().__init__(
folder_path=folder_path,
verbose=verbose,
es_key=es_key,
)
signal_info_key = (0, self.stream_id) # Key format is (segment_index, stream_id)
self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key]
self.meta = self._signals_info_dict["meta"]
if es_key is None:
stream_kind = self._signals_info_dict["stream_kind"] # ap or lf
stream_kind_caps = stream_kind.upper()
device = self._signals_info_dict["device"].capitalize() # imec0, imec1, etc.
electrical_series_name = f"ElectricalSeries{stream_kind_caps}"
# Add imec{probe_index} to the electrical series name when there are multiple probes
# or undefined, `typeImEnabled` is present in the meta of all the production probes
self.probes_enabled_in_run = int(self.meta.get("typeImEnabled", 0))
if self.probes_enabled_in_run != 1:
electrical_series_name += f"{device}"
self.es_key = electrical_series_name
# Set electrode properties from probe information
probe = self.recording_extractor.get_probe()
channel_ids = self.recording_extractor.get_channel_ids()
# Should follow pattern 'Imec0', 'Imec1', etc.
probe_name = self.recording_extractor.stream_id[:5].capitalize()
# Set group_name property based on shank count
if probe.get_shank_count() > 1:
shank_ids = probe.shank_ids
self.recording_extractor.set_property(key="shank_ids", values=shank_ids)
group_name = [f"Neuropixels{probe_name}Shank{shank_id}" for shank_id in shank_ids]
else:
group_name = [f"Neuropixels{probe_name}"] * len(channel_ids)
self.recording_extractor.set_property(key="group_name", ids=channel_ids, values=group_name)
# Set contact geometry properties
contact_shapes = probe.contact_shapes
self.recording_extractor.set_property(key="contact_shapes", ids=channel_ids, values=contact_shapes)
# Set channel_name property for multi-stream deduplication
# For SpikeGLX, multiple streams (AP, LF) can record from the same electrodes
# We set channel_name to show all streams for each electrode (e.g., "AP0,LF0")
current_stream_kind = self._signals_info_dict["stream_kind"] # "ap" or "lf"
# Check if companion stream exists (AP has LF, LF has AP)
device = self._signals_info_dict["device"] # e.g., "imec0"
companion_stream_kind = "lf" if current_stream_kind == "ap" else "ap"
companion_stream_id = f"{device}.{companion_stream_kind}"
companion_key = (0, companion_stream_id)
signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict
has_companion_stream = companion_key in signals_info_dict
# Build channel names
channel_names = []
for channel_id in channel_ids:
# Extract channel number from channel_id (e.g., "imec0.ap#AP0" -> "0")
channel_number = channel_id.split("#")[-1][2:]
if has_companion_stream:
# Multi-stream: show both AP and LF (alphabetically sorted)
channel_name = f"AP{channel_number},LF{channel_number}"
else:
# Single stream: just use the current stream name
channel_name = f"{current_stream_kind.upper()}{channel_number}"
channel_names.append(channel_name)
self.recording_extractor.set_property(key="channel_name", ids=channel_ids, values=channel_names)
[docs]
def get_original_timestamps(self) -> np.ndarray:
new_recording = self._initialize_extractor(
self.source_data
) # TODO: add generic method for aliasing from NeuroConv signature to SI init
if self._number_of_segments == 1:
return new_recording.get_times()
else:
return [
new_recording.get_times(segment_index=segment_index)
for segment_index in range(self._number_of_segments)
]