From a207654064edc699c33c0fe759ad9ea44462f13c Mon Sep 17 00:00:00 2001 From: crnbaker Date: Mon, 3 Jun 2024 16:10:28 +0100 Subject: [PATCH 1/2] #58 - fixed card status decode_status() function bug --- src/spectrumdevice/settings/status.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/spectrumdevice/settings/status.py b/src/spectrumdevice/settings/status.py index 4042ea7..0ed6218 100644 --- a/src/spectrumdevice/settings/status.py +++ b/src/spectrumdevice/settings/status.py @@ -57,6 +57,4 @@ class StatusCode(Enum): def decode_status(code: int) -> CARD_STATUS_TYPE: """Converts the integer value received by a card when queried about its status to a list of StatusCodes.""" possible_codes = [code.value for code in StatusCode] - return CARD_STATUS_TYPE( - [StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)] - ) + return [StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)] From c49cb855819203147ecf0ce3a90dd1d0bfe16e23 Mon Sep 17 00:00:00 2001 From: crnbaker Date: Mon, 3 Jun 2024 17:03:58 +0100 Subject: [PATCH 2/2] #58 - new digitiser method get_raw_waveforms returns waveforms as 16-biut ints (rather than as floating point voltage) --- .../devices/digitiser/digitiser_card.py | 29 ++++++++++++--- .../devices/digitiser/digitiser_interface.py | 6 ++- .../devices/digitiser/digitiser_star_hub.py | 37 ++++++++++++++----- 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/src/spectrumdevice/devices/digitiser/digitiser_card.py b/src/spectrumdevice/devices/digitiser/digitiser_card.py index 5831ec3..a67ceae 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_card.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_card.py @@ -7,7 +7,7 @@ import logging from typing import List, Optional, Sequence, cast -from numpy import float_, mod, squeeze, zeros +from numpy import float_, int16, mod, squeeze, zeros from numpy.typing import NDArray from spectrum_gmbh.py_header.regs import ( @@ -105,8 +105,8 @@ def wait_for_acquisition_to_complete(self) -> None: """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY) - def get_waveforms(self) -> List[List[NDArray[float_]]]: - """Get a list of the most recently transferred waveforms, in channel order. + def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: + """Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers. This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays (waveforms) and returns the list. @@ -120,7 +120,7 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]: this would the rate at which your trigger source was running). Returns: - waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition + waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions: `np.array(waveforms).mean(axis=0)` @@ -167,6 +167,24 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]: (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums)) ) + repeat_acquisitions = [] + for n in range(self._batch_size): + repeat_acquisitions.append([waveform for waveform in waveforms_in_columns[n, :, :].T]) + + return repeat_acquisitions + + def get_waveforms(self) -> List[List[NDArray[float_]]]: + """Get a list of the most recently transferred waveforms, in channel order, in Volts as floats. + + See get_raw_waveforms() for details. + + Returns: + waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition + and one array per enabled channel, in channel order. To average the acquisitions: + `np.array(waveforms).mean(axis=0)` + + """ + raw_repeat_acquisitions = self.get_raw_waveforms() repeat_acquisitions = [] for n in range(self._batch_size): repeat_acquisitions.append( @@ -174,10 +192,9 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]: cast( SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) - for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T) + for ch_num, waveform in zip(self.enabled_analog_channel_nums, raw_repeat_acquisitions[n]) ] ) - return repeat_acquisitions def get_timestamp(self) -> Optional[datetime.datetime]: diff --git a/src/spectrumdevice/devices/digitiser/digitiser_interface.py b/src/spectrumdevice/devices/digitiser/digitiser_interface.py index 8dd8df7..4f1bf5a 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_interface.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_interface.py @@ -8,7 +8,7 @@ from datetime import datetime from typing import List, Optional -from numpy import float_, ndarray +from numpy import float_, int16, ndarray from numpy.typing import NDArray from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface @@ -104,6 +104,10 @@ def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measure def execute_continuous_fifo_acquisition(self) -> None: raise NotImplementedError() + @abstractmethod + def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: + raise NotImplementedError() + @abstractmethod def get_waveforms(self) -> List[List[NDArray[float_]]]: raise NotImplementedError() diff --git a/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py b/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py index f2970a2..52e1b44 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py @@ -5,9 +5,9 @@ # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. import datetime from threading import Thread -from typing import Dict, List, Optional, Sequence +from typing import Callable, Dict, List, Optional, Sequence, TypeVar -from numpy import float_ +from numpy import float_, int16 from numpy.typing import NDArray from spectrumdevice.devices.abstract_device import ( @@ -23,6 +23,10 @@ from spectrumdevice.settings.device_modes import AcquisitionMode +WAVEFORM_TYPE_VAR = TypeVar("WAVEFORM_TYPE_VAR", NDArray[float_], NDArray[int16]) + + +# noinspection PyTypeChecker class SpectrumDigitiserStarHub( AbstractSpectrumStarHub[ SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface @@ -70,23 +74,38 @@ def wait_for_acquisition_to_complete(self) -> None: card.wait_for_acquisition_to_complete() def get_waveforms(self) -> List[List[NDArray[float_]]]: - """Get a list of the most recently transferred waveforms. + """Get a list of the most recently transferred waveforms, as floating point voltages. This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. See `SpectrumDigitiserCard.get_waveforms()` for more information. - Args: - num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and - copy. Acquiring in batches (num_acquisitions > 1) can improve performance. - Returns: waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order. """ - card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {} + return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_waveforms) + + def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: + """Get a list of the most recently transferred waveforms, as integers. + + This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. + See `SpectrumDigitiserCard.get_waveforms()` for more information. + + Returns: + waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition, + and one array per enabled channel, in channel order. + """ + return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_raw_waveforms) + + def _get_waveforms_in_threads( + self, get_waveforms_method: Callable[[SpectrumDigitiserCard], List[List[WAVEFORM_TYPE_VAR]]] + ) -> List[List[WAVEFORM_TYPE_VAR]]: + """Gets waveforms from child cards in separate threads, using the SpectrumDigitiserCard method provided.""" + + card_ids_and_waveform_sets: Dict[str, list[list[WAVEFORM_TYPE_VAR]]] = {} def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: - this_cards_waveforms = digitiser_card.get_waveforms() + this_cards_waveforms = get_waveforms_method(digitiser_card) card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards]