Skip to content

Commit

Permalink
image_tiff_tfs: generalized logic to read in all TFS/FEI metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
atomprobe-tc committed Dec 11, 2023
1 parent 981e3c2 commit df72956
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 54 deletions.
95 changes: 52 additions & 43 deletions pynxtools/dataconverter/readers/em/subparsers/image_tiff_tfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

from pynxtools.dataconverter.readers.em.subparsers.image_tiff import TiffSubParser
from pynxtools.dataconverter.readers.em.subparsers.image_tiff_tfs_cfg import \
tfs_section_names, tfs_section_details
TiffTfsConcepts, TiffTfsToNeXusCfg, get_fei_parent_concepts, get_fei_childs
from pynxtools.dataconverter.readers.em.utils.image_utils import \
sort_tuple, if_str_represents_float
sort_ascendingly_by_second_argument, if_str_represents_float


class TfsTiffSubParser(TiffSubParser):
Expand Down Expand Up @@ -71,54 +71,63 @@ def get_metadata(self):
# self.tags = {TAGS[key] : fp.tag[key] for key in fp.tag_v2}
# for key, val in self.tags.items():
# print(f"{key}, {val}")
tfs_section_offsets = {}
tfs_parent_concepts = get_fei_parent_concepts()
tfs_parent_concepts_byte_offset = {}
for concept in tfs_parent_concepts:
tfs_parent_concepts_byte_offset[concept] = None
with open(self.file_path, 'rb', 0) as fp:
s = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
for section_name in tfs_section_names:
pos = s.find(bytes(section_name, "utf8")) # != -1
tfs_section_offsets[section_name] = pos
print(tfs_section_offsets)
for concept in tfs_parent_concepts:
pos = s.find(bytes(f"[{concept}]", "utf8")) # != -1
if pos != -1:
tfs_parent_concepts_byte_offset[concept] = pos
else:
raise ValueError(f"Expected block with metadata for concept [{concept}] were not found !")
print(tfs_parent_concepts_byte_offset)

# define search offsets
tpl = []
for key, value in tfs_section_offsets.items():
tpl.append((key, value))
tpl = sort_tuple(tpl)
print(tpl)
sequence = [] # decide I/O order in which metadata for childs of parent concepts will be read
for key, value in tfs_parent_concepts_byte_offset.items():
if value is not None:
sequence.append((key, value))
# tuple of parent_concept name and byte offset
sequence = sort_ascendingly_by_second_argument(sequence)
print(sequence)

# exemplar parsing of specific TFS section content into a dict
# here for section_name == "[System]":
pos_s = None
pos_e = None
for idx in np.arange(0, len(tpl)):
if tpl[idx][0] != "[System]":
continue
idx = 0
for parent, byte_offset in sequence:
pos_s = byte_offset
pos_e = None
if idx < len(sequence) - 1:
pos_e = sequence[idx + 1][1]
else:
pos_s = tpl[idx][1]
if idx <= len(tpl) - 1:
pos_e = tpl[idx + 1][1]
break
print(f"Search for [System] in between byte offsets {pos_s} and {pos_e}")
if pos_s is None or pos_e is None:
raise ValueError(f"Search for [System] was unsuccessful !")
pos_e = np.iinfo(np.uint64).max
idx += 1
if pos_s is None or pos_e is None:
raise ValueError(f"Definition of byte boundaries for reading childs of [{parent}] was unsuccessful !")
print(f"Search for [{parent}] in between byte offsets {pos_s} and {pos_e}")

