Skip to content

Commit

Permalink
Merge pull request #59 from KCL-BMEIS/58-card-status-broken
Browse files Browse the repository at this point in the history
Card status fixed, and new method to get raw waveforms from digitisers
  • Loading branch information
crnbaker authored Jun 3, 2024
2 parents 540218e + c49cb85 commit 33a2025
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 19 deletions.
29 changes: 23 additions & 6 deletions src/spectrumdevice/devices/digitiser/digitiser_card.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from typing import List, Optional, Sequence, cast

from numpy import float_, mod, squeeze, zeros
from numpy import float_, int16, mod, squeeze, zeros
from numpy.typing import NDArray

from spectrum_gmbh.py_header.regs import (
Expand Down Expand Up @@ -105,8 +105,8 @@ def wait_for_acquisition_to_complete(self) -> None:
"""
self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms, in channel order.
def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
"""Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers.
This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays
(waveforms) and returns the list.
Expand All @@ -120,7 +120,7 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]:
this would the rate at which your trigger source was running).
Returns:
waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
and one array per enabled channel, in channel order. To average the acquisitions:
`np.array(waveforms).mean(axis=0)`
Expand Down Expand Up @@ -167,17 +167,34 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]:
(self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums))
)

repeat_acquisitions = []
for n in range(self._batch_size):
repeat_acquisitions.append([waveform for waveform in waveforms_in_columns[n, :, :].T])

return repeat_acquisitions

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms, in channel order, in Volts as floats.
See get_raw_waveforms() for details.
Returns:
waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
and one array per enabled channel, in channel order. To average the acquisitions:
`np.array(waveforms).mean(axis=0)`
"""
raw_repeat_acquisitions = self.get_raw_waveforms()
repeat_acquisitions = []
for n in range(self._batch_size):
repeat_acquisitions.append(
[
cast(
SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num]
).convert_raw_waveform_to_voltage_waveform(squeeze(waveform))
for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T)
for ch_num, waveform in zip(self.enabled_analog_channel_nums, raw_repeat_acquisitions[n])
]
)

return repeat_acquisitions

def get_timestamp(self) -> Optional[datetime.datetime]:
Expand Down
6 changes: 5 additions & 1 deletion src/spectrumdevice/devices/digitiser/digitiser_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime
from typing import List, Optional

from numpy import float_, ndarray
from numpy import float_, int16, ndarray
from numpy.typing import NDArray

from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface
Expand Down Expand Up @@ -104,6 +104,10 @@ def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measure
def execute_continuous_fifo_acquisition(self) -> None:
raise NotImplementedError()

@abstractmethod
def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
raise NotImplementedError()

@abstractmethod
def get_waveforms(self) -> List[List[NDArray[float_]]]:
raise NotImplementedError()
Expand Down
37 changes: 28 additions & 9 deletions src/spectrumdevice/devices/digitiser/digitiser_star_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.
import datetime
from threading import Thread
from typing import Dict, List, Optional, Sequence
from typing import Callable, Dict, List, Optional, Sequence, TypeVar

from numpy import float_
from numpy import float_, int16
from numpy.typing import NDArray

from spectrumdevice.devices.abstract_device import (
Expand All @@ -23,6 +23,10 @@
from spectrumdevice.settings.device_modes import AcquisitionMode


WAVEFORM_TYPE_VAR = TypeVar("WAVEFORM_TYPE_VAR", NDArray[float_], NDArray[int16])


# noinspection PyTypeChecker
class SpectrumDigitiserStarHub(
AbstractSpectrumStarHub[
SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface
Expand Down Expand Up @@ -70,23 +74,38 @@ def wait_for_acquisition_to_complete(self) -> None:
card.wait_for_acquisition_to_complete()

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms.
"""Get a list of the most recently transferred waveforms, as floating point voltages.
This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See `SpectrumDigitiserCard.get_waveforms()` for more information.
Args:
num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and
copy. Acquiring in batches (num_acquisitions > 1) can improve performance.
Returns:
waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
and one array per enabled channel, in channel order.
"""
card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {}
return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_waveforms)

def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
"""Get a list of the most recently transferred waveforms, as integers.
This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See `SpectrumDigitiserCard.get_waveforms()` for more information.
Returns:
waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
and one array per enabled channel, in channel order.
"""
return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_raw_waveforms)

def _get_waveforms_in_threads(
self, get_waveforms_method: Callable[[SpectrumDigitiserCard], List[List[WAVEFORM_TYPE_VAR]]]
) -> List[List[WAVEFORM_TYPE_VAR]]:
"""Gets waveforms from child cards in separate threads, using the SpectrumDigitiserCard method provided."""

card_ids_and_waveform_sets: Dict[str, list[list[WAVEFORM_TYPE_VAR]]] = {}

def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None:
this_cards_waveforms = digitiser_card.get_waveforms()
this_cards_waveforms = get_waveforms_method(digitiser_card)
card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms

threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards]
Expand Down
4 changes: 1 addition & 3 deletions src/spectrumdevice/settings/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,4 @@ class StatusCode(Enum):
def decode_status(code: int) -> CARD_STATUS_TYPE:
"""Converts the integer value received by a card when queried about its status to a list of StatusCodes."""
possible_codes = [code.value for code in StatusCode]
return CARD_STATUS_TYPE(
[StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)]
)
return [StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)]

0 comments on commit 33a2025

Please sign in to comment.