Skip to content

Commit

Permalink
allow parsing of binary data for files with one spectrum
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaspie committed Feb 14, 2024
1 parent 6482f71 commit 46841d5
Showing 1 changed file with 139 additions and 130 deletions.
269 changes: 139 additions & 130 deletions pynxtools/dataconverter/readers/xps/phi/spe_pro_phi.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,18 +527,51 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map):
except KeyError:
pass

# Create keys for writing to data and detector
entry = construct_entry_name(region_parent)
scan_key = construct_data_key(spectrum)
detector_data_key_child = construct_detector_data_key(spectrum)
detector_data_key = f'{path_map["detector"]}/{detector_data_key_child}/counts'

energy = np.array(spectrum["energy"])
intensity = np.array(spectrum["data"])

if entry not in self._xps_dict["data"]:
self._xps_dict["data"][entry] = xr.Dataset()

# Write averaged cycle data to 'data'.
all_scan_data = [
np.array(value)
for key, value in self._xps_dict["data"][entry].items()
if scan_key.split("_")[0] in key
]

# Write averaged cycle data to 'data'.
averaged_scans = np.mean(all_scan_data, axis=0)
if averaged_scans.size == 1:
# on first scan in cycle
averaged_scans = intensity

try:
self._xps_dict["data"][entry][scan_key.split("_")[0]] = xr.DataArray(
data=averaged_scans,
coords={"energy": energy},
)
except ValueError:
pass

# Write scan data to 'data'.
self._xps_dict["data"][entry][scan_key] = xr.DataArray(
data=intensity, coords={"energy": energy}
)

# Write raw intensities to 'detector'.
self._xps_dict[detector_data_key] = intensity


# =============================================================================
# # Create keys for writing to data and detector
# entry = construct_entry_name(region_parent)
# scan_key = construct_data_key(spectrum)
# detector_data_key_child = construct_detector_data_key(spectrum)
# detector_data_key = f'{path_map["detector"]}/{detector_data_key_child}/counts'
#
#
# x_units = spectrum["x_units"]
# energy = np.array(spectrum["data"]["x"])
# intensity = np.array(spectrum["data"]["y"])
#
# if entry not in self._xps_dict["data"]:
# self._xps_dict["data"][entry] = xr.Dataset()
Expand Down Expand Up @@ -571,13 +604,13 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map):
# # Write averaged cycle data to 'data'.
# self._xps_dict["data"][entry][scan_key.split("_")[0]] = xr.DataArray(
# data=averaged_scans,
# coords={x_units: energy},
# coords={"energy": energy},
# )
# if self.parser.export_settings["Separate Scan Data"]:
# # Write average cycle data to 'data'.
# self._xps_dict["data"][entry][scan_key] = xr.DataArray(
# data=averaged_channels,
# coords={x_units: energy},
# coords={"energy": energy},
# )
#
# if (
Expand All @@ -588,7 +621,7 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map):
# channel_no = spectrum["channel_no"]
# self._xps_dict["data"][entry][
# f"{scan_key}_chan{channel_no}"
# ] = xr.DataArray(data=intensity, coords={x_units: energy})
# ] = xr.DataArray(data=intensity, coords={"energy": energy})
# =============================================================================


Expand All @@ -608,6 +641,13 @@ def __init__(self):
self.spectra = []
self.metadata = PhiMetadata()

self.binary_header_length = 4
self.spectra_header_length = 24
self.float_buffer = 4

self.binary_header = None
self.spectra_header = None