# fish metadata of e.g. the system section
for term in tfs_section_details["[System]"]:
s.seek(pos_s, 0)
pos = s.find(bytes(term, "utf8"))
if pos < pos_e: # check if pos_e is None
s.seek(pos, 0)
value = f"{s.readline().strip().decode('utf8').replace(f'{term}=', '')}"
if value != "":
if if_str_represents_float(value) is True:
self.tfs[f"system/{term}"] = np.float64(value)
elif value.isdigit() is True:
self.tfs[f"system/{term}"] = np.int64(value)
# fish metadata of e.g. the system section
for term in get_fei_childs(parent):
s.seek(pos_s, 0)
pos = s.find(bytes(f"{term}=", "utf8"))
if pos < pos_e: # check if pos_e is None
s.seek(pos, 0)
value = f"{s.readline().strip().decode('utf8').replace(f'{term}=', '')}"
self.tfs[f"{parent}/{term}"] = None
if isinstance(value, str):
if value != "":
if if_str_represents_float(value) is True:
self.tfs[f"{parent}/{term}"] = np.float64(value)
elif value.isdigit() is True:
self.tfs[f"{parent}/{term}"] = np.int64(value)
else:
self.tfs[f"{parent}/{term}"] = value
else:
self.tfs[f"system/{term}"] = None
else:
pass
print(self.tfs)
print(f"{parent}/{term} ---> {type(value)}")
else:
pass
for key, val in self.tfs.items():
print(f"{key}, {val}")

def parse_and_normalize(self):
"""Perform actual parsing filling cache self.tmp."""
Expand Down
42 changes: 32 additions & 10 deletions pynxtools/dataconverter/readers/em/subparsers/image_tiff_tfs_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#
"""Configuration of the image_tiff_tfs subparser."""

from typing import List


TiffTfsConcepts = ["User/Date",
"User/Time",
Expand Down Expand Up @@ -104,16 +106,16 @@
"Scan/FrameTime",
"EScan/Scan",
"EScan/InternalScan",
"ESCAN/Dwell",
"ESCAN/PixelWidth",
"ESCAN/PixelHeight",
"ESCAN/HorFieldsize",
"ESCAN/VerFieldsize",
"ESCAN/FrameTime",
"ESCAN/LineTime",
"ESCAN/Mainslock",
"ESCAN/LineIntegration",
"ESCAN/ScanInterlacing",
"EScan/Dwell",
"EScan/PixelWidth",
"EScan/PixelHeight",
"EScan/HorFieldsize",
"EScan/VerFieldsize",
"EScan/FrameTime",
"EScan/LineTime",
"EScan/Mainslock",
"EScan/LineIntegration",
"EScan/ScanInterlacing",
"Stage/StageX",
"Stage/StageY",
"Stage/StageZ",
Expand Down Expand Up @@ -193,6 +195,26 @@
"ColdStage/Humidity",
"ColdStage/SampleBias"]


def get_fei_parent_concepts() -> List:
"""Get list of unique FEI parent concepts."""
parent_concepts = set()
for entry in TiffTfsConcepts:
if isinstance(entry, str) and entry.count("/") == 1:
parent_concepts.add(entry.split("/")[0])
return list(parent_concepts)


def get_fei_childs(concept: str) -> List:
"""Get all children of FEI parent concept."""
child_concepts = set()
for entry in TiffTfsConcepts:
if isinstance(entry, str) and entry.count("/") == 1:
if entry.startswith(f"{concept}/") is True:
child_concepts.add(entry.split("/")[1])
return list(child_concepts)


TiffTfsToNeXusCfg = {"/ENTRY[entry*]/measurement/EVENT_DATA_EM_SET[event_data_em_set]/EVENT_DATA_EM[event_data_em*]/start_time": {"fun": "ikz_berlin_apreo_iso8601", "terms": ["User/Date", "User/Time"]},
"IGNORE": { "fun": "load_from", "terms": "User/User" },
"IGNORE": { "fun": "load_from", "terms": "User/UserText" },
Expand Down
2 changes: 1 addition & 1 deletion pynxtools/dataconverter/readers/em/utils/image_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


# https://www.geeksforgeeks.org/python-program-to-sort-a-list-of-tuples-by-second-item/
def sort_tuple(tup):
def sort_ascendingly_by_second_argument(tup):
# convert the list of tuples to a numpy array with data type (object, int)
arr = np.array(tup, dtype=[('col1', object), ('col2', int)])
# get the indices that would sort the array based on the second column
Expand Down

0 comments on commit df72956

Please sign in to comment.