diff --git a/src/example_scripts/awg_example.py b/src/example_scripts/awg_example.py index bd246d4..660c673 100644 --- a/src/example_scripts/awg_example.py +++ b/src/example_scripts/awg_example.py @@ -1,9 +1,10 @@ from time import sleep from matplotlib.pyplot import plot, show -from numpy import int16, iinfo, linspace, sin, pi +from numpy import int16 from spectrumdevice.devices.awg.awg_card import SpectrumAWGCard +from spectrumdevice.devices.awg.synthesis import make_full_scale_sine_waveform from spectrumdevice.settings import TriggerSettings, TriggerSource, ExternalTriggerMode from spectrumdevice.settings.channel import OutputChannelStopLevelMode from spectrumdevice.settings.device_modes import GenerationMode @@ -27,12 +28,8 @@ ) card.configure_trigger(trigger_settings) - full_scale_min_value = iinfo(int16).min - full_scale_max_value = iinfo(int16).max + t, analog_wfm = make_full_scale_sine_waveform(FREQUENCY, SAMPLE_RATE, NUM_CYCLES, dtype=int16) - duration = NUM_CYCLES / FREQUENCY - t = linspace(0, duration, int(duration * SAMPLE_RATE + 1)) - analog_wfm = (sin(2 * pi * FREQUENCY * t) * full_scale_max_value).astype(int16) card.set_sample_rate_in_hz(SAMPLE_RATE) card.set_generation_mode(GenerationMode.SPC_REP_STD_SINGLERESTART) card.set_num_loops(NUM_PULSES) diff --git a/src/example_scripts/trigger_awg_with_pulse_generator_example.py b/src/example_scripts/trigger_awg_with_pulse_generator_example.py new file mode 100644 index 0000000..27a5508 --- /dev/null +++ b/src/example_scripts/trigger_awg_with_pulse_generator_example.py @@ -0,0 +1,68 @@ +from time import sleep + +from numpy import int16 + +from spectrumdevice.devices.awg.awg_card import SpectrumAWGCard +from spectrumdevice.devices.awg.synthesis import make_full_scale_sine_waveform +from spectrumdevice.settings import TriggerSettings, TriggerSource, ExternalTriggerMode, IOLineMode +from spectrumdevice.settings.channel import OutputChannelStopLevelMode +from spectrumdevice.settings.device_modes import GenerationMode +from spectrumdevice.settings.pulse_generator import ( + PulseGeneratorOutputSettings, + PulseGeneratorTriggerSettings, + PulseGeneratorTriggerMode, + PulseGeneratorTriggerDetectionMode, + PulseGeneratorMultiplexer1TriggerSource, + PulseGeneratorMultiplexer2TriggerSource, +) + + +SAMPLE_RATE = 40000000 + + +if __name__ == "__main__": + + card = SpectrumAWGCard(device_number=0) + + card_trigger_settings = TriggerSettings( + trigger_sources=[TriggerSource.SPC_TMASK_EXT2], # ext1 trigger source is first IOLine + external_trigger_mode=ExternalTriggerMode.SPC_TM_POS, + ) + + t, analog_wfm = make_full_scale_sine_waveform( + frequency_in_hz=1e6, sample_rate_hz=SAMPLE_RATE, num_cycles=1, dtype=int16 + ) + + # Set up AWG card + card.configure_trigger(card_trigger_settings) + card.set_sample_rate_in_hz(SAMPLE_RATE) + card.set_generation_mode(GenerationMode.SPC_REP_STD_SINGLERESTART) + card.set_num_loops(1) + card.transfer_waveform(analog_wfm) + card.analog_channels[0].set_stop_level_mode(OutputChannelStopLevelMode.SPCM_STOPLVL_ZERO) + card.analog_channels[0].set_is_switched_on(True) + card.analog_channels[0].set_signal_amplitude_in_mv(1000) + + pulse_trigger_settings = PulseGeneratorTriggerSettings( + trigger_mode=PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_SINGLESHOT, + trigger_detection_mode=PulseGeneratorTriggerDetectionMode.RISING_EDGE, + multiplexer_1_source=PulseGeneratorMultiplexer1TriggerSource.SPCM_PULSEGEN_MUX1_SRC_UNUSED, + multiplexer_1_output_inversion=False, + multiplexer_2_source=PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE, + multiplexer_2_output_inversion=False, + ) + + pulse_output_settings = PulseGeneratorOutputSettings( + period_in_seconds=1e-3, duty_cycle=0.5, num_pulses=10, delay_in_seconds=0.0, output_inversion=False + ) + + card.io_lines[0].set_mode(IOLineMode.SPCM_XMODE_PULSEGEN) + card.io_lines[0].pulse_generator.configure_trigger(pulse_trigger_settings) + card.io_lines[0].pulse_generator.configure_output(pulse_output_settings) + + card.start() + + card.io_lines[0].pulse_generator.force_trigger() + sleep(1) + card.stop() + card.disconnect() diff --git a/src/spectrumdevice/devices/abstract_device/__init__.py b/src/spectrumdevice/devices/abstract_device/__init__.py index ae29657..7a0ca98 100644 --- a/src/spectrumdevice/devices/abstract_device/__init__.py +++ b/src/spectrumdevice/devices/abstract_device/__init__.py @@ -8,13 +8,9 @@ from spectrumdevice.devices.abstract_device.abstract_spectrum_channel import AbstractSpectrumChannel from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice from spectrumdevice.devices.abstract_device.abstract_spectrum_hub import AbstractSpectrumStarHub -from spectrumdevice.devices.abstract_device.interfaces import SpectrumChannelInterface, SpectrumDeviceInterface - __all__ = [ - "SpectrumChannelInterface", "AbstractSpectrumChannel", - "SpectrumDeviceInterface", "AbstractSpectrumDevice", "AbstractSpectrumCard", "AbstractSpectrumStarHub", diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py index 99d48a5..0acfa59 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py @@ -9,7 +9,7 @@ from abc import ABC, abstractmethod from functools import reduce from operator import or_ -from typing import Any, List, Optional, Sequence, Tuple, TypeVar, Generic +from typing import Any, List, Optional, Sequence, Tuple from spectrum_gmbh.regs import ( M2CMD_DATA_STARTDMA, @@ -35,7 +35,7 @@ SPC_MIINST_BYTESPERSAMPLE, ) from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice -from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface +from spectrumdevice.devices.abstract_device.device_interface import AnalogChannelInterfaceType, IOLineInterfaceType from spectrumdevice.exceptions import ( SpectrumExternalTriggerNotEnabled, SpectrumInvalidNumberOfEnabledChannels, @@ -71,11 +71,9 @@ # Use a Generic and Type Variables to allow subclasses of AbstractSpectrumCard to define whether they own AWG analog # channels or Digitiser analog channels and IO lines -AnalogChannelInterfaceType = TypeVar("AnalogChannelInterfaceType", bound=SpectrumAnalogChannelInterface) -IOLineInterfaceType = TypeVar("IOLineInterfaceType", bound=SpectrumIOLineInterface) -class AbstractSpectrumCard(AbstractSpectrumDevice, Generic[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): +class AbstractSpectrumCard(AbstractSpectrumDevice[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).""" def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py index 7406b97..9e665f6 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py @@ -6,11 +6,12 @@ # Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. -from spectrumdevice.devices.abstract_device.interfaces import ( - SpectrumDeviceInterface, +from spectrumdevice.devices.abstract_device.channel_interfaces import ( SpectrumChannelInterface, SpectrumAnalogChannelInterface, ) +from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface +from spectrumdevice.settings import SpectrumRegisterLength from spectrumdevice.settings.channel import SpectrumAnalogChannelName, SpectrumChannelName @@ -48,6 +49,21 @@ def name(self) -> ChannelNameType: def _number(self) -> int: return int(self.name.name.split(self._name_prefix)[-1]) + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length) + + def read_parent_device_register( + self, + spectrum_register: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> int: + return self._parent_device.read_spectrum_device_register(spectrum_register, length) + def __eq__(self, other: object) -> bool: if isinstance(other, AbstractSpectrumChannel): return (self.name == other.name) and (self._parent_device == other._parent_device) diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py index 54399d3..f0dae7b 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py @@ -6,7 +6,11 @@ from abc import ABC -from spectrumdevice.devices.abstract_device.interfaces import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.device_interface import ( + SpectrumDeviceInterface, + AnalogChannelInterfaceType, + IOLineInterfaceType, +) from spectrumdevice.exceptions import SpectrumDeviceNotConnected, SpectrumDriversNotFound from spectrumdevice.settings import SpectrumRegisterLength, TriggerSettings from spectrumdevice.settings.triggering import EXTERNAL_TRIGGER_SOURCES @@ -28,7 +32,7 @@ ) -class AbstractSpectrumDevice(SpectrumDeviceInterface, ABC): +class AbstractSpectrumDevice(SpectrumDeviceInterface[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Abstract superclass which implements methods common to all Spectrum devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes listed in spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py index db7a154..6811a5e 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py @@ -14,11 +14,11 @@ from spectrum_gmbh.regs import SPC_SYNC_ENABLEMASK from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice -from spectrumdevice.devices.abstract_device.interfaces import ( - SpectrumDeviceInterface, +from spectrumdevice.devices.abstract_device.channel_interfaces import ( SpectrumAnalogChannelInterface, SpectrumIOLineInterface, ) +from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface from spectrumdevice.exceptions import SpectrumSettingsMismatchError from spectrumdevice.settings import ( AdvancedCardFeature, diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py index cc294b2..925f3cd 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py @@ -1,15 +1,26 @@ from abc import ABC, abstractmethod +from typing import Any, Optional from spectrumdevice.devices.abstract_device import AbstractSpectrumChannel -from spectrumdevice.devices.abstract_device.interfaces import SpectrumIOLineInterface +from spectrumdevice.devices.abstract_device.channel_interfaces import SpectrumIOLineInterface +from spectrumdevice.exceptions import SpectrumFeatureNotSupportedByCard +from spectrumdevice.features.pulse_generator.pulse_generator import PulseGenerator +from spectrumdevice.features.pulse_generator.interfaces import PulseGeneratorInterface from spectrumdevice.settings import IOLineMode -from spectrumdevice.settings.io_lines import IO_LINE_MODE_COMMANDS, SpectrumIOLineName +from spectrumdevice.settings.io_lines import IO_LINE_MODE_COMMANDS, SpectrumIOLineName, decode_enabled_io_line_mode class AbstractSpectrumIOLine(SpectrumIOLineInterface, AbstractSpectrumChannel[SpectrumIOLineName], ABC): """Partially implemented abstract superclass contain code common for controlling an individual IO Line of all spectrum devices.""" + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + try: + self._pulse_generator: Optional[PulseGenerator] = PulseGenerator(parent=self) + except SpectrumFeatureNotSupportedByCard: + self._pulse_generator = None + @property def _name_prefix(self) -> str: return "X" @@ -23,9 +34,21 @@ def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: @property def mode(self) -> IOLineMode: - # todo: this may contain dig out settings bits, so needs a decode function that ignores those bits - return IOLineMode(self._parent_device.read_spectrum_device_register(IO_LINE_MODE_COMMANDS[self._number])) + return decode_enabled_io_line_mode( + self._parent_device.read_spectrum_device_register(IO_LINE_MODE_COMMANDS[self._number]) + ) def set_mode(self, mode: IOLineMode) -> None: value_to_write = self._get_io_line_mode_settings_mask(mode) | mode.value self._parent_device.write_to_spectrum_device_register(IO_LINE_MODE_COMMANDS[self._number], value_to_write) + + @property + def pulse_generator(self) -> PulseGeneratorInterface: + """Gets the IO line's pulse generator.""" + if self._pulse_generator is not None: + return self._pulse_generator + else: + raise SpectrumFeatureNotSupportedByCard( + call_description=self.__str__() + ".pulse_generator()", + message="Pulse generator firmware option not enabled.", + ) diff --git a/src/spectrumdevice/devices/abstract_device/channel_interfaces.py b/src/spectrumdevice/devices/abstract_device/channel_interfaces.py new file mode 100644 index 0000000..16cbfe1 --- /dev/null +++ b/src/spectrumdevice/devices/abstract_device/channel_interfaces.py @@ -0,0 +1,74 @@ +"""Defines a common public interface for controlling all Spectrum devices and their channels.""" + +# Christian Baker, King's College London +# Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London +# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. + +from abc import ABC, abstractmethod +from typing import TypeVar, Generic + +from spectrumdevice.features.pulse_generator.interfaces import PulseGeneratorInterface +from spectrumdevice.settings import ( + SpectrumRegisterLength, + IOLineMode, +) +from spectrumdevice.settings.channel import SpectrumAnalogChannelName, SpectrumChannelName +from spectrumdevice.settings.io_lines import SpectrumIOLineName + +ChannelNameType = TypeVar("ChannelNameType", bound=SpectrumChannelName) + + +class SpectrumChannelInterface(Generic[ChannelNameType], ABC): + """Defines the common public interface for control of the channels of Digitiser and AWG devices including + Multipurpose IO Lines. All properties are read-only and must be set with their respective setter methods.""" + + @property + @abstractmethod + def name(self) -> ChannelNameType: + raise NotImplementedError + + @abstractmethod + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + raise NotImplementedError() + + @abstractmethod + def read_parent_device_register( + self, + spectrum_register: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> int: + raise NotImplementedError() + + +class SpectrumAnalogChannelInterface(SpectrumChannelInterface[SpectrumAnalogChannelName], ABC): + """Defines the common public interface for control of the analog channels of Digitiser and AWG devices. All + properties are read-only and must be set with their respective setter methods.""" + + pass + + +class SpectrumIOLineInterface(SpectrumChannelInterface[SpectrumIOLineName], ABC): + """Defines the common public interface for control of the Multipurpose IO Lines of Digitiser and AWG devices. All + properties are read-only and must be set with their respective setter methods.""" + + @property + @abstractmethod + def mode(self) -> IOLineMode: + """Returns the current mode of the IO Line.""" + raise NotImplementedError() + + @abstractmethod + def set_mode(self, mode: IOLineMode) -> None: + """Sets the current mode of the IO Line""" + raise NotImplementedError() + + @property + @abstractmethod + def pulse_generator(self) -> PulseGeneratorInterface: + """Gets the IO line's pulse generator.""" + raise NotImplementedError() diff --git a/src/spectrumdevice/devices/abstract_device/interfaces.py b/src/spectrumdevice/devices/abstract_device/device_interface.py similarity index 72% rename from src/spectrumdevice/devices/abstract_device/interfaces.py rename to src/spectrumdevice/devices/abstract_device/device_interface.py index 0ae4f6d..2febe12 100644 --- a/src/spectrumdevice/devices/abstract_device/interfaces.py +++ b/src/spectrumdevice/devices/abstract_device/device_interface.py @@ -1,67 +1,31 @@ -"""Defines a common public interface for controlling all Spectrum devices and their channels.""" - -# Christian Baker, King's College London -# Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London -# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. - from abc import ABC, abstractmethod from typing import List, Optional, Sequence, Tuple, TypeVar, Generic +from spectrumdevice.devices.abstract_device.channel_interfaces import ( + SpectrumAnalogChannelInterface, + SpectrumIOLineInterface, +) from spectrumdevice.settings import ( AdvancedCardFeature, AvailableIOModes, CardFeature, ClockMode, - ExternalTriggerMode, DEVICE_STATUS_TYPE, + ExternalTriggerMode, ModelNumber, SpectrumRegisterLength, TransferBuffer, TriggerSettings, TriggerSource, - IOLineMode, ) from spectrumdevice.settings.card_dependent_properties import CardType -from spectrumdevice.settings.channel import SpectrumAnalogChannelName, SpectrumChannelName -from spectrumdevice.settings.io_lines import SpectrumIOLineName - -ChannelNameType = TypeVar("ChannelNameType", bound=SpectrumChannelName) - - -class SpectrumChannelInterface(Generic[ChannelNameType], ABC): - """Defines the common public interface for control of the channels of Digitiser and AWG devices including - Multipurpose IO Lines. All properties are read-only and must be set with their respective setter methods.""" - - @property - @abstractmethod - def name(self) -> ChannelNameType: - raise NotImplementedError -class SpectrumAnalogChannelInterface(SpectrumChannelInterface[SpectrumAnalogChannelName], ABC): - """Defines the common public interface for control of the analog channels of Digitiser and AWG devices. All - properties are read-only and must be set with their respective setter methods.""" - - pass - - -class SpectrumIOLineInterface(SpectrumChannelInterface[SpectrumIOLineName], ABC): - """Defines the common public interface for control of the Multipurpose IO Lines of Digitiser and AWG devices. All - properties are read-only and must be set with their respective setter methods.""" - - @property - @abstractmethod - def mode(self) -> IOLineMode: - """Returns the current mode of the IO Line.""" - raise NotImplementedError() - - @abstractmethod - def set_mode(self, mode: IOLineMode) -> None: - """Sets the current mode of the IO Line""" - raise NotImplementedError() +AnalogChannelInterfaceType = TypeVar("AnalogChannelInterfaceType", bound=SpectrumAnalogChannelInterface) +IOLineInterfaceType = TypeVar("IOLineInterfaceType", bound=SpectrumIOLineInterface) -class SpectrumDeviceInterface(ABC): +class SpectrumDeviceInterface(Generic[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Defines the common public interface for control of all digitiser and AWG devices, be they StarHub composite devices (e.g. the NetBox) or individual cards. All properties are read-only and must be set with their respective setter methods.""" @@ -118,11 +82,11 @@ def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = No @property @abstractmethod - def analog_channels(self) -> Sequence[SpectrumAnalogChannelInterface]: + def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: raise NotImplementedError() @property - def io_lines(self) -> Sequence[SpectrumIOLineInterface]: + def io_lines(self) -> Sequence[IOLineInterfaceType]: raise NotImplementedError() @property diff --git a/src/spectrumdevice/devices/awg/awg_card.py b/src/spectrumdevice/devices/awg/awg_card.py index 14b4ff1..7fdfaa1 100644 --- a/src/spectrumdevice/devices/awg/awg_card.py +++ b/src/spectrumdevice/devices/awg/awg_card.py @@ -51,8 +51,8 @@ def transfer_waveform(self, waveform: NDArray[int16]) -> None: remainder = len(waveform) % step_size if remainder > 0: logger.warning( - "Waveform length is not a multiple of 8 samples. Waveform in card memory will be zero-padded" - " to the next multiple of 8." + "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be " + "zero-padded to the next multiple of 8." ) coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) diff --git a/src/spectrumdevice/devices/awg/awg_interface.py b/src/spectrumdevice/devices/awg/awg_interface.py index 15c83e1..d63b938 100644 --- a/src/spectrumdevice/devices/awg/awg_interface.py +++ b/src/spectrumdevice/devices/awg/awg_interface.py @@ -3,8 +3,11 @@ from numpy import int16 from numpy.typing import NDArray -from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface -from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface +from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.channel_interfaces import ( + SpectrumAnalogChannelInterface, + SpectrumIOLineInterface, +) from spectrumdevice.settings.channel import OutputChannelFilter, OutputChannelStopLevelMode from spectrumdevice.settings.device_modes import GenerationMode from spectrumdevice.settings.io_lines import DigOutIOLineModeSettings diff --git a/src/spectrumdevice/devices/awg/synthesis.py b/src/spectrumdevice/devices/awg/synthesis.py new file mode 100644 index 0000000..7be57e7 --- /dev/null +++ b/src/spectrumdevice/devices/awg/synthesis.py @@ -0,0 +1,14 @@ +from numpy import float_, iinfo, issubdtype, signedinteger, pi, sin, linspace, int_ +from numpy.typing import NDArray + + +def make_full_scale_sine_waveform( + frequency_in_hz: float, sample_rate_hz: int, num_cycles: float, dtype: type +) -> tuple[NDArray[float_], NDArray[int_]]: + if not issubdtype(dtype, signedinteger): + raise ValueError("dtype must be a signed integer type") + + full_scale_max_value = iinfo(dtype).max + duration = num_cycles / frequency_in_hz + t = linspace(0, duration, int(duration * sample_rate_hz + 1)) + return t, (sin(2 * pi * frequency_in_hz * t) * full_scale_max_value).astype(dtype) diff --git a/src/spectrumdevice/devices/digitiser/digitiser_interface.py b/src/spectrumdevice/devices/digitiser/digitiser_interface.py index 23a880b..8dd8df7 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_interface.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_interface.py @@ -11,8 +11,11 @@ from numpy import float_, ndarray from numpy.typing import NDArray -from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface -from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface +from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.channel_interfaces import ( + SpectrumAnalogChannelInterface, + SpectrumIOLineInterface, +) from spectrumdevice.settings import AcquisitionMode, AcquisitionSettings from spectrumdevice import Measurement from spectrumdevice.settings.channel import InputImpedance, InputCoupling, InputPath diff --git a/src/spectrumdevice/devices/mocks/__init__.py b/src/spectrumdevice/devices/mocks/__init__.py index 8bde001..bd08046 100644 --- a/src/spectrumdevice/devices/mocks/__init__.py +++ b/src/spectrumdevice/devices/mocks/__init__.py @@ -23,7 +23,7 @@ SpectrumNoTransferBufferDefined, SpectrumSettingsMismatchError, ) -from spectrumdevice.settings import ModelNumber, TransferBuffer +from spectrumdevice.settings import AdvancedCardFeature, CardFeature, ModelNumber, TransferBuffer from spectrumdevice.settings.card_dependent_properties import CardType from spectrumdevice.settings.device_modes import AcquisitionMode @@ -47,6 +47,8 @@ def __init__( mock_source_frame_rate_hz: float, num_modules: int, num_channels_per_module: int, + card_features: Optional[list[CardFeature]] = None, + advanced_card_features: Optional[list[AdvancedCardFeature]] = None, ): """ Args: @@ -60,7 +62,11 @@ def __init__( modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. + card_features (list[CardFeature]): List of available features of the mock device + advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device + """ + super().__init__( device_number=device_number, model=model, @@ -68,6 +74,8 @@ def __init__( num_modules=num_modules, num_channels_per_module=num_channels_per_module, card_type=CardType.SPCM_TYPE_AI, + card_features=card_features if card_features is not None else [], + advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode @@ -154,7 +162,15 @@ def wait_for_acquisition_to_complete(self) -> None: class MockSpectrumAWGCard(MockAbstractSpectrumAWG, MockAbstractSpectrumCard, SpectrumAWGCard): - def __init__(self, device_number: int, model: ModelNumber, num_modules: int, num_channels_per_module: int) -> None: + def __init__( + self, + device_number: int, + model: ModelNumber, + num_modules: int, + num_channels_per_module: int, + card_features: Optional[list[CardFeature]], + advanced_card_features: Optional[list[AdvancedCardFeature]], + ) -> None: """ Args: device_number (int): The index of the mock device to create. Used to create a name for the device which is @@ -165,6 +181,8 @@ def __init__(self, device_number: int, model: ModelNumber, num_modules: int, num modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. + card_features (list[CardFeature]): List of available features of the mock device + advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device """ super().__init__( card_type=CardType.SPCM_TYPE_AO, @@ -172,6 +190,8 @@ def __init__(self, device_number: int, model: ModelNumber, num_modules: int, num model=model, num_modules=num_modules, num_channels_per_module=num_channels_per_module, + card_features=card_features if card_features is not None else [], + advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string) diff --git a/src/spectrumdevice/devices/mocks/mock_abstract_devices.py b/src/spectrumdevice/devices/mocks/mock_abstract_devices.py index 0a11858..b5e55bf 100644 --- a/src/spectrumdevice/devices/mocks/mock_abstract_devices.py +++ b/src/spectrumdevice/devices/mocks/mock_abstract_devices.py @@ -5,12 +5,12 @@ # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. from abc import ABC +from functools import reduce +from operator import or_ from threading import Event, Lock, Thread from typing import Any, Dict, Optional, Union, cast from spectrum_gmbh.regs import ( - SPCM_FEAT_EXTFW_SEGSTAT, - SPCM_FEAT_MULTI, SPCM_X0_AVAILMODES, SPCM_X1_AVAILMODES, SPCM_X2_AVAILMODES, @@ -29,12 +29,85 @@ SPC_MIINST_MODULES, SPC_MIINST_CHPERMODULE, ) + +from spectrum_gmbh.py_header.regs import ( + SPCM_PULSEGEN_MODE_GATED, + SPC_AMP0, + SPC_AMP1, + SPC_AMP10, + SPC_AMP11, + SPC_AMP12, + SPC_AMP13, + SPC_AMP14, + SPC_AMP15, + SPC_AMP2, + SPC_AMP3, + SPC_AMP4, + SPC_AMP5, + SPC_AMP6, + SPC_AMP7, + SPC_AMP8, + SPC_AMP9, + SPC_OFFS0, + SPC_OFFS1, + SPC_OFFS10, + SPC_OFFS11, + SPC_OFFS12, + SPC_OFFS13, + SPC_OFFS14, + SPC_OFFS15, + SPC_OFFS2, + SPC_OFFS3, + SPC_OFFS4, + SPC_OFFS5, + SPC_OFFS6, + SPC_OFFS7, + SPC_OFFS8, + SPC_OFFS9, + SPC_XIO_PULSEGEN0_CONFIG, + SPC_XIO_PULSEGEN0_HIGH, + SPC_XIO_PULSEGEN0_LEN, + SPC_XIO_PULSEGEN0_LOOPS, + SPC_XIO_PULSEGEN0_MODE, + SPC_XIO_PULSEGEN1_CONFIG, + SPC_XIO_PULSEGEN1_HIGH, + SPC_XIO_PULSEGEN1_LEN, + SPC_XIO_PULSEGEN1_LOOPS, + SPC_XIO_PULSEGEN1_MODE, + SPC_XIO_PULSEGEN2_CONFIG, + SPC_XIO_PULSEGEN2_HIGH, + SPC_XIO_PULSEGEN2_LEN, + SPC_XIO_PULSEGEN2_LOOPS, + SPC_XIO_PULSEGEN2_MODE, + SPC_XIO_PULSEGEN3_CONFIG, + SPC_XIO_PULSEGEN3_HIGH, + SPC_XIO_PULSEGEN3_LEN, + SPC_XIO_PULSEGEN3_LOOPS, + SPC_XIO_PULSEGEN3_MODE, + SPC_XIO_PULSEGEN_AVAILHIGH_MAX, + SPC_XIO_PULSEGEN_AVAILHIGH_MIN, + SPC_XIO_PULSEGEN_AVAILHIGH_STEP, + SPC_XIO_PULSEGEN_AVAILLEN_MAX, + SPC_XIO_PULSEGEN_AVAILLEN_MIN, + SPC_XIO_PULSEGEN_AVAILLEN_STEP, + SPC_XIO_PULSEGEN_AVAILLOOPS_MAX, + SPC_XIO_PULSEGEN_AVAILLOOPS_MIN, + SPC_XIO_PULSEGEN_AVAILLOOPS_STEP, + SPC_XIO_PULSEGEN_CLOCK, + SPC_XIO_PULSEGEN_ENABLE, +) from spectrumdevice.devices.abstract_device import AbstractSpectrumDevice, AbstractSpectrumCard, AbstractSpectrumStarHub from spectrumdevice.devices.awg.abstract_spectrum_awg import AbstractSpectrumAWG from spectrumdevice.devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser from spectrumdevice.devices.mocks.mock_waveform_source import mock_waveform_source_factory -from spectrumdevice.exceptions import SpectrumDeviceNotConnected -from spectrumdevice.settings import AcquisitionMode, ModelNumber, SpectrumRegisterLength +from spectrumdevice.exceptions import MockRegisterNotImplemented, SpectrumDeviceNotConnected +from spectrumdevice.settings import ( + AcquisitionMode, + AdvancedCardFeature, + CardFeature, + ModelNumber, + SpectrumRegisterLength, +) from spectrumdevice.settings.card_dependent_properties import CardType from spectrumdevice.settings.device_modes import GenerationMode @@ -93,8 +166,9 @@ def read_spectrum_device_register( if spectrum_register in self._param_dict: return self._param_dict[spectrum_register] else: - self._param_dict[spectrum_register] = -1 - return -1 + raise MockRegisterNotImplemented( + f"Register {spectrum_register} has not been implemented in the mock device." + ) else: raise SpectrumDeviceNotConnected("Mock device has been disconnected.") @@ -112,11 +186,15 @@ def __init__( mode: Union[AcquisitionMode, GenerationMode], num_modules: int, num_channels_per_module: int, + card_features: list[CardFeature], + advanced_card_features: list[AdvancedCardFeature], **kwargs: Any, ) -> None: param_dict: dict[int, int] = {} - param_dict[SPC_PCIFEATURES] = SPCM_FEAT_MULTI - param_dict[SPC_PCIEXTFEATURES] = SPCM_FEAT_EXTFW_SEGSTAT + param_dict[SPC_PCIFEATURES] = reduce(or_, [f.value for f in card_features]) if card_features else 0 + param_dict[SPC_PCIEXTFEATURES] = ( + reduce(or_, [f.value for f in advanced_card_features]) if advanced_card_features else 0 + ) param_dict[SPCM_X0_AVAILMODES] = SPCM_XMODE_DISABLE param_dict[SPCM_X1_AVAILMODES] = SPCM_XMODE_DISABLE param_dict[SPCM_X2_AVAILMODES] = SPCM_XMODE_DISABLE @@ -131,6 +209,83 @@ def __init__( param_dict[SPC_MIINST_CHPERMODULE] = num_channels_per_module param_dict[SPC_MIINST_BYTESPERSAMPLE] = 2 param_dict[SPC_MIINST_MAXADCVALUE] = 128 + # Pulse generation: + param_dict[SPC_XIO_PULSEGEN_CLOCK] = 1000 + param_dict[SPC_XIO_PULSEGEN_ENABLE] = 0 + param_dict[SPC_XIO_PULSEGEN0_CONFIG] = 0 + param_dict[SPC_XIO_PULSEGEN1_CONFIG] = 0 + param_dict[SPC_XIO_PULSEGEN2_CONFIG] = 0 + param_dict[SPC_XIO_PULSEGEN3_CONFIG] = 0 + param_dict[SPC_XIO_PULSEGEN0_MODE] = SPCM_PULSEGEN_MODE_GATED + param_dict[SPC_XIO_PULSEGEN1_MODE] = SPCM_PULSEGEN_MODE_GATED + param_dict[SPC_XIO_PULSEGEN2_MODE] = SPCM_PULSEGEN_MODE_GATED + param_dict[SPC_XIO_PULSEGEN3_MODE] = SPCM_PULSEGEN_MODE_GATED + # ...pulse period + param_dict[SPC_XIO_PULSEGEN_AVAILLEN_MIN] = 2 + param_dict[SPC_XIO_PULSEGEN_AVAILLEN_MAX] = 1000 + param_dict[SPC_XIO_PULSEGEN_AVAILLEN_STEP] = 2 + param_dict[SPC_XIO_PULSEGEN0_LEN] = 2 + param_dict[SPC_XIO_PULSEGEN1_LEN] = 2 + param_dict[SPC_XIO_PULSEGEN2_LEN] = 2 + param_dict[SPC_XIO_PULSEGEN3_LEN] = 2 + # ...pulse high voltage duration + param_dict[SPC_XIO_PULSEGEN_AVAILHIGH_MIN] = 1 + param_dict[SPC_XIO_PULSEGEN_AVAILHIGH_MAX] = 500 + param_dict[SPC_XIO_PULSEGEN_AVAILHIGH_STEP] = 1 + param_dict[SPC_XIO_PULSEGEN0_HIGH] = 1 + param_dict[SPC_XIO_PULSEGEN1_HIGH] = 1 + param_dict[SPC_XIO_PULSEGEN2_HIGH] = 1 + param_dict[SPC_XIO_PULSEGEN3_HIGH] = 1 + # ...number of pulses + param_dict[SPC_XIO_PULSEGEN_AVAILLOOPS_MIN] = 1 + param_dict[SPC_XIO_PULSEGEN_AVAILLOOPS_MAX] = 1000 + param_dict[SPC_XIO_PULSEGEN_AVAILLOOPS_STEP] = 1 + param_dict[SPC_XIO_PULSEGEN0_LOOPS] = 0 + param_dict[SPC_XIO_PULSEGEN1_LOOPS] = 0 + param_dict[SPC_XIO_PULSEGEN2_LOOPS] = 0 + param_dict[SPC_XIO_PULSEGEN3_LOOPS] = 0 + # ...trigger delay + param_dict[602007] = 0 # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs for some reason + param_dict[602008] = 1000000 # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs for some reason + param_dict[602009] = 1 # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs for some reason + param_dict[601003] = 0 # SPC_XIO_PULSEGEN0_DELAY not in regs for some reason + param_dict[601103] = 0 # SPC_XIO_PULSEGEN1_DELAY not in regs for some reason + param_dict[601203] = 0 # SPC_XIO_PULSEGEN2_DELAY not in regs for some reason + param_dict[601303] = 0 # SPC_XIO_PULSEGEN3_DELAY not in regs for some reason + # Channel settings + param_dict[SPC_AMP0] = 200 + param_dict[SPC_AMP1] = 200 + param_dict[SPC_AMP2] = 200 + param_dict[SPC_AMP3] = 200 + param_dict[SPC_AMP4] = 200 + param_dict[SPC_AMP5] = 200 + param_dict[SPC_AMP6] = 200 + param_dict[SPC_AMP7] = 200 + param_dict[SPC_AMP8] = 200 + param_dict[SPC_AMP9] = 200 + param_dict[SPC_AMP10] = 200 + param_dict[SPC_AMP11] = 200 + param_dict[SPC_AMP12] = 200 + param_dict[SPC_AMP13] = 200 + param_dict[SPC_AMP14] = 200 + param_dict[SPC_AMP15] = 200 + param_dict[SPC_OFFS0] = 0 + param_dict[SPC_OFFS1] = 0 + param_dict[SPC_OFFS2] = 0 + param_dict[SPC_OFFS3] = 0 + param_dict[SPC_OFFS4] = 0 + param_dict[SPC_OFFS5] = 0 + param_dict[SPC_OFFS6] = 0 + param_dict[SPC_OFFS7] = 0 + param_dict[SPC_OFFS8] = 0 + param_dict[SPC_OFFS9] = 0 + param_dict[SPC_OFFS10] = 0 + param_dict[SPC_OFFS11] = 0 + param_dict[SPC_OFFS12] = 0 + param_dict[SPC_OFFS13] = 0 + param_dict[SPC_OFFS14] = 0 + param_dict[SPC_OFFS15] = 0 + self._buffer_lock = Lock() self._enabled_channels = [0] super().__init__( diff --git a/src/spectrumdevice/exceptions.py b/src/spectrumdevice/exceptions.py index 1f38465..40512de 100644 --- a/src/spectrumdevice/exceptions.py +++ b/src/spectrumdevice/exceptions.py @@ -1,4 +1,6 @@ """Defines exceptions raised by spectrumdevice device classes.""" +from typing import Optional + from spectrumdevice.settings.card_dependent_properties import CARD_TYPE_DESCRIPTIONS, CardType @@ -46,10 +48,11 @@ class SpectrumApiCallFailed(IOError): def __init__( self, call_description: str, - error_code: int, + error_code: Optional[int] = None, message: str = "Unknown", ) -> None: - super().__init__(f'"{call_description}" failed with "{message}" ({self.error_code_string(error_code)})') + code_suffix = ({self.error_code_string(error_code)}) if error_code is not None else "" + super().__init__(f'"{call_description}" failed with "{message}" {code_suffix}') @classmethod def error_code_string(cls, error_code: int) -> str: @@ -60,6 +63,14 @@ class SpectrumFIFOModeHardwareBufferOverrun(SpectrumApiCallFailed): pass +class SpectrumFeatureNotSupportedByCard(SpectrumApiCallFailed): + pass + + +class SpectrumParameterValueOutOfRange(SpectrumApiCallFailed): + pass + + class SpectrumWrongAcquisitionMode(IOError): def __init__(self, msg: str) -> None: super().__init__(f"Incorrect acquisition mode: {msg}") @@ -97,5 +108,15 @@ class SpectrumCardIsNotAnAWG(SpectrumWrongCardType): pass -class SpectrumFeatureNotSupportedByCard(IOError): +class SpectrumInvalidParameterValue(ValueError): + def __init__( + self, param_name: str, requested_value: float, param_min: float, param_max: float, param_step: float + ) -> None: + super().__init__( + f"The requested {param_name} value of {requested_value} is invalid. It must be between " + f"{param_min} and {param_max} inclusive, and a multiple of {param_step}." + ) + + +class MockRegisterNotImplemented(ValueError): pass diff --git a/src/spectrumdevice/features/__init__.py b/src/spectrumdevice/features/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spectrumdevice/features/pulse_generator/__init__.py b/src/spectrumdevice/features/pulse_generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spectrumdevice/features/pulse_generator/interfaces.py b/src/spectrumdevice/features/pulse_generator/interfaces.py new file mode 100644 index 0000000..fb8f10d --- /dev/null +++ b/src/spectrumdevice/features/pulse_generator/interfaces.py @@ -0,0 +1,253 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from spectrumdevice.settings import SpectrumRegisterLength +from spectrumdevice.settings.pulse_generator import ( + PulseGeneratorMultiplexerTriggerSource, + PulseGeneratorOutputSettings, + PulseGeneratorTriggerDetectionMode, + PulseGeneratorTriggerMode, + PulseGeneratorTriggerSettings, +) + +MultiplexerTriggerSourceTypeVar = TypeVar( + "MultiplexerTriggerSourceTypeVar", bound=PulseGeneratorMultiplexerTriggerSource +) + + +class PulseGeneratorMultiplexerInterface(Generic[MultiplexerTriggerSourceTypeVar], ABC): + @property + @abstractmethod + def number(self) -> int: + raise NotImplementedError() + + def read_parent_device_register( + self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO + ) -> int: + raise NotImplementedError() + + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + raise NotImplementedError() + + @property + def output_inversion(self) -> bool: + raise NotImplementedError() + + def set_output_inversion(self, inverted: bool) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def trigger_source(self) -> MultiplexerTriggerSourceTypeVar: + raise NotImplementedError() + + @abstractmethod + def set_trigger_source(self, trigger_source: MultiplexerTriggerSourceTypeVar) -> None: + raise NotImplementedError() + + +class PulseGeneratorInterface(ABC): + @abstractmethod + def configure_output( + self, settings: PulseGeneratorOutputSettings, coerce: bool = True + ) -> PulseGeneratorOutputSettings: + raise NotImplementedError() + + @abstractmethod + def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: + raise NotImplementedError() + + @abstractmethod + def force_trigger(self) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def number(self) -> int: + raise NotImplementedError() + + @abstractmethod + def read_parent_device_register( + self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO + ) -> int: + raise NotImplementedError() + + @abstractmethod + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def multiplexer_1(self) -> PulseGeneratorMultiplexerInterface: + raise NotImplementedError() + + @property + @abstractmethod + def multiplexer_2(self) -> PulseGeneratorMultiplexerInterface: + raise NotImplementedError() + + @property + @abstractmethod + def clock_rate_in_hz(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def clock_period_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def enabled(self) -> bool: + raise NotImplementedError() + + @abstractmethod + def enable(self) -> None: + raise NotImplementedError() + + @abstractmethod + def disable(self) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def output_inversion(self) -> bool: + raise NotImplementedError() + + @abstractmethod + def set_output_inversion(self, inverted: bool) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode: + raise NotImplementedError() + + @abstractmethod + def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def trigger_mode(self) -> PulseGeneratorTriggerMode: + raise NotImplementedError() + + @abstractmethod + def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def min_allowed_period_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def max_allowed_period_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def allowed_period_step_size_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def period_in_seconds(self) -> float: + raise NotImplementedError() + + @abstractmethod + def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def min_allowed_high_voltage_duration_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def max_allowed_high_voltage_duration_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def allowed_high_voltage_duration_step_size_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def duration_of_high_voltage_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def duration_of_low_voltage_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def duty_cycle(self) -> float: + raise NotImplementedError() + + @abstractmethod + def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def min_allowed_pulses(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def max_allowed_pulses(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def allowed_num_pulses_step_size(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def num_pulses(self) -> int: + raise NotImplementedError() + + @abstractmethod + def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def min_allowed_delay_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def max_allowed_delay_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def allowed_delay_step_size_in_seconds(self) -> float: + raise NotImplementedError() + + @property + @abstractmethod + def delay_in_seconds(self) -> float: + raise NotImplementedError() + + @abstractmethod + def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: + raise NotImplementedError() diff --git a/src/spectrumdevice/features/pulse_generator/multiplexer.py b/src/spectrumdevice/features/pulse_generator/multiplexer.py new file mode 100644 index 0000000..823d3e3 --- /dev/null +++ b/src/spectrumdevice/features/pulse_generator/multiplexer.py @@ -0,0 +1,88 @@ +from abc import ABC + +from spectrumdevice.features.pulse_generator.interfaces import ( + MultiplexerTriggerSourceTypeVar, + PulseGeneratorInterface, + PulseGeneratorMultiplexerInterface, +) +from spectrumdevice.settings import SpectrumRegisterLength +from spectrumdevice.settings.pulse_generator import ( + PULSE_GEN_CONFIG_COMMANDS, + PULSE_GEN_MUX1_COMMANDS, + PULSE_GEN_MUX2_COMMANDS, + PULSE_GEN_MUX_INVERSION_COMMANDS, + PulseGeneratorMultiplexer1TriggerSource, + PulseGeneratorMultiplexer2TriggerSource, + decode_pulse_gen_config, +) +from spectrumdevice.spectrum_wrapper import toggle_bitmap_value + + +class PulseGeneratorMultiplexer(PulseGeneratorMultiplexerInterface[MultiplexerTriggerSourceTypeVar], ABC): + def __init__(self, parent: PulseGeneratorInterface) -> None: + self._parent_pulse_gen = parent + + def read_parent_device_register( + self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO + ) -> int: + return self._parent_pulse_gen.read_parent_device_register(spectrum_register, length) + + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + self._parent_pulse_gen.write_to_parent_device_register(spectrum_register, value, length) + + @property + def output_inversion(self) -> bool: + currently_enabled_config_options = decode_pulse_gen_config( + self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._parent_pulse_gen.number]) + ) + return PULSE_GEN_MUX_INVERSION_COMMANDS[self.number] in currently_enabled_config_options + + def set_output_inversion(self, inverted: bool) -> None: + current_register_value = self.read_parent_device_register( + PULSE_GEN_CONFIG_COMMANDS[self._parent_pulse_gen.number] + ) + new_register_value = toggle_bitmap_value( + current_register_value, PULSE_GEN_MUX_INVERSION_COMMANDS[self.number], inverted + ) + self.write_to_parent_device_register( + PULSE_GEN_CONFIG_COMMANDS[self._parent_pulse_gen.number], new_register_value + ) + + +class PulseGeneratorMultiplexer1(PulseGeneratorMultiplexer[PulseGeneratorMultiplexer1TriggerSource]): + @property + def number(self) -> int: + return 0 # use zero-indexed value for use getting command from PULSE_GEN_MUX1_COMMANDS tuple + + @property + def trigger_source(self) -> PulseGeneratorMultiplexer1TriggerSource: + return PulseGeneratorMultiplexer1TriggerSource( + self.read_parent_device_register(PULSE_GEN_MUX1_COMMANDS[self._parent_pulse_gen.number]) + ) + + def set_trigger_source(self, trigger_source: PulseGeneratorMultiplexer1TriggerSource) -> None: + self.write_to_parent_device_register( + PULSE_GEN_MUX1_COMMANDS[self._parent_pulse_gen.number], trigger_source.value + ) + + +class PulseGeneratorMultiplexer2(PulseGeneratorMultiplexer[PulseGeneratorMultiplexer2TriggerSource]): + @property + def number(self) -> int: + return 1 # use zero-indexed value for use getting command from PULSE_GEN_MUX1_COMMANDS tuple + + @property + def trigger_source(self) -> PulseGeneratorMultiplexer2TriggerSource: + return PulseGeneratorMultiplexer2TriggerSource( + self.read_parent_device_register(PULSE_GEN_MUX2_COMMANDS[self._parent_pulse_gen.number]) + ) + + def set_trigger_source(self, trigger_source: PulseGeneratorMultiplexer2TriggerSource) -> None: + self.write_to_parent_device_register( + PULSE_GEN_MUX2_COMMANDS[self._parent_pulse_gen.number], trigger_source.value + ) diff --git a/src/spectrumdevice/features/pulse_generator/pulse_generator.py b/src/spectrumdevice/features/pulse_generator/pulse_generator.py new file mode 100644 index 0000000..3c965a3 --- /dev/null +++ b/src/spectrumdevice/features/pulse_generator/pulse_generator.py @@ -0,0 +1,415 @@ +import numpy as np +from numpy import clip, int16, iinfo + +from spectrum_gmbh.py_header.regs import ( + SPCM_PULSEGEN_CONFIG_INVERT, + SPC_PCIEXTFEATURES, + SPC_XIO_PULSEGEN_AVAILLEN_MAX, + SPC_XIO_PULSEGEN_AVAILLEN_MIN, + SPC_XIO_PULSEGEN_AVAILLEN_STEP, + SPC_XIO_PULSEGEN_AVAILLOOPS_MAX, + SPC_XIO_PULSEGEN_AVAILLOOPS_MIN, + SPC_XIO_PULSEGEN_AVAILLOOPS_STEP, + SPC_XIO_PULSEGEN_CLOCK, + SPC_XIO_PULSEGEN_ENABLE, + SPC_XIO_PULSEGEN_COMMAND, + SPCM_PULSEGEN_CMD_FORCE, + SPC_M2CMD, + M2CMD_CARD_WRITESETUP, +) +from spectrumdevice.devices.abstract_device.channel_interfaces import SpectrumIOLineInterface +from spectrumdevice.exceptions import ( + SpectrumFeatureNotSupportedByCard, + SpectrumInvalidParameterValue, + SpectrumIOError, +) +from spectrumdevice.features.pulse_generator.interfaces import ( + PulseGeneratorInterface, +) +from spectrumdevice.features.pulse_generator.multiplexer import PulseGeneratorMultiplexer1, PulseGeneratorMultiplexer2 +from spectrumdevice.settings import AdvancedCardFeature, SpectrumRegisterLength +from spectrumdevice.settings.card_features import decode_advanced_card_features +from spectrumdevice.settings.pulse_generator import ( + PULSE_GEN_CONFIG_COMMANDS, + PULSE_GEN_DELAY_COMMANDS, + PULSE_GEN_ENABLE_COMMANDS, + PULSE_GEN_PULSE_PERIOD_COMMANDS, + PULSE_GEN_HIGH_DURATION_COMMANDS, + PULSE_GEN_NUM_REPEATS_COMMANDS, + PULSE_GEN_TRIGGER_MODE_COMMANDS, + PulseGeneratorOutputSettings, + PulseGeneratorTriggerDetectionMode, + PulseGeneratorTriggerMode, + PulseGeneratorTriggerSettings, + decode_enabled_pulse_gens, + decode_pulse_gen_config, + PulseGeneratorMultiplexer2TriggerSource, +) +from spectrumdevice.spectrum_wrapper import toggle_bitmap_value + + +class PulseGenerator(PulseGeneratorInterface): + def __init__(self, parent: SpectrumIOLineInterface): + self._parent_io_line = parent + # last char of IO line name is IO line chanel number, which is used to set pulse generator number + self._number = int(parent.name.name[-1]) + available_advanced_features = decode_advanced_card_features( + self.read_parent_device_register(SPC_PCIEXTFEATURES) + ) + if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features: + raise SpectrumFeatureNotSupportedByCard( + call_description=self.__str__() + ".__init__()", + message="Pulse generator firmware option not installed on device.", + ) + self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self) + self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self) + + def configure_output( + self, settings: PulseGeneratorOutputSettings, coerce: bool = True + ) -> PulseGeneratorOutputSettings: + """Configure all pulse generator output settings at once. By default, all values are coerced to the + nearest values allowed by the hardware, and the coerced values are returned.""" + self.set_output_inversion(settings.output_inversion) + coerced_settings = PulseGeneratorOutputSettings( + period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce), + duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce), + num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce), + delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce), + output_inversion=settings.output_inversion, + ) + self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) + return coerced_settings + + def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: + self.set_trigger_mode(settings.trigger_mode) + self.set_trigger_detection_mode(settings.trigger_detection_mode) + self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source) + self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source) + self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion) + self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion) + self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) + + def force_trigger(self) -> None: + if ( + self._multiplexer_2.trigger_source + != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE + ): + raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'") + self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE) + + @property + def number(self) -> int: + return self._number + + @property + def multiplexer_1(self) -> PulseGeneratorMultiplexer1: + return self._multiplexer_1 + + @property + def multiplexer_2(self) -> PulseGeneratorMultiplexer2: + return self._multiplexer_2 + + def read_parent_device_register( + self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO + ) -> int: + return self._parent_io_line.read_parent_device_register(spectrum_register, length) + + def write_to_parent_device_register( + self, + spectrum_register: int, + value: int, + length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, + ) -> None: + self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length) + + def _convert_clock_cycles_to_seconds(self, clock_cycles: int) -> float: + return clock_cycles * self.clock_period_in_seconds + + def _convert_seconds_to_clock_cycles(self, seconds: float) -> float: + # round to nearest milli-cycle to avoid floating point precision problems + return round(seconds * self.clock_rate_in_hz * 1e3) / 1e3 + + def _get_enabled_pulse_generator_ids(self) -> list[int]: + return decode_enabled_pulse_gens(self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)) + + @property + def clock_rate_in_hz(self) -> int: + return self.read_parent_device_register(SPC_XIO_PULSEGEN_CLOCK) + + @property + def clock_period_in_seconds(self) -> float: + return 1 / self.clock_rate_in_hz + + @property + def enabled(self) -> bool: + return PULSE_GEN_ENABLE_COMMANDS[self._number] in self._get_enabled_pulse_generator_ids() + + def enable(self) -> None: + current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) + new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True) + self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) + + def disable(self) -> None: + current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) + new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False) + self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) + + @property + def output_inversion(self) -> bool: + currently_enabled_config_options = decode_pulse_gen_config( + self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) + ) + return SPCM_PULSEGEN_CONFIG_INVERT in currently_enabled_config_options + + def set_output_inversion(self, inverted: bool) -> None: + current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) + new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted) + self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) + + @property + def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode: + currently_enabled_config_options = decode_pulse_gen_config( + self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) + ) + if PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value in currently_enabled_config_options: + return PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH + else: + return PulseGeneratorTriggerDetectionMode.RISING_EDGE + + def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: + current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) + high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value + new_register_value = toggle_bitmap_value( + current_register_value, + high_voltage_mode_value, + mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, + ) + self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) + + @property + def trigger_mode(self) -> PulseGeneratorTriggerMode: + return PulseGeneratorTriggerMode( + self.read_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number]) + ) + + def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: + self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value) + + @property + def min_allowed_period_in_seconds(self) -> float: + reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN) + reg_val = 0 if reg_val == -1 else reg_val + return self._convert_clock_cycles_to_seconds(reg_val) + + @property + def max_allowed_period_in_seconds(self) -> float: + reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX) + reg_val = iinfo(int16).max if reg_val == -1 else reg_val + return self._convert_clock_cycles_to_seconds(reg_val) + + @property + def _allowed_period_step_size_in_clock_cycles(self) -> int: + return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_STEP) + + @property + def allowed_period_step_size_in_seconds(self) -> float: + return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles) + + @property + def period_in_seconds(self) -> float: + return self._convert_clock_cycles_to_seconds( + self.read_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number]) + ) + + def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: + """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value + will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and + allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is + requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of + active channels and the sample rate.""" + period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period) + coerced_period = _coerce_fractional_value_to_allowed_integer( + period_in_clock_cycles, + int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)), + int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)), + self._allowed_period_step_size_in_clock_cycles, + ) + if not coerce and coerced_period != period_in_clock_cycles: + raise SpectrumInvalidParameterValue( + "pulse generator period", + period, + self.min_allowed_period_in_seconds, + self.max_allowed_period_in_seconds, + self.allowed_period_step_size_in_seconds, + ) + + self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period)) + return self._convert_clock_cycles_to_seconds(coerced_period) + + @property + def min_allowed_high_voltage_duration_in_seconds(self) -> float: + reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN) + reg_val = 0 if reg_val == -1 else reg_val + return self._convert_clock_cycles_to_seconds(reg_val) + + @property + def max_allowed_high_voltage_duration_in_seconds(self) -> float: + reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX) + reg_val = iinfo(int16).max if reg_val == -1 else reg_val + return self._convert_clock_cycles_to_seconds(reg_val) + + @property + def _allowed_high_voltage_duration_step_size_in_clock_cycles(self) -> int: + return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_STEP) + + @property + def allowed_high_voltage_duration_step_size_in_seconds(self) -> float: + return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles) + + @property + def duration_of_high_voltage_in_seconds(self) -> float: + return self._convert_clock_cycles_to_seconds( + self.read_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number]) + ) + + @property + def duration_of_low_voltage_in_seconds(self) -> float: + return self.period_in_seconds - self.duration_of_high_voltage_in_seconds + + @property + def duty_cycle(self) -> float: + return self.duration_of_high_voltage_in_seconds / self.period_in_seconds + + def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: + """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and + use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested + an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active + channels and the sample rate. + """ + requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles( + self.period_in_seconds * duty_cycle + ) + clipped_duration = _coerce_fractional_value_to_allowed_integer( + requested_high_v_duration_in_clock_cycles, + int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)), + int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)), + self._allowed_high_voltage_duration_step_size_in_clock_cycles, + ) + if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles: + raise SpectrumInvalidParameterValue( + "high-voltage duration", + self.period_in_seconds * duty_cycle, + self.min_allowed_high_voltage_duration_in_seconds, + self.max_allowed_high_voltage_duration_in_seconds, + self.allowed_high_voltage_duration_step_size_in_seconds, + ) + self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration) + return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds + + @property + def min_allowed_pulses(self) -> int: + return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MIN) + + @property + def max_allowed_pulses(self) -> int: + reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MAX) + # my card has this register set to -2, which I assume means no limit (can't work it out from the docs) + return reg_val if reg_val > 0 else iinfo(int16).max + + @property + def allowed_num_pulses_step_size(self) -> int: + return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_STEP) + + @property + def num_pulses(self) -> int: + """The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.""" + return self.read_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number]) + + def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: + """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated + continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, + max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a + SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.""" + + num_pulses = max(0, num_pulses) # make negative value 0 to enable continuous pulse generation + + coerced_num_pulses = _coerce_fractional_value_to_allowed_integer( + float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size + ) + + if not coerce and coerced_num_pulses != num_pulses: + raise SpectrumInvalidParameterValue( + "number of pulses", + num_pulses, + self.min_allowed_pulses, + self.max_allowed_pulses, + self.allowed_num_pulses_step_size, + ) + + self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses) + return coerced_num_pulses + + @property + def min_allowed_delay_in_seconds(self) -> float: + reg_value = self.read_parent_device_register(602007) # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs.py + reg_value = 0 if reg_value == -1 else reg_value + return self._convert_clock_cycles_to_seconds(reg_value) + + @property + def max_allowed_delay_in_seconds(self) -> float: + reg_value = self.read_parent_device_register(602008) # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs.py + reg_value = iinfo(int16).max if reg_value == -1 else reg_value + return self._convert_clock_cycles_to_seconds(reg_value) + + @property + def allowed_delay_step_size_in_seconds(self) -> float: + return self._convert_clock_cycles_to_seconds( + self.read_parent_device_register(602009) # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs.py + ) + + @property + def delay_in_seconds(self) -> float: + """The delay between the trigger and the first pulse transmission""" + return self._convert_clock_cycles_to_seconds( + self.read_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number]) + ) + + def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: + """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is + coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and + allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised + if the requested value is invalid.""" + + requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds) + clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer( + requested_delay_in_clock_cycles, + int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)), + int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)), + int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)), + ) + + if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles: + raise SpectrumInvalidParameterValue( + "delay in seconds", + requested_delay_in_clock_cycles, + self.min_allowed_delay_in_seconds, + self.max_allowed_delay_in_seconds, + self.allowed_delay_step_size_in_seconds, + ) + + self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles) + return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles) + + def __str__(self) -> str: + return f"Pulse generator {self._number} of {self._parent_io_line}." + + +def _coerce_fractional_value_to_allowed_integer( + fractional_value: float, min_allowed: int, max_allowed: int, step: int +) -> int: + coerced = int(round(fractional_value / step) * step) + if min_allowed == -1: + min_allowed = 0 + if max_allowed == -1: + max_allowed = np.iinfo(int16).max + return int(clip(coerced, min_allowed, max_allowed)) diff --git a/src/spectrumdevice/settings/__init__.py b/src/spectrumdevice/settings/__init__.py index 43fbe13..54d958d 100644 --- a/src/spectrumdevice/settings/__init__.py +++ b/src/spectrumdevice/settings/__init__.py @@ -45,7 +45,8 @@ @dataclass class TriggerSettings: - """A dataclass collecting all settings related to triggering. See Spectrum documentation.""" + """A dataclass collecting all settings related to triggering generation and acquisition. See Spectrum documentation. + Note that pulse generators have their own trigger options.""" trigger_sources: List[TriggerSource] """The trigger sources to enable""" diff --git a/src/spectrumdevice/settings/card_features.py b/src/spectrumdevice/settings/card_features.py index 113a6ce..866b116 100644 --- a/src/spectrumdevice/settings/card_features.py +++ b/src/spectrumdevice/settings/card_features.py @@ -31,6 +31,7 @@ SPCM_FEAT_EXTFW_SEGSTAT, SPCM_FEAT_EXTFW_SEGAVERAGE, SPCM_FEAT_EXTFW_BOXCAR, + SPCM_FEAT_EXTFW_PULSEGEN, ) # Devices return same value for SPCM_FEAT_STARHUB4, SPCM_FEAT_STARHUB4, SPCM_FEAT_STARHUB6_EXTM and @@ -80,7 +81,7 @@ class AdvancedCardFeature(Enum): SPCM_FEAT_EXTFW_SEGSTAT = SPCM_FEAT_EXTFW_SEGSTAT SPCM_FEAT_EXTFW_SEGAVERAGE = SPCM_FEAT_EXTFW_SEGAVERAGE SPCM_FEAT_EXTFW_BOXCAR = SPCM_FEAT_EXTFW_BOXCAR - SPCM_FEAT_EXTFW_PULSEGEN = 0x00000008 # not in regs.py for some reason. None of the AWG stuff seem to be, + SPCM_FEAT_EXTFW_PULSEGEN = SPCM_FEAT_EXTFW_PULSEGEN def decode_advanced_card_features(value: int) -> List[AdvancedCardFeature]: diff --git a/src/spectrumdevice/settings/io_lines.py b/src/spectrumdevice/settings/io_lines.py index e6185a4..83dd7fb 100644 --- a/src/spectrumdevice/settings/io_lines.py +++ b/src/spectrumdevice/settings/io_lines.py @@ -10,6 +10,7 @@ from enum import Enum from typing import List +from spectrumdevice.exceptions import SpectrumIOError from spectrumdevice.settings.channel import SpectrumChannelName from spectrumdevice.spectrum_wrapper import decode_bitmap_using_list_of_ints from spectrum_gmbh.regs import ( @@ -17,6 +18,7 @@ SPCM_XMODE_ASYNCIN, SPCM_XMODE_ASYNCOUT, SPCM_XMODE_DIGIN, + SPCM_XMODE_PULSEGEN, SPCM_XMODE_TRIGIN, SPCM_XMODE_DIGOUT, SPCM_XMODE_TRIGOUT, @@ -86,6 +88,7 @@ class IOLineMode(Enum): SPCM_XMODE_RUNSTATE = SPCM_XMODE_RUNSTATE SPCM_XMODE_ARMSTATE = SPCM_XMODE_ARMSTATE SPCM_XMODE_CONTOUTMARK = SPCM_XMODE_CONTOUTMARK + SPCM_XMODE_PULSEGEN = SPCM_XMODE_PULSEGEN IO_LINE_MODE_COMMANDS = ( @@ -144,6 +147,16 @@ def decode_available_io_modes(value: int) -> List[IOLineMode]: return [IOLineMode(found_value) for found_value in decode_bitmap_using_list_of_ints(value, possible_values)] +def decode_enabled_io_line_mode(value: int) -> IOLineMode: + """DigOutSourceChannel and DigOutSourceBit are bitmapped on to IOLine mode in the IO_LINE_MODE_COMMANDS register, + so need to extract only the IOLine mode bits for determining the currently enabled mode.""" + possible_values = [mode.value for mode in IOLineMode] + active_modes = [IOLineMode(found_value) for found_value in decode_bitmap_using_list_of_ints(value, possible_values)] + if len(active_modes) != 1: + raise SpectrumIOError("Could not read enabled IO line mode") + return active_modes[0] + + @dataclass class AvailableIOModes: """Stores a list of the available IOLineMode settings on each of the four I/O lines (X0, X1, X2 and X3) on a diff --git a/src/spectrumdevice/settings/pulse_generator.py b/src/spectrumdevice/settings/pulse_generator.py new file mode 100644 index 0000000..3705411 --- /dev/null +++ b/src/spectrumdevice/settings/pulse_generator.py @@ -0,0 +1,211 @@ +from dataclasses import dataclass +from enum import Enum + +from spectrum_gmbh.py_header.regs import ( + SPCM_PULSEGEN_CONFIG_HIGH, + SPCM_PULSEGEN_CONFIG_INVERT, + SPCM_PULSEGEN_CONFIG_MUX1_INVERT, + SPCM_PULSEGEN_CONFIG_MUX2_INVERT, + SPCM_PULSEGEN_ENABLE0, + SPCM_PULSEGEN_ENABLE1, + SPCM_PULSEGEN_ENABLE2, + SPCM_PULSEGEN_ENABLE3, + SPCM_PULSEGEN_MODE_GATED, + SPCM_PULSEGEN_MODE_SINGLESHOT, + SPCM_PULSEGEN_MODE_TRIGGERED, + SPCM_PULSEGEN_MUX1_SRC_ARM, + SPCM_PULSEGEN_MUX1_SRC_RUN, + SPCM_PULSEGEN_MUX1_SRC_UNUSED, + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN0, + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN1, + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN2, + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN3, + SPCM_PULSEGEN_MUX2_SRC_SOFTWARE, + SPCM_PULSEGEN_MUX2_SRC_UNUSED, + SPCM_PULSEGEN_MUX2_SRC_XIO0, + SPCM_PULSEGEN_MUX2_SRC_XIO1, + SPCM_PULSEGEN_MUX2_SRC_XIO2, + SPCM_PULSEGEN_MUX2_SRC_XIO3, + SPC_XIO_PULSEGEN0_CONFIG, + SPC_XIO_PULSEGEN0_HIGH, + SPC_XIO_PULSEGEN0_LEN, + SPC_XIO_PULSEGEN0_LOOPS, + SPC_XIO_PULSEGEN0_MODE, + SPC_XIO_PULSEGEN0_MUX1_SRC, + SPC_XIO_PULSEGEN0_MUX2_SRC, + SPC_XIO_PULSEGEN1_CONFIG, + SPC_XIO_PULSEGEN1_HIGH, + SPC_XIO_PULSEGEN1_LEN, + SPC_XIO_PULSEGEN1_LOOPS, + SPC_XIO_PULSEGEN1_MODE, + SPC_XIO_PULSEGEN1_MUX1_SRC, + SPC_XIO_PULSEGEN1_MUX2_SRC, + SPC_XIO_PULSEGEN2_CONFIG, + SPC_XIO_PULSEGEN2_HIGH, + SPC_XIO_PULSEGEN2_LEN, + SPC_XIO_PULSEGEN2_LOOPS, + SPC_XIO_PULSEGEN2_MODE, + SPC_XIO_PULSEGEN2_MUX1_SRC, + SPC_XIO_PULSEGEN2_MUX2_SRC, + SPC_XIO_PULSEGEN3_CONFIG, + SPC_XIO_PULSEGEN3_HIGH, + SPC_XIO_PULSEGEN3_LEN, + SPC_XIO_PULSEGEN3_LOOPS, + SPC_XIO_PULSEGEN3_MODE, + SPC_XIO_PULSEGEN3_MUX1_SRC, + SPC_XIO_PULSEGEN3_MUX2_SRC, +) +from spectrumdevice.spectrum_wrapper import decode_bitmap_using_list_of_ints + +PULSE_GEN_ENABLE_COMMANDS = (SPCM_PULSEGEN_ENABLE0, SPCM_PULSEGEN_ENABLE1, SPCM_PULSEGEN_ENABLE2, SPCM_PULSEGEN_ENABLE3) + + +def decode_enabled_pulse_gens(value: int) -> list[int]: + """Converts the integer value received by a Spectrum device when queried about its enabled pulse gens into a list of + ids of the enable pulse generators.""" + possible_values = [v for v in PULSE_GEN_ENABLE_COMMANDS] + return [found_value for found_value in decode_bitmap_using_list_of_ints(value, possible_values)] + + +class PulseGeneratorTriggerMode(Enum): + SPCM_PULSEGEN_MODE_GATED = SPCM_PULSEGEN_MODE_GATED + """Pulse generator will start if the trigger condition or “gate” is met and will stop, if either the gate becomes + inactive or the defined number of LOOPS have been generated. Will reset its loop counter, when the gate becomes LOW. + """ + SPCM_PULSEGEN_MODE_TRIGGERED = SPCM_PULSEGEN_MODE_TRIGGERED + """The pulse generator will start if the trigger condition is met and will replay the defined number of loops + before re-arm- ing itself and waiting for another trigger event. Changes in the trigger signal while replaying will + be ignored.""" + SPCM_PULSEGEN_MODE_SINGLESHOT = SPCM_PULSEGEN_MODE_SINGLESHOT + """The pulse generator will start if the trigger condition is met and will replay the defined number of loops once. + """ + + +PULSE_GEN_TRIGGER_MODE_COMMANDS = ( + SPC_XIO_PULSEGEN0_MODE, + SPC_XIO_PULSEGEN1_MODE, + SPC_XIO_PULSEGEN2_MODE, + SPC_XIO_PULSEGEN3_MODE, +) + + +class PulseGeneratorMultiplexerTriggerSource: + pass + + +class PulseGeneratorMultiplexer1TriggerSource(PulseGeneratorMultiplexerTriggerSource, Enum): + SPCM_PULSEGEN_MUX1_SRC_UNUSED = SPCM_PULSEGEN_MUX1_SRC_UNUSED + """Inputs of MUX1 are not used in creating the trigger condition and instead a static logic HIGH is used for MUX1. + """ + SPCM_PULSEGEN_MUX1_SRC_RUN = SPCM_PULSEGEN_MUX1_SRC_RUN + """This input of MUX1 reflects the current run state of the card. If acquisition/output is running the signal is + HIGH. If card has stopped the signal is LOW. The signal is identical to XIO output using SPCM_XMODE_RUNSTATE.""" + SPCM_PULSEGEN_MUX1_SRC_ARM = SPCM_PULSEGEN_MUX1_SRC_ARM + """This input of MUX1 reflects the current ARM state of the card. If the card is armed and ready to receive a + trigger the signal is HIGH. If the card isn’t running or the card is still acquiring pretrigger data or the trigger + has already been detected. the signal is LOW. The signal is identical to XIO output using SPCM_XMODE_ARMSTATE.""" + + +PULSE_GEN_MUX1_COMMANDS = ( + SPC_XIO_PULSEGEN0_MUX1_SRC, + SPC_XIO_PULSEGEN1_MUX1_SRC, + SPC_XIO_PULSEGEN2_MUX1_SRC, + SPC_XIO_PULSEGEN3_MUX1_SRC, +) + + +class PulseGeneratorMultiplexer2TriggerSource(PulseGeneratorMultiplexerTriggerSource, Enum): + SPCM_PULSEGEN_MUX2_SRC_UNUSED = SPCM_PULSEGEN_MUX2_SRC_UNUSED + SPCM_PULSEGEN_MUX2_SRC_SOFTWARE = SPCM_PULSEGEN_MUX2_SRC_SOFTWARE + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN0 = SPCM_PULSEGEN_MUX2_SRC_PULSEGEN0 + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN1 = SPCM_PULSEGEN_MUX2_SRC_PULSEGEN1 + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN2 = SPCM_PULSEGEN_MUX2_SRC_PULSEGEN2 + SPCM_PULSEGEN_MUX2_SRC_PULSEGEN3 = SPCM_PULSEGEN_MUX2_SRC_PULSEGEN3 + SPCM_PULSEGEN_MUX2_SRC_XIO0 = SPCM_PULSEGEN_MUX2_SRC_XIO0 + SPCM_PULSEGEN_MUX2_SRC_XIO1 = SPCM_PULSEGEN_MUX2_SRC_XIO1 + SPCM_PULSEGEN_MUX2_SRC_XIO2 = SPCM_PULSEGEN_MUX2_SRC_XIO2 + SPCM_PULSEGEN_MUX2_SRC_XIO3 = SPCM_PULSEGEN_MUX2_SRC_XIO3 + + +PULSE_GEN_MUX2_COMMANDS = ( + SPC_XIO_PULSEGEN0_MUX2_SRC, + SPC_XIO_PULSEGEN1_MUX2_SRC, + SPC_XIO_PULSEGEN2_MUX2_SRC, + SPC_XIO_PULSEGEN3_MUX2_SRC, +) + + +class PulseGeneratorTriggerDetectionMode(Enum): + RISING_EDGE = 0 # this value is not defined in reg as really its just "HIGH" mode on or off + SPCM_PULSEGEN_CONFIG_HIGH = SPCM_PULSEGEN_CONFIG_HIGH + + +@dataclass +class PulseGeneratorTriggerSettings: + trigger_mode: PulseGeneratorTriggerMode + trigger_detection_mode: PulseGeneratorTriggerDetectionMode + multiplexer_1_source: PulseGeneratorMultiplexer1TriggerSource + multiplexer_1_output_inversion: bool + multiplexer_2_source: PulseGeneratorMultiplexer2TriggerSource + multiplexer_2_output_inversion: bool + + +PULSE_GEN_CONFIG_COMMANDS = ( + SPC_XIO_PULSEGEN0_CONFIG, + SPC_XIO_PULSEGEN1_CONFIG, + SPC_XIO_PULSEGEN2_CONFIG, + SPC_XIO_PULSEGEN3_CONFIG, +) + + +PULSE_GEN_MUX_INVERSION_COMMANDS = (SPCM_PULSEGEN_CONFIG_MUX1_INVERT, SPCM_PULSEGEN_CONFIG_MUX2_INVERT) + + +def decode_pulse_gen_config(value: int) -> list[int]: + """Converts the integer value received by a Spectrum device when queried about its pulse gen configuration into a + list of int16 values of the enabled configuration options.""" + possible_values = [ + SPCM_PULSEGEN_CONFIG_MUX1_INVERT, + SPCM_PULSEGEN_CONFIG_MUX2_INVERT, + SPCM_PULSEGEN_CONFIG_INVERT, + int(PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value), + ] + return [found_value for found_value in decode_bitmap_using_list_of_ints(value, possible_values)] + + +PULSE_GEN_PULSE_PERIOD_COMMANDS = ( + SPC_XIO_PULSEGEN0_LEN, + SPC_XIO_PULSEGEN1_LEN, + SPC_XIO_PULSEGEN2_LEN, + SPC_XIO_PULSEGEN3_LEN, +) + +PULSE_GEN_HIGH_DURATION_COMMANDS = ( + SPC_XIO_PULSEGEN0_HIGH, + SPC_XIO_PULSEGEN1_HIGH, + SPC_XIO_PULSEGEN2_HIGH, + SPC_XIO_PULSEGEN3_HIGH, +) + +PULSE_GEN_NUM_REPEATS_COMMANDS = ( + SPC_XIO_PULSEGEN0_LOOPS, + SPC_XIO_PULSEGEN1_LOOPS, + SPC_XIO_PULSEGEN2_LOOPS, + SPC_XIO_PULSEGEN3_LOOPS, +) + +PULSE_GEN_DELAY_COMMANDS = ( + 601003, # SPC_XIO_PULSEGEN0_DELAY not in regs.py for some reason + 601103, # SPC_XIO_PULSEGEN1_DELAY not in regs.py for some reason + 601203, # SPC_XIO_PULSEGEN2_DELAY not in regs.py for some reason + 601303, # SPC_XIO_PULSEGEN3_DELAY not in regs.py for some reason +) + + +@dataclass +class PulseGeneratorOutputSettings: + period_in_seconds: float + duty_cycle: float + num_pulses: int + delay_in_seconds: float + output_inversion: bool diff --git a/src/spectrumdevice/spectrum_wrapper/__init__.py b/src/spectrumdevice/spectrum_wrapper/__init__.py index c768331..0a85ee9 100644 --- a/src/spectrumdevice/spectrum_wrapper/__init__.py +++ b/src/spectrumdevice/spectrum_wrapper/__init__.py @@ -52,6 +52,13 @@ def decode_bitmap_using_list_of_ints(bitmap_value: int, test_values: List[int]) return values_in_bitmap +def toggle_bitmap_value(bitmap_value: int, option: int, enabled: bool) -> int: + if enabled: + return bitmap_value | option # set relevant bit to one + else: + return bitmap_value & ~option # set relevant bit to zero + + def get_spectrum_i32_api_param(device_handle: DEVICE_HANDLE_TYPE, spectrum_command: int) -> int: param = int32(0) error_handler(spcm_dwGetParam_i32)(device_handle, spectrum_command, byref(param)) diff --git a/src/spectrumdevice/spectrum_wrapper/error_handler.py b/src/spectrumdevice/spectrum_wrapper/error_handler.py index 1381a81..66f433a 100644 --- a/src/spectrumdevice/spectrum_wrapper/error_handler.py +++ b/src/spectrumdevice/spectrum_wrapper/error_handler.py @@ -8,7 +8,13 @@ from importlib import resources from typing import Callable, Dict, Any -from spectrumdevice.exceptions import SpectrumApiCallFailed, SpectrumFIFOModeHardwareBufferOverrun +from spectrum_gmbh.py_header.spcerr import ERR_FEATURE, ERR_VALUE +from spectrumdevice.exceptions import ( + SpectrumApiCallFailed, + SpectrumFIFOModeHardwareBufferOverrun, + SpectrumFeatureNotSupportedByCard, + SpectrumParameterValueOutOfRange, +) from spectrum_gmbh.spcerr import ( ERR_OK, ERR_LASTERR, @@ -34,7 +40,11 @@ def _parse_errors_table() -> Dict[int, str]: KNOWN_ERRORS_WITH_DESCRIPTIONS = _parse_errors_table() ERROR_CODES_TO_IGNORE = [ERR_OK] ERROR_CODES_TO_REPORT_BUT_NOT_RAISE = [ERR_LASTERR, ERR_TIMEOUT, ERR_ABORT] -ERROR_CODES_WITH_EXCEPTIONS = {ERR_FIFOHWOVERRUN: SpectrumFIFOModeHardwareBufferOverrun} +ERROR_CODES_WITH_EXCEPTIONS = { + ERR_FIFOHWOVERRUN: SpectrumFIFOModeHardwareBufferOverrun, + ERR_FEATURE: SpectrumFeatureNotSupportedByCard, + ERR_VALUE: SpectrumParameterValueOutOfRange, +} def error_handler(func: Callable) -> Callable: diff --git a/src/tests/device_factories.py b/src/tests/device_factories.py index 297d16a..37b4324 100644 --- a/src/tests/device_factories.py +++ b/src/tests/device_factories.py @@ -3,7 +3,7 @@ from spectrumdevice.devices.awg.awg_interface import SpectrumAWGInterface from spectrumdevice.devices.digitiser import SpectrumDigitiserCard, SpectrumDigitiserInterface from spectrumdevice.devices.mocks import MockSpectrumAWGCard, MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub -from spectrumdevice.settings import ModelNumber +from spectrumdevice.settings import AdvancedCardFeature, CardFeature, ModelNumber from tests.configuration import ( MOCK_DEVICE_TEST_FRAME_RATE_HZ, NUM_CARDS_IN_STAR_HUB, @@ -35,6 +35,11 @@ def create_digitiser_card_for_testing() -> SpectrumDigitiserInterface: mock_source_frame_rate_hz=MOCK_DEVICE_TEST_FRAME_RATE_HZ, num_modules=NUM_MODULES_PER_DIGITISER, num_channels_per_module=NUM_CHANNELS_PER_DIGITISER_MODULE, + card_features=[CardFeature.SPCM_FEAT_MULTI], + advanced_card_features=[ + AdvancedCardFeature.SPCM_FEAT_EXTFW_SEGSTAT, + AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN, + ], ) @@ -49,6 +54,11 @@ def create_awg_card_for_testing() -> SpectrumAWGInterface: model=ModelNumber.TYP_M2P6560_X4, num_modules=NUM_MODULES_PER_AWG, num_channels_per_module=NUM_CHANNELS_PER_AWG_MODULE, + card_features=[CardFeature.SPCM_FEAT_MULTI], + advanced_card_features=[ + AdvancedCardFeature.SPCM_FEAT_EXTFW_SEGSTAT, + AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN, + ], ) diff --git a/src/tests/test_pulse_generator.py b/src/tests/test_pulse_generator.py new file mode 100644 index 0000000..89a055b --- /dev/null +++ b/src/tests/test_pulse_generator.py @@ -0,0 +1,186 @@ +from unittest import TestCase + +from spectrumdevice import MockSpectrumDigitiserCard +from spectrumdevice.exceptions import SpectrumFeatureNotSupportedByCard, SpectrumInvalidParameterValue +from spectrumdevice.settings import ModelNumber +from spectrumdevice.settings.pulse_generator import ( + PulseGeneratorMultiplexer1TriggerSource, + PulseGeneratorMultiplexer2TriggerSource, + PulseGeneratorOutputSettings, + PulseGeneratorTriggerDetectionMode, + PulseGeneratorTriggerMode, + PulseGeneratorTriggerSettings, +) +from tests.configuration import ( + MOCK_DEVICE_TEST_FRAME_RATE_HZ, + NUM_CHANNELS_PER_DIGITISER_MODULE, + NUM_MODULES_PER_DIGITISER, + TEST_DIGITISER_NUMBER, +) +from tests.device_factories import create_awg_card_for_testing + + +class PulseGeneratorTest(TestCase): + def setUp(self) -> None: + self._awg = create_awg_card_for_testing() + self._awg.set_sample_rate_in_hz(1000000) + self._awg.analog_channels[0].set_is_switched_on(True) + self._awg.analog_channels[0].set_signal_amplitude_in_mv(1000) + + def tearDown(self) -> None: + self._awg.reset() + self._awg.disconnect() + + def test_pulse_gen_feat_not_available(self) -> None: + mock_digitiser_without_pulse_gen = MockSpectrumDigitiserCard( + device_number=TEST_DIGITISER_NUMBER, + model=ModelNumber.TYP_M2P5966_X4, + mock_source_frame_rate_hz=MOCK_DEVICE_TEST_FRAME_RATE_HZ, + num_modules=NUM_MODULES_PER_DIGITISER, + num_channels_per_module=NUM_CHANNELS_PER_DIGITISER_MODULE, + card_features=[], + advanced_card_features=[], + ) + with self.assertRaises(SpectrumFeatureNotSupportedByCard): + _ = mock_digitiser_without_pulse_gen.io_lines[0].pulse_generator + + def test_get_pulse_gens(self) -> None: + for io_line in self._awg.io_lines: + _ = io_line.pulse_generator + + def test_enable_disable(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + self.assertFalse(pg.enabled) + pg.enable() + self.assertTrue(pg.enabled) + pg.disable() + self.assertFalse(pg.enabled) + + def test_output_inversion(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + self.assertFalse(pg.output_inversion) + pg.set_output_inversion(True) + self.assertTrue(pg.output_inversion) + pg.set_output_inversion(False) + self.assertFalse(pg.output_inversion) + + def test_trigger_detection_mode(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + self.assertEqual(PulseGeneratorTriggerDetectionMode.RISING_EDGE, pg.trigger_detection_mode) + pg.set_trigger_detection_mode(PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH) + self.assertEqual(PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, pg.trigger_detection_mode) + pg.set_trigger_detection_mode(PulseGeneratorTriggerDetectionMode.RISING_EDGE) + self.assertEqual(PulseGeneratorTriggerDetectionMode.RISING_EDGE, pg.trigger_detection_mode) + + def test_trigger_mode(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_trigger_mode(PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_TRIGGERED) + self.assertEqual(PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_TRIGGERED, pg.trigger_mode) + pg.set_trigger_mode(PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_GATED) + self.assertEqual(PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_GATED, pg.trigger_mode) + + def test_pulse_period(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_period_in_seconds(pg.min_allowed_period_in_seconds) + self.assertEqual(pg.min_allowed_period_in_seconds, pg.period_in_seconds) + + def test_coerce_pulse_period(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_period_in_seconds(pg.max_allowed_period_in_seconds + 1, coerce=True) + self.assertAlmostEqual(pg.max_allowed_period_in_seconds, pg.period_in_seconds, places=5) + + def test_invalid_pulse_period(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + with self.assertRaises(SpectrumInvalidParameterValue): + pg.set_period_in_seconds(pg.max_allowed_period_in_seconds + 1) + + def test_duty_cycle(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_period_in_seconds(pg.max_allowed_period_in_seconds) + duty_cycle = pg.min_allowed_high_voltage_duration_in_seconds / pg.period_in_seconds + pg.set_duty_cycle(duty_cycle) + self.assertAlmostEqual(duty_cycle, pg.duty_cycle, places=5) + + def test_coerce_duty_cycle(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_period_in_seconds(pg.max_allowed_period_in_seconds) + pg.set_duty_cycle(1.1, coerce=True) + self.assertAlmostEqual( + pg.max_allowed_high_voltage_duration_in_seconds, pg.duration_of_high_voltage_in_seconds, places=5 + ) + + def test_invalid_duty_cycle(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + with self.assertRaises(SpectrumInvalidParameterValue): + pg.set_period_in_seconds(pg.max_allowed_period_in_seconds) + pg.set_duty_cycle(1.1) + + def test_num_pulses(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_num_pulses(pg.min_allowed_pulses) + self.assertEqual(pg.min_allowed_pulses, pg.num_pulses) + + def test_coerce_num_pulses(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_num_pulses(pg.max_allowed_pulses + 1, coerce=True) + self.assertEqual(pg.max_allowed_pulses, pg.num_pulses) + + def test_invalid_num_pulses(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + with self.assertRaises(SpectrumInvalidParameterValue): + pg.set_num_pulses(pg.max_allowed_pulses + 1) + + def test_delay(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_delay_in_seconds(pg.min_allowed_delay_in_seconds) + self.assertEqual(pg.min_allowed_delay_in_seconds, pg.delay_in_seconds) + + def test_coerce_delay(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + pg.set_delay_in_seconds(pg.max_allowed_delay_in_seconds + 1, coerce=True) + self.assertAlmostEqual(pg.max_allowed_delay_in_seconds, pg.delay_in_seconds, places=5) + + def test_invalid_delay(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + with self.assertRaises(SpectrumInvalidParameterValue): + pg.set_delay_in_seconds(pg.max_allowed_delay_in_seconds + 1) + + def test_configure_trigger(self) -> None: + trigger_settings = PulseGeneratorTriggerSettings( + trigger_mode=PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_TRIGGERED, + trigger_detection_mode=PulseGeneratorTriggerDetectionMode.RISING_EDGE, + multiplexer_1_source=PulseGeneratorMultiplexer1TriggerSource.SPCM_PULSEGEN_MUX1_SRC_UNUSED, + multiplexer_1_output_inversion=False, + multiplexer_2_source=PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE, + multiplexer_2_output_inversion=False, + ) + pg = self._awg.io_lines[0].pulse_generator + pg.configure_trigger(trigger_settings) + + self.assertEqual(PulseGeneratorTriggerMode.SPCM_PULSEGEN_MODE_TRIGGERED, pg.trigger_mode) + self.assertEqual(PulseGeneratorTriggerDetectionMode.RISING_EDGE, pg.trigger_detection_mode) + self.assertEqual( + PulseGeneratorMultiplexer1TriggerSource.SPCM_PULSEGEN_MUX1_SRC_UNUSED, pg.multiplexer_1.trigger_source + ) + self.assertFalse(pg.multiplexer_1.output_inversion) + self.assertEqual( + PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE, pg.multiplexer_2.trigger_source + ) + self.assertFalse(pg.multiplexer_2.output_inversion) + + def test_configure_output(self) -> None: + pg = self._awg.io_lines[0].pulse_generator + duty_cycle = pg.min_allowed_high_voltage_duration_in_seconds / pg.max_allowed_period_in_seconds + output_settings = PulseGeneratorOutputSettings( + period_in_seconds=pg.max_allowed_period_in_seconds, + duty_cycle=duty_cycle, + num_pulses=pg.max_allowed_pulses, + delay_in_seconds=pg.max_allowed_delay_in_seconds, + output_inversion=True, + ) + pg.configure_output(output_settings, coerce=False) + self.assertEqual(pg.max_allowed_period_in_seconds, pg.period_in_seconds) + self.assertEqual(duty_cycle, pg.duty_cycle) + self.assertEqual(pg.max_allowed_pulses, pg.num_pulses) + self.assertEqual(pg.max_allowed_delay_in_seconds, pg.delay_in_seconds) + self.assertTrue(pg.output_inversion) diff --git a/src/tests/test_single_card.py b/src/tests/test_single_card.py index 064b2b2..95fbe49 100644 --- a/src/tests/test_single_card.py +++ b/src/tests/test_single_card.py @@ -7,7 +7,7 @@ from spectrum_gmbh.regs import SPC_CHENABLE from spectrumdevice import SpectrumDigitiserAnalogChannel -from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface from spectrumdevice.devices.awg.awg_channel import SpectrumAWGAnalogChannel from spectrumdevice.devices.awg.awg_interface import SpectrumAWGInterface from spectrumdevice.devices.digitiser import SpectrumDigitiserInterface diff --git a/src/tests/test_star_hub.py b/src/tests/test_star_hub.py index 0db9fd2..875048d 100644 --- a/src/tests/test_star_hub.py +++ b/src/tests/test_star_hub.py @@ -13,11 +13,13 @@ NUM_MODULES_PER_DIGITISER, ) from tests.device_factories import create_spectrum_star_hub_for_testing -from tests.test_single_card import SingleCardTest +from tests.test_single_card import DigitiserCardTest @pytest.mark.star_hub -class StarHubTest(SingleCardTest): +class StarHubTest(DigitiserCardTest): + __test__ = True + def setUp(self) -> None: self._device: SpectrumDigitiserStarHub = create_spectrum_star_hub_for_testing()