diff --git a/README.md b/README.md index a4ec175..796412a 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ for controlling devices: |----------------------------|-----------------------------------------------------------------------| | `SpectrumDigitiserCard` | Controlling individual digitiser cards | | `SpectrumDigitiserStarHub` | Controlling digitiser cards aggregated with a StarHub | -| `SpectrumAWGCard` | Controlling individual AWG cards (Not yet implemented) | +| `SpectrumAWGCard` | Controlling individual AWG cards | | `SpectrumAWGStarHub` | Controlling AWG cards aggregated with a StarHub (Not yet implemented) | `spectrumdevice` also includes mock classes for testing software without drivers installed or hardware connected: @@ -157,11 +157,12 @@ independently configuring each channel. For example, to change the vertical range of channel 2 of a digitiser card to 1V: ```python -card.channels[2].set_vertical_range_in_mv(1000) +card.analog_channels[2].set_vertical_range_in_mv(1000) ``` and then print the vertical offset: + ```python -print(card.channels[2].vertical_offset_in_percent) +print(card.analog_channels[2].vertical_offset_in_percent) ``` ### Configuring everything at once @@ -230,6 +231,7 @@ buffer = transfer_buffer_factory( buffer_type=BufferType.SPCM_BUF_DATA, # must be SPCM_BUF_DATA to transfer samples from digitiser direction=BufferDirection.SPCM_DIR_CARDTOPC, # must be SPCM_DIR_CARDTOPC to transfer samples from digitiser size_in_samples=size_in_samples, + bytes_per_sampe=card.bytes_per_sample, board_memory_offset_bytes=board_memory_offset_bytes, notify_size_in_pages=notify_size_in_pages ) diff --git a/src/example_scripts/awg_example.py b/src/example_scripts/awg_example.py new file mode 100644 index 0000000..bd246d4 --- /dev/null +++ b/src/example_scripts/awg_example.py @@ -0,0 +1,55 @@ +from time import sleep + +from matplotlib.pyplot import plot, show +from numpy import int16, iinfo, linspace, sin, pi + +from spectrumdevice.devices.awg.awg_card import SpectrumAWGCard +from spectrumdevice.settings import TriggerSettings, TriggerSource, ExternalTriggerMode +from spectrumdevice.settings.channel import OutputChannelStopLevelMode +from spectrumdevice.settings.device_modes import GenerationMode + +PULSE_RATE_HZ = 5000 +NUM_PULSES = 5 +NUM_CYCLES = 2 +FREQUENCY = 20e3 +SAMPLE_RATE = 125000000 + + +if __name__ == "__main__": + + card = SpectrumAWGCard(device_number=0) + print(card) + + trigger_settings = TriggerSettings( + trigger_sources=[TriggerSource.SPC_TMASK_EXT0], + external_trigger_mode=ExternalTriggerMode.SPC_TM_POS, + external_trigger_level_in_mv=200, + ) + card.configure_trigger(trigger_settings) + + full_scale_min_value = iinfo(int16).min + full_scale_max_value = iinfo(int16).max + + duration = NUM_CYCLES / FREQUENCY + t = linspace(0, duration, int(duration * SAMPLE_RATE + 1)) + analog_wfm = (sin(2 * pi * FREQUENCY * t) * full_scale_max_value).astype(int16) + card.set_sample_rate_in_hz(SAMPLE_RATE) + card.set_generation_mode(GenerationMode.SPC_REP_STD_SINGLERESTART) + card.set_num_loops(NUM_PULSES) + card.transfer_waveform(analog_wfm) + card.analog_channels[0].set_stop_level_mode(OutputChannelStopLevelMode.SPCM_STOPLVL_ZERO) + card.analog_channels[0].set_is_switched_on(True) + card.analog_channels[0].set_signal_amplitude_in_mv(1000) + + card.start() + + for _ in range(NUM_PULSES): + card.force_trigger_event() + sleep(1 / PULSE_RATE_HZ) + print("generated pulse") + + card.stop() + card.disconnect() + + plot(t * 1e6, analog_wfm) + show() diff --git a/src/example_scripts/continuous_averaging_fifo_mode.py b/src/example_scripts/continuous_averaging_fifo_mode.py index b1a937a..305c64c 100644 --- a/src/example_scripts/continuous_averaging_fifo_mode.py +++ b/src/example_scripts/continuous_averaging_fifo_mode.py @@ -73,6 +73,7 @@ def continuous_averaging_multi_fifo_example( # Retrieve streamed waveform data until desired time has elapsed measurements_list = [] while (monotonic() - start_time) < acquisition_duration_in_seconds: + print(f"Asking for waveforms at {monotonic() - start_time}") measurements_list += [ Measurement(waveforms=frame, timestamp=card.get_timestamp()) for frame in card.get_waveforms() ] diff --git a/src/example_scripts/standard_single_mode.py b/src/example_scripts/standard_single_mode.py index a97a25f..1f3c903 100644 --- a/src/example_scripts/standard_single_mode.py +++ b/src/example_scripts/standard_single_mode.py @@ -30,6 +30,8 @@ def standard_single_mode_example( card = SpectrumDigitiserCard(device_number=device_number, ip_address=ip_address) else: # Set up a mock device + for item in MockSpectrumDigitiserCard.__mro__: + print(item) card = MockSpectrumDigitiserCard( device_number=device_number, model=ModelNumber.TYP_M2P5966_X4, @@ -75,7 +77,7 @@ def standard_single_mode_example( from matplotlib.pyplot import plot, show, xlabel, tight_layout, ylabel meas = standard_single_mode_example( - mock_mode=False, trigger_source=TriggerSource.SPC_TMASK_EXT0, device_number=1, ip_address="169.254.13.35" + mock_mode=True, trigger_source=TriggerSource.SPC_TMASK_EXT0, device_number=1, ip_address="169.254.13.35" ) # Plot waveforms diff --git a/src/example_scripts/star_hub_example.py b/src/example_scripts/star_hub_example.py index 05f7075..2bb9b9d 100644 --- a/src/example_scripts/star_hub_example.py +++ b/src/example_scripts/star_hub_example.py @@ -22,7 +22,9 @@ def connect_to_star_hub_example( # Connect to each card in the hub. child_cards.append(SpectrumDigitiserCard(device_number=n, ip_address=ip_address)) # Connect to the hub itself - return SpectrumDigitiserStarHub(device_number=0, child_cards=child_cards, master_card_index=master_card_index) + return SpectrumDigitiserStarHub( + device_number=0, child_cards=tuple(child_cards), master_card_index=master_card_index + ) else: mock_child_cards = [] for n in range(num_cards): @@ -49,8 +51,8 @@ def connect_to_star_hub_example( num_measurements = 5 hub = connect_to_star_hub_example(mock_mode=False, num_cards=2, master_card_index=1, ip_address="169.254.13.35") - print(f"{hub} contains {len(hub.channels)} channels in total:") - for channel in hub.channels: + print(f"{hub} contains {len(hub.analog_channels)} channels in total:") + for channel in hub.analog_channels: print(channel) # Trigger settings diff --git a/src/spectrumdevice/__init__.py b/src/spectrumdevice/__init__.py index 839c43b..1c4a46f 100644 --- a/src/spectrumdevice/__init__.py +++ b/src/spectrumdevice/__init__.py @@ -58,7 +58,7 @@ from spectrumdevice.measurement import Measurement from .devices.digitiser.digitiser_card import SpectrumDigitiserCard -from .devices.digitiser.digitiser_channel import SpectrumDigitiserChannel +from .devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel from .devices.digitiser.digitiser_star_hub import SpectrumDigitiserStarHub from .devices.mocks import MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub from .devices.abstract_device import ( @@ -70,7 +70,7 @@ from .devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser __all__ = [ - "SpectrumDigitiserChannel", + "SpectrumDigitiserAnalogChannel", "SpectrumDigitiserCard", "SpectrumDigitiserStarHub", "MockSpectrumDigitiserCard", diff --git a/src/spectrumdevice/devices/abstract_device/__init__.py b/src/spectrumdevice/devices/abstract_device/__init__.py index 948aa23..ae29657 100644 --- a/src/spectrumdevice/devices/abstract_device/__init__.py +++ b/src/spectrumdevice/devices/abstract_device/__init__.py @@ -8,7 +8,7 @@ from spectrumdevice.devices.abstract_device.abstract_spectrum_channel import AbstractSpectrumChannel from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice from spectrumdevice.devices.abstract_device.abstract_spectrum_hub import AbstractSpectrumStarHub -from spectrumdevice.devices.abstract_device.device_interface import SpectrumChannelInterface, SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.interfaces import SpectrumChannelInterface, SpectrumDeviceInterface __all__ = [ diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py index a517134..99d48a5 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_card.py @@ -9,7 +9,7 @@ from abc import ABC, abstractmethod from functools import reduce from operator import or_ -from typing import List, Optional, Sequence, Tuple +from typing import Any, List, Optional, Sequence, Tuple, TypeVar, Generic from spectrum_gmbh.regs import ( M2CMD_DATA_STARTDMA, @@ -31,9 +31,11 @@ SPC_TIMEOUT, SPC_TRIG_ANDMASK, SPC_TRIG_ORMASK, + M2CMD_CARD_FORCETRIGGER, + SPC_MIINST_BYTESPERSAMPLE, ) from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice -from spectrumdevice.devices.abstract_device.device_interface import SpectrumChannelInterface +from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface from spectrumdevice.exceptions import ( SpectrumExternalTriggerNotEnabled, SpectrumInvalidNumberOfEnabledChannels, @@ -67,16 +69,23 @@ logger = logging.getLogger(__name__) -class AbstractSpectrumCard(AbstractSpectrumDevice, ABC): +# Use a Generic and Type Variables to allow subclasses of AbstractSpectrumCard to define whether they own AWG analog +# channels or Digitiser analog channels and IO lines +AnalogChannelInterfaceType = TypeVar("AnalogChannelInterfaceType", bound=SpectrumAnalogChannelInterface) +IOLineInterfaceType = TypeVar("IOLineInterfaceType", bound=SpectrumIOLineInterface) + + +class AbstractSpectrumCard(AbstractSpectrumDevice, Generic[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).""" - def __init__(self, device_number: int = 0, ip_address: Optional[str] = None): + def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ + super().__init__() # required for proper MRO resolution if ip_address is not None: self._visa_string = _create_visa_string_from_ip(ip_address, device_number) else: @@ -84,8 +93,9 @@ def __init__(self, device_number: int = 0, ip_address: Optional[str] = None): self._connect(self._visa_string) self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP)) self._trigger_sources: List[TriggerSource] = [] - self._channels = self._init_channels() - self._enabled_channels: List[int] = [0] + self._analog_channels = self._init_analog_channels() + self._io_lines = self._init_io_lines() + self._enabled_analog_channels: List[int] = [0] self._transfer_buffer: Optional[TransferBuffer] = None self.apply_channel_enabling() @@ -118,6 +128,8 @@ def start_transfer(self) -> None: For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it is acquired. + + # todo: docstring for AWG transfers """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA) @@ -134,6 +146,8 @@ def stop_transfer(self) -> None: For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, and transfer will automatically stop when `stop()` is called as there will be no more samples to transfer, so `stop_transfer()` should not be used. + + # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA) @@ -146,6 +160,9 @@ def wait_for_transfer_chunk_to_complete(self) -> None: be read using the `get_waveforms()` method. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms(). + + # todo: update the above docstring to take into account cases where notify size < data lemgth + # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA) @@ -182,7 +199,7 @@ def __eq__(self, other: object) -> bool: raise NotImplementedError(f"Cannot compare {self.__class__} with {other.__class__}") @property - def channels(self) -> Sequence[SpectrumChannelInterface]: + def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: """A tuple containing the channels that belong to the card. Properties of the individual channels can be set by calling the methods of the @@ -192,24 +209,37 @@ def channels(self) -> Sequence[SpectrumChannelInterface]: channels (Sequence[`SpectrumChannelInterface`]): A tuple of objects conforming to the `SpectrumChannelInterface` interface. """ - return self._channels + return self._analog_channels + + @property + def io_lines(self) -> Sequence[IOLineInterfaceType]: + """A tuple containing the Multipurpose IO Lines that belong to the card. + + Properties of the individual channels can be set by calling the methods of the + returned objects directly. + + Returns: + channels (Sequence[`SpectrumIOLineInterface`]): A tuple of objects conforming to the + `SpectrumIOLineInterface` interface. + """ + return self._io_lines @property - def enabled_channels(self) -> List[int]: + def enabled_analog_channels(self) -> List[int]: """The indices of the currently enabled channels. Returns: enabled_channels (List[int]): The indices of the currently enabled channels. """ - return self._enabled_channels + return self._enabled_analog_channels - def set_enabled_channels(self, channels_nums: List[int]) -> None: + def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change which channels are enabled. Args: channels_nums (List[int]): The integer channel indices to enable. """ if len(channels_nums) in [1, 2, 4, 8]: - self._enabled_channels = channels_nums + self._enabled_analog_channels = channels_nums self.apply_channel_enabling() else: raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.") @@ -353,7 +383,7 @@ def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: def apply_channel_enabling(self) -> None: """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.""" - enabled_channel_spectrum_values = [self.channels[i].name.value for i in self._enabled_channels] + enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels] if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]: bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values) self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels) @@ -363,7 +393,11 @@ def apply_channel_enabling(self) -> None: ) @abstractmethod - def _init_channels(self) -> Sequence[SpectrumChannelInterface]: + def _init_analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: + raise NotImplementedError() + + @abstractmethod + def _init_io_lines(self) -> Sequence[IOLineInterfaceType]: raise NotImplementedError() @property @@ -452,6 +486,14 @@ def __str__(self) -> str: def type(self) -> CardType: return CardType(self.read_spectrum_device_register(SPC_FNCTYPE)) + def force_trigger_event(self) -> None: + """Force a trigger event to occur""" + self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER) + + @property + def bytes_per_sample(self) -> int: + return self.read_spectrum_device_register(SPC_MIINST_BYTESPERSAMPLE) + def _create_visa_string_from_ip(ip_address: str, instrument_number: int) -> str: return f"TCPIP[0]::{ip_address}::inst{instrument_number}::INSTR" diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py index 8540494..7406b97 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_channel.py @@ -1,25 +1,43 @@ """Provides a partially-implemented abstract class common to individual channels of Spectrum devices.""" +from abc import abstractmethod +from typing import Any, TypeVar, Generic # Christian Baker, King's College London # Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. -from spectrumdevice.devices.abstract_device.device_interface import SpectrumChannelInterface -from spectrumdevice.devices.abstract_device.abstract_spectrum_card import AbstractSpectrumCard -from spectrumdevice.settings.channel import SpectrumChannelName +from spectrumdevice.devices.abstract_device.interfaces import ( + SpectrumDeviceInterface, + SpectrumChannelInterface, + SpectrumAnalogChannelInterface, +) +from spectrumdevice.settings.channel import SpectrumAnalogChannelName, SpectrumChannelName -class AbstractSpectrumChannel(SpectrumChannelInterface): - """Partially implemented abstract superclass contain code common for controlling an individual channel of all - spectrum devices.""" +ChannelNameType = TypeVar("ChannelNameType", bound=SpectrumChannelName) + + +class AbstractSpectrumChannel(SpectrumChannelInterface, Generic[ChannelNameType]): + """Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of + all spectrum devices.""" - def __init__(self, channel_number: int, parent_device: AbstractSpectrumCard): - self._name = SpectrumChannelName[f"CHANNEL{channel_number}"] + def __init__(self, channel_number: int, parent_device: SpectrumDeviceInterface, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._name = self._make_name(channel_number) self._parent_device = parent_device self._enabled = True @property - def name(self) -> SpectrumChannelName: + @abstractmethod + def _name_prefix(self) -> str: + raise NotImplementedError + + @abstractmethod + def _make_name(self, channel_number: int) -> ChannelNameType: + raise NotImplementedError + + @property + def name(self) -> ChannelNameType: """The identifier assigned by the spectrum driver, formatted as an Enum by the settings package. Returns: @@ -28,7 +46,7 @@ def name(self) -> SpectrumChannelName: @property def _number(self) -> int: - return int(self.name.name.split("CHANNEL")[-1]) + return int(self.name.name.split(self._name_prefix)[-1]) def __eq__(self, other: object) -> bool: if isinstance(other, AbstractSpectrumChannel): @@ -41,3 +59,15 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) + + +class AbstractSpectrumAnalogChannel(AbstractSpectrumChannel[SpectrumAnalogChannelName], SpectrumAnalogChannelInterface): + """Partially implemented abstract superclass contain code common for controlling an individual analog channel of all + spectrum devices.""" + + @property + def _name_prefix(self) -> str: + return "CHANNEL" + + def _make_name(self, channel_number: int) -> SpectrumAnalogChannelName: + return SpectrumAnalogChannelName[f"{self._name_prefix}{channel_number}"] diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py index 5424e22..54399d3 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_device.py @@ -6,7 +6,7 @@ from abc import ABC -from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.interfaces import SpectrumDeviceInterface from spectrumdevice.exceptions import SpectrumDeviceNotConnected, SpectrumDriversNotFound from spectrumdevice.settings import SpectrumRegisterLength, TriggerSettings from spectrumdevice.settings.triggering import EXTERNAL_TRIGGER_SOURCES @@ -55,6 +55,8 @@ def start(self) -> None: For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have already been defined. + + # todo: docstring for different AWG modes """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER) @@ -63,6 +65,8 @@ def stop(self) -> None: For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE). + + # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP) diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py index 776347f..db7a154 100644 --- a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_hub.py @@ -8,13 +8,17 @@ from abc import ABC from functools import reduce from operator import or_ -from typing import List, Sequence, Tuple +from typing import Any, List, Sequence, Tuple, TypeVar, Generic from numpy import arange from spectrum_gmbh.regs import SPC_SYNC_ENABLEMASK from spectrumdevice.devices.abstract_device.abstract_spectrum_device import AbstractSpectrumDevice -from spectrumdevice.devices.abstract_device.device_interface import SpectrumChannelInterface, SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.interfaces import ( + SpectrumDeviceInterface, + SpectrumAnalogChannelInterface, + SpectrumIOLineInterface, +) from spectrumdevice.exceptions import SpectrumSettingsMismatchError from spectrumdevice.settings import ( AdvancedCardFeature, @@ -29,17 +33,15 @@ from spectrumdevice.spectrum_wrapper import destroy_handle -class AbstractSpectrumStarHub(AbstractSpectrumDevice, ABC): +CardType = TypeVar("CardType", bound=SpectrumDeviceInterface) + + +class AbstractSpectrumStarHub(AbstractSpectrumDevice, Generic[CardType], ABC): """Composite abstract class of `AbstractSpectrumCard` implementing methods common to all StarHubs. StarHubs are composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub is synchronised, aggregating the channels of all child cards.""" - def __init__( - self, - device_number: int, - child_cards: Sequence[SpectrumDeviceInterface], - master_card_index: int, - ): + def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. @@ -48,7 +50,7 @@ def __init__( master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ - self._child_cards: Sequence[SpectrumDeviceInterface] = child_cards + self._child_cards: Sequence[CardType] = child_cards self._master_card = child_cards[master_card_index] self._triggering_card = child_cards[master_card_index] child_card_logical_indices = (2**n for n in range(len(self._child_cards))) @@ -227,7 +229,7 @@ def apply_channel_enabling(self) -> None: d.apply_channel_enabling() @property - def enabled_channels(self) -> List[int]: + def enabled_analog_channels(self) -> List[int]: """The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub). @@ -237,11 +239,13 @@ def enabled_channels(self) -> List[int]: enabled_channels = [] n_channels_in_previous_card = 0 for card in self._child_cards: - enabled_channels += [channel_num + n_channels_in_previous_card for channel_num in card.enabled_channels] - n_channels_in_previous_card = len(card.channels) + enabled_channels += [ + channel_num + n_channels_in_previous_card for channel_num in card.enabled_analog_channels + ] + n_channels_in_previous_card = len(card.analog_channels) return enabled_channels - def set_enabled_channels(self, channels_nums: List[int]) -> None: + def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub). @@ -252,10 +256,10 @@ def set_enabled_channels(self, channels_nums: List[int]) -> None: channels_to_enable_all_cards = channels_nums for child_card in self._child_cards: - n_channels_in_card = len(child_card.channels) + n_channels_in_card = len(child_card.analog_channels) channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards)) num_channels_to_enable_this_card = len(channels_to_enable_this_card) - child_card.set_enabled_channels(channels_to_enable_this_card) + child_card.set_enabled_analog_channels(channels_to_enable_this_card) channels_to_enable_all_cards = [ num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:] ] @@ -270,18 +274,30 @@ def transfer_buffers(self) -> List[TransferBuffer]: return [card.transfer_buffers[0] for card in self._child_cards] @property - def channels(self) -> Sequence[SpectrumChannelInterface]: + def analog_channels(self) -> Sequence[SpectrumAnalogChannelInterface]: """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for more information. Returns: channels (Sequence[`SpectrumChannelInterface`]): A tuple of `SpectrumDigitiserChannel` objects. """ - channels: List[SpectrumChannelInterface] = [] + channels: List[SpectrumAnalogChannelInterface] = [] for device in self._child_cards: - channels += device.channels + channels += device.analog_channels return tuple(channels) + @property + def io_lines(self) -> Sequence[SpectrumIOLineInterface]: + """A tuple containing of all the Multipurpose IO Lines of the child cards of the hub. + + Returns: + channels (Sequence[`SpectrumIOLineInterface`]): A tuple of `SpectrumIOLineInterface` objects. + """ + io_lines: List[SpectrumIOLineInterface] = [] + for device in self._child_cards: + io_lines += device.io_lines + return tuple(io_lines) # todo: this is probably wrong. I don't think both cards in a netbox have IO lines + @property def timeout_in_ms(self) -> int: """The time for which the card will wait for a trigger to be received after a device has started @@ -324,6 +340,13 @@ def available_io_modes(self) -> AvailableIOModes: """ return self._master_card.available_io_modes + @property + def bytes_per_sample(self) -> int: + bytes_per_sample_each_card = [] + for d in self._child_cards: + bytes_per_sample_each_card.append(d.bytes_per_sample) + return check_settings_constant_across_devices(bytes_per_sample_each_card, __name__) + def __str__(self) -> str: return f"StarHub {self._visa_string}" diff --git a/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py new file mode 100644 index 0000000..cc294b2 --- /dev/null +++ b/src/spectrumdevice/devices/abstract_device/abstract_spectrum_io_line.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod + +from spectrumdevice.devices.abstract_device import AbstractSpectrumChannel +from spectrumdevice.devices.abstract_device.interfaces import SpectrumIOLineInterface +from spectrumdevice.settings import IOLineMode +from spectrumdevice.settings.io_lines import IO_LINE_MODE_COMMANDS, SpectrumIOLineName + + +class AbstractSpectrumIOLine(SpectrumIOLineInterface, AbstractSpectrumChannel[SpectrumIOLineName], ABC): + """Partially implemented abstract superclass contain code common for controlling an individual IO Line of all + spectrum devices.""" + + @property + def _name_prefix(self) -> str: + return "X" + + def _make_name(self, channel_number: int) -> SpectrumIOLineName: + return SpectrumIOLineName[f"{self._name_prefix}{channel_number}"] + + @abstractmethod + def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: + raise NotImplementedError + + @property + def mode(self) -> IOLineMode: + # todo: this may contain dig out settings bits, so needs a decode function that ignores those bits + return IOLineMode(self._parent_device.read_spectrum_device_register(IO_LINE_MODE_COMMANDS[self._number])) + + def set_mode(self, mode: IOLineMode) -> None: + value_to_write = self._get_io_line_mode_settings_mask(mode) | mode.value + self._parent_device.write_to_spectrum_device_register(IO_LINE_MODE_COMMANDS[self._number], value_to_write) diff --git a/src/spectrumdevice/devices/abstract_device/device_interface.py b/src/spectrumdevice/devices/abstract_device/interfaces.py similarity index 69% rename from src/spectrumdevice/devices/abstract_device/device_interface.py rename to src/spectrumdevice/devices/abstract_device/interfaces.py index 449b4c6..0ae4f6d 100644 --- a/src/spectrumdevice/devices/abstract_device/device_interface.py +++ b/src/spectrumdevice/devices/abstract_device/interfaces.py @@ -5,7 +5,7 @@ # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. from abc import ABC, abstractmethod -from typing import List, Optional, Sequence, Tuple +from typing import List, Optional, Sequence, Tuple, TypeVar, Generic from spectrumdevice.settings import ( AdvancedCardFeature, @@ -14,24 +14,53 @@ ClockMode, ExternalTriggerMode, DEVICE_STATUS_TYPE, + ModelNumber, SpectrumRegisterLength, TransferBuffer, TriggerSettings, TriggerSource, + IOLineMode, ) -from spectrumdevice.settings.channel import SpectrumChannelName +from spectrumdevice.settings.card_dependent_properties import CardType +from spectrumdevice.settings.channel import SpectrumAnalogChannelName, SpectrumChannelName +from spectrumdevice.settings.io_lines import SpectrumIOLineName +ChannelNameType = TypeVar("ChannelNameType", bound=SpectrumChannelName) -class SpectrumChannelInterface(ABC): - """Defines the common public interface for control of the channels of Digitiser and AWG devices. All properties are - read-only and must be set with their respective setter methods.""" + +class SpectrumChannelInterface(Generic[ChannelNameType], ABC): + """Defines the common public interface for control of the channels of Digitiser and AWG devices including + Multipurpose IO Lines. All properties are read-only and must be set with their respective setter methods.""" @property @abstractmethod - def name(self) -> SpectrumChannelName: + def name(self) -> ChannelNameType: raise NotImplementedError +class SpectrumAnalogChannelInterface(SpectrumChannelInterface[SpectrumAnalogChannelName], ABC): + """Defines the common public interface for control of the analog channels of Digitiser and AWG devices. All + properties are read-only and must be set with their respective setter methods.""" + + pass + + +class SpectrumIOLineInterface(SpectrumChannelInterface[SpectrumIOLineName], ABC): + """Defines the common public interface for control of the Multipurpose IO Lines of Digitiser and AWG devices. All + properties are read-only and must be set with their respective setter methods.""" + + @property + @abstractmethod + def mode(self) -> IOLineMode: + """Returns the current mode of the IO Line.""" + raise NotImplementedError() + + @abstractmethod + def set_mode(self, mode: IOLineMode) -> None: + """Sets the current mode of the IO Line""" + raise NotImplementedError() + + class SpectrumDeviceInterface(ABC): """Defines the common public interface for control of all digitiser and AWG devices, be they StarHub composite devices (e.g. the NetBox) or individual cards. All properties are read-only and must be set with their respective @@ -89,16 +118,20 @@ def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = No @property @abstractmethod - def channels(self) -> Sequence[SpectrumChannelInterface]: + def analog_channels(self) -> Sequence[SpectrumAnalogChannelInterface]: + raise NotImplementedError() + + @property + def io_lines(self) -> Sequence[SpectrumIOLineInterface]: raise NotImplementedError() @property @abstractmethod - def enabled_channels(self) -> List[int]: + def enabled_analog_channels(self) -> List[int]: raise NotImplementedError() @abstractmethod - def set_enabled_channels(self, channels_nums: List[int]) -> None: + def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: raise NotImplementedError() @property @@ -198,3 +231,22 @@ def timeout_in_ms(self) -> int: @abstractmethod def set_timeout_in_ms(self, timeout_in_ms: int) -> None: raise NotImplementedError() + + @abstractmethod + def force_trigger_event(self) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def bytes_per_sample(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def type(self) -> CardType: + raise NotImplementedError() + + @property + @abstractmethod + def model_number(self) -> ModelNumber: + raise NotImplementedError() diff --git a/src/spectrumdevice/devices/awg/__init__.py b/src/spectrumdevice/devices/awg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spectrumdevice/devices/awg/abstract_spectrum_awg.py b/src/spectrumdevice/devices/awg/abstract_spectrum_awg.py new file mode 100644 index 0000000..9b2b826 --- /dev/null +++ b/src/spectrumdevice/devices/awg/abstract_spectrum_awg.py @@ -0,0 +1,78 @@ +from abc import ABC +from copy import copy +from typing import cast + +from spectrum_gmbh.regs import SPC_CARDMODE, SPC_LOOPS +from spectrumdevice.devices.abstract_device import AbstractSpectrumDevice +from spectrumdevice.devices.awg.awg_channel import SpectrumAWGAnalogChannel +from spectrumdevice.devices.awg.awg_interface import SpectrumAWGInterface +from spectrumdevice.settings.device_modes import GenerationMode +from spectrumdevice.settings.output_channel_pairing import ( + ChannelPair, + ChannelPairingMode, + DIFFERENTIAL_CHANNEL_PAIR_COMMANDS, + DOUBLING_CHANNEL_PAIR_COMMANDS, +) + + +class AbstractSpectrumAWG(AbstractSpectrumDevice, SpectrumAWGInterface, ABC): + @property + def generation_mode(self) -> GenerationMode: + """Change the currently enabled card mode. See `GenerationMode` and the Spectrum documentation + for the available modes.""" + return GenerationMode(self.read_spectrum_device_register(SPC_CARDMODE)) + + def set_generation_mode(self, mode: GenerationMode) -> None: + self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value) + + @property + def num_loops(self) -> int: + return self.read_spectrum_device_register(SPC_LOOPS) + + def set_num_loops(self, num_loops: int) -> None: + self.write_to_spectrum_device_register(SPC_LOOPS, num_loops) + + def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: + """Configures a pair of consecutive channels to operate either independently, in differential mode or + in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically + configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is + required by the Spectrum API. + + Args: + channel_pair (ChannelPair): The pair of channels to configure + mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL + """ + + doubling_enabled = int(mode == ChannelPairingMode.DOUBLE) + differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL) + + if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7): + raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.") + + if doubling_enabled or differential_mode_enabled: + self._mirror_even_channel_settings_on_odd_channel(channel_pair) + self._disable_odd_channel(channel_pair) + + self.write_to_spectrum_device_register( + DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled + ) + self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled) + + def _disable_odd_channel(self, channel_pair: ChannelPair) -> None: + try: + enabled_channels = copy(self.enabled_analog_channels) + enabled_channels.remove(channel_pair.value + 1) + self.set_enabled_analog_channels(enabled_channels) + except ValueError: + pass # odd numbered channel was not enable, so no need to disable it. + + def _mirror_even_channel_settings_on_odd_channel(self, channel_pair: ChannelPair) -> None: + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value + 1]).set_signal_amplitude_in_mv( + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value]).signal_amplitude_in_mv + ) + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value + 1]).set_dc_offset_in_mv( + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value]).dc_offset_in_mv + ) + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value + 1]).set_output_filter( + cast(SpectrumAWGAnalogChannel, self.analog_channels[channel_pair.value]).output_filter + ) diff --git a/src/spectrumdevice/devices/awg/awg_card.py b/src/spectrumdevice/devices/awg/awg_card.py new file mode 100644 index 0000000..14b4ff1 --- /dev/null +++ b/src/spectrumdevice/devices/awg/awg_card.py @@ -0,0 +1,78 @@ +import logging +from typing import Optional, Sequence + +from numpy import int16 +from numpy.typing import NDArray + +from spectrum_gmbh.regs import SPC_MIINST_CHPERMODULE, SPC_MIINST_MODULES, TYP_SERIESMASK, TYP_M2PEXPSERIES, SPC_MEMSIZE +from spectrumdevice.devices.abstract_device import AbstractSpectrumCard +from spectrumdevice.devices.awg.abstract_spectrum_awg import AbstractSpectrumAWG +from spectrumdevice.devices.awg.awg_channel import SpectrumAWGAnalogChannel, SpectrumAWGIOLine +from spectrumdevice.devices.awg.awg_interface import SpectrumAWGAnalogChannelInterface, SpectrumAWGIOLineInterface +from spectrumdevice.settings import TransferBuffer +from spectrumdevice.settings.card_dependent_properties import get_memsize_step_size +from spectrumdevice.settings.transfer_buffer import ( + BufferDirection, + BufferType, + set_transfer_buffer, + transfer_buffer_factory, +) + +logger = logging.getLogger(__name__) + + +class SpectrumAWGCard( + AbstractSpectrumCard[SpectrumAWGAnalogChannelInterface, SpectrumAWGIOLineInterface], AbstractSpectrumAWG +): + def _init_analog_channels(self) -> Sequence[SpectrumAWGAnalogChannelInterface]: + num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) + num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) + total_channels = num_modules * num_channels_per_module + return tuple([SpectrumAWGAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)]) + + def _init_io_lines(self) -> Sequence[SpectrumAWGIOLineInterface]: + if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: + return tuple([SpectrumAWGIOLine(channel_number=n, parent_device=self) for n in range(4)]) + else: + raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") + + def transfer_waveform(self, waveform: NDArray[int16]) -> None: + buffer = transfer_buffer_factory( + buffer_type=BufferType.SPCM_BUF_DATA, + direction=BufferDirection.SPCM_DIR_PCTOCARD, + size_in_samples=len(waveform), + bytes_per_sample=self.bytes_per_sample, + ) + if len(waveform) < 16: + raise ValueError("Waveform must be at least 16 samples long") + buffer.data_array[:] = waveform + self.define_transfer_buffer((buffer,)) + step_size = get_memsize_step_size(self._model_number) + remainder = len(waveform) % step_size + if remainder > 0: + logger.warning( + "Waveform length is not a multiple of 8 samples. Waveform in card memory will be zero-padded" + " to the next multiple of 8." + ) + coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) + self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) + self.start_transfer() + self.wait_for_transfer_chunk_to_complete() + + def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: + """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when + transfer_waveform is used to send a single waveform to the card. + + Args: + buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed + `TransferBuffer` The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and + BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the + length of the data to transfer. + """ + if buffer is None: + raise ValueError( + "You must provide a preconfigured buffer for transferring samples to an AWG because the" + "buffer size cannot be inferred." + ) + self._transfer_buffer = buffer[0] + set_transfer_buffer(self._handle, self._transfer_buffer) diff --git a/src/spectrumdevice/devices/awg/awg_channel.py b/src/spectrumdevice/devices/awg/awg_channel.py new file mode 100644 index 0000000..41ba3f3 --- /dev/null +++ b/src/spectrumdevice/devices/awg/awg_channel.py @@ -0,0 +1,145 @@ +from typing import Any + +from numpy import int16 + +from spectrumdevice.devices.abstract_device import AbstractSpectrumCard +from spectrumdevice.devices.abstract_device.abstract_spectrum_channel import AbstractSpectrumAnalogChannel +from spectrumdevice.devices.abstract_device.abstract_spectrum_io_line import AbstractSpectrumIOLine +from spectrumdevice.devices.awg.awg_interface import ( + SpectrumAWGAnalogChannelInterface, + SpectrumAWGIOLineInterface, + SpectrumAWGInterface, +) +from spectrumdevice.exceptions import SpectrumCardIsNotAnAWG +from spectrumdevice.settings import IOLineMode +from spectrumdevice.settings.card_dependent_properties import CardType, OUTPUT_AMPLITUDE_LIMITS_IN_MV +from spectrumdevice.settings.channel import ( + OUTPUT_AMPLITUDE_COMMANDS, + OUTPUT_CHANNEL_ON_OFF_COMMANDS, + OUTPUT_DC_OFFSET_COMMANDS, + OUTPUT_FILTER_COMMANDS, + OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS, + OUTPUT_STOP_LEVEL_MODE_COMMANDS, + OutputChannelFilter, + OutputChannelStopLevelMode, +) +from spectrumdevice.settings.io_lines import ( + DigOutIOLineModeSettings, + DigOutSourceChannel, + DigOutSourceBit, +) + + +class SpectrumAWGIOLine(AbstractSpectrumIOLine, SpectrumAWGIOLineInterface): + def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: + if parent_device.type != CardType.SPCM_TYPE_AO: + raise SpectrumCardIsNotAnAWG(parent_device.type) + super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy + self._dig_out_settings = DigOutIOLineModeSettings( + source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0, + source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15, + ) + + @property + def dig_out_settings(self) -> DigOutIOLineModeSettings: + return self._dig_out_settings + + def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None: + self._dig_out_settings = dig_out_settings + + def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: + if mode == IOLineMode.SPCM_XMODE_DIGOUT: + return self._dig_out_settings.source_channel.value | self._dig_out_settings.source_bit.value + else: + return 0 + + +class SpectrumAWGAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumAWGAnalogChannelInterface): + def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None: + if parent_device.type != CardType.SPCM_TYPE_AO: + raise SpectrumCardIsNotAnAWG(parent_device.type) + super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy + + @property + def is_switched_on(self) -> bool: + """Returns "True" if the output channel is switched on, or "False" if it is muted.""" + return bool(self._parent_device.read_spectrum_device_register(OUTPUT_CHANNEL_ON_OFF_COMMANDS[self._number])) + + def set_is_switched_on(self, is_switched_on: bool) -> None: + """Switches the output channel on ("True") or off ("False").""" + self._parent_device.write_to_spectrum_device_register( + OUTPUT_CHANNEL_ON_OFF_COMMANDS[self._number], int(is_switched_on) + ) + + @property + def dc_offset_in_mv(self) -> int: + """The current output signal DC offset in mV. + + Returns: + dc_offset (int): The currently set output signal DC offset in mV. + """ + return self._parent_device.read_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number]) + + def set_dc_offset_in_mv(self, dc_offset: int) -> None: + if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: + raise ValueError( + f"Max allowed signal DC offset for card {self._parent_device.model_number} is " + f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " + f"so {dc_offset} mV is too high." + ) + self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset) + + @property + def signal_amplitude_in_mv(self) -> int: + """The current output signal amplitude in mV. + + Returns: + amplitude (int): The currently set output signal amplitude in mV. + """ + return self._parent_device.read_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number]) + + def set_signal_amplitude_in_mv(self, amplitude: int) -> None: + if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: + raise ValueError( + f"Max allowed signal amplitude for card {self._parent_device.model_number} is " + f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " + f"so {amplitude} mV is too high." + ) + self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude) + + @property + def output_filter(self) -> OutputChannelFilter: + """The current output filter setting. + + Returns: + output_filter (OutputChannelFilter): The currently set output filter. + """ + return OutputChannelFilter( + self._parent_device.read_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number]) + ) + + def set_output_filter(self, output_filter: OutputChannelFilter) -> None: + self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value) + + @property + def stop_level_mode(self) -> OutputChannelStopLevelMode: + """Sets the behavior of the channel when the output is stopped or playback finished.""" + return OutputChannelStopLevelMode( + self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number]) + ) + + def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None: + self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value) + + @property + def stop_level_custom_value(self) -> int16: + """Sets the level to which the output will be set when the output is stopped or playback finished and + stop_level_mode is set to `OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM`.""" + return int16( + self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number]) + ) + + def set_stop_level_custom_value(self, value: int16) -> None: + self._parent_device.write_to_spectrum_device_register( + OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value) + ) diff --git a/src/spectrumdevice/devices/awg/awg_interface.py b/src/spectrumdevice/devices/awg/awg_interface.py new file mode 100644 index 0000000..15c83e1 --- /dev/null +++ b/src/spectrumdevice/devices/awg/awg_interface.py @@ -0,0 +1,116 @@ +from abc import ABC, abstractmethod + +from numpy import int16 +from numpy.typing import NDArray + +from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface +from spectrumdevice.settings.channel import OutputChannelFilter, OutputChannelStopLevelMode +from spectrumdevice.settings.device_modes import GenerationMode +from spectrumdevice.settings.io_lines import DigOutIOLineModeSettings +from spectrumdevice.settings.output_channel_pairing import ChannelPair, ChannelPairingMode + + +class SpectrumAWGIOLineInterface(SpectrumIOLineInterface, ABC): + @property + @abstractmethod + def dig_out_settings(self) -> DigOutIOLineModeSettings: + raise NotImplementedError() + + @abstractmethod + def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None: + raise NotImplementedError() + + +class SpectrumAWGAnalogChannelInterface(SpectrumAnalogChannelInterface, ABC): + """Defines the public interface for control of the channels of Spectrum AWG device. All properties are read- + only and must be set with their respective setter methods.""" + + @property + @abstractmethod + def is_switched_on(self) -> bool: + """SPC_ENABLEOUT0, SPC_ENABLEOUT01 etc""" + raise NotImplementedError() + + @abstractmethod + def set_is_switched_on(self, is_switched_on: bool) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def dc_offset_in_mv(self) -> int: + """SPC_OFFS0""" + raise NotImplementedError() + + @abstractmethod + def set_dc_offset_in_mv(self, amplitude: int) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def signal_amplitude_in_mv(self) -> int: + """SPC_AMP0""" + raise NotImplementedError() + + @abstractmethod + def set_signal_amplitude_in_mv(self, amplitude: int) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def output_filter(self) -> OutputChannelFilter: + raise NotImplementedError() + + @abstractmethod + def set_output_filter(self, filter: OutputChannelFilter) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def stop_level_mode(self) -> OutputChannelStopLevelMode: + raise NotImplementedError() + + @abstractmethod + def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def stop_level_custom_value(self) -> int16: + raise NotImplementedError() + + @abstractmethod + def set_stop_level_custom_value(self, value: int16) -> None: + raise NotImplementedError() + + +class SpectrumAWGInterface(SpectrumDeviceInterface, ABC): + """Defines the public interface for control of all Spectrum AWG devices, be they StarHub composite devices + (e.g. the NetBox) or individual AWG cards. All properties are read-only and must be set with their respective + setter methods.""" + + @property + @abstractmethod + def generation_mode(self) -> GenerationMode: + raise NotImplementedError() + + @abstractmethod + def set_generation_mode(self, mode: GenerationMode) -> None: + raise NotImplementedError() + + @abstractmethod + def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: + raise NotImplementedError() + + @abstractmethod + def transfer_waveform(self, waveform: NDArray[int16]) -> None: + raise NotImplementedError() + + @property + @abstractmethod + def num_loops(self) -> int: + raise NotImplementedError() + + @abstractmethod + def set_num_loops(self, num_loops: int) -> None: + raise NotImplementedError() diff --git a/src/spectrumdevice/devices/digitiser/__init__.py b/src/spectrumdevice/devices/digitiser/__init__.py index d59c4a7..c81a36b 100644 --- a/src/spectrumdevice/devices/digitiser/__init__.py +++ b/src/spectrumdevice/devices/digitiser/__init__.py @@ -5,16 +5,16 @@ # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. from spectrumdevice.devices.digitiser.digitiser_card import SpectrumDigitiserCard -from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserChannel +from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel from spectrumdevice.devices.digitiser.digitiser_interface import ( - SpectrumDigitiserChannelInterface, + SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserInterface, ) from spectrumdevice.devices.digitiser.digitiser_star_hub import SpectrumDigitiserStarHub __all__ = [ - "SpectrumDigitiserChannelInterface", - "SpectrumDigitiserChannel", + "SpectrumDigitiserAnalogChannelInterface", + "SpectrumDigitiserAnalogChannel", "SpectrumDigitiserInterface", "SpectrumDigitiserCard", "SpectrumDigitiserStarHub", diff --git a/src/spectrumdevice/devices/digitiser/abstract_spectrum_digitiser.py b/src/spectrumdevice/devices/digitiser/abstract_spectrum_digitiser.py index 5fe9d04..eaebea7 100644 --- a/src/spectrumdevice/devices/digitiser/abstract_spectrum_digitiser.py +++ b/src/spectrumdevice/devices/digitiser/abstract_spectrum_digitiser.py @@ -10,13 +10,13 @@ from spectrumdevice.measurement import Measurement from spectrumdevice.devices.abstract_device import AbstractSpectrumDevice from spectrumdevice.devices.digitiser.digitiser_interface import SpectrumDigitiserInterface -from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserChannel +from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel from spectrumdevice.exceptions import SpectrumWrongAcquisitionMode from spectrumdevice.settings import AcquisitionMode, AcquisitionSettings from spectrum_gmbh.regs import M2CMD_CARD_WRITESETUP, SPC_M2CMD -class AbstractSpectrumDigitiser(SpectrumDigitiserInterface, AbstractSpectrumDevice, ABC): +class AbstractSpectrumDigitiser(AbstractSpectrumDevice, SpectrumDigitiserInterface, ABC): """Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes (`SpectrumDigitiserCard`, `SpectrumDigitiserStarHub` or their mock equivalents) which inherit the methods defined here. Note that @@ -39,28 +39,28 @@ def configure_acquisition(self, settings: AcquisitionSettings) -> None: settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples ) self.set_timeout_in_ms(settings.timeout_in_ms) - self.set_enabled_channels(settings.enabled_channels) + self.set_enabled_analog_channels(settings.enabled_channels) # Apply channel dependent settings for channel, v_range, v_offset, impedance in zip( - self.channels, + self.analog_channels, settings.vertical_ranges_in_mv, settings.vertical_offsets_in_percent, settings.input_impedances, ): - cast(SpectrumDigitiserChannel, channel).set_vertical_range_in_mv(v_range) - cast(SpectrumDigitiserChannel, channel).set_vertical_offset_in_percent(v_offset) - cast(SpectrumDigitiserChannel, channel).set_input_impedance(impedance) + cast(SpectrumDigitiserAnalogChannel, channel).set_vertical_range_in_mv(v_range) + cast(SpectrumDigitiserAnalogChannel, channel).set_vertical_offset_in_percent(v_offset) + cast(SpectrumDigitiserAnalogChannel, channel).set_input_impedance(impedance) # Only some hardware has software programmable input coupling, so coupling can be None if settings.input_couplings is not None: - for channel, coupling in zip(self.channels, settings.input_couplings): - cast(SpectrumDigitiserChannel, channel).set_input_coupling(coupling) + for channel, coupling in zip(self.analog_channels, settings.input_couplings): + cast(SpectrumDigitiserAnalogChannel, channel).set_input_coupling(coupling) # Only some hardware has software programmable input paths, so it can be None if settings.input_paths is not None: - for channel, path in zip(self.channels, settings.input_paths): - cast(SpectrumDigitiserChannel, channel).set_input_path(path) + for channel, path in zip(self.analog_channels, settings.input_paths): + cast(SpectrumDigitiserAnalogChannel, channel).set_input_path(path) # Write the configuration to the card self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) diff --git a/src/spectrumdevice/devices/digitiser/digitiser_card.py b/src/spectrumdevice/devices/digitiser/digitiser_card.py index 77e3ce3..298e62e 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_card.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_card.py @@ -23,11 +23,16 @@ SPC_MIINST_MODULES, SPC_POSTTRIGGER, SPC_SEGMENTSIZE, + TYP_SERIESMASK, + TYP_M2PEXPSERIES, ) from spectrumdevice.devices.abstract_device import AbstractSpectrumCard from spectrumdevice.devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser -from spectrumdevice.devices.digitiser.digitiser_interface import SpectrumDigitiserChannelInterface -from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserChannel +from spectrumdevice.devices.digitiser.digitiser_interface import ( + SpectrumDigitiserAnalogChannelInterface, + SpectrumDigitiserIOLineInterface, +) +from spectrumdevice.devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel, SpectrumDigitiserIOLine from spectrumdevice.devices.spectrum_timestamper import Timestamper from spectrumdevice.exceptions import ( SpectrumCardIsNotADigitiser, @@ -41,7 +46,6 @@ BufferType, create_samples_acquisition_transfer_buffer, set_transfer_buffer, - SAMPLE_DATA_TYPE, NOTIFY_SIZE_PAGE_SIZE_IN_BYTES, DEFAULT_NOTIFY_SIZE_IN_PAGES, ) @@ -49,28 +53,41 @@ logger = logging.getLogger(__name__) -class SpectrumDigitiserCard(AbstractSpectrumCard, AbstractSpectrumDigitiser): +class SpectrumDigitiserCard( + AbstractSpectrumCard[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface], + AbstractSpectrumDigitiser, +): """Class for controlling individual Spectrum digitiser cards.""" - def __init__(self, device_number: int = 0, ip_address: Optional[str] = None): + def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None: """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ - AbstractSpectrumCard.__init__(self, device_number, ip_address) + # pass unused args up the inheritance hierarchy + super().__init__(device_number=device_number, ip_address=ip_address) + if self.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(self.type) self._acquisition_mode = self.acquisition_mode self._timestamper: Optional[Timestamper] = None self._batch_size = 1 - def _init_channels(self) -> Sequence[SpectrumDigitiserChannelInterface]: + def _init_analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) total_channels = num_modules * num_channels_per_module - return tuple([SpectrumDigitiserChannel(n, self) for n in range(total_channels)]) + return tuple( + [SpectrumDigitiserAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)] + ) + + def _init_io_lines(self) -> Sequence[SpectrumDigitiserIOLineInterface]: + if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: + return tuple([SpectrumDigitiserIOLine(channel_number=n, parent_device=self) for n in range(4)]) + else: + raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") def enable_timestamping(self) -> None: self._timestamper = Timestamper(self, self._handle) @@ -112,7 +129,7 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]: raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer") num_read_bytes = 0 - num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_channels) + num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channels) num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype) @@ -147,17 +164,17 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]: num_read_bytes += num_available_bytes waveforms_in_columns = raw_samples.reshape( - (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_channels)) + (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channels)) ) repeat_acquisitions = [] for n in range(self._batch_size): repeat_acquisitions.append( [ - cast(SpectrumDigitiserChannel, self.channels[ch_num]).convert_raw_waveform_to_voltage_waveform( - squeeze(waveform) - ) - for ch_num, waveform in zip(self.enabled_channels, waveforms_in_columns[n, :, :].T) + cast( + SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] + ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) + for ch_num, waveform in zip(self.enabled_analog_channels, waveforms_in_columns[n, :, :].T) ] ) @@ -284,9 +301,10 @@ def _set_or_update_transfer_buffer_attribute(self, buffer: Optional[Sequence[Tra raise ValueError("Digitisers need a transfer buffer with type BufferDirection.SPCM_BUF_DATA") elif self._transfer_buffer is None: if self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): - bytes_per_sample = SAMPLE_DATA_TYPE().itemsize - samples_per_batch = self.acquisition_length_in_samples * len(self.enabled_channels) * self._batch_size - pages_per_batch = samples_per_batch * bytes_per_sample / NOTIFY_SIZE_PAGE_SIZE_IN_BYTES + samples_per_batch = ( + self.acquisition_length_in_samples * len(self.enabled_analog_channels) * self._batch_size + ) + pages_per_batch = samples_per_batch * self.bytes_per_sample / NOTIFY_SIZE_PAGE_SIZE_IN_BYTES if pages_per_batch < DEFAULT_NOTIFY_SIZE_IN_PAGES: notify_size = pages_per_batch @@ -295,11 +313,15 @@ def _set_or_update_transfer_buffer_attribute(self, buffer: Optional[Sequence[Tra # Make transfer buffer big enough to hold all samples in the batch self._transfer_buffer = create_samples_acquisition_transfer_buffer( - samples_per_batch, notify_size_in_pages=notify_size + size_in_samples=samples_per_batch, + notify_size_in_pages=notify_size, + bytes_per_sample=self.bytes_per_sample, ) elif self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): self._transfer_buffer = create_samples_acquisition_transfer_buffer( - self.acquisition_length_in_samples * len(self.enabled_channels), notify_size_in_pages=0 + size_in_samples=self.acquisition_length_in_samples * len(self.enabled_analog_channels), + notify_size_in_pages=0, + bytes_per_sample=self.bytes_per_sample, ) else: raise ValueError("AcquisitionMode not recognised") diff --git a/src/spectrumdevice/devices/digitiser/digitiser_channel.py b/src/spectrumdevice/devices/digitiser/digitiser_channel.py index c9e6a48..4ecb8cb 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_channel.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_channel.py @@ -1,4 +1,5 @@ """Provides a concrete class for configuring the individual channels of Spectrum digitiser devices.""" +from typing import Any # Christian Baker, King's College London # Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London @@ -7,11 +8,16 @@ from numpy import ndarray from spectrum_gmbh.regs import SPC_MIINST_MAXADCVALUE -from spectrumdevice.devices.abstract_device import AbstractSpectrumCard, AbstractSpectrumChannel +from spectrumdevice.devices.abstract_device import AbstractSpectrumCard +from spectrumdevice.devices.abstract_device.abstract_spectrum_channel import AbstractSpectrumAnalogChannel +from spectrumdevice.devices.abstract_device.abstract_spectrum_io_line import AbstractSpectrumIOLine from spectrumdevice.devices.digitiser.digitiser_interface import ( - SpectrumDigitiserChannelInterface, + SpectrumDigitiserInterface, + SpectrumDigitiserAnalogChannelInterface, + SpectrumDigitiserIOLineInterface, ) from spectrumdevice.exceptions import SpectrumCardIsNotADigitiser +from spectrumdevice.settings import IOLineMode from spectrumdevice.settings.card_dependent_properties import CardType from spectrumdevice.settings.channel import ( INPUT_IMPEDANCE_COMMANDS, @@ -25,16 +31,29 @@ ) -class SpectrumDigitiserChannel(AbstractSpectrumChannel, SpectrumDigitiserChannelInterface): +class SpectrumDigitiserIOLine(AbstractSpectrumIOLine, SpectrumDigitiserIOLineInterface): + def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: + if parent_device.type != CardType.SPCM_TYPE_AI: + raise SpectrumCardIsNotADigitiser(parent_device.type) + super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy + + def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: + return 0 # no settings required for DigOut + + +class SpectrumDigitiserAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumDigitiserAnalogChannelInterface): """Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when a `SpectrumDigitiserCard` or `SpectrumDigitiserStarHub` is instantiated, and can then be accessed via the `.channels` property.""" - def __init__(self, channel_number: int, parent_device: AbstractSpectrumCard): + def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None: if parent_device.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(parent_device.type) - AbstractSpectrumChannel.__init__(self, channel_number, parent_device) + + # pass unused args up the inheritance hierarchy + super().__init__(channel_number=channel_number, parent_device=parent_device) + self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) # used frequently so store locally instead of reading from device each time: self._vertical_range_mv = self.vertical_range_in_mv diff --git a/src/spectrumdevice/devices/digitiser/digitiser_interface.py b/src/spectrumdevice/devices/digitiser/digitiser_interface.py index cfa2d4f..23a880b 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_interface.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_interface.py @@ -11,15 +11,20 @@ from numpy import float_, ndarray from numpy.typing import NDArray -from spectrumdevice.devices.abstract_device import SpectrumChannelInterface, SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface +from spectrumdevice.devices.abstract_device.interfaces import SpectrumAnalogChannelInterface, SpectrumIOLineInterface from spectrumdevice.settings import AcquisitionMode, AcquisitionSettings from spectrumdevice import Measurement from spectrumdevice.settings.channel import InputImpedance, InputCoupling, InputPath -class SpectrumDigitiserChannelInterface(SpectrumChannelInterface, ABC): - """Defines the public interface for control of the channels of Spectrum digitiser device. All properties are read- - only and must be set with their respective setter methods.""" +class SpectrumDigitiserIOLineInterface(SpectrumIOLineInterface, ABC): + pass + + +class SpectrumDigitiserAnalogChannelInterface(SpectrumAnalogChannelInterface, ABC): + """Defines the public interface for control of the analog channels of Spectrum digitiser device. All properties are + read-only and must be set with their respective setter methods.""" @property @abstractmethod diff --git a/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py b/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py index 515ce39..bf59a68 100644 --- a/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py +++ b/src/spectrumdevice/devices/digitiser/digitiser_star_hub.py @@ -5,7 +5,7 @@ # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. import datetime from threading import Thread -from typing import Dict, List, Optional, Sequence, cast +from typing import Dict, List, Optional, Sequence from numpy import float_ from numpy.typing import NDArray @@ -16,22 +16,18 @@ from spectrumdevice.devices.abstract_device.abstract_spectrum_hub import check_settings_constant_across_devices from spectrumdevice.devices.digitiser.digitiser_card import SpectrumDigitiserCard from spectrumdevice.devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser -from spectrumdevice.settings import TransferBuffer +from spectrumdevice.settings import ModelNumber, TransferBuffer +from spectrumdevice.settings.card_dependent_properties import CardType from spectrumdevice.settings.device_modes import AcquisitionMode -class SpectrumDigitiserStarHub(AbstractSpectrumStarHub, AbstractSpectrumDigitiser): - """Composite class of `SpectrumCards` for controlling a StarHub digitiser device, for example the Spectrum NetBox. - StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the child cards - of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the control of a - StarHub device as if it were a single Spectrum card.""" +class SpectrumDigitiserStarHub(AbstractSpectrumStarHub[SpectrumDigitiserCard], AbstractSpectrumDigitiser): + """Composite class of `SpectrumDigitiserCard` for controlling a StarHub digitiser device, for example the Spectrum + NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the + child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the + control of a StarHub device as if it were a single Spectrum card.""" - def __init__( - self, - device_number: int, - child_cards: Sequence[SpectrumDigitiserCard], - master_card_index: int, - ): + def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. @@ -40,7 +36,7 @@ def __init__( master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ - AbstractSpectrumStarHub.__init__(self, device_number, child_cards, master_card_index) + super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index) self._acquisition_mode = self.acquisition_mode def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: @@ -64,7 +60,7 @@ def wait_for_acquisition_to_complete(self) -> None: """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()` for more information.""" for card in self._child_cards: - cast(SpectrumDigitiserCard, card).wait_for_acquisition_to_complete() + card.wait_for_acquisition_to_complete() def get_waveforms(self) -> List[List[NDArray[float_]]]: """Get a list of the most recently transferred waveforms. @@ -86,9 +82,7 @@ def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: this_cards_waveforms = digitiser_card.get_waveforms() card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms - threads = [ - Thread(target=_get_waveforms, args=(cast(SpectrumDigitiserCard, card),)) for card in self._child_cards - ] + threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards] for thread in threads: thread.start() @@ -106,10 +100,10 @@ def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: def get_timestamp(self) -> Optional[datetime.datetime]: """Get timestamp for the last acquisition""" - return cast(SpectrumDigitiserCard, self._triggering_card).get_timestamp() + return self._triggering_card.get_timestamp() def enable_timestamping(self) -> None: - cast(SpectrumDigitiserCard, self._triggering_card).enable_timestamping() + self._triggering_card.enable_timestamping() @property def acquisition_length_in_samples(self) -> int: @@ -121,7 +115,7 @@ def acquisition_length_in_samples(self) -> int: length_in_samples: The currently set acquisition length in samples.""" lengths = [] for d in self._child_cards: - lengths.append(cast(SpectrumDigitiserCard, d).acquisition_length_in_samples) + lengths.append(d.acquisition_length_in_samples) return check_settings_constant_across_devices(lengths, __name__) def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: @@ -131,7 +125,7 @@ def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: Args: length_in_samples (int): The desired acquisition length in samples.""" for d in self._child_cards: - cast(SpectrumDigitiserCard, d).set_acquisition_length_in_samples(length_in_samples) + d.set_acquisition_length_in_samples(length_in_samples) @property def post_trigger_length_in_samples(self) -> int: @@ -144,7 +138,7 @@ def post_trigger_length_in_samples(self) -> int: """ lengths = [] for d in self._child_cards: - lengths.append(cast(SpectrumDigitiserCard, d).post_trigger_length_in_samples) + lengths.append(d.post_trigger_length_in_samples) return check_settings_constant_across_devices(lengths, __name__) def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: @@ -155,7 +149,7 @@ def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: length_in_samples (int): The desired post trigger length in samples. """ for d in self._child_cards: - cast(SpectrumDigitiserCard, d).set_post_trigger_length_in_samples(length_in_samples) + d.set_post_trigger_length_in_samples(length_in_samples) @property def acquisition_mode(self) -> AcquisitionMode: @@ -167,7 +161,7 @@ def acquisition_mode(self) -> AcquisitionMode: """ modes = [] for d in self._child_cards: - modes.append(cast(SpectrumDigitiserCard, d).acquisition_mode) + modes.append(d.acquisition_mode) return AcquisitionMode(check_settings_constant_across_devices([m.value for m in modes], __name__)) def set_acquisition_mode(self, mode: AcquisitionMode) -> None: @@ -177,15 +171,27 @@ def set_acquisition_mode(self, mode: AcquisitionMode) -> None: Args: mode (`AcquisitionMode`): The desired acquisition mode.""" for d in self._child_cards: - cast(SpectrumDigitiserCard, d).set_acquisition_mode(mode) + d.set_acquisition_mode(mode) @property def batch_size(self) -> int: batch_sizes = [] for d in self._child_cards: - batch_sizes.append(cast(SpectrumDigitiserCard, d).batch_size) + batch_sizes.append(d.batch_size) return check_settings_constant_across_devices(batch_sizes, __name__) def set_batch_size(self, batch_size: int) -> None: for d in self._child_cards: - cast(SpectrumDigitiserCard, d).set_batch_size(batch_size) + d.set_batch_size(batch_size) + + def force_trigger_event(self) -> None: + for d in self._child_cards: + d.force_trigger_event() + + @property + def type(self) -> CardType: + return self._child_cards[0].type + + @property + def model_number(self) -> ModelNumber: + return self._child_cards[0].model_number diff --git a/src/spectrumdevice/devices/mocks/__init__.py b/src/spectrumdevice/devices/mocks/__init__.py index 77e024a..8bde001 100644 --- a/src/spectrumdevice/devices/mocks/__init__.py +++ b/src/spectrumdevice/devices/mocks/__init__.py @@ -6,32 +6,32 @@ import logging from time import perf_counter, sleep -from typing import List, Optional, Sequence +from typing import Any, List, Optional, Sequence -from spectrum_gmbh.regs import ( - SPC_FNCTYPE, - SPC_MIINST_CHPERMODULE, - SPC_MIINST_MODULES, - SPC_PCITYP, -) +from spectrumdevice.devices.awg.awg_card import SpectrumAWGCard from spectrumdevice.devices.digitiser import SpectrumDigitiserCard from spectrumdevice.devices.digitiser import SpectrumDigitiserStarHub -from spectrumdevice.devices.mocks.mock_abstract_devices import MockAbstractSpectrumDigitiser +from spectrumdevice.devices.mocks.mock_abstract_devices import ( + MockAbstractSpectrumDigitiser, + MockAbstractSpectrumCard, + MockAbstractSpectrumStarHub, + MockAbstractSpectrumAWG, +) from spectrumdevice.devices.mocks.mock_waveform_source import TRANSFER_CHUNK_COUNTER from spectrumdevice.devices.mocks.timestamps import MockTimestamper from spectrumdevice.exceptions import ( SpectrumNoTransferBufferDefined, SpectrumSettingsMismatchError, ) -from spectrumdevice.settings import TransferBuffer -from spectrumdevice.settings.card_dependent_properties import CardType, ModelNumber +from spectrumdevice.settings import ModelNumber, TransferBuffer +from spectrumdevice.settings.card_dependent_properties import CardType from spectrumdevice.settings.device_modes import AcquisitionMode logger = logging.getLogger(__name__) MOCK_TRANSFER_TIMEOUT_IN_S = 10 -class MockSpectrumDigitiserCard(SpectrumDigitiserCard, MockAbstractSpectrumDigitiser): +class MockSpectrumDigitiserCard(MockAbstractSpectrumDigitiser, MockAbstractSpectrumCard, SpectrumDigitiserCard): """A mock spectrum card, for testing software written to use the `SpectrumDigitiserCard` class. This class overrides methods of `SpectrumDigitiserCard` that communicate with hardware with mocked implementations, @@ -45,8 +45,8 @@ def __init__( device_number: int, model: ModelNumber, mock_source_frame_rate_hz: float, - num_modules: int = 2, - num_channels_per_module: int = 4, + num_modules: int, + num_channels_per_module: int, ): """ Args: @@ -61,17 +61,18 @@ def __init__( num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. """ - MockAbstractSpectrumDigitiser.__init__(self, mock_source_frame_rate_hz) - self._param_dict[SPC_MIINST_MODULES] = num_modules - self._param_dict[SPC_MIINST_CHPERMODULE] = num_channels_per_module - self._param_dict[SPC_PCITYP] = model.value - self._param_dict[SPC_FNCTYPE] = CardType.SPCM_TYPE_AI.value - self._param_dict[TRANSFER_CHUNK_COUNTER] = 0 - SpectrumDigitiserCard.__init__(self, device_number=device_number) - self._visa_string = f"MockCard{device_number}" + super().__init__( + device_number=device_number, + model=model, + mock_source_frame_rate_hz=mock_source_frame_rate_hz, + num_modules=num_modules, + num_channels_per_module=num_channels_per_module, + card_type=CardType.SPCM_TYPE_AI, + ) self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode self._previous_transfer_chunk_count = 0 + self._param_dict[TRANSFER_CHUNK_COUNTER] = 0 def enable_timestamping(self) -> None: self._timestamper: MockTimestamper = MockTimestamper(self, self._handle) @@ -96,7 +97,7 @@ def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """ super().set_acquisition_length_in_samples(length_in_samples) - def set_enabled_channels(self, channels_nums: List[int]) -> None: + def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set. @@ -105,8 +106,8 @@ def set_enabled_channels(self, channels_nums: List[int]) -> None: channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2]. """ - if len(list(filter(lambda x: 0 <= x < len(self.channels), channels_nums))) == len(channels_nums): - super().set_enabled_channels(channels_nums) + if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums): + super().set_enabled_analog_channels(channels_nums) else: raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.") @@ -152,29 +153,58 @@ def wait_for_acquisition_to_complete(self) -> None: logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect") -class MockSpectrumDigitiserStarHub(SpectrumDigitiserStarHub, MockAbstractSpectrumDigitiser): +class MockSpectrumAWGCard(MockAbstractSpectrumAWG, MockAbstractSpectrumCard, SpectrumAWGCard): + def __init__(self, device_number: int, model: ModelNumber, num_modules: int, num_channels_per_module: int) -> None: + """ + Args: + device_number (int): The index of the mock device to create. Used to create a name for the device which is + used internally. + model (ModelNumber): The model of card to mock. + num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this + is read from the device so does not need to be set. See the Spectrum documentation to work out how many + modules your hardware has. + num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On + real hardware, this is read from the device so does not need to be set. + """ + super().__init__( + card_type=CardType.SPCM_TYPE_AO, + device_number=device_number, + model=model, + num_modules=num_modules, + num_channels_per_module=num_channels_per_module, + ) + self._connect(self._visa_string) + + def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: + """Create or provide a `TransferBuffer` object for transferring samples from the device. + + See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it + does not write to any hardware device.""" + if buffer is None: + raise ValueError( + "You must provide a preconfigured buffer for transferring samples to an AWG because the" + "buffer size cannot be inferred." + ) + self._transfer_buffer = buffer[0] + + +class MockSpectrumDigitiserStarHub(MockAbstractSpectrumStarHub, SpectrumDigitiserStarHub): """A mock spectrum StarHub, for testing software written to use the `SpectrumStarHub` class. Overrides methods of `SpectrumStarHub` and `AbstractSpectrumDigitiser` that communicate with hardware with mocked implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI.""" - def __init__( - self, - device_number: int, - child_cards: Sequence[MockSpectrumDigitiserCard], - master_card_index: int, - ): + def __init__(self, **kwargs: Any): """ Args: - child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumCard` objects defining the + child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the properties of the child cards located within the mock hub. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ - MockAbstractSpectrumDigitiser.__init__(self) - SpectrumDigitiserStarHub.__init__(self, device_number, child_cards, master_card_index) - self._visa_string = f"MockHub{device_number}" + super().__init__(param_dict=None, **kwargs) + self._visa_string = "/mock" + self._visa_string self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode diff --git a/src/spectrumdevice/devices/mocks/mock_abstract_devices.py b/src/spectrumdevice/devices/mocks/mock_abstract_devices.py index 322914f..0a11858 100644 --- a/src/spectrumdevice/devices/mocks/mock_abstract_devices.py +++ b/src/spectrumdevice/devices/mocks/mock_abstract_devices.py @@ -6,7 +6,7 @@ from abc import ABC from threading import Event, Lock, Thread -from typing import Dict, Optional +from typing import Any, Dict, Optional, Union, cast from spectrum_gmbh.regs import ( SPCM_FEAT_EXTFW_SEGSTAT, @@ -17,40 +17,35 @@ SPCM_X3_AVAILMODES, SPCM_XMODE_DISABLE, SPC_CARDMODE, + SPC_FNCTYPE, SPC_MEMSIZE, + SPC_MIINST_BYTESPERSAMPLE, SPC_MIINST_MAXADCVALUE, SPC_PCIEXTFEATURES, SPC_PCIFEATURES, + SPC_PCITYP, SPC_SEGMENTSIZE, SPC_TIMEOUT, + SPC_MIINST_MODULES, + SPC_MIINST_CHPERMODULE, ) -from spectrumdevice.devices.abstract_device import AbstractSpectrumDevice +from spectrumdevice.devices.abstract_device import AbstractSpectrumDevice, AbstractSpectrumCard, AbstractSpectrumStarHub +from spectrumdevice.devices.awg.abstract_spectrum_awg import AbstractSpectrumAWG from spectrumdevice.devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser from spectrumdevice.devices.mocks.mock_waveform_source import mock_waveform_source_factory from spectrumdevice.exceptions import SpectrumDeviceNotConnected -from spectrumdevice.settings import AcquisitionMode, SpectrumRegisterLength +from spectrumdevice.settings import AcquisitionMode, ModelNumber, SpectrumRegisterLength +from spectrumdevice.settings.card_dependent_properties import CardType +from spectrumdevice.settings.device_modes import GenerationMode class MockAbstractSpectrumDevice(AbstractSpectrumDevice, ABC): - """Overrides methods of `AbstractSpectrumDevice` that communicate with hardware with mocked implementations, allowing - software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. Instances of this - class cannot be constructed directly - instantiate `MockAbstractSpectrumDigitiser` and `MockSpectrumStarHub` objects instead, - which inherit from this class.""" - - def __init__(self) -> None: - self._param_dict: Dict[int, int] = { - SPC_PCIFEATURES: SPCM_FEAT_MULTI, - SPC_PCIEXTFEATURES: SPCM_FEAT_EXTFW_SEGSTAT, - SPCM_X0_AVAILMODES: SPCM_XMODE_DISABLE, - SPCM_X1_AVAILMODES: SPCM_XMODE_DISABLE, - SPCM_X2_AVAILMODES: SPCM_XMODE_DISABLE, - SPCM_X3_AVAILMODES: SPCM_XMODE_DISABLE, - SPC_TIMEOUT: 1000, - SPC_SEGMENTSIZE: 1000, - SPC_MEMSIZE: 1000, - } - self._buffer_lock = Lock() - self._enabled_channels = [0] + def __init__(self, param_dict: Optional[Dict[int, int]], **kwargs: Any): + if param_dict is None: + self._param_dict: Dict[int, int] = {} + else: + self._param_dict = param_dict + super().__init__(**kwargs) # required for proper MRO resolution def write_to_spectrum_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO @@ -104,22 +99,65 @@ def read_spectrum_device_register( raise SpectrumDeviceNotConnected("Mock device has been disconnected.") +class MockAbstractSpectrumCard(MockAbstractSpectrumDevice, AbstractSpectrumCard, ABC): + """Overrides methods of `AbstractSpectrumDevice` that communicate with hardware with mocked implementations, allowing + software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. Instances of this + class cannot be constructed directly - instantiate `MockAbstractSpectrumDigitiser` and `MockSpectrumStarHub` objects instead, + which inherit from this class.""" + + def __init__( + self, + model: ModelNumber, + card_type: CardType, + mode: Union[AcquisitionMode, GenerationMode], + num_modules: int, + num_channels_per_module: int, + **kwargs: Any, + ) -> None: + param_dict: dict[int, int] = {} + param_dict[SPC_PCIFEATURES] = SPCM_FEAT_MULTI + param_dict[SPC_PCIEXTFEATURES] = SPCM_FEAT_EXTFW_SEGSTAT + param_dict[SPCM_X0_AVAILMODES] = SPCM_XMODE_DISABLE + param_dict[SPCM_X1_AVAILMODES] = SPCM_XMODE_DISABLE + param_dict[SPCM_X2_AVAILMODES] = SPCM_XMODE_DISABLE + param_dict[SPCM_X3_AVAILMODES] = SPCM_XMODE_DISABLE + param_dict[SPC_TIMEOUT] = 1000 + param_dict[SPC_SEGMENTSIZE] = 1000 + param_dict[SPC_MEMSIZE] = 1000 + param_dict[SPC_PCITYP] = model.value + param_dict[SPC_FNCTYPE] = card_type.value + param_dict[SPC_CARDMODE] = cast(int, mode.value) # cast suppresses a pycharm warning + param_dict[SPC_MIINST_MODULES] = num_modules + param_dict[SPC_MIINST_CHPERMODULE] = num_channels_per_module + param_dict[SPC_MIINST_BYTESPERSAMPLE] = 2 + param_dict[SPC_MIINST_MAXADCVALUE] = 128 + self._buffer_lock = Lock() + self._enabled_channels = [0] + super().__init__( + param_dict=param_dict, **kwargs + ) # then call the rest of the inits after the params have been set + self._visa_string = "/mock" + self._visa_string + + +class MockAbstractSpectrumStarHub(MockAbstractSpectrumDevice, AbstractSpectrumStarHub, ABC): + pass + + class MockAbstractSpectrumDigitiser(MockAbstractSpectrumDevice, AbstractSpectrumDigitiser, ABC): """Overrides methods of `AbstractSpectrumDigitiser` that communicate with hardware with mocked implementations, allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. Instances of this class cannot be constructed directly - instantiate `MockAbstractSpectrumDigitiser` and `MockSpectrumStarHub` objects instead, which inherit from this class.""" - def __init__(self, source_frame_rate_hz: float = 10.0) -> None: + def __init__(self, mock_source_frame_rate_hz: float = 10.0, **kwargs: Any) -> None: """ Args: source_frame_rate_hz (float): Frame rate at which a mock waveform source will generate waveforms. """ - MockAbstractSpectrumDevice.__init__(self) - self._source_frame_rate_hz = source_frame_rate_hz - self._param_dict[SPC_CARDMODE] = AcquisitionMode.SPC_REC_STD_SINGLE.value - self._param_dict[SPC_MIINST_MAXADCVALUE] = 128 - + # use super() to ensure init of MockAbstractSpectrumDevice is only called once in child classes with multiple + # inheritance + super().__init__(mode=AcquisitionMode.SPC_REC_STD_SINGLE, **kwargs) + self._source_frame_rate_hz = mock_source_frame_rate_hz self._buffer_lock = Lock() self._acquisition_stop_event = Event() self._acquisition_thread: Optional[Thread] = None @@ -134,6 +172,7 @@ def start(self) -> None: notify_size = self.transfer_buffers[0].notify_size_in_pages # this will be 0 in STD_SINGLE_MODE waveform_source = mock_waveform_source_factory(self.acquisition_mode, self._param_dict, notify_size) amplitude = self.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) + print(f"STARTING MOCK WAVEFORMS SOURCE WITH AMPLITUDE {amplitude}") self._acquisition_stop_event.clear() self._acquisition_thread = Thread( target=waveform_source, @@ -142,7 +181,7 @@ def start(self) -> None: self._source_frame_rate_hz, amplitude, self.transfer_buffers[0].data_array, - self.acquisition_length_in_samples * len(self.enabled_channels), + self.acquisition_length_in_samples * len(self.enabled_analog_channels), self._buffer_lock, ), ) @@ -151,3 +190,8 @@ def start(self) -> None: def stop(self) -> None: """Stops the mock waveform source and timestamp threads.""" self._acquisition_stop_event.set() + + +class MockAbstractSpectrumAWG(MockAbstractSpectrumDevice, AbstractSpectrumAWG, ABC): + def __init__(self, **kwargs: Any) -> None: + super().__init__(mode=GenerationMode.SPC_REP_STD_SINGLE, **kwargs) diff --git a/src/spectrumdevice/devices/mocks/mock_waveform_source.py b/src/spectrumdevice/devices/mocks/mock_waveform_source.py index dc9c954..7c74af1 100644 --- a/src/spectrumdevice/devices/mocks/mock_waveform_source.py +++ b/src/spectrumdevice/devices/mocks/mock_waveform_source.py @@ -132,9 +132,9 @@ def mock_waveform_source_factory( param_dict: Dict[int, int], notify_size_in_pages: float = 0, ) -> MockWaveformSource: - if acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: + if acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): return MultiFIFOModeMockWaveformSource(param_dict, notify_size_in_pages) - elif AcquisitionMode.SPC_REC_STD_SINGLE: + elif acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE: return SingleModeMockWaveformSource(param_dict) else: raise NotImplementedError(f"Mock waveform source not yet implemented for {acquisition_mode} acquisition mode.") diff --git a/src/spectrumdevice/devices/spectrum_timestamper.py b/src/spectrumdevice/devices/spectrum_timestamper.py index c919860..0d38d70 100644 --- a/src/spectrumdevice/devices/spectrum_timestamper.py +++ b/src/spectrumdevice/devices/spectrum_timestamper.py @@ -47,7 +47,9 @@ def __init__( ): self._parent_device = parent_device self._transfer_buffer = transfer_buffer_factory( - buffer_type=BufferType.SPCM_BUF_TIMESTAMP, direction=BufferDirection.SPCM_DIR_CARDTOPC + buffer_type=BufferType.SPCM_BUF_TIMESTAMP, + direction=BufferDirection.SPCM_DIR_CARDTOPC, + bytes_per_sample=parent_device.bytes_per_sample, ) self._expected_timestamp_bytes_per_frame = BYTES_PER_TIMESTAMP diff --git a/src/spectrumdevice/settings/card_dependent_properties.py b/src/spectrumdevice/settings/card_dependent_properties.py index f090c97..85f612a 100644 --- a/src/spectrumdevice/settings/card_dependent_properties.py +++ b/src/spectrumdevice/settings/card_dependent_properties.py @@ -86,6 +86,7 @@ TYP_M2P5913_X4, TYP_M2P5911_X4, TYP_M2P5912_X4, + TYP_M2P65XX_X4, ) @@ -193,6 +194,7 @@ class ModelNumber(Enum): TYP_M2P59XX_X4 & TYP_FAMILYMASK: 8, TYP_M4I22XX_X8 & TYP_FAMILYMASK: 32, TYP_M4I44XX_X8 & TYP_FAMILYMASK: 16, + TYP_M2P65XX_X4 & TYP_FAMILYMASK: 8, } diff --git a/src/spectrumdevice/settings/card_features.py b/src/spectrumdevice/settings/card_features.py index 3f28bb2..113a6ce 100644 --- a/src/spectrumdevice/settings/card_features.py +++ b/src/spectrumdevice/settings/card_features.py @@ -80,6 +80,7 @@ class AdvancedCardFeature(Enum): SPCM_FEAT_EXTFW_SEGSTAT = SPCM_FEAT_EXTFW_SEGSTAT SPCM_FEAT_EXTFW_SEGAVERAGE = SPCM_FEAT_EXTFW_SEGAVERAGE SPCM_FEAT_EXTFW_BOXCAR = SPCM_FEAT_EXTFW_BOXCAR + SPCM_FEAT_EXTFW_PULSEGEN = 0x00000008 # not in regs.py for some reason. None of the AWG stuff seem to be, def decode_advanced_card_features(value: int) -> List[AdvancedCardFeature]: diff --git a/src/spectrumdevice/settings/channel.py b/src/spectrumdevice/settings/channel.py index 0647807..d734092 100644 --- a/src/spectrumdevice/settings/channel.py +++ b/src/spectrumdevice/settings/channel.py @@ -247,10 +247,14 @@ class OutputChannelStopLevelMode(Enum): SPCM_STOPLVL_HOLDLAST = SPCM_STOPLVL_HOLDLAST """ Output level will stay at the level of the last played sample.""" SPCM_STOPLVL_CUSTOM = SPCM_STOPLVL_CUSTOM - """ Output level will go to the value defined using AWGChannel.set_stop_level_custom_value()""" + """ Output level will go to the value defined using SpectrumAWGChannel.set_stop_level_custom_value()""" class SpectrumChannelName(Enum): + pass + + +class SpectrumAnalogChannelName(SpectrumChannelName): CHANNEL0 = CHANNEL0 CHANNEL1 = CHANNEL1 CHANNEL2 = CHANNEL2 diff --git a/src/spectrumdevice/settings/device_modes.py b/src/spectrumdevice/settings/device_modes.py index 59e93da..74f720f 100644 --- a/src/spectrumdevice/settings/device_modes.py +++ b/src/spectrumdevice/settings/device_modes.py @@ -15,6 +15,7 @@ SPC_CM_EXTERNAL, SPC_CM_EXTREFCLOCK, SPC_REP_STD_SINGLE, + SPC_REP_STD_SINGLERESTART, ) @@ -39,6 +40,8 @@ class GenerationMode(Enum): SPC_REP_STD_SINGLE = SPC_REP_STD_SINGLE """Data generation from on-board memory repeating the complete programmed memory either once, infinite or for a defined number of times after one single trigger event.""" + SPC_REP_STD_SINGLERESTART = SPC_REP_STD_SINGLERESTART + """Data generation from on-board memory. The programmed memory is repeated once after each single trigger event.""" class ClockMode(Enum): diff --git a/src/spectrumdevice/settings/io_lines.py b/src/spectrumdevice/settings/io_lines.py index f8fb674..e6185a4 100644 --- a/src/spectrumdevice/settings/io_lines.py +++ b/src/spectrumdevice/settings/io_lines.py @@ -10,6 +10,7 @@ from enum import Enum from typing import List +from spectrumdevice.settings.channel import SpectrumChannelName from spectrumdevice.spectrum_wrapper import decode_bitmap_using_list_of_ints from spectrum_gmbh.regs import ( SPCM_XMODE_DISABLE, @@ -22,9 +23,54 @@ SPCM_XMODE_RUNSTATE, SPCM_XMODE_ARMSTATE, SPCM_XMODE_CONTOUTMARK, + SPCM_XMODE_DIGOUTSRC_CH0, + SPCM_XMODE_DIGOUTSRC_CH6, + SPCM_XMODE_DIGOUTSRC_CH5, + SPCM_XMODE_DIGOUTSRC_CH4, + SPCM_XMODE_DIGOUTSRC_CH3, + SPCM_XMODE_DIGOUTSRC_CH2, + SPCM_XMODE_DIGOUTSRC_CH1, + SPCM_XMODE_DIGOUTSRC_CH7, + SPCM_XMODE_DIGOUTSRC_BIT15, + SPCM_XMODE_DIGOUTSRC_BIT14, + SPCM_XMODE_DIGOUTSRC_BIT13, + SPCM_XMODE_DIGOUTSRC_BIT12, + SPCM_X0_MODE, + SPCM_X1_MODE, + SPCM_X15_MODE, + SPCM_X14_MODE, + SPCM_X13_MODE, + SPCM_X12_MODE, + SPCM_X11_MODE, + SPCM_X10_MODE, + SPCM_X9_MODE, + SPCM_X8_MODE, + SPCM_X7_MODE, + SPCM_X6_MODE, + SPCM_X5_MODE, + SPCM_X4_MODE, ) +class SpectrumIOLineName(SpectrumChannelName): + X0 = 0x00000001 + X1 = 0x00000002 + X2 = 0x00000004 + X3 = 0x00000008 + X4 = 0x00000010 + X5 = 0x00000020 + X6 = 0x00000040 + X7 = 0x00000080 + X8 = 0x00000100 + X9 = 0x00000200 + X10 = 0x00000400 + X11 = 0x00000800 + X12 = 0x00001000 + X13 = 0x00002000 + X14 = 0x00004000 + X15 = 0x00008000 + + class IOLineMode(Enum): """Enum representing the possible modes that a devices multi-purpose I/O line can support. A list of available modes for each I/O line on a device is provided by the devices available_io_modes property. See the Spectrum @@ -42,6 +88,55 @@ class IOLineMode(Enum): SPCM_XMODE_CONTOUTMARK = SPCM_XMODE_CONTOUTMARK +IO_LINE_MODE_COMMANDS = ( + SPCM_X0_MODE, + SPCM_X1_MODE, + SPCM_X4_MODE, + SPCM_X5_MODE, + SPCM_X6_MODE, + SPCM_X7_MODE, + SPCM_X8_MODE, + SPCM_X9_MODE, + SPCM_X10_MODE, + SPCM_X11_MODE, + SPCM_X12_MODE, + SPCM_X13_MODE, + SPCM_X14_MODE, + SPCM_X15_MODE, +) + + +class DigOutSourceChannel(Enum): + SPCM_XMODE_DIGOUTSRC_CH0 = SPCM_XMODE_DIGOUTSRC_CH0 + SPCM_XMODE_DIGOUTSRC_CH1 = SPCM_XMODE_DIGOUTSRC_CH1 + SPCM_XMODE_DIGOUTSRC_CH2 = SPCM_XMODE_DIGOUTSRC_CH2 + SPCM_XMODE_DIGOUTSRC_CH3 = SPCM_XMODE_DIGOUTSRC_CH3 + SPCM_XMODE_DIGOUTSRC_CH4 = SPCM_XMODE_DIGOUTSRC_CH4 + SPCM_XMODE_DIGOUTSRC_CH5 = SPCM_XMODE_DIGOUTSRC_CH5 + SPCM_XMODE_DIGOUTSRC_CH6 = SPCM_XMODE_DIGOUTSRC_CH6 + SPCM_XMODE_DIGOUTSRC_CH7 = SPCM_XMODE_DIGOUTSRC_CH7 + + +class DigOutSourceBit(Enum): + SPCM_XMODE_DIGOUTSRC_BIT15 = SPCM_XMODE_DIGOUTSRC_BIT15 + """Use Bit15 of selected channel: channel’s resolution will be reduced to 15 bit.""" + SPCM_XMODE_DIGOUTSRC_BIT14 = SPCM_XMODE_DIGOUTSRC_BIT14 + """Use Bit14 of selected channel: channel’s resolution will be reduced to 14 bit, + even if bit 15 is not used for digital replay.""" + SPCM_XMODE_DIGOUTSRC_BIT13 = SPCM_XMODE_DIGOUTSRC_BIT13 + """Use Bit13 of selected channel: channel’s resolution will be reduced to 13 bit, + even if bit 15 and/or bit 14 are not used for digital replay.""" + SPCM_XMODE_DIGOUTSRC_BIT12 = SPCM_XMODE_DIGOUTSRC_BIT12 + """Use Bit12 of selected channel: channel’s resolution will be reduced to 12 bit, + even if bit 15 and/or bit 14 end/or bit 13 are not used for digital replay.""" + + +@dataclass +class DigOutIOLineModeSettings: + source_channel: DigOutSourceChannel + source_bit: DigOutSourceBit + + def decode_available_io_modes(value: int) -> List[IOLineMode]: """Converts the integer value received from a Spectrum device when queried about its IO line modes into a list of IOLineModes.""" diff --git a/src/spectrumdevice/settings/transfer_buffer.py b/src/spectrumdevice/settings/transfer_buffer.py index 6164cb4..d8aa1d4 100644 --- a/src/spectrumdevice/settings/transfer_buffer.py +++ b/src/spectrumdevice/settings/transfer_buffer.py @@ -13,7 +13,7 @@ from functools import partial from typing import Optional -from numpy import ndarray, zeros, int16, uint8 +from numpy import ndarray, zeros, int16, uint8, int8 from spectrumdevice.spectrum_wrapper import DEVICE_HANDLE_TYPE from spectrumdevice.spectrum_wrapper.error_handler import error_handler @@ -38,9 +38,6 @@ ) -SAMPLE_DATA_TYPE = int16 - - class BufferType(Enum): """An Enum representing the three different types of transfer buffer. See the Spectrum documentation for more information.""" @@ -147,6 +144,7 @@ def copy_contents(self) -> ndarray: def transfer_buffer_factory( buffer_type: BufferType, direction: BufferDirection, + bytes_per_sample: int, size_in_samples: Optional[int] = None, board_memory_offset_bytes: int = 0, notify_size_in_pages: float = 1, @@ -156,6 +154,7 @@ def transfer_buffer_factory( buffer_type (BufferType): Specifies whether the buffer is to be used to transfer samples, timestamps or A/B data. direction (BufferDirection): Specifies whether the buffer is to be used to transfer data from the card to the PC, or the PC to the card. + bytes_per_sample: The number of bytes per sample used by the card. Can be read using card.bytes_per_sample. size_in_samples (int): The size of the array into which samples will be written, in samples. Currently only required for BufferType.SPCM_BUF_DATA as SPCM_BUF_TIMESTAMP buffers are always 4096 uint8 long. board_memory_offset_bytes (int): Sets the offset for transfer in board memory. Default 0. See Spectrum @@ -167,10 +166,17 @@ def transfer_buffer_factory( # _check_notify_size_validity(notify_size_in_pages) + if bytes_per_sample == 1: + sample_data_type: type = int8 + elif bytes_per_sample == 2: + sample_data_type = int16 + else: + raise ValueError("Invalid number of bytes per sample. Should be 1 or 2.") + if buffer_type == BufferType.SPCM_BUF_DATA: if size_in_samples is not None: return SamplesTransferBuffer( - direction, board_memory_offset_bytes, zeros(size_in_samples, SAMPLE_DATA_TYPE), notify_size_in_pages + direction, board_memory_offset_bytes, zeros(size_in_samples, sample_data_type), notify_size_in_pages ) else: raise ValueError("You must provide a buffer size_in_samples to create a BufferType.SPCM_BUF_DATA buffer.") @@ -201,11 +207,11 @@ def _check_notify_size_validity(notify_size_in_pages: float) -> None: create_samples_acquisition_transfer_buffer = partial( - transfer_buffer_factory, BufferType.SPCM_BUF_DATA, BufferDirection.SPCM_DIR_CARDTOPC + transfer_buffer_factory, buffer_type=BufferType.SPCM_BUF_DATA, direction=BufferDirection.SPCM_DIR_CARDTOPC ) create_timestamp_acquisition_transfer_buffer = partial( - transfer_buffer_factory, BufferType.SPCM_BUF_TIMESTAMP, BufferDirection.SPCM_DIR_CARDTOPC + transfer_buffer_factory, buffer_type=BufferType.SPCM_BUF_TIMESTAMP, direction=BufferDirection.SPCM_DIR_CARDTOPC ) @@ -214,7 +220,9 @@ def set_transfer_buffer(device_handle: DEVICE_HANDLE_TYPE, buffer: TransferBuffe device_handle, buffer.type.value, buffer.direction.value, - int(buffer.notify_size_in_pages * NOTIFY_SIZE_PAGE_SIZE_IN_BYTES), + int(buffer.notify_size_in_pages * NOTIFY_SIZE_PAGE_SIZE_IN_BYTES) + if buffer.direction == BufferDirection.SPCM_DIR_CARDTOPC + else 0, buffer.data_array_pointer, buffer.board_memory_offset_bytes, buffer.data_array_length_in_bytes, diff --git a/src/tests/configuration.py b/src/tests/configuration.py index e6050ff..6b61a9f 100644 --- a/src/tests/configuration.py +++ b/src/tests/configuration.py @@ -12,20 +12,25 @@ class SpectrumTestMode(Enum): # Set to TestMode.MOCK_HARDWARE for software-only testing, even if Spectrum drivers are found on the system # Set to TestMode.REAL_HARDWARE to run tests on a real hardware device as configured below. -SINGLE_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE -STAR_HUB_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE +SINGLE_DIGITISER_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE +DIGITISER_STAR_HUB_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE +SINGLE_AWG_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE # Set IP address of real spectrum device (for use if TestMode.REAL_HARDWARE is set above). Set to None to run tests on # a local (PCIe) card. -TEST_DEVICE_IP = "169.254.13.35" +TEST_DIGITISER_IP = "169.254.13.35" +TEST_AWG_IP = None # Set the device number to connect to for real hardware tests. If using a star hub (e.g. netbox), this should be the # STAR_HUB_MASTER_CARD_INDEX. If you only have a single, local (PCIe) card, set to 0. -TEST_DEVICE_NUMBER = 1 +TEST_DIGITISER_NUMBER = 1 +TEST_AWG_NUMBER = 0 # Configure the card. These values are used to set up Mock devices as well as to check the configuration of real # hardware devices, so should match your real hardware if SpectrumTestMode.REAL_HARDWARE is being used. -NUM_MODULES_PER_CARD = 2 -NUM_CHANNELS_PER_MODULE = 4 +NUM_MODULES_PER_DIGITISER = 2 +NUM_CHANNELS_PER_DIGITISER_MODULE = 4 +NUM_MODULES_PER_AWG = 1 +NUM_CHANNELS_PER_AWG_MODULE = 1 NUM_CARDS_IN_STAR_HUB = 2 STAR_HUB_MASTER_CARD_INDEX = 1 @@ -39,13 +44,18 @@ class SpectrumTestMode(Enum): INTEGRATION_TEST_TRIGGER_SOURCE = TriggerSource.SPC_TMASK_SOFTWARE -if SINGLE_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE and not SPECTRUM_DRIVERS_FOUND: +if SINGLE_DIGITISER_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE and not SPECTRUM_DRIVERS_FOUND: raise SpectrumIOError( "Cannot run single card tests in REAL_HARDWARE mode because no Spectrum drivers were found." - "Set SINGLE_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE in configuration.py." + "Set SINGLE_DIGITISER_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE in configuration.py." ) -if STAR_HUB_TEST_MODE == SpectrumTestMode.REAL_HARDWARE and not SPECTRUM_DRIVERS_FOUND: +if SINGLE_AWG_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE and not SPECTRUM_DRIVERS_FOUND: + raise SpectrumIOError( + "Cannot run single card tests in REAL_HARDWARE mode because no Spectrum drivers were found." + "Set SINGLE_AWG_CARD_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE in configuration.py." + ) +if DIGITISER_STAR_HUB_TEST_MODE == SpectrumTestMode.REAL_HARDWARE and not SPECTRUM_DRIVERS_FOUND: raise SpectrumIOError( "Cannot run star-hub tests in REAL_HARDWARE mode because no Spectrum drivers were found" - "Set STAR_HUB_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE in configuration.py." + "Set DIGITISER_STAR_HUB_TEST_MODE = SpectrumTestMode.MOCK_HARDWARE in configuration.py." ) diff --git a/src/tests/device_factories.py b/src/tests/device_factories.py index 43476e2..297d16a 100644 --- a/src/tests/device_factories.py +++ b/src/tests/device_factories.py @@ -1,45 +1,66 @@ from spectrumdevice import SpectrumDigitiserStarHub -from spectrumdevice.devices.digitiser import SpectrumDigitiserCard -from spectrumdevice.devices.mocks import MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub +from spectrumdevice.devices.awg.awg_card import SpectrumAWGCard +from spectrumdevice.devices.awg.awg_interface import SpectrumAWGInterface +from spectrumdevice.devices.digitiser import SpectrumDigitiserCard, SpectrumDigitiserInterface +from spectrumdevice.devices.mocks import MockSpectrumAWGCard, MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub from spectrumdevice.settings import ModelNumber from tests.configuration import ( MOCK_DEVICE_TEST_FRAME_RATE_HZ, NUM_CARDS_IN_STAR_HUB, - NUM_CHANNELS_PER_MODULE, - NUM_MODULES_PER_CARD, - SINGLE_CARD_TEST_MODE, + NUM_CHANNELS_PER_DIGITISER_MODULE, + NUM_MODULES_PER_DIGITISER, + SINGLE_DIGITISER_CARD_TEST_MODE, STAR_HUB_MASTER_CARD_INDEX, - STAR_HUB_TEST_MODE, + DIGITISER_STAR_HUB_TEST_MODE, SpectrumTestMode, - TEST_DEVICE_IP, - TEST_DEVICE_NUMBER, + TEST_DIGITISER_IP, + TEST_DIGITISER_NUMBER, + SINGLE_AWG_CARD_TEST_MODE, + NUM_MODULES_PER_AWG, + NUM_CHANNELS_PER_AWG_MODULE, + TEST_AWG_IP, + TEST_AWG_NUMBER, ) -def create_spectrum_card_for_testing() -> SpectrumDigitiserCard: - """Configure a real or mock device for unit testing using the glabal constant values defined in +def create_digitiser_card_for_testing() -> SpectrumDigitiserInterface: + """Configure a real or mock device for unit testing using the global constant values defined in tests/configuration.py""" - if SINGLE_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE: - return SpectrumDigitiserCard(device_number=TEST_DEVICE_NUMBER, ip_address=TEST_DEVICE_IP) + if SINGLE_DIGITISER_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE: + return SpectrumDigitiserCard(device_number=TEST_DIGITISER_NUMBER, ip_address=TEST_DIGITISER_IP) else: return MockSpectrumDigitiserCard( - device_number=TEST_DEVICE_NUMBER, + device_number=TEST_DIGITISER_NUMBER, model=ModelNumber.TYP_M2P5966_X4, mock_source_frame_rate_hz=MOCK_DEVICE_TEST_FRAME_RATE_HZ, - num_modules=NUM_MODULES_PER_CARD, - num_channels_per_module=NUM_CHANNELS_PER_MODULE, + num_modules=NUM_MODULES_PER_DIGITISER, + num_channels_per_module=NUM_CHANNELS_PER_DIGITISER_MODULE, + ) + + +def create_awg_card_for_testing() -> SpectrumAWGInterface: + """Configure a real or mock device for unit testing using the global constant values defined in + tests/configuration.py""" + if SINGLE_AWG_CARD_TEST_MODE == SpectrumTestMode.REAL_HARDWARE: + return SpectrumAWGCard(device_number=TEST_AWG_NUMBER, ip_address=TEST_AWG_IP) + else: + return MockSpectrumAWGCard( + device_number=TEST_AWG_NUMBER, + model=ModelNumber.TYP_M2P6560_X4, + num_modules=NUM_MODULES_PER_AWG, + num_channels_per_module=NUM_CHANNELS_PER_AWG_MODULE, ) def create_spectrum_star_hub_for_testing() -> SpectrumDigitiserStarHub: """Configure a real or mock device for unit testing using the glabal constant values defined in tests/configuration.py""" - if STAR_HUB_TEST_MODE == SpectrumTestMode.REAL_HARDWARE: + if DIGITISER_STAR_HUB_TEST_MODE == SpectrumTestMode.REAL_HARDWARE: child_cards = [] for n in range(NUM_CARDS_IN_STAR_HUB): - child_cards.append(SpectrumDigitiserCard(device_number=n, ip_address=TEST_DEVICE_IP)) + child_cards.append(SpectrumDigitiserCard(device_number=n, ip_address=TEST_DIGITISER_IP)) return SpectrumDigitiserStarHub( - device_number=0, child_cards=child_cards, master_card_index=STAR_HUB_MASTER_CARD_INDEX + device_number=0, child_cards=tuple(child_cards), master_card_index=STAR_HUB_MASTER_CARD_INDEX ) else: mock_child_cards = [] @@ -49,8 +70,8 @@ def create_spectrum_star_hub_for_testing() -> SpectrumDigitiserStarHub: device_number=0, model=ModelNumber.TYP_M2P5966_X4, mock_source_frame_rate_hz=MOCK_DEVICE_TEST_FRAME_RATE_HZ, - num_modules=NUM_MODULES_PER_CARD, - num_channels_per_module=NUM_CHANNELS_PER_MODULE, + num_modules=NUM_MODULES_PER_DIGITISER, + num_channels_per_module=NUM_CHANNELS_PER_DIGITISER_MODULE, ) ) return MockSpectrumDigitiserStarHub( diff --git a/src/tests/test_integration.py b/src/tests/test_integration.py index 0fd4bb1..1e803c1 100644 --- a/src/tests/test_integration.py +++ b/src/tests/test_integration.py @@ -16,34 +16,38 @@ ACQUISITION_LENGTH, INTEGRATION_TEST_TRIGGER_SOURCE, NUM_CARDS_IN_STAR_HUB, - NUM_CHANNELS_PER_MODULE, - NUM_MODULES_PER_CARD, - SINGLE_CARD_TEST_MODE, + NUM_CHANNELS_PER_DIGITISER_MODULE, + NUM_MODULES_PER_DIGITISER, + SINGLE_DIGITISER_CARD_TEST_MODE, STAR_HUB_MASTER_CARD_INDEX, - STAR_HUB_TEST_MODE, + DIGITISER_STAR_HUB_TEST_MODE, SpectrumTestMode, - TEST_DEVICE_IP, - TEST_DEVICE_NUMBER, + TEST_DIGITISER_IP, + TEST_DIGITISER_NUMBER, ) @pytest.mark.integration class SingleCardIntegrationTests(TestCase): def setUp(self) -> None: - self._single_card_mock_mode = SINGLE_CARD_TEST_MODE == SpectrumTestMode.MOCK_HARDWARE + self._single_card_mock_mode = SINGLE_DIGITISER_CARD_TEST_MODE == SpectrumTestMode.MOCK_HARDWARE def test_standard_single_mode(self) -> None: measurement = standard_single_mode_example( mock_mode=self._single_card_mock_mode, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, - device_number=TEST_DEVICE_NUMBER, - ip_address=TEST_DEVICE_IP, + device_number=TEST_DIGITISER_NUMBER, + ip_address=TEST_DIGITISER_IP, acquisition_length=ACQUISITION_LENGTH, ) self.assertEqual(len(measurement.waveforms), 1) self.assertEqual([wfm.shape for wfm in measurement.waveforms], [(ACQUISITION_LENGTH,)]) if self._single_card_mock_mode: - self.assertAlmostEqual(measurement.waveforms[0].max() - measurement.waveforms[0].min(), 0.4, 1) + # mock waveform source generates random values covering full ADC range, which is set to += 0.2 V + expected_pk_to_pk_volts = 0.4 + self.assertAlmostEqual( + measurement.waveforms[0].max() - measurement.waveforms[0].min(), expected_pk_to_pk_volts, 1 + ) self.assertAlmostEqual(measurement.waveforms[0].mean(), 0.0, 1) two_seconds_ago = datetime.datetime.now() - datetime.timedelta(seconds=2) @@ -59,8 +63,8 @@ def test_finite_multi_fifo_mode(self) -> None: num_measurements=5, batch_size=5, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, - device_number=TEST_DEVICE_NUMBER, - ip_address=TEST_DEVICE_IP, + device_number=TEST_DIGITISER_NUMBER, + ip_address=TEST_DIGITISER_IP, acquisition_length=ACQUISITION_LENGTH, ) self.assertEqual(len(measurements), 5) @@ -72,8 +76,8 @@ def test_continuous_multi_fifo_mode(self) -> None: time_to_keep_acquiring_for_in_seconds=0.5, batch_size=1, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, - device_number=TEST_DEVICE_NUMBER, - ip_address=TEST_DEVICE_IP, + device_number=TEST_DIGITISER_NUMBER, + ip_address=TEST_DIGITISER_IP, single_acquisition_length_in_samples=ACQUISITION_LENGTH, ) self._asserts_for_fifo_mode(measurements) @@ -84,8 +88,8 @@ def test_averaging_continuous_multi_fifo_example(self) -> None: acquisition_duration_in_seconds=0.5, num_averages=2, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, - device_number=TEST_DEVICE_NUMBER, - ip_address=TEST_DEVICE_IP, + device_number=TEST_DIGITISER_NUMBER, + ip_address=TEST_DIGITISER_IP, acquisition_length=ACQUISITION_LENGTH, ) self._asserts_for_fifo_mode(measurements) @@ -107,16 +111,19 @@ def _asserts_for_fifo_mode(self, measurements: List[Measurement]) -> None: @pytest.mark.star_hub class StarHubIntegrationTests(TestCase): def setUp(self) -> None: - self._star_hub_mock_mode = STAR_HUB_TEST_MODE == SpectrumTestMode.MOCK_HARDWARE + self._star_hub_mock_mode = DIGITISER_STAR_HUB_TEST_MODE == SpectrumTestMode.MOCK_HARDWARE def test_star_hub(self) -> None: hub = connect_to_star_hub_example( mock_mode=self._star_hub_mock_mode, num_cards=NUM_CARDS_IN_STAR_HUB, master_card_index=STAR_HUB_MASTER_CARD_INDEX, - ip_address=TEST_DEVICE_IP, + ip_address=TEST_DIGITISER_IP, + ) + self.assertEqual( + len(hub.analog_channels), + NUM_CHANNELS_PER_DIGITISER_MODULE * NUM_MODULES_PER_DIGITISER * NUM_CARDS_IN_STAR_HUB, ) - self.assertEqual(len(hub.channels), NUM_CHANNELS_PER_MODULE * NUM_MODULES_PER_CARD * NUM_CARDS_IN_STAR_HUB) self.assertEqual(len(hub._child_cards), NUM_CARDS_IN_STAR_HUB) @@ -126,5 +133,5 @@ class NoDriversTest(TestCase): def test_fails_with_no_driver_without_mock_mode(self) -> None: with self.assertRaises(SpectrumDriversNotFound): standard_single_mode_example( - mock_mode=False, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, device_number=TEST_DEVICE_NUMBER + mock_mode=False, trigger_source=INTEGRATION_TEST_TRIGGER_SOURCE, device_number=TEST_DIGITISER_NUMBER ) diff --git a/src/tests/test_single_card.py b/src/tests/test_single_card.py index 380556b..064b2b2 100644 --- a/src/tests/test_single_card.py +++ b/src/tests/test_single_card.py @@ -1,74 +1,78 @@ -from typing import cast +from abc import ABC, abstractmethod +from typing import Generic, TypeVar from unittest import TestCase +from numpy import array, iinfo, int16 +from numpy.testing import assert_array_equal + from spectrum_gmbh.regs import SPC_CHENABLE -from spectrumdevice import SpectrumDigitiserCard, SpectrumDigitiserChannel +from spectrumdevice import SpectrumDigitiserAnalogChannel +from spectrumdevice.devices.abstract_device import SpectrumDeviceInterface +from spectrumdevice.devices.awg.awg_channel import SpectrumAWGAnalogChannel +from spectrumdevice.devices.awg.awg_interface import SpectrumAWGInterface from spectrumdevice.devices.digitiser import SpectrumDigitiserInterface from spectrumdevice.exceptions import ( SpectrumDeviceNotConnected, SpectrumExternalTriggerNotEnabled, SpectrumTriggerOperationNotImplemented, ) -from spectrumdevice.settings.channel import SpectrumChannelName -from spectrumdevice.settings.device_modes import AcquisitionMode, ClockMode -from spectrumdevice.settings.transfer_buffer import create_samples_acquisition_transfer_buffer +from spectrumdevice.settings.channel import SpectrumAnalogChannelName +from spectrumdevice.settings.device_modes import AcquisitionMode, ClockMode, GenerationMode +from spectrumdevice.settings.transfer_buffer import ( + create_samples_acquisition_transfer_buffer, + transfer_buffer_factory, + BufferType, + BufferDirection, +) from spectrumdevice.settings.triggering import ExternalTriggerMode, TriggerSource -from tests.configuration import ACQUISITION_LENGTH, NUM_CHANNELS_PER_MODULE, NUM_MODULES_PER_CARD -from tests.device_factories import create_spectrum_card_for_testing +from tests.configuration import ( + ACQUISITION_LENGTH, + NUM_CHANNELS_PER_DIGITISER_MODULE, + NUM_MODULES_PER_DIGITISER, + NUM_MODULES_PER_AWG, + NUM_CHANNELS_PER_AWG_MODULE, +) +from tests.device_factories import create_awg_card_for_testing, create_digitiser_card_for_testing + + +CardInterfaceVar = TypeVar("CardInterfaceVar", bound=SpectrumDeviceInterface) + +class SingleCardTest(TestCase, Generic[CardInterfaceVar], ABC): + __test__ = False -class SingleCardTest(TestCase): def setUp(self) -> None: - self._device: SpectrumDigitiserInterface = create_spectrum_card_for_testing() - self._all_spectrum_channel_identifiers = [c.value for c in SpectrumChannelName] + self._device: CardInterfaceVar = self._create_test_card() + self._all_spectrum_channel_identifiers = [c.value for c in SpectrumAnalogChannelName] self._all_spectrum_channel_identifiers.sort() # Enums are unordered so ensure channels are in ascending order - self._expected_num_channels = NUM_CHANNELS_PER_MODULE * NUM_MODULES_PER_CARD + self._expected_num_channels = self._determine_expected_num_channels() + + @abstractmethod + def _create_test_card(self) -> CardInterfaceVar: + raise NotImplementedError + + @abstractmethod + def _determine_expected_num_channels(self) -> int: + raise NotImplementedError def tearDown(self) -> None: self._device.disconnect() def test_count_channels(self) -> None: - channels = self._device.channels + channels = self._device.analog_channels self.assertEqual(self._expected_num_channels, len(channels)) - def test_get_channels(self) -> None: - channels = self._device.channels - - expected_channels = tuple( - [ - SpectrumDigitiserChannel(i, cast(SpectrumDigitiserCard, self._device)) - for i in range(self._expected_num_channels) - ] - ) - self.assertEqual(expected_channels, channels) - def test_enable_one_channel(self) -> None: - self._device.set_enabled_channels([0]) + self._device.set_enabled_analog_channels([0]) self.assertEqual( self._all_spectrum_channel_identifiers[0], self._device.read_spectrum_device_register(SPC_CHENABLE) ) def test_enable_two_channels(self) -> None: - self._device.set_enabled_channels([0, 1]) - expected_command = self._all_spectrum_channel_identifiers[0] | self._all_spectrum_channel_identifiers[1] - self.assertEqual(expected_command, self._device.read_spectrum_device_register(SPC_CHENABLE)) - - def test_acquisition_length(self) -> None: - acquisition_length = ACQUISITION_LENGTH - self._device.set_acquisition_mode(AcquisitionMode.SPC_REC_STD_SINGLE) - self._device.set_acquisition_length_in_samples(acquisition_length) - self.assertEqual(acquisition_length, self._device.acquisition_length_in_samples) - - def test_post_trigger_length(self) -> None: - post_trigger_length = ACQUISITION_LENGTH - self._device.set_acquisition_mode(AcquisitionMode.SPC_REC_STD_SINGLE) - self._device.set_post_trigger_length_in_samples(post_trigger_length) - self.assertEqual(post_trigger_length, self._device.post_trigger_length_in_samples) - - def test_acquisition_mode(self) -> None: - acquisition_mode = AcquisitionMode.SPC_REC_STD_SINGLE - self._device.set_acquisition_mode(acquisition_mode) - self.assertEqual(acquisition_mode, self._device.acquisition_mode) + if len(self._device.analog_channels) > 1: + self._device.set_enabled_analog_channels([0, 1]) + expected_command = self._all_spectrum_channel_identifiers[0] | self._all_spectrum_channel_identifiers[1] + self.assertEqual(expected_command, self._device.read_spectrum_device_register(SPC_CHENABLE)) def test_timeout(self) -> None: timeout = 1000 @@ -130,15 +134,109 @@ def test_available_io_modes(self) -> None: except Exception as e: self.assertTrue(False, f"raised an exception {e}") - def test_transfer_buffer(self) -> None: - buffer = create_samples_acquisition_transfer_buffer(ACQUISITION_LENGTH) - self._device.define_transfer_buffer([buffer]) - self.assertEqual(buffer, self._device.transfer_buffers[0]) - def test_disconnect(self) -> None: - self._device.set_acquisition_length_in_samples(ACQUISITION_LENGTH) - self.assertTrue(self._device.acquisition_length_in_samples == ACQUISITION_LENGTH) + self._device.set_sample_rate_in_hz(1000000) + self.assertTrue(self._device.sample_rate_in_hz == 1000000) self._device.disconnect() with self.assertRaises(SpectrumDeviceNotConnected): - self._device.set_acquisition_length_in_samples(ACQUISITION_LENGTH) + self._device.set_sample_rate_in_hz(1000000) self._device.reconnect() + + +class DigitiserCardTest(SingleCardTest[SpectrumDigitiserInterface]): + __test__ = True + + def _create_test_card(self) -> SpectrumDigitiserInterface: + return create_digitiser_card_for_testing() + + def _determine_expected_num_channels(self) -> int: + return NUM_CHANNELS_PER_DIGITISER_MODULE * NUM_MODULES_PER_DIGITISER + + def test_get_channels(self) -> None: + channels = self._device.analog_channels + + expected_channels = tuple( + [ + SpectrumDigitiserAnalogChannel(channel_number=i, parent_device=self._device) + for i in range(self._expected_num_channels) + ] + ) + self.assertEqual(expected_channels, channels) + + def test_acquisition_length(self) -> None: + acquisition_length = ACQUISITION_LENGTH + self._device.set_acquisition_mode(AcquisitionMode.SPC_REC_STD_SINGLE) + self._device.set_acquisition_length_in_samples(acquisition_length) + self.assertEqual(acquisition_length, self._device.acquisition_length_in_samples) + + def test_post_trigger_length(self) -> None: + post_trigger_length = ACQUISITION_LENGTH + self._device.set_acquisition_mode(AcquisitionMode.SPC_REC_STD_SINGLE) + self._device.set_post_trigger_length_in_samples(post_trigger_length) + self.assertEqual(post_trigger_length, self._device.post_trigger_length_in_samples) + + def test_acquisition_mode(self) -> None: + acquisition_mode = AcquisitionMode.SPC_REC_STD_SINGLE + self._device.set_acquisition_mode(acquisition_mode) + self.assertEqual(acquisition_mode, self._device.acquisition_mode) + + def test_transfer_buffer(self) -> None: + buffer = create_samples_acquisition_transfer_buffer( + size_in_samples=ACQUISITION_LENGTH, bytes_per_sample=self._device.bytes_per_sample + ) + self._device.define_transfer_buffer([buffer]) + self.assertEqual(buffer, self._device.transfer_buffers[0]) + + +class AWGCardTest(SingleCardTest[SpectrumAWGInterface]): + __test__ = True + + def _create_test_card(self) -> SpectrumAWGInterface: + return create_awg_card_for_testing() + + def _determine_expected_num_channels(self) -> int: + return NUM_CHANNELS_PER_AWG_MODULE * NUM_MODULES_PER_AWG + + def test_get_channels(self) -> None: + channels = self._device.analog_channels + + expected_channels = tuple( + [ + SpectrumAWGAnalogChannel(channel_number=i, parent_device=self._device) + for i in range(self._expected_num_channels) + ] + ) + self.assertEqual(expected_channels, channels) + + def test_generation_mode(self) -> None: + generation_mode = GenerationMode.SPC_REP_STD_SINGLE + self._device.set_generation_mode(generation_mode) + self.assertEqual(generation_mode, self._device.generation_mode) + + def test_num_loops(self) -> None: + self._device.set_num_loops(5) + self.assertEqual(5, self._device.num_loops) + + def test_transfer_waveform(self) -> None: + wfm = ( + array([0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8]) + * iinfo(int16).max + ).astype(int16) + self._device.transfer_waveform(wfm) + transferred_wfm = self._device.transfer_buffers[0].data_array + assert_array_equal(wfm, transferred_wfm) + + def test_transfer_too_small_waveform(self) -> None: + wfm = array([0.0]) + with self.assertRaises(ValueError): + self._device.transfer_waveform(wfm) + + def test_transfer_buffer(self) -> None: + buffer = transfer_buffer_factory( + buffer_type=BufferType.SPCM_BUF_DATA, + direction=BufferDirection.SPCM_DIR_PCTOCARD, + size_in_samples=16, + bytes_per_sample=self._device.bytes_per_sample, + ) + self._device.define_transfer_buffer([buffer]) + self.assertEqual(buffer, self._device.transfer_buffers[0]) diff --git a/src/tests/test_single_channel.py b/src/tests/test_single_channel.py index 6a58498..25043bb 100644 --- a/src/tests/test_single_channel.py +++ b/src/tests/test_single_channel.py @@ -1,14 +1,18 @@ from unittest import TestCase -from spectrumdevice import SpectrumDigitiserChannel +from numpy import iinfo, int16 + +from spectrumdevice import SpectrumDigitiserAnalogChannel +from spectrumdevice.devices.awg.awg_channel import SpectrumAWGAnalogChannel from spectrumdevice.settings import InputImpedance -from tests.device_factories import create_spectrum_card_for_testing +from spectrumdevice.settings.channel import OutputChannelFilter, OutputChannelStopLevelMode +from tests.device_factories import create_awg_card_for_testing, create_digitiser_card_for_testing -class SingleChannelTest(TestCase): +class SingleDigitiserAnalogChannelTest(TestCase): def setUp(self) -> None: - self._device = create_spectrum_card_for_testing() - self._channel = SpectrumDigitiserChannel(0, self._device) + self._device = create_digitiser_card_for_testing() + self._channel = SpectrumDigitiserAnalogChannel(channel_number=0, parent_device=self._device) def tearDown(self) -> None: self._channel._parent_device.disconnect() @@ -27,3 +31,34 @@ def test_input_impedance(self) -> None: impedance = InputImpedance.ONE_MEGA_OHM self._channel.set_input_impedance(impedance) self.assertEqual(impedance, self._channel.input_impedance) + + +class SingleAWGAnalogChannelTest(TestCase): + def setUp(self) -> None: + self._device = create_awg_card_for_testing() + self._channel = SpectrumAWGAnalogChannel(channel_number=0, parent_device=self._device) + + def test_switched_on(self) -> None: + self._channel.set_is_switched_on(True) + self.assertTrue(self._channel.is_switched_on) + + def test_dc_offset(self) -> None: + self._channel.set_dc_offset_in_mv(100) + self.assertEqual(100, self._channel.dc_offset_in_mv) + + def test_signal_amplitude(self) -> None: + self._channel.set_signal_amplitude_in_mv(1000) + self.assertEqual(1000, self._channel.signal_amplitude_in_mv) + + def test_output_filter(self) -> None: + self._channel.set_output_filter(OutputChannelFilter.LOW_PASS_1_MHZ) + self.assertEqual(OutputChannelFilter.LOW_PASS_1_MHZ, self._channel.output_filter) + + def test_stop_level_mode(self) -> None: + self._channel.set_stop_level_mode(OutputChannelStopLevelMode.SPCM_STOPLVL_HIGH) + self.assertEqual(OutputChannelStopLevelMode.SPCM_STOPLVL_HIGH, self._channel.stop_level_mode) + + def test_stop_level_custom_value(self) -> None: + max_value = int16(iinfo(int16).max) + self._channel.set_stop_level_custom_value(max_value) + self.assertEqual(max_value, self._channel.stop_level_custom_value) diff --git a/src/tests/test_star_hub.py b/src/tests/test_star_hub.py index e13eabc..0db9fd2 100644 --- a/src/tests/test_star_hub.py +++ b/src/tests/test_star_hub.py @@ -1,14 +1,17 @@ -from typing import cast - import pytest from numpy import array from spectrum_gmbh.regs import SPC_CHENABLE -from spectrumdevice import AbstractSpectrumCard, SpectrumDigitiserChannel, SpectrumDigitiserStarHub +from spectrumdevice import SpectrumDigitiserAnalogChannel, SpectrumDigitiserStarHub from spectrumdevice.exceptions import SpectrumInvalidNumberOfEnabledChannels -from spectrumdevice.settings.channel import SpectrumChannelName +from spectrumdevice.settings.channel import SpectrumAnalogChannelName from spectrumdevice.settings.transfer_buffer import create_samples_acquisition_transfer_buffer -from tests.configuration import ACQUISITION_LENGTH, NUM_CARDS_IN_STAR_HUB, NUM_CHANNELS_PER_MODULE, NUM_MODULES_PER_CARD +from tests.configuration import ( + ACQUISITION_LENGTH, + NUM_CARDS_IN_STAR_HUB, + NUM_CHANNELS_PER_DIGITISER_MODULE, + NUM_MODULES_PER_DIGITISER, +) from tests.device_factories import create_spectrum_star_hub_for_testing from tests.test_single_card import SingleCardTest @@ -18,26 +21,26 @@ class StarHubTest(SingleCardTest): def setUp(self) -> None: self._device: SpectrumDigitiserStarHub = create_spectrum_star_hub_for_testing() - self._expected_num_channels_each_card = NUM_CHANNELS_PER_MODULE * NUM_MODULES_PER_CARD + self._expected_num_channels_each_card = NUM_CHANNELS_PER_DIGITISER_MODULE * NUM_MODULES_PER_DIGITISER self._expected_total_num_channels = self._expected_num_channels_each_card * NUM_CARDS_IN_STAR_HUB - self._all_spectrum_channel_identifiers = [c.value for c in SpectrumChannelName] + self._all_spectrum_channel_identifiers = [c.value for c in SpectrumAnalogChannelName] self._all_spectrum_channel_identifiers.sort() # Enums are unordered to ensure channels are in ascending order def tearDown(self) -> None: self._device.disconnect() def test_count_channels(self) -> None: - channels = self._device.channels + channels = self._device.analog_channels self.assertEqual(len(channels), self._expected_total_num_channels) def test_enable_one_channel(self) -> None: with self.assertRaises(SpectrumInvalidNumberOfEnabledChannels): - self._device.set_enabled_channels([0]) + self._device.set_enabled_analog_channels([0]) def test_enable_two_channels(self) -> None: - self._device.set_enabled_channels([0, self._expected_num_channels_each_card]) + self._device.set_enabled_analog_channels([0, self._expected_num_channels_each_card]) card_one_expected_command = self._all_spectrum_channel_identifiers[0] card_two_expected_command = self._all_spectrum_channel_identifiers[0] self.assertEqual( @@ -48,12 +51,12 @@ def test_enable_two_channels(self) -> None: ) def test_get_channels(self) -> None: - channels = self._device.channels + channels = self._device.analog_channels expected_channels = [] for n in range(NUM_CARDS_IN_STAR_HUB): expected_channels += [ - SpectrumDigitiserChannel(i, cast(AbstractSpectrumCard, self._device._child_cards[n])) + SpectrumDigitiserAnalogChannel(channel_number=i, parent_device=self._device._child_cards[n]) for i in range(self._expected_num_channels_each_card) ] expected_channels_tuple = tuple(expected_channels) @@ -61,7 +64,12 @@ def test_get_channels(self) -> None: def test_transfer_buffer(self) -> None: - buffer = [create_samples_acquisition_transfer_buffer(ACQUISITION_LENGTH) for _ in range(NUM_CARDS_IN_STAR_HUB)] + buffer = [ + create_samples_acquisition_transfer_buffer( + size_in_samples=ACQUISITION_LENGTH, bytes_per_sample=self._device.bytes_per_sample + ) + for _ in range(NUM_CARDS_IN_STAR_HUB) + ] self._device.define_transfer_buffer(buffer) self.assertTrue((array(self._device.transfer_buffers) == buffer).all())