Source code for neuroconv.datainterfaces.ophys.scanimage.scanimageimaginginterfaces

import datetime
import json
import warnings
from pathlib import Path
from typing import Optional

import numpy as np
from dateutil.parser import parse as dateparse
from pydantic import FilePath, validate_call

from ..baseimagingextractorinterface import BaseImagingExtractorInterface
from ....utils import DeepDict


[docs] class ScanImageImagingInterface(BaseImagingExtractorInterface): """Interface for reading TIFF files produced via ScanImage software. This interface is designed to handle the structure of ScanImage TIFF files, which can contain multi-channel and both planar and volumetric data. It supports both single-file and multi-file datasets generated by ScanImage in various acquisition modes (grab, focus, loop). ScanImage is a software package for controlling laser scanning microscopes, particularly for two-photon and multi-photon imaging. The interface extracts imaging data and metadata from ScanImage TIFF files and converts them to NWB format. Key features: * Handles multi-channel data with channel selection * Supports volumetric (multi-plane) imaging data * Automatically detects and loads multi-file datasets based on ScanImage naming conventions * Extracts and provides access to ScanImage metadata * Efficiently retrieves frames using lazy loading * Handles flyback frames in volumetric data """ display_name = "ScanImage Imaging" associated_suffixes = (".tif", ".tiff") info = "Interface for ScanImage TIFF files." @validate_call def __init__( self, file_path: Optional[FilePath] = None, channel_name: Optional[str] = None, slice_sample: Optional[int] = None, plane_index: Optional[int] = None, file_paths: Optional[list[FilePath]] = None, interleave_slice_samples: Optional[bool] = None, plane_name: str | None = None, fallback_sampling_frequency: float | None = None, verbose: bool = False, ): """ Parameters ---------- file_path : FilePath, optional Path to the ScanImage TIFF file. If this is part of a multi-file series, this should be the first file. Either `file_path` or `file_paths` must be provided. channel_name : str, optional Name of the channel to extract (e.g., "Channel 1", "Channel 2"). - If None and only one channel is available, that channel will be used. - If None and multiple channels are available, an error will be raised. - Use `get_available_channels(file_path)` to see available channels before creating the interface. slice_sample : int, optional Controls how to handle multiple frames per slice in volumetric data: ScanImage data can contain multiple frames for a single plane. Use this to select a specific frame from each slice. if None, this will throw an error. Select a slice sample or set `interleave_slice_samples` to True to interleave all the slice samples as separate volumes/samples. Note that this will scramble the acquisition order of the frames. This parameter has no effect when frames_per_slice = 1. plane_index : int, optional Must be between ``0`` and ``num_planes-1``. Used to extract a specific plane from volumetric data. When provided: - The resulting extractor will be planar - Each sample will contain only data for the specified plane - This parameter has no effect on planar (non-volumetric) data. file_paths : list[Path | str], optional List of file paths to use. This is an escape value that can be used in case the automatic file detection doesn't work correctly and can be used to override the automatic file detection. This is useful when: - Automatic detection doesn't work correctly - You need to specify a custom subset of files - You need to control the exact order of files The file paths must be provided in the temporal order of the frames in the dataset. interleave_slice_samples : bool, optional Controls whether to interleave all slice samples as separate time points when frames_per_slice > 1: - If True: Interleaves all slice samples as separate time points, increasing the effective number of samples by frames_per_slice. This treats each slice_sample as a distinct sample. - If False: Requires a specific slice_sample to be provided when frames_per_slice > 1. - This parameter has no effect when ``frames_per_slice = 1`` or when ``slice_sample`` is provided. - Default is True for backward compatibility (will change to False after November 2025). plane_name : str, optional Deprecated. Use plane_index instead. Will be removed in or after November 2025. fallback_sampling_frequency : float, optional Deprecated. Will be removed in or after November 2025. verbose : bool, default: False If True, will print detailed information about the interface initialization process. """ file_paths = [Path(file_path)] if file_path else file_paths header_version = self.get_scanimage_version(file_path=file_paths[0]) if header_version not in [3, 4, 5]: raise ValueError( f"Unsupported ScanImage version {header_version}. Supported versions are 3, 4, and 5." f"Most likely this is a legacy version, use ScanImageLegacyImagingInterface instead." ) # Backward compatibility flag - will be set to False after November 2025 if interleave_slice_samples is None: interleave_slice_samples = True warnings.warn( "interleave_slice_samples currently set to True for backward compatibility. \n" "This will be set to False by default in or after November 2025." ) if plane_name is not None: warnings.warn( "The `plane_name` argument is deprecated and will be removed in or after November 2025. Use `plane_index` instead." ) plane_index = int(plane_name) if fallback_sampling_frequency is not None: warnings.warn( "The `fallback_sampling_frequency` argument is deprecated and will be removed in or after November 2025" ) self.channel_name = channel_name self.plane_index = plane_index super().__init__( file_path=file_path, channel_name=channel_name, file_paths=file_paths, plane_index=plane_index, slice_sample=slice_sample, interleave_slice_samples=interleave_slice_samples, verbose=verbose, ) # Make sure the timestamps are available, the extractor caches them times = self.imaging_extractor.get_times() self.imaging_extractor.set_times(times=times)
[docs] @classmethod def get_extractor_class(cls): from roiextractors import ScanImageImagingExtractor return ScanImageImagingExtractor
def _initialize_extractor(self, interface_kwargs: dict): self.extractor_kwargs = interface_kwargs.copy() self.extractor_kwargs.pop("verbose", None) self.extractor_kwargs.pop("photon_series_type", None) extractor_class = self.get_extractor_class() extractor_instance = extractor_class(**self.extractor_kwargs) return extractor_instance
[docs] def get_metadata(self) -> DeepDict: """ Get metadata for the ScanImage imaging data. Returns ------- DeepDict The metadata dictionary containing imaging metadata from the ScanImage files. This includes: - Session start time extracted from the ScanImage file - Device information for the microscope - Optical channel configuration - Imaging plane details including grid spacing and origin coordinates if available - Photon series metadata with scan line rate and other acquisition parameters """ metadata = super().get_metadata() session_start_time = self._get_session_start_time() if session_start_time: metadata["NWBFile"]["session_start_time"] = session_start_time # Extract ScanImage-specific metadata if hasattr(self.imaging_extractor, "_general_metadata"): # Add general metadata to a custom field scanimage_metadata = self.imaging_extractor._general_metadata # Update device information device_name = "Microscope" metadata["Ophys"]["Device"][0].update(name=device_name, description=f"Microscope controlled by ScanImage") channel_name_string = self.channel_name.replace(" ", "").capitalize() optical_channel_name = f"OpticalChannel{channel_name_string}" optical_channel_metadata = { "name": optical_channel_name, "description": "Optical channel from ScanImage acquisition", "emission_lambda": np.nan, } # Update imaging plane metadata imaging_plane_metadata = metadata["Ophys"]["ImagingPlane"][0] plane_index_string = f"Plane{self.plane_index}" if self.plane_index is not None else "" imaging_plane_name = f"ImagingPlane{channel_name_string}{plane_index_string}" imaging_plane_metadata.update( name=imaging_plane_name, device=device_name, imaging_rate=self.imaging_extractor.get_sampling_frequency(), description="Imaging plane from ScanImage acquisition", optical_channel=[optical_channel_metadata], ) # Update photon series metadata photon_series_key = self.photon_series_type # "TwoPhotonSeries" or "OnePhotonSeries" photon_series_metadata = metadata["Ophys"][photon_series_key][0] photon_series_name = f"{photon_series_key}{channel_name_string}{plane_index_string}" photon_series_metadata["imaging_plane"] = imaging_plane_name photon_series_metadata["name"] = photon_series_name photon_series_metadata["description"] = f"Imaging data acquired using ScanImage for {self.channel_name}" # Add additional metadata if available if "FrameData" in scanimage_metadata: frame_data = scanimage_metadata["FrameData"] # Calculate scan line rate from line period if available if "SI.hRoiManager.linePeriod" in frame_data: scan_line_rate = 1 / float(frame_data["SI.hRoiManager.linePeriod"]) photon_series_metadata.update(scan_line_rate=scan_line_rate) elif "SI.hScan2D.scannerFrequency" in frame_data: photon_series_metadata.update(scan_line_rate=frame_data["SI.hScan2D.scannerFrequency"]) # Add version information to device description if available if "SI.VERSION_MAJOR" in frame_data: version = f"{frame_data.get('SI.VERSION_MAJOR', '')}.{frame_data.get('SI.VERSION_MINOR', '')}.{frame_data.get('SI.VERSION_UPDATE', '')}" metadata["Ophys"]["Device"][0][ "description" ] = f"Microscope and acquisition data with ScanImage (version {version})" # Extract ROI metadata if available if "RoiGroups" in scanimage_metadata: roi_metadata = scanimage_metadata["RoiGroups"] # Extract grid spacing and origin coordinates from scanfields grid_spacing = None grid_spacing_unit = "n.a" origin_coords = None origin_coords_unit = "n.a" if "imagingRoiGroup" in roi_metadata and "rois" in roi_metadata["imagingRoiGroup"]: rois = roi_metadata["imagingRoiGroup"]["rois"] if isinstance(rois, dict) and "scanfields" in rois: scanfields = rois["scanfields"] if "sizeXY" in scanfields and "pixelResolutionXY" in scanfields: fov_size_in_um = np.array(scanfields["sizeXY"]) frame_dimension = np.array(scanfields["pixelResolutionXY"]) grid_spacing = fov_size_in_um / frame_dimension grid_spacing_unit = "micrometers" if "centerXY" in scanfields: origin_coords = scanfields["centerXY"] origin_coords_unit = "micrometers" # Update imaging plane metadata with grid spacing and origin coordinates if grid_spacing is not None: imaging_plane_metadata.update( grid_spacing=grid_spacing.tolist(), grid_spacing_unit=grid_spacing_unit ) if origin_coords is not None: imaging_plane_metadata.update(origin_coords=origin_coords, origin_coords_unit=origin_coords_unit) return metadata
def _get_session_start_time(self) -> datetime.datetime | None: """ Extract and parse the 'epoch' metadata from a ScanImage TIFF file as the session start time. This method opens the first frame of the TIFF file, extracts the 'epoch' field from the ImageDescription tag, and parses it into a datetime object. Returns ------- datetime Parsed datetime from the 'epoch' metadata. Raises ------ ValueError If 'epoch' metadata is not found in the file. """ from tifffile import TiffReader tiff_file_path = self.imaging_extractor.file_path with TiffReader(tiff_file_path) as tif: image_description = tif.pages[0].tags["ImageDescription"].value import re match = re.search(r"epoch\s*=\s*\[([^\]]+)\]", image_description) if not match: raise ValueError(f"'epoch' field not found in {tiff_file_path}") epoch_values = match.group(1).split() import warnings if len(epoch_values) != 6: warnings.warn( f"Expected 6 values in 'epoch' field, found {len(epoch_values)}: \n" f"Epoch field {epoch_values}." ) return None year, month, day, hour, minute, seconds = map(float, epoch_values) second_int = int(seconds) microsecond = int((seconds - second_int) * 1e6) return datetime.datetime(int(year), int(month), int(day), int(hour), int(minute), second_int, microsecond)
[docs] @staticmethod def get_scanimage_version(file_path: Path | str) -> int: """ Extract the ScanImage version from a BigTIFF file without validation. This method reads the binary header of the TIFF file to determine the ScanImage version that produced it. It supports ScanImage versions 3, 4, and 5. Parameters ---------- file_path : Path | str Path to the ScanImage TIFF file Returns ------- int ScanImage version number (3, 4, or 5) """ with open(file_path, "rb") as f: # Skip the TIFF header (16 bytes) and the Magic Number (4 bytes) f.seek(20) # Read ScanImage version (4 bytes) version_bytes = f.read(4) scanimage_version = int.from_bytes(version_bytes, byteorder="little") return scanimage_version
[docs] @staticmethod def get_available_channels(file_path: Path | str) -> list[str]: """ Get the channel names available in a ScanImage TIFF file. This static method extracts the channel names from a ScanImage TIFF file without needing to create an interface instance. This is useful for determining which channels are available before creating an interface. Parameters ---------- file_path : Path | str Path to the ScanImage TIFF file. Returns ------- list[str] List of channel names available in the file (e.g., ["Channel 1", "Channel 2"]). """ from roiextractors import ScanImageImagingExtractor return ScanImageImagingExtractor.get_available_channels(file_path=file_path)
[docs] @staticmethod def get_available_planes(file_path: Path | str) -> list[str]: """ Get the available plane names from a ScanImage TIFF file. This static method determines the number of planes (Z-slices) in a volumetric ScanImage dataset without needing to create an interface instance. This is useful for determining which planes are available before creating an interface. Parameters ---------- file_path : Path | str Path to the ScanImage TIFF file. Returns ------- list[str] List of plane names available in the file. For volumetric data, this will be a list of strings representing plane indices (e.g., ["0", "1", "2"]). """ from roiextractors import ScanImageImagingExtractor return ScanImageImagingExtractor.get_available_planes(file_path=file_path)
[docs] class ScanImageLegacyImagingInterface(BaseImagingExtractorInterface): """Interface for reading TIFF files produced via ScanImage v3.8.""" display_name = "ScanImage Imaging" associated_suffixes = (".tif",) info = "Interface for ScanImage v3.8 TIFF files."
[docs] @classmethod def get_source_schema(cls) -> dict: source_schema = super().get_source_schema() source_schema["properties"]["file_path"]["description"] = "Path to Tiff file." return source_schema
[docs] @classmethod def get_extractor_class(cls): from roiextractors import ScanImageLegacyImagingExtractor return ScanImageLegacyImagingExtractor
def _initialize_extractor(self, interface_kwargs: dict): self.extractor_kwargs = interface_kwargs.copy() self.extractor_kwargs.pop("fallback_sampling_frequency", None) self.extractor_kwargs["sampling_frequency"] = self.sampling_frequency extractor_class = self.get_extractor_class() extractor_instance = extractor_class(**self.extractor_kwargs) return extractor_instance @validate_call def __init__( self, file_path: FilePath, fallback_sampling_frequency: float | None = None, verbose: bool = False, ): """ DataInterface for reading Tiff files that are generated by ScanImage v3.8. This interface extracts the metadata from the exif of the tiff file. Parameters ---------- file_path: FilePath Path to tiff file. fallback_sampling_frequency: float, optional The sampling frequency can usually be extracted from the scanimage metadata in exif:ImageDescription:state.acq.frameRate. If not, use this. """ from roiextractors.extractors.tiffimagingextractors.scanimagetiff_utils import ( extract_extra_metadata, ) self.image_metadata = extract_extra_metadata(file_path=file_path) if "state.acq.frameRate" in self.image_metadata: sampling_frequency = float(self.image_metadata["state.acq.frameRate"]) elif "SI.hRoiManager.scanFrameRate" in self.image_metadata: sampling_frequency = float(self.image_metadata["SI.hRoiManager.scanFrameRate"]) else: assert_msg = ( "sampling frequency not found in image metadata, " "input the frequency using the argument `fallback_sampling_frequency`" ) assert fallback_sampling_frequency is not None, assert_msg sampling_frequency = fallback_sampling_frequency self.sampling_frequency = sampling_frequency super().__init__(file_path=file_path, fallback_sampling_frequency=fallback_sampling_frequency, verbose=verbose)
[docs] def get_metadata(self) -> DeepDict: """ Get metadata for the ScanImage imaging data. Returns ------- dict Dictionary containing metadata including session start time and device information specific to the ScanImage system. """ device_number = 0 # Imaging plane metadata is a list with metadata for each plane metadata = super().get_metadata() if "state.internal.triggerTimeString" in self.image_metadata: extracted_session_start_time = dateparse(self.image_metadata["state.internal.triggerTimeString"]) metadata["NWBFile"].update(session_start_time=extracted_session_start_time) # Extract many scan image properties and attach them as dic in the description ophys_metadata = metadata["Ophys"] two_photon_series_metadata = ophys_metadata["TwoPhotonSeries"][device_number] if self.image_metadata is not None: extracted_description = json.dumps(self.image_metadata) two_photon_series_metadata.update(description=extracted_description) return metadata
[docs] def get_scanimage_major_version(scanimage_metadata: dict) -> str: """ Determine the version of ScanImage that produced the TIFF file. Parameters ---------- scanimage_metadata : dict Dictionary of metadata extracted from a TIFF file produced via ScanImage. Returns ------- version: str The version of ScanImage that produced the TIFF file. Raises ------ ValueError If the ScanImage version could not be determined from metadata. """ if "SI.VERSION_MAJOR" in scanimage_metadata: return scanimage_metadata["SI.VERSION_MAJOR"] elif "state.software.version" in scanimage_metadata: return scanimage_metadata["state.software.version"] raise ValueError("ScanImage version could not be determined from metadata.")