Skip to content

Commit

Permalink
Merge pull request #235 from ericpre/add_pixel_trigger_quantumdetector
Browse files Browse the repository at this point in the history
Add support for pixel trigger acquisition in quantumdetector reader
  • Loading branch information
ericpre authored Apr 1, 2024
2 parents a0dbed3 + 468482c commit 1becedd
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 33 deletions.
11 changes: 0 additions & 11 deletions doc/user_guide/supported_formats/quantumdetector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,6 @@ store a series of diffraction patterns from scanning transmission electron
diffraction measurements. It supports reading data from camera with one or
four quadrants.

If a ``hdr`` file with the same file name was saved along the ``mib`` file,
it will be used to infer the navigation shape of the providing that the option
"line trigger" was used for the acquisition. Alternatively, the navigation
shape can be specified as an argument:

.. code-block:: python
>>> from rsciio.quantumdetector import file_reader
>>> s_dict = file_reader("file.mib", navigation_shape=(256, 256))
API functions
^^^^^^^^^^^^^

Expand Down
87 changes: 72 additions & 15 deletions rsciio/quantumdetector/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import os
from pathlib import Path
import warnings

import dask.array as da
import numpy as np
Expand Down Expand Up @@ -130,14 +131,14 @@ def parse_file(self, path):
self.file_size = f.tell()
self.buffer = False
self.path = path
except: # pragma: no cover
except BaseException: # pragma: no cover
raise RuntimeError("File does not contain MIB header.")
elif isinstance(path, bytes):
try:
head = path[:384].decode().split(",")
self.file_size = len(path)
self.buffer = True
except: # pragma: no cover
except BaseException: # pragma: no cover
raise RuntimeError("Buffer does not contain MIB header.")
else: # pragma: no cover
raise TypeError("`path` must be a str or a buffer.")
Expand Down Expand Up @@ -218,7 +219,7 @@ def load_mib_data(
print_info : bool, default=False
If True, display information when loading the file.
return_mmap : bool
If True, return the py:func:`numpy.memmap` object. Default is True.
If True, return the :class:`numpy.memmap` object. Default is True.
Returns
-------
Expand Down Expand Up @@ -267,9 +268,9 @@ def load_mib_data(
# Reshape only when the slice from zeros
if first_frame == 0 and len(navigation_shape) > 1:
navigation_shape = (
navigation_shape[1],
frame_number_in_file // navigation_shape[1],
)
navigation_shape[0],
frame_number_in_file // navigation_shape[0],
)[::-1]
else:
navigation_shape = (number_of_frames_to_load,)
elif number_of_frames_to_load < frame_number:
Expand Down Expand Up @@ -326,13 +327,22 @@ def load_mib_data(
data = data["data"]
if not return_mmap:
if lazy:
data = da.from_array(data, chunks=chunks)
if isinstance(chunks, tuple) and len(chunks) > 2:
# Since the data is reshaped later on, we set only the
# signal dimension chunks here
_chunks = ("auto",) + chunks[-2:]
else:
_chunks = chunks
data = da.from_array(data, chunks=_chunks)
else:
data = np.array(data)

# remove navigation_dimension with value 1 before reshaping
navigation_shape = tuple(i for i in navigation_shape if i > 1)
data = data.reshape(navigation_shape + mib_prop.merlin_size)
if lazy and isinstance(chunks, tuple) and len(chunks) > 2:
# rechunk navigation space when chunking is specified as a tuple
data = data.rechunk(chunks)

if return_headers:
return data, headers
Expand Down Expand Up @@ -401,7 +411,7 @@ def parse_exposures(headers, max_index=10000):
from the headers. By default, reads only the first 10 000 frames.
>>> from rsciio.quantumdetector import load_mib_data, parse_exposures
>>> data, headers = load_mib_data(path, return_header=True, return_mmap=True)
>>> data, headers = load_mib_data(path, return_headers=True, return_mmap=True)
>>> exposures = parse_exposures(headers)
All frames can be parsed by using ``max_index=-1``:
Expand Down Expand Up @@ -485,6 +495,9 @@ def file_reader(
"""
Read a Quantum Detectors ``mib`` file.
If a ``hdr`` file with the same file name was saved along the ``mib`` file,
it will be used to read the metadata.
Parameters
----------
%s
Expand All @@ -503,6 +516,20 @@ def file_reader(
In case of interrupted acquisition, only the completed lines are read and
the incomplete line are discarded.
When the scanning shape (i. e. navigation shape) is not available from the
metadata (for example with acquisition using pixel trigger), the timestamps
will be used to guess the navigation shape.
Examples
--------
In case, the navigation shape can't read from the data itself (for example,
type of acquisition unsupported), the ``navigation_shape`` can be specified:
.. code-block:: python
>>> from rsciio.quantumdetector import file_reader
>>> s_dict = file_reader("file.mib", navigation_shape=(256, 256))
"""
mib_prop = MIBProperties()
mib_prop.parse_file(filename)
Expand All @@ -517,13 +544,43 @@ def file_reader(
hdr = None
_logger.warning("`hdr` file couldn't be found.")

if navigation_shape is None and hdr is not None:
# Use the hdr file to find the number of frames
navigation_shape = (
int(hdr["Frames per Trigger (Number)"]),
int(hdr["Frames in Acquisition (Number)"])
// int(hdr["Frames per Trigger (Number)"]),
)
frame_per_trigger = 1
headers = None
if navigation_shape is None:
if hdr is not None:
# Use the hdr file to find the number of frames
frame_per_trigger = int(hdr["Frames per Trigger (Number)"])
frames_number = int(hdr["Frames in Acquisition (Number)"])
else:
_, headers = load_mib_data(filename, return_headers=True)
frames_number = len(headers)

if frame_per_trigger == 1:
if headers is None:
_, headers = load_mib_data(filename, return_headers=True)
# Use parse_timestamps to find the number of frame per line
# we will get a difference of timestamps at the beginning of each line
with warnings.catch_warnings():
# Filter warning for converting timezone aware datetime
# The time zone is dropped
# Changed from `DeprecationWarning` to `UserWarning` in numpy 2.0
warnings.simplefilter("ignore")
times = np.array(parse_timestamps(headers)).astype(dtype="datetime64")

times_diff = np.diff(times).astype(float)
if len(times_diff) > 0:
# Substract the mean and take the first position above 0
indices = np.argwhere(times_diff - np.mean(times_diff) > 0)
if len(indices) > 0 and len(indices[0]) > 0:
frame_per_trigger = indices[0][0] + 1

if frames_number == 0:
# Some hdf files have the "Frames per Trigger (Number)": 0
# in this case, we don't reshape
# Possibly for "continuous and indefinite" acquisition
navigation_shape = None
else:
navigation_shape = (frame_per_trigger, frames_number // frame_per_trigger)

data = load_mib_data(
filename,
Expand Down
45 changes: 38 additions & 7 deletions rsciio/tests/test_quantumdetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
MIBProperties,
load_mib_data,
parse_exposures,
parse_hdr_file,
parse_timestamps,
)

Expand Down Expand Up @@ -120,7 +121,12 @@ def test_single_chip(fname, reshape):
def test_quad_chip(fname):
s = hs.load(TEST_DATA_DIR_UNZIPPED / fname)
if "9_Frame" in fname:
navigation_shape = (9,)
if "24_Rows_256" in fname:
# Unknow why the timestamps of this file are not consistent
# with others
navigation_shape = (3, 3)
else:
navigation_shape = (9,)
else:
navigation_shape = ()
assert s.data.shape == navigation_shape + (512, 512)
Expand All @@ -134,7 +140,9 @@ def test_quad_chip(fname):
assert axis.units == ""


@pytest.mark.parametrize("chunks", ("auto", (9, 128, 128), ("auto", 128, 128)))
@pytest.mark.parametrize(
"chunks", ("auto", (3, 3, 128, 128), ("auto", "auto", 128, 128))
)
def test_chunks(chunks):
fname = TEST_DATA_DIR_UNZIPPED / "Quad_9_Frame_CounterDepth_24_Rows_256.mib"
s = hs.load(fname, lazy=True, chunks=chunks)
Expand All @@ -159,7 +167,7 @@ def test_mib_properties_quad__repr__():
def test_interrupted_acquisition():
fname = TEST_DATA_DIR_UNZIPPED / "Single_9_Frame_CounterDepth_1_Rows_256.mib"
# There is only 9 frames, simulate interrupted acquisition using 10 lines
s = hs.load(fname, navigation_shape=(10, 2))
s = hs.load(fname, navigation_shape=(4, 3))
assert s.axes_manager.signal_shape == (256, 256)
assert s.axes_manager.navigation_shape == (4, 2)

Expand All @@ -180,11 +188,14 @@ def test_interrupted_acquisition_first_frame():
assert s.axes_manager.navigation_shape == (7,)


def test_non_square():
@pytest.mark.parametrize("navigation_shape", (None, (8,), (4, 2)))
def test_non_square(navigation_shape):
fname = TEST_DATA_DIR_UNZIPPED / "001_4x2_6bit.mib"
s = hs.load(fname, navigation_shape=(4, 2))
s = hs.load(fname, navigation_shape=navigation_shape)
assert s.axes_manager.signal_shape == (256, 256)
assert s.axes_manager.navigation_shape == (4, 2)
if navigation_shape is None:
navigation_shape = (4, 2)
assert s.axes_manager.navigation_shape == navigation_shape


def test_no_hdr():
Expand All @@ -193,7 +204,7 @@ def test_no_hdr():
shutil.copyfile(fname, fname2)
s = hs.load(fname2)
assert s.axes_manager.signal_shape == (256, 256)
assert s.axes_manager.navigation_shape == (8,)
assert s.axes_manager.navigation_shape == (4, 2)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -363,3 +374,23 @@ def test_load_save_cycle(tmp_path):
assert s.axes_manager.navigation_shape == s2.axes_manager.navigation_shape
assert s.axes_manager.signal_shape == s2.axes_manager.signal_shape
assert s.data.dtype == s2.data.dtype


def test_frames_in_acquisition_zero():
# Some hdr file have entry "Frames per Trigger (Number): 0"
# Possibly for "continuous and indefinite" acquisition
# Copy and edit a file with corresponding changes
base_fname = TEST_DATA_DIR_UNZIPPED / "Single_1_Frame_CounterDepth_6_Rows_256"
fname = f"{base_fname}_zero_frames_in_acquisition"
# Create test file using existing test file
shutil.copyfile(f"{base_fname}.mib", f"{fname}.mib")
hdf_dict = parse_hdr_file(f"{base_fname}.hdr")
hdf_dict["Frames in Acquisition (Number)"] = 0
with open(f"{fname}.hdr", "w") as f:
f.write("HDR\n")
for k, v in hdf_dict.items():
f.write(f"{k}:\t{v}\n")
f.write("End\t")

s = hs.load(f"{fname}.mib")
assert s.axes_manager.navigation_shape == ()
1 change: 1 addition & 0 deletions upcoming_changes/235.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:ref:`Quantum Detector <quantumdetector-format>` reader: fix setting chunks.
1 change: 1 addition & 0 deletions upcoming_changes/235.enhancements.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:ref:`Quantum Detector <quantumdetector-format>` reader: use timestamps to get navigation shape when the navigation shape is not available - for example, acquisition with pixel trigger or scan shape not in metadata.

0 comments on commit 1becedd

Please sign in to comment.