self.settings_map = {
"FileDesc": "file_description",
"acq_filename": "acquisition_filename",
Expand Down Expand Up @@ -811,6 +851,7 @@ def parse_file(self, file, **kwargs):

self.add_regions_and_areas_to_spectra(regions, areas)

self.parse_binary_header(data)
self.parse_data_into_spectra(data)

self.add_metadata_to_each_spectrum()
Expand Down Expand Up @@ -999,7 +1040,8 @@ def parse_spectral_regions(self, header):

region.validate_types()

return regions_full + regions
return regions
# return regions_full + regions

def parse_spatial_areas(self, header):
"""
Expand Down Expand Up @@ -1056,138 +1098,103 @@ def add_regions_and_areas_to_spectra(self, regions, areas):
for area in areas:
concatenated = {**region.dict(), **area.dict()}

self.spectra += [concatenated]
self.spectra += [concatenated]

def parse_data_into_spectra(self, binary_data):
n_spectra = self.metadata.no_spectral_regions

binary_header, spectra_header = self.parse_binary_header(binary_data)
def parse_binary_header(self, binary_data):
"""
Read the binary headers
Assuming the headers are 4 bytes unsigned integers
Each spectrum gets 24 unsigned 4 byte integers
for spectrum_no in range(n_spectra):
n_points = spectra_header[spectrum_no][8]
parsed_data = self.parse_binary_data(binary_data, spectrum_no, n_points)
Parameters
----------
binary_data : TYPE
DESCRIPTION.
# =============================================================================
# for i, spectrum in enumerate(self.spectra):
# spectrum.update({
# "binary_header": binary_header,
# "spectra_header": spectra_header[i],
# "data": parsed_data[i]
# }
# )
# =============================================================================
"""
binary_header = struct.unpack("I", binary_data[: self.binary_header_length])[0]

return spectra_header, binary_data
for i, spectrum in enumerate(self.spectra):
start = (
self.binary_header_length * self.spectra_header_length * i
+ self.binary_header_length
)
stop = start + self.binary_header_length * self.spectra_header_length
spectrum_header = struct.unpack(
"I" * self.spectra_header_length, binary_data[start:stop]
)
n_values = spectrum_header[8]
spectrum.update(
{
"binary_header": binary_header,
"spectrum_header": np.array(spectrum_header),
"n_values": n_values,
}
)

def parse_binary_header(self, binary_data):
n_spectra = self.metadata.no_spectral_regions
# Read the binary headers
# Assuming the headers are 4 bytes unsigned integers
# Each spectrum gets 24 unsigned 4 byte integers
binary_header_length = 4
spectra_header_length = 24
def parse_data_into_spectra(self, binary_data):
"""
Parse the data of all individual spectra.
binary_header = struct.unpack("I", binary_data[:binary_header_length])[0]
Parameters
----------
binary_data : bytes
Binary XPS data, format is 64 bit float.
spectra_header = np.zeros((n_spectra, 24), dtype=np.uint32)
for i in range(n_spectra):
start = (
binary_header_length * spectra_header_length * i + binary_header_length
)
stop = start + binary_header_length * spectra_header_length
spectra_header[i] = struct.unpack(
"I" * spectra_header_length, binary_data[start:stop]
"""
print(type(binary_data))
offset = self.spectra_header_length + self.binary_header_length

for i, spectrum in enumerate(self.spectra):
n_values = spectrum["n_values"]
# print(n_values)
start = (i + 1) * offset * self.float_buffer
stop = (i + 1) * (n_values + offset) * self.float_buffer
print(start, stop)

binary_spectrum_data = binary_data[start:stop]
parsed_data = self._parse_binary_data(binary_spectrum_data)

spectrum.update(
{
"data": parsed_data,
}
)
print(stop)

spectra_header = np.array(spectra_header)
def _parse_binary_data(self, binary_spectrum_data):
"""
For each spectrum, parse the XPS data by
return binary_header, spectra_header
Parameters
----------
binary_spectrum_data : bytes
Binary data containing the intensity data for one
spectrum.
def parse_binary_data(self, binary_data, spectrum_no, n_points):
parsed_data = 0
stop = 100
Returns
-------
parsed_data : TYPE
DESCRIPTION.
"""
# format is 64 bit float
encoding = ["f", 4]
buffer = encoding[1]
encoding = encoding[0]

stream = []
for result in data:
length = result[1] * buffer
data = result[0]
for i in range(0, length, buffer):
stream.append(struct.unpack(encoding, data[i : i + buffer])[0])

spectrum_data = struct.unpack_from(
"<f", binary_data[100:104], 4
) # n_points * 24 + 4 * 5)#[0]
print(spectrum_data)

# Assuming the 5th element of each 24-byte header contains the points per spectrum
# and is stored as an unsigned 4-byte integer.

# =============================================================================
# spectrum_data = np.frombuffer(
# binary_data,
# dtype=float,
# count=200,#n_points,
# offset=524,#n_spectra * 24 + 4 * 5,
# )
# if spectrum_no == 0:
# print(spectrum_data)
# =============================================================================

# print(spectrum_data)

# print(spectrum_data)

# np.fromfile(v_file_ref, dtype=np.float64, count=npoints)

# =============================================================================
#
# for i, data_line in enumerate(binary_data_lines):
# spectrum_data = struct.unpack_from('<I', data, n_spectra * 24 + 4 * 5)[0]
# print("ahsd")
# print(spectrum_data)
# =============================================================================
# spectrum_data = np.frombuffer(data_line, dtype=np.float64, count=npoints)
# Each spectrum gets 24 unsigned 4 byte integers
# binary_data_spec = np.frombuffer(data_line, dtype=np.float64)#, count=24*n_spectra)
# print(binary_data_spec)

# parsed_data[i] = spectrum_data
parsed_data = []

return parsed_data
n_values = int(len(binary_spectrum_data) / self.float_buffer)

# =============================================================================
# def load_phispe_spectra(v_file_ref):
# # Assuming the following global variables are defined elsewhere in the Python code:
# # nSpectra, nVersion, wSpectrumNames, wSpectrumScales
# global nSpectra, nVersion, wSpectrumNames, wSpectrumScales
#
# # load spectra
# for ic in range(nSpectra):
# # set up
# npoints = wSpectrumScales[ic][0]
# sName = wSpectrumNames[ic]
# # Create a numpy array for the spectrum
# wSpectrum = np.zeros(npoints)
#
# # Set scale for x-axis (not directly applicable in numpy, but keeping for reference)
# x_scale_start = wSpectrumScales[ic][1]
# x_scale_end = wSpectrumScales[ic][2]
# x_scale = np.linspace(x_scale_start, x_scale_end, npoints)
#
# wSpectrum = np.fromfile(v_file_ref, dtype=np.float64, count=npoints)
#
#
# # Assign the spectrum to a variable with the name sName in the global scope
# globals()[sName] = wSpectrum
#
# return 0
# =============================================================================
for i in range(n_values):
start = i * self.float_buffer
stop = (i + 1) * self.float_buffer
parsed_data.append(
struct.unpack_from("<f", binary_spectrum_data[start:stop])[0]
)
import matplotlib.pyplot as plt

plt.plot(parsed_data)
plt.show()

return parsed_data

def extract_unit(self, key, value):
"""
Expand All @@ -1212,7 +1219,6 @@ def extract_unit(self, key, value):
unit : str
Associated unit.
"""
unit_map = {"seconds": "s", "uA": "micro-A", "(min)": "min"}
special_cases = {
Expand Down Expand Up @@ -1552,3 +1558,6 @@ def _convert_stage_positions(value):
parser = PhiParser()
d = parser.parse_file(file)
d0 = d[0]

spectra_header = np.array([s["spectrum_header"] for s in parser.spectra])
n_values = np.array([s["n_values"] for s in parser.spectra])

0 comments on commit 46841d5

Please sign in to comment.