"""Collection of helper functions related to configuration of datasets dependent on backend."""
from pathlib import Path
from typing import Generator, Literal
import h5py
import numpy as np
import zarr
from hdmf import Container
from hdmf.data_utils import DataIO
from hdmf.utils import get_data_shape
from hdmf_zarr import NWBZarrIO
from pynwb import NWBHDF5IO, NWBFile, get_manager
from pynwb.base import DynamicTable, TimeSeriesReferenceVectorData
from pynwb.file import NWBContainer
from ._configuration_models import DATASET_IO_CONFIGURATIONS
from ._configuration_models._base_dataset_io import DatasetIOConfiguration
def _get_io_mode(io: NWBHDF5IO | NWBZarrIO) -> str:
"""NWBHDF5IO and NWBZarrIO have different ways of storing the io mode (e.g. "r", "a", "w") they used on a path."""
if isinstance(io, NWBHDF5IO):
return io.mode
elif isinstance(io, NWBZarrIO):
return io._ZarrIO__mode
def _is_dataset_written_to_file(
candidate_dataset: h5py.Dataset | zarr.Array,
backend: Literal["hdf5", "zarr"],
existing_file_path: str | None,
) -> bool:
"""
Determine if the neurodata object is already written to the file on disk.
This object should then be skipped by the `get_io_datasets` function when working in append mode.
"""
if existing_file_path is None:
return False
normalized_existing = Path(existing_file_path).resolve()
return (
isinstance(candidate_dataset, h5py.Dataset) # If the source data is an HDF5 Dataset
and backend == "hdf5"
and Path(candidate_dataset.file.filename).resolve()
== normalized_existing # If the source HDF5 Dataset is the appending NWBFile
) or (
isinstance(candidate_dataset, zarr.Array) # If the source data is a Zarr Array
and backend == "zarr"
and Path(candidate_dataset.store.path).resolve()
== normalized_existing # If the source Zarr 'file' is the appending NWBFile
)
[docs]
def get_default_dataset_io_configurations(
nwbfile: NWBFile,
backend: None | Literal["hdf5", "zarr"] = None, # None for auto-detect from append mode, otherwise required
) -> Generator[DatasetIOConfiguration, None, None]:
"""
Generate DatasetIOConfiguration objects for wrapping NWB file objects with a specific backend.
This method automatically detects all objects in an NWB file that can be wrapped in a hdmf.DataIO.
If the NWB file is in append mode, it supports auto-detection of the backend.
Otherwise, it requires a backend specification.
Parameters
----------
nwbfile : pynwb.NWBFile
An in-memory NWBFile object, either generated from the base class or read from an existing file of any backend.
backend : "hdf5" or "zarr"
Which backend format type you would like to use in configuring each dataset's compression methods and options.
Yields
------
DatasetIOConfiguration
A summary of each detected object that can be wrapped in a hdmf.DataIO.
"""
DatasetIOConfigurationClass = DATASET_IO_CONFIGURATIONS[backend]
if backend is None and nwbfile.read_io is None:
raise ValueError(
"Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` was not "
"read from an existing file!"
)
if backend is None and nwbfile.read_io is not None and nwbfile.read_io.mode not in ("r+", "a"):
raise ValueError(
"Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` is being appended."
)
detected_backend = None
existing_file_path = None
if isinstance(nwbfile.read_io, NWBHDF5IO) and _get_io_mode(io=nwbfile.read_io) in ("r+", "a"):
detected_backend = "hdf5"
existing_file_path = nwbfile.read_io.source
elif isinstance(nwbfile.read_io, NWBZarrIO) and _get_io_mode(io=nwbfile.read_io) in ("r+", "a"):
detected_backend = "zarr"
existing_file_path = nwbfile.read_io.source
backend = backend or detected_backend
if detected_backend is not None and detected_backend != backend:
raise ValueError(
f"Detected backend '{detected_backend}' for appending file, but specified `backend` "
f"({backend}) does not match! Set `backend=None` or remove the keyword argument to allow it to auto-detect."
)
known_dataset_fields = ("data", "timestamps")
manager = get_manager()
builder = manager.build(nwbfile, export=True)
# export = True ensures that the builder is created fresh (as opposed to a cached version),
# which is essential to make sure that all of the datasets are properly represented.
for neurodata_object in nwbfile.objects.values():
if isinstance(neurodata_object, DynamicTable):
dynamic_table = neurodata_object # For readability
for column in dynamic_table.columns:
candidate_dataset = column.data # VectorData object
# noinspection PyTypeChecker
if _is_dataset_written_to_file(
candidate_dataset=candidate_dataset,
backend=backend,
existing_file_path=existing_file_path,
):
continue # Skip
# Skip over columns that are already wrapped in DataIO
if isinstance(candidate_dataset, DataIO):
continue # Skip
# Skip over columns whose values are links, such as the 'group' of an ElectrodesTable
if any(isinstance(value, Container) for value in candidate_dataset):
continue # Skip
# Skip when columns whose values are a reference type
if isinstance(column, TimeSeriesReferenceVectorData):
continue
# Skip datasets with any zero-length axes
dataset_name = "data"
candidate_dataset = getattr(column, dataset_name)
full_shape = get_data_shape(data=candidate_dataset)
if any(axis_length == 0 for axis_length in full_shape):
continue
dataset_io_configuration = DatasetIOConfigurationClass.from_neurodata_object_with_defaults(
neurodata_object=column, dataset_name=dataset_name, builder=builder
)
yield dataset_io_configuration
elif isinstance(neurodata_object, NWBContainer):
for known_dataset_field in known_dataset_fields:
# Skip optional fields that aren't present
if known_dataset_field not in neurodata_object.fields:
continue
candidate_dataset = getattr(neurodata_object, known_dataset_field)
# Skip if already written to file
# noinspection PyTypeChecker
if _is_dataset_written_to_file(
candidate_dataset=candidate_dataset, backend=backend, existing_file_path=existing_file_path
):
continue
# Skip over datasets that are already wrapped in DataIO
if isinstance(candidate_dataset, DataIO):
continue
# Skip edge case of in-memory ImageSeries with external mode; data is in fields and is empty array
if isinstance(candidate_dataset, np.ndarray) and candidate_dataset.size == 0:
continue
# Skip datasets with any zero-length axes
candidate_dataset = getattr(neurodata_object, known_dataset_field)
full_shape = get_data_shape(data=candidate_dataset)
if any(axis_length == 0 for axis_length in full_shape):
continue
dataset_io_configuration = DatasetIOConfigurationClass.from_neurodata_object_with_defaults(
neurodata_object=neurodata_object, dataset_name=known_dataset_field, builder=builder
)
yield dataset_io_configuration
[docs]
def get_existing_dataset_io_configurations(nwbfile: NWBFile) -> Generator[DatasetIOConfiguration, None, None]:
"""
Generate DatasetIOConfiguration objects for each neurodata object in an nwbfile.
Parameters
----------
nwbfile : pynwb.NWBFile
An NWBFile object that has been read from an existing file with an existing backend configuration.
backend : "hdf5" or "zarr"
Which backend format type you would like to use in configuring each dataset's compression methods and options.
Yields
------
DatasetIOConfiguration
A configuration object for each dataset in the NWB file.
"""
if nwbfile.read_io is None:
raise ValueError("nwbfile must be read from an existing file!")
backend = None
if isinstance(nwbfile.read_io, NWBHDF5IO):
backend = "hdf5"
elif isinstance(nwbfile.read_io, NWBZarrIO):
backend = "zarr"
DatasetIOConfigurationClass = DATASET_IO_CONFIGURATIONS[backend]
known_dataset_fields = ("data", "timestamps")
for neurodata_object in nwbfile.objects.values():
if isinstance(neurodata_object, DynamicTable):
dynamic_table = neurodata_object # For readability
for column in dynamic_table.columns:
candidate_dataset = column.data # VectorData object
# Skip over columns whose values are links, such as the 'group' of an ElectrodesTable
if any(isinstance(value, Container) for value in candidate_dataset):
continue # Skip
# Skip when columns whose values are a reference type
if isinstance(column, TimeSeriesReferenceVectorData):
continue
# Skip datasets with any zero-length axes
dataset_name = "data"
candidate_dataset = getattr(column, dataset_name)
full_shape = get_data_shape(data=candidate_dataset)
if any(axis_length == 0 for axis_length in full_shape):
continue
dataset_io_configuration = DatasetIOConfigurationClass.from_neurodata_object_with_existing(
neurodata_object=column,
dataset_name=dataset_name,
)
yield dataset_io_configuration
elif isinstance(neurodata_object, NWBContainer):
for known_dataset_field in known_dataset_fields:
# Skip optional fields that aren't present
if known_dataset_field not in neurodata_object.fields:
continue
candidate_dataset = getattr(neurodata_object, known_dataset_field)
# Skip datasets with any zero-length axes
candidate_dataset = getattr(neurodata_object, known_dataset_field)
full_shape = get_data_shape(data=candidate_dataset)
if any(axis_length == 0 for axis_length in full_shape):
continue
dataset_io_configuration = DatasetIOConfigurationClass.from_neurodata_object_with_existing(
neurodata_object=neurodata_object,
dataset_name=known_dataset_field,
)
yield dataset_io_configuration