diff --git a/tests/sim/__init__.py b/tests/sim/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/sim/conftest.py b/tests/sim/conftest.py deleted file mode 100644 index b02740c082..0000000000 --- a/tests/sim/conftest.py +++ /dev/null @@ -1,11 +0,0 @@ -from pathlib import Path - -import pytest - -from ophyd_async.sim.demo import PatternDetector - - -@pytest.fixture -async def sim_pattern_detector(tmp_path_factory) -> PatternDetector: - path: Path = tmp_path_factory.mktemp("tmp") - return PatternDetector(name="PATTERN1", path=path) diff --git a/tests/sim/demo/__init__.py b/tests/sim/demo/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/sim/demo/test_sim_motor.py b/tests/sim/demo/test_sim_motor.py deleted file mode 100644 index d879c717b8..0000000000 --- a/tests/sim/demo/test_sim_motor.py +++ /dev/null @@ -1,56 +0,0 @@ -import asyncio -import time - -import pytest -from bluesky.plans import spiral_square -from bluesky.run_engine import RunEngine - -from ophyd_async.core import DeviceCollector -from ophyd_async.sim.demo import SimMotor - - -async def test_move_sim_in_plan(): - RE = RunEngine() - - async with DeviceCollector(): - m1 = SimMotor("M1") - m2 = SimMotor("M2") - - my_plan = spiral_square([], m1, m2, 0, 0, 4, 4, 10, 10) - - RE(my_plan) - - assert await m1.user_readback.get_value() == -2 - assert await m2.user_readback.get_value() == -2 - - -async def test_slow_move(): - async with DeviceCollector(): - m1 = SimMotor("M1", instant=False) - - await m1.velocity.set(20) - - start = time.monotonic() - await m1.set(10) - elapsed = time.monotonic() - start - - assert await m1.user_readback.get_value() == 10 - assert elapsed >= 0.5 - assert elapsed < 1 - - -async def test_stop(): - async with DeviceCollector(): - m1 = SimMotor("M1", instant=False) - - # this move should take 10 seconds but we will stop it after 0.5 - move_status = m1.set(10) - await asyncio.sleep(0.5) - await m1.stop(success=False) - new_pos = await m1.user_readback.get_value() - assert new_pos < 10 - assert new_pos >= 0.1 - - assert not move_status.success - with pytest.raises(RuntimeError, match="Motor was stopped"): - await move_status diff --git a/tests/sim/test_pattern_generator.py b/tests/sim/test_pattern_generator.py deleted file mode 100644 index a8522c59c5..0000000000 --- a/tests/sim/test_pattern_generator.py +++ /dev/null @@ -1,34 +0,0 @@ -import pytest - -from ophyd_async.sim.demo import PatternGenerator - - -@pytest.fixture -async def pattern_generator(): - # path: Path = tmp_path_factory.mktemp("tmp") - pattern_generator = PatternGenerator() - yield pattern_generator - - -async def test_init(pattern_generator: PatternGenerator): - assert pattern_generator.exposure == 0.1 - assert pattern_generator.height == 240 - assert pattern_generator.width == 320 - assert pattern_generator.image_counter == 0 - assert pattern_generator._handle_for_h5_file is None - assert pattern_generator._full_intensity_blob.shape == (240, 320) - - -def test_set_exposure(pattern_generator: PatternGenerator): - pattern_generator.set_exposure(0.5) - assert pattern_generator.exposure == 0.5 - - -def test_set_x(pattern_generator: PatternGenerator): - pattern_generator.set_x(5.0) - assert pattern_generator.x == 5.0 - - -def test_set_y(pattern_generator: PatternGenerator): - pattern_generator.set_y(-3.0) - assert pattern_generator.y == -3.0 diff --git a/tests/sim/test_sim_detector.py b/tests/sim/test_sim_detector.py deleted file mode 100644 index 062e31004c..0000000000 --- a/tests/sim/test_sim_detector.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -from collections import defaultdict - -import bluesky.plans as bp -import h5py -import numpy as np -from bluesky import RunEngine - -from ophyd_async.core import assert_emitted -from ophyd_async.plan_stubs import ensure_connected -from ophyd_async.sim.demo import PatternDetector - - -async def test_sim_pattern_detector_initialization( - sim_pattern_detector: PatternDetector, -): - assert ( - sim_pattern_detector.pattern_generator - ), "PatternGenerator was not initialized correctly." - - -async def test_detector_creates_controller_and_writer( - sim_pattern_detector: PatternDetector, -): - assert sim_pattern_detector.writer - assert sim_pattern_detector.controller - - -def test_writes_pattern_to_file( - sim_pattern_detector: PatternDetector, - RE: RunEngine, -): - # assert that the file contains data in expected dimensions - docs = defaultdict(list) - - def capture_emitted(name, doc): - docs[name].append(doc) - - def plan(): - yield from ensure_connected(sim_pattern_detector, mock=True) - yield from bp.count([sim_pattern_detector]) - - RE(plan(), capture_emitted) - assert_emitted( - docs, start=1, descriptor=1, stream_resource=2, stream_datum=2, event=1, stop=1 - ) - path = docs["stream_resource"][0]["uri"].split("://localhost")[-1] - if os.name == "nt": - path = path.lstrip("/") - h5file = h5py.File(path) - assert list(h5file["/entry"]) == ["data", "sum"] - assert list(h5file["/entry/sum"]) == [44540.0] - assert np.sum(h5file["/entry/data/data"]) == 44540.0 diff --git a/tests/sim/test_sim_writer.py b/tests/sim/test_sim_writer.py deleted file mode 100644 index 5ee2bba97a..0000000000 --- a/tests/sim/test_sim_writer.py +++ /dev/null @@ -1,42 +0,0 @@ -from unittest.mock import patch - -import pytest - -from ophyd_async.core import DeviceCollector -from ophyd_async.sim.demo import PatternDetectorWriter, PatternGenerator - - -@pytest.fixture -async def writer(static_path_provider) -> PatternDetectorWriter: - async with DeviceCollector(mock=True): - driver = PatternGenerator() - - return PatternDetectorWriter(driver, static_path_provider, lambda: "NAME") - - -async def test_correct_descriptor_doc_after_open(writer: PatternDetectorWriter): - with patch("ophyd_async.core._signal.wait_for_value", return_value=None): - descriptor = await writer.open() - - assert descriptor == { - "NAME": { - "source": "sim://pattern-generator-hdf-file", - "shape": [240, 320], - "dtype": "array", - "external": "STREAM:", - }, - "NAME-sum": { - "source": "sim://pattern-generator-hdf-file", - "shape": [], - "dtype": "number", - "external": "STREAM:", - }, - } - - await writer.close() - - -async def test_collect_stream_docs(writer: PatternDetectorWriter): - await writer.open() - [item async for item in writer.collect_stream_docs(1)] - assert writer.pattern_generator._handle_for_h5_file diff --git a/tests/sim/test_streaming_plan.py b/tests/sim/test_streaming_plan.py deleted file mode 100644 index 222f92d685..0000000000 --- a/tests/sim/test_streaming_plan.py +++ /dev/null @@ -1,55 +0,0 @@ -from collections import defaultdict - -from bluesky import plans as bp -from bluesky.run_engine import RunEngine - -from ophyd_async.core import assert_emitted -from ophyd_async.plan_stubs import ensure_connected -from ophyd_async.sim.demo import PatternDetector - - -# NOTE the async operations with h5py are non-trival -# because of lack of native support for async operations -# see https://github.com/h5py/h5py/issues/837 -async def test_streaming_plan(RE: RunEngine, sim_pattern_detector: PatternDetector): - names = [] - docs = [] - - def append_and_print(name, doc): - names.append(name) - docs.append(doc) - - RE.subscribe(append_and_print) - - def plan(): - yield from ensure_connected(sim_pattern_detector, mock=True) - yield from bp.count([sim_pattern_detector], num=1) - - RE(plan()) - - # NOTE - double resource because double stream - assert names == [ - "start", - "descriptor", - "stream_resource", - "stream_resource", - "stream_datum", - "stream_datum", - "event", - "stop", - ] - await sim_pattern_detector.writer.close() - - -async def test_plan(RE: RunEngine, sim_pattern_detector: PatternDetector): - docs = defaultdict(list) - - def plan(): - yield from ensure_connected(sim_pattern_detector, mock=True) - yield from bp.count([sim_pattern_detector]) - - RE(plan(), lambda name, doc: docs[name].append(doc)) - - assert_emitted( - docs, start=1, descriptor=1, stream_resource=2, stream_datum=2, event=1, stop=1 - ) diff --git a/tests/tango/test_base_device.py b/tests/tango/test_base_device.py index 60c6db9762..14ec56ea6a 100644 --- a/tests/tango/test_base_device.py +++ b/tests/tango/test_base_device.py @@ -23,13 +23,14 @@ AttrQuality, AttrWriteType, CmdArgType, + GreenMode, DevState, ) from tango.asyncio import DeviceProxy as AsyncDeviceProxy from tango.asyncio_executor import set_global_executor from tango.server import Device, attribute, command from tango.test_context import MultiDeviceTestContext -from tango.test_utils import assert_close +from tango.test_utils import assert_close, green_mode class TestEnum(IntEnum): @@ -49,7 +50,7 @@ class TestEnum(IntEnum): class TestDevice(Device): __test__ = False - _array = [[1, 2, 3], [4, 5, 6]] + _array = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]] _justvalue = 5 _writeonly = 6 @@ -259,12 +260,13 @@ def get_test_descriptor(python_type: type[T], value: T, is_cmd: bool) -> dict: # -------------------------------------------------------------------- -@pytest.fixture(scope="module") -def tango_test_device(): - with MultiDeviceTestContext( - [{"class": TestDevice, "devices": [{"name": "test/device/1"}]}], process=True - ) as context: - yield context.get_device_access("test/device/1") +@pytest.fixture() +async def tango_test_device(): + return MultiDeviceTestContext( + [{"class": TestDevice, "devices": [{"name": "test/device/1"}]}], + green_mode=GreenMode.Asyncio, + process=True, + ) # -------------------------------------------------------------------- @@ -298,20 +300,27 @@ def compare_values(expected, received): # -------------------------------------------------------------------- + + @pytest.mark.asyncio async def test_connect(tango_test_device): values, description = await describe_class(tango_test_device) + test_device = TestTangoReadable(tango_test_device) + return async with DeviceCollector(): - test_device = TestTangoReadable(tango_test_device) + ... assert test_device.name == "test_device" assert description == await test_device.describe() compare_values(values, await test_device.read()) +""" + + # -------------------------------------------------------------------- -@pytest.mark.asyncio + async def test_set_trl(tango_test_device): values, description = await describe_class(tango_test_device) test_device = TestTangoReadable(name="test_device") @@ -325,7 +334,7 @@ async def test_set_trl(tango_test_device): # -------------------------------------------------------------------- -@pytest.mark.asyncio + @pytest.mark.parametrize("proxy", [True, False, None]) async def test_connect_proxy(tango_test_device, proxy: bool | None): if proxy is None: @@ -345,7 +354,7 @@ async def test_connect_proxy(tango_test_device, proxy: bool | None): # -------------------------------------------------------------------- -@pytest.mark.asyncio + async def test_with_bluesky(tango_test_device): # now let's do some bluesky stuff RE = RunEngine() @@ -355,7 +364,7 @@ async def test_with_bluesky(tango_test_device): # -------------------------------------------------------------------- -@pytest.mark.asyncio + async def test_tango_demo(demo_test_context): with demo_test_context: detector = TangoDetector( @@ -380,3 +389,4 @@ async def test_tango_demo(demo_test_context): await stop_status assert all([set_status.done, stop_status.done]) assert all([set_status.success, stop_status.success]) +""" diff --git a/tests/tango/test_tango_signals.py b/tests/tango/test_tango_signals.py deleted file mode 100644 index 4919549e53..0000000000 --- a/tests/tango/test_tango_signals.py +++ /dev/null @@ -1,772 +0,0 @@ -import asyncio -import textwrap -import time -from enum import Enum, IntEnum -from random import choice - -import numpy as np -import numpy.typing as npt -import pytest -from bluesky.protocols import Reading -from test_base_device import TestDevice - -from ophyd_async.core import SignalBackend, SignalR, SignalRW, SignalW, SignalX, T -from ophyd_async.tango.core import ( - TangoSignalBackend, - tango_signal_r, - tango_signal_rw, - tango_signal_w, - tango_signal_x, -) -from tango import AttrDataFormat, AttrWriteType, DevState -from tango.asyncio import DeviceProxy -from tango.asyncio_executor import set_global_executor -from tango.server import Device, attribute, command -from tango.test_context import MultiDeviceTestContext -from tango.test_utils import assert_close - - -def __tango_signal_auto(*args, **kwargs): - raise RuntimeError("Fix this later") - - -# -------------------------------------------------------------------- -""" -Since TangoTest does not support EchoMode, we create our own Device. - -""" - - -class TestEnum(IntEnum): - __test__ = False - A = 0 - B = 1 - - -BASE_TYPES_SET = ( - # type_name, tango_name, py_type, sample_values - ("boolean", "DevBoolean", bool, (True, False)), - ("short", "DevShort", int, (1, 2, 3, 4, 5)), - ("ushort", "DevUShort", int, (1, 2, 3, 4, 5)), - ("long", "DevLong", int, (1, 2, 3, 4, 5)), - ("ulong", "DevULong", int, (1, 2, 3, 4, 5)), - ("long64", "DevLong64", int, (1, 2, 3, 4, 5)), - ("char", "DevUChar", int, (1, 2, 3, 4, 5)), - ("float", "DevFloat", float, (1.1, 2.2, 3.3, 4.4, 5.5)), - ("double", "DevDouble", float, (1.1, 2.2, 3.3, 4.4, 5.5)), - ("string", "DevString", str, ("aaa", "bbb", "ccc")), - ("state", "DevState", DevState, (DevState.ON, DevState.MOVING, DevState.ALARM)), - ("enum", "DevEnum", TestEnum, (TestEnum.A, TestEnum.B)), -) - -ATTRIBUTES_SET = [] -COMMANDS_SET = [] - -for type_name, tango_type_name, py_type, values in BASE_TYPES_SET: - ATTRIBUTES_SET.extend( - [ - ( - f"{type_name}_scalar_attr", - tango_type_name, - AttrDataFormat.SCALAR, - py_type, - choice(values), - choice(values), - ), - ( - f"{type_name}_spectrum_attr", - tango_type_name, - AttrDataFormat.SPECTRUM, - npt.NDArray[py_type], - [choice(values), choice(values), choice(values)], - [choice(values), choice(values), choice(values)], - ), - ( - f"{type_name}_image_attr", - tango_type_name, - AttrDataFormat.IMAGE, - npt.NDArray[py_type], - [ - [choice(values), choice(values), choice(values)], - [choice(values), choice(values), choice(values)], - ], - [ - [choice(values), choice(values), choice(values)], - [choice(values), choice(values), choice(values)], - ], - ), - ] - ) - - if tango_type_name == "DevUChar": - continue - else: - COMMANDS_SET.append( - ( - f"{type_name}_scalar_cmd", - tango_type_name, - AttrDataFormat.SCALAR, - py_type, - choice(values), - choice(values), - ) - ) - if tango_type_name in ["DevState", "DevEnum"]: - continue - else: - COMMANDS_SET.append( - ( - f"{type_name}_spectrum_cmd", - tango_type_name, - AttrDataFormat.SPECTRUM, - npt.NDArray[py_type], - [choice(values), choice(values), choice(values)], - [choice(values), choice(values), choice(values)], - ) - ) - - -# -------------------------------------------------------------------- -# TestDevice -# -------------------------------------------------------------------- -# -------------------------------------------------------------------- -@pytest.fixture(scope="module") -def tango_test_device(): - with MultiDeviceTestContext( - [{"class": TestDevice, "devices": [{"name": "test/device/1"}]}], process=True - ) as context: - yield context.get_device_access("test/device/1") - - -# -------------------------------------------------------------------- -# Echo device -# -------------------------------------------------------------------- -class EchoDevice(Device): - attr_values = {} - - def initialize_dynamic_attributes(self): - for name, typ, form, _, _, _ in ATTRIBUTES_SET: - attr = attribute( - name=name, - dtype=typ, - dformat=form, - access=AttrWriteType.READ_WRITE, - fget=self.read, - fset=self.write, - max_dim_x=3, - max_dim_y=2, - enum_labels=[member.name for member in TestEnum], - ) - self.add_attribute(attr) - self.set_change_event(name, True, False) - - for name, typ, form, _, _, _ in COMMANDS_SET: - cmd = command( - f=getattr(self, name), - dtype_in=typ, - dformat_in=form, - dtype_out=typ, - dformat_out=form, - ) - self.add_command(cmd) - - def read(self, attr): - attr.set_value(self.attr_values[attr.get_name()]) - - def write(self, attr): - new_value = attr.get_write_value() - self.attr_values[attr.get_name()] = new_value - self.push_change_event(attr.get_name(), new_value) - - echo_command_code = textwrap.dedent( - """\ - def echo_command(self, arg): - return arg - """ - ) - - for name, _, _, _, _, _ in COMMANDS_SET: - exec(echo_command_code.replace("echo_command", name)) - - -# -------------------------------------------------------------------- -def assert_enum(initial_value, readout_value): - if type(readout_value) in [list, tuple]: - for _initial_value, _readout_value in zip( - initial_value, readout_value, strict=False - ): - assert_enum(_initial_value, _readout_value) - else: - assert initial_value == readout_value - - -# -------------------------------------------------------------------- -# fixtures to run Echo device -# -------------------------------------------------------------------- -@pytest.fixture(scope="module") -def echo_device(): - with MultiDeviceTestContext( - [{"class": EchoDevice, "devices": [{"name": "test/device/1"}]}], process=True - ) as context: - yield context.get_device_access("test/device/1") - - -# -------------------------------------------------------------------- -@pytest.fixture(autouse=True) -def reset_tango_asyncio(): - set_global_executor(None) - - -# -------------------------------------------------------------------- -# helpers to run tests -# -------------------------------------------------------------------- -def get_test_descriptor(python_type: type[T], value: T, is_cmd: bool) -> dict: - if python_type in [bool, int]: - return {"dtype": "integer", "shape": []} - if python_type in [float]: - return {"dtype": "number", "shape": []} - if python_type in [str]: - return {"dtype": "string", "shape": []} - if issubclass(python_type, DevState): - return {"dtype": "string", "shape": []} - if issubclass(python_type, Enum): - return { - "dtype": "string", - "shape": [], - } - - return { - "dtype": "array", - "shape": [np.iinfo(np.int32).max] if is_cmd else list(np.array(value).shape), - } - - -# -------------------------------------------------------------------- -async def make_backend( - typ: type | None, - pv: str, - connect: bool = True, - allow_events: bool | None = True, -) -> TangoSignalBackend: - backend = TangoSignalBackend(typ, pv, pv) - backend.allow_events(allow_events) - if connect: - await backend.connect(1) - return backend - - -# -------------------------------------------------------------------- -async def prepare_device(echo_device: str, pv: str, put_value: T) -> None: - proxy = await DeviceProxy(echo_device) - setattr(proxy, pv, put_value) - - -# -------------------------------------------------------------------- -class MonitorQueue: - def __init__(self, backend: SignalBackend): - self.updates: asyncio.Queue[Reading] = asyncio.Queue() - self.backend = backend - self.subscription = backend.set_callback(self.updates.put_nowait) - - async def assert_updates(self, expected_value): - expected_reading = { - "timestamp": pytest.approx(time.time(), rel=0.1), - "alarm_severity": 0, - } - update_reading = dict(await asyncio.wait_for(self.updates.get(), timeout=5)) - update_value = update_reading.pop("value") - assert_close(update_value, expected_value) - backend_reading = dict( - await asyncio.wait_for(self.backend.get_reading(), timeout=5) - ) - backend_reading.pop("value") - backend_value = await asyncio.wait_for(self.backend.get_value(), timeout=5) - assert_close(backend_value, expected_value) - assert update_reading == expected_reading == backend_reading - - def close(self): - self.backend.set_callback(None) - - -# -------------------------------------------------------------------- -async def assert_monitor_then_put( - echo_device: str, - pv: str, - initial_value: T, - put_value: T, - descriptor: dict, - datatype: type[T] | None = None, -): - await prepare_device(echo_device, pv, initial_value) - source = echo_device + "/" + pv - backend = await make_backend(datatype, source, allow_events=True) - # Make a monitor queue that will monitor for updates - q = MonitorQueue(backend) - try: - assert dict(source=source, **descriptor) == await backend.get_datakey("") - # Check initial value - await q.assert_updates(initial_value) - # Put to new value and check that - await backend.put(put_value, wait=True) - assert_close(put_value, await backend.get_setpoint()) - await q.assert_updates(put_value) - finally: - q.close() - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value", - ATTRIBUTES_SET, - ids=[x[0] for x in ATTRIBUTES_SET], -) -async def test_backend_get_put_monitor_attr( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, -): - try: - # Set a timeout for the operation to prevent it from running indefinitely - await asyncio.wait_for( - assert_monitor_then_put( - echo_device, - pv, - initial_value, - put_value, - get_test_descriptor(py_type, initial_value, False), - py_type, - ), - timeout=100, # Timeout in seconds - ) - except asyncio.TimeoutError: - pytest.fail("Test timed out") - except Exception as e: - pytest.fail(f"Test failed with exception: {e}") - - -# -------------------------------------------------------------------- -async def assert_put_read( - echo_device: str, - pv: str, - put_value: T, - descriptor: dict, - datatype: type[T] | None = None, -): - source = echo_device + "/" + pv - backend = await make_backend(datatype, source) - # Make a monitor queue that will monitor for updates - assert dict(source=source, **descriptor) == await backend.get_datakey("") - # Put to new value and check that - await backend.put(put_value, wait=True) - - expected_reading = { - "timestamp": pytest.approx(time.time(), rel=0.1), - "alarm_severity": 0, - } - - assert_close(await backend.get_value(), put_value) - - get_reading = dict(await backend.get_reading()) - get_reading.pop("value") - assert expected_reading == get_reading - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value", - COMMANDS_SET, - ids=[x[0] for x in COMMANDS_SET], -) -async def test_backend_get_put_monitor_cmd( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, -): - # With the given datatype, check we have the correct initial value and putting works - descriptor = get_test_descriptor(py_type, initial_value, True) - await assert_put_read(echo_device, pv, put_value, descriptor, py_type) - # # With guessed datatype, check we can set it back to the initial value - await assert_put_read(echo_device, pv, put_value, descriptor) - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - await asyncio.gather(*tasks) - del echo_device - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value, use_proxy", - [ - ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - use_proxy, - ) - for ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - ) in ATTRIBUTES_SET - for use_proxy in [True, False] - ], - ids=[f"{x[0]}_{use_proxy}" for x in ATTRIBUTES_SET for use_proxy in [True, False]], -) -async def test_tango_signal_r( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, - use_proxy: bool, -): - await prepare_device(echo_device, pv, initial_value) - source = echo_device + "/" + pv - proxy = await DeviceProxy(echo_device) if use_proxy else None - - timeout = 0.2 - signal = tango_signal_r( - datatype=py_type, - read_trl=source, - device_proxy=proxy, - timeout=timeout, - name="test_signal", - ) - await signal.connect() - reading = await signal.read() - assert_close(reading["test_signal"]["value"], initial_value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value, use_proxy", - [ - ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - use_proxy, - ) - for ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - ) in ATTRIBUTES_SET - for use_proxy in [True, False] - ], - ids=[f"{x[0]}_{use_proxy}" for x in ATTRIBUTES_SET for use_proxy in [True, False]], -) -async def test_tango_signal_w( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, - use_proxy: bool, -): - await prepare_device(echo_device, pv, initial_value) - source = echo_device + "/" + pv - proxy = await DeviceProxy(echo_device) if use_proxy else None - - timeout = 0.2 - signal = tango_signal_w( - datatype=py_type, - write_trl=source, - device_proxy=proxy, - timeout=timeout, - name="test_signal", - ) - await signal.connect() - status = signal.set(put_value, wait=True, timeout=timeout) - await status - assert status.done is True and status.success is True - - status = signal.set(put_value, wait=False, timeout=timeout) - await status - assert status.done is True and status.success is True - - status = signal.set(put_value, wait=True) - await status - assert status.done is True and status.success is True - - status = signal.set(put_value, wait=False) - await status - assert status.done is True and status.success is True - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value, use_proxy", - [ - ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - use_proxy, - ) - for ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - ) in ATTRIBUTES_SET - for use_proxy in [True, False] - ], - ids=[f"{x[0]}_{use_proxy}" for x in ATTRIBUTES_SET for use_proxy in [True, False]], -) -async def test_tango_signal_rw( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, - use_proxy: bool, -): - await prepare_device(echo_device, pv, initial_value) - source = echo_device + "/" + pv - proxy = await DeviceProxy(echo_device) if use_proxy else None - - timeout = 0.2 - signal = tango_signal_rw( - datatype=py_type, - read_trl=source, - write_trl=source, - device_proxy=proxy, - timeout=timeout, - name="test_signal", - ) - await signal.connect() - reading = await signal.read() - assert_close(reading["test_signal"]["value"], initial_value) - await signal.set(put_value) - location = await signal.locate() - assert_close(location["setpoint"], put_value) - assert_close(location["readback"], put_value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("use_proxy", [True, False]) -async def test_tango_signal_x(tango_test_device: str, use_proxy: bool): - proxy = await DeviceProxy(tango_test_device) if use_proxy else None - timeout = 0.2 - signal = tango_signal_x( - write_trl=tango_test_device + "/" + "clear", - device_proxy=proxy, - timeout=timeout, - name="test_signal", - ) - await signal.connect() - status = signal.trigger() - await status - assert status.done is True and status.success is True - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.skip("Not sure if we need tango_signal_auto") -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value, use_proxy", - [ - ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - use_proxy, - ) - for ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - ) in ATTRIBUTES_SET - for use_proxy in [True, False] - ], - ids=[f"{x[0]}_{use_proxy}" for x in ATTRIBUTES_SET for use_proxy in [True, False]], -) -async def test_tango_signal_auto_attrs( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, - use_proxy: bool, -): - await prepare_device(echo_device, pv, initial_value) - source = echo_device + "/" + pv - proxy = await DeviceProxy(echo_device) if use_proxy else None - timeout = 0.2 - - async def _test_signal(dtype, proxy): - signal = await __tango_signal_auto( - datatype=dtype, - trl=source, - device_proxy=proxy, - timeout=timeout, - name="test_signal", - ) - assert type(signal) is SignalRW - await signal.connect() - reading = await signal.read() - value = reading["test_signal"]["value"] - if isinstance(value, np.ndarray): - value = value.tolist() - assert_close(value, initial_value) - - await signal.set(put_value, wait=True, timeout=timeout) - reading = await signal.read() - value = reading["test_signal"]["value"] - if isinstance(value, np.ndarray): - value = value.tolist() - assert_close(value, put_value) - - dtype = py_type - await _test_signal(dtype, proxy) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.skip("Not sure if we need tango_signal_auto") -@pytest.mark.parametrize( - "pv, tango_type, d_format, py_type, initial_value, put_value, use_dtype, use_proxy", - [ - ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - use_dtype, - use_proxy, - ) - for ( - pv, - tango_type, - d_format, - py_type, - initial_value, - put_value, - ) in COMMANDS_SET - for use_dtype in [True, False] - for use_proxy in [True, False] - ], - ids=[ - f"{x[0]}_{use_dtype}_{use_proxy}" - for x in COMMANDS_SET - for use_dtype in [True, False] - for use_proxy in [True, False] - ], -) -async def test_tango_signal_auto_cmds( - echo_device: str, - pv: str, - tango_type: str, - d_format: AttrDataFormat, - py_type: type[T], - initial_value: T, - put_value: T, - use_dtype: bool, - use_proxy: bool, -): - source = echo_device + "/" + pv - timeout = 0.2 - - async def _test_signal(dtype, proxy): - signal = await __tango_signal_auto( - datatype=dtype, - trl=source, - device_proxy=proxy, - name="test_signal", - timeout=timeout, - ) - # Ophyd SignalX does not support types - assert type(signal) in [SignalR, SignalRW, SignalW] - await signal.connect() - assert signal - reading = await signal.read() - assert reading["test_signal"]["value"] is None - - await signal.set(put_value, wait=True, timeout=0.1) - reading = await signal.read() - value = reading["test_signal"]["value"] - if isinstance(value, np.ndarray): - value = value.tolist() - assert_close(value, put_value) - - proxy = await DeviceProxy(echo_device) if use_proxy else None - dtype = py_type if use_dtype else None - await _test_signal(dtype, proxy) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.skip("Not sure if we need tango_signal_auto") -@pytest.mark.parametrize("use_proxy", [True, False]) -async def test_tango_signal_auto_cmds_void(tango_test_device: str, use_proxy: bool): - proxy = await DeviceProxy(tango_test_device) if use_proxy else None - signal = await __tango_signal_auto( - datatype=None, - trl=tango_test_device + "/" + "clear", - device_proxy=proxy, - ) - assert type(signal) is SignalX - await signal.connect() - assert signal - await signal.trigger(wait=True) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.skip("Not sure if we need tango_signal_auto") -async def test_tango_signal_auto_badtrl(tango_test_device: str): - proxy = await DeviceProxy(tango_test_device) - with pytest.raises(RuntimeError) as exc_info: - await __tango_signal_auto( - datatype=None, - trl=tango_test_device + "/" + "badtrl", - device_proxy=proxy, - ) - assert f"Cannot find badtrl in {tango_test_device}" in str(exc_info.value) diff --git a/tests/tango/test_tango_transport.py b/tests/tango/test_tango_transport.py deleted file mode 100644 index 51048c4e9c..0000000000 --- a/tests/tango/test_tango_transport.py +++ /dev/null @@ -1,856 +0,0 @@ -import asyncio -from enum import Enum - -import numpy as np -import numpy.typing as npt -import pytest -from test_base_device import TestDevice -from test_tango_signals import ( - EchoDevice, - make_backend, - prepare_device, -) - -from ophyd_async.core import ( - NotConnected, -) -from ophyd_async.tango.core import ( - AttributeProxy, - CommandProxy, - TangoSignalBackend, - ensure_proper_executor, - get_dtype_extended, - get_python_type, - get_tango_trl, - get_trl_descriptor, -) -from tango import ( - CmdArgType, - DevState, -) -from tango.asyncio import DeviceProxy -from tango.asyncio_executor import ( - AsyncioExecutor, - get_global_executor, - set_global_executor, -) -from tango.test_context import MultiDeviceTestContext - - -# -------------------------------------------------------------------- -@pytest.fixture(scope="module") -def tango_test_device(): - with MultiDeviceTestContext( - [{"class": TestDevice, "devices": [{"name": "test/device/1"}]}], process=True - ) as context: - yield context.get_device_access("test/device/1") - - -# -------------------------------------------------------------------- -@pytest.fixture(scope="module") -def echo_device(): - with MultiDeviceTestContext( - [{"class": EchoDevice, "devices": [{"name": "test/device/1"}]}], process=True - ) as context: - yield context.get_device_access("test/device/1") - - -# -------------------------------------------------------------------- -@pytest.fixture(autouse=True) -def reset_tango_asyncio(): - set_global_executor(None) - - -# -------------------------------------------------------------------- -class HelperClass: - @ensure_proper_executor - async def mock_func(self): - return "executed" - - -# Test function -@pytest.mark.asyncio -async def test_ensure_proper_executor(): - # Instantiate the helper class and call the decorated method - helper_instance = HelperClass() - result = await helper_instance.mock_func() - - # Assertions - assert result == "executed" - assert isinstance(get_global_executor(), AsyncioExecutor) - - -# -------------------------------------------------------------------- -@pytest.mark.parametrize( - "tango_type, expected", - [ - (CmdArgType.DevVoid, (False, None, "string")), - (CmdArgType.DevBoolean, (False, bool, "integer")), - (CmdArgType.DevShort, (False, int, "integer")), - (CmdArgType.DevLong, (False, int, "integer")), - (CmdArgType.DevFloat, (False, float, "number")), - (CmdArgType.DevDouble, (False, float, "number")), - (CmdArgType.DevUShort, (False, int, "integer")), - (CmdArgType.DevULong, (False, int, "integer")), - (CmdArgType.DevString, (False, str, "string")), - (CmdArgType.DevVarCharArray, (True, list[str], "string")), - (CmdArgType.DevVarShortArray, (True, int, "integer")), - (CmdArgType.DevVarLongArray, (True, int, "integer")), - (CmdArgType.DevVarFloatArray, (True, float, "number")), - (CmdArgType.DevVarDoubleArray, (True, float, "number")), - (CmdArgType.DevVarUShortArray, (True, int, "integer")), - (CmdArgType.DevVarULongArray, (True, int, "integer")), - (CmdArgType.DevVarStringArray, (True, str, "string")), - # (CmdArgType.DevVarLongStringArray, (True, str, "string")), - # (CmdArgType.DevVarDoubleStringArray, (True, str, "string")), - (CmdArgType.DevState, (False, CmdArgType.DevState, "string")), - (CmdArgType.ConstDevString, (False, str, "string")), - (CmdArgType.DevVarBooleanArray, (True, bool, "integer")), - (CmdArgType.DevUChar, (False, int, "integer")), - (CmdArgType.DevLong64, (False, int, "integer")), - (CmdArgType.DevULong64, (False, int, "integer")), - (CmdArgType.DevVarLong64Array, (True, int, "integer")), - (CmdArgType.DevVarULong64Array, (True, int, "integer")), - (CmdArgType.DevEncoded, (False, list[str], "string")), - (CmdArgType.DevEnum, (False, Enum, "string")), - # (CmdArgType.DevPipeBlob, (False, list[str], "string")), - (float, (False, float, "number")), - ], -) -def test_get_python_type(tango_type, expected): - if tango_type is not float: - assert get_python_type(tango_type) == expected - else: - # get_python_type should raise a TypeError - with pytest.raises(TypeError) as exc_info: - get_python_type(tango_type) - assert str(exc_info.value) == "Unknown TangoType" - - -# -------------------------------------------------------------------- -@pytest.mark.parametrize( - "datatype, expected", - [ - (npt.NDArray[np.float64], np.dtype("float64")), - (npt.NDArray[np.int8], np.dtype("int8")), - (npt.NDArray[np.uint8], np.dtype("uint8")), - (npt.NDArray[np.int32], np.dtype("int32")), - (npt.NDArray[np.int64], np.dtype("int64")), - (npt.NDArray[np.uint16], np.dtype("uint16")), - (npt.NDArray[np.uint32], np.dtype("uint32")), - (npt.NDArray[np.uint64], np.dtype("uint64")), - (npt.NDArray[np.bool_], np.dtype("bool")), - (npt.NDArray[DevState], CmdArgType.DevState), - (npt.NDArray[np.str_], np.dtype("str")), - (npt.NDArray[np.float32], np.dtype("float32")), - (npt.NDArray[np.complex64], np.dtype("complex64")), - (npt.NDArray[np.complex128], np.dtype("complex128")), - ], -) -def test_get_dtype_extended(datatype, expected): - assert get_dtype_extended(datatype) == expected - - -# -------------------------------------------------------------------- - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "datatype, tango_resource, expected_descriptor", - [ - ( - int, - "test/device/1/justvalue", - {"source": "test/device/1/justvalue", "dtype": "integer", "shape": []}, - ), - ( - float, - "test/device/1/limitedvalue", - {"source": "test/device/1/limitedvalue", "dtype": "number", "shape": []}, - ), - ( - npt.NDArray[float], - "test/device/1/array", - {"source": "test/device/1/array", "dtype": "array", "shape": [2, 3]}, - ), - # Add more test cases as needed - ], -) -async def test_get_trl_descriptor( - tango_test_device, datatype, tango_resource, expected_descriptor -): - proxy = await DeviceProxy(tango_test_device) - tr_configs = { - tango_resource.split("/")[-1]: await proxy.get_attribute_config( - tango_resource.split("/")[-1] - ) - } - descriptor = get_trl_descriptor(datatype, tango_resource, tr_configs) - assert descriptor == expected_descriptor - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "trl, proxy_needed, expected_type, should_raise", - [ - ("test/device/1/justvalue", True, AttributeProxy, False), - ("test/device/1/justvalue", False, AttributeProxy, False), - ("test/device/1/clear", True, CommandProxy, False), - ("test/device/1/clear", False, CommandProxy, False), - ("test/device/1/nonexistent", True, None, True), - ], -) -async def test_get_tango_trl( - tango_test_device, trl, proxy_needed, expected_type, should_raise -): - proxy = await DeviceProxy(tango_test_device) if proxy_needed else None - if should_raise: - with pytest.raises(RuntimeError): - await get_tango_trl(trl, proxy, 1) - else: - result = await get_tango_trl(trl, proxy, 1) - assert isinstance(result, expected_type) - - -# -------------------------------------------------------------------- - - -@pytest.mark.asyncio -@pytest.mark.parametrize("attr", ["justvalue", "array"]) -async def test_attribute_proxy_get(tango_test_device, attr): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, attr) - val = None - val = await attr_proxy.get() - assert val is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "attr, wait", - [("justvalue", True), ("justvalue", False), ("array", True), ("array", False)], -) -async def test_attribute_proxy_put(tango_test_device, attr, wait): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, attr) - - old_value = await attr_proxy.get() - new_value = old_value + 1 - status = await attr_proxy.put(new_value, wait=wait, timeout=0.1) - if status: - await status - else: - if not wait: - raise AssertionError("If wait is False, put should return a status object") - await asyncio.sleep(1.0) - updated_value = await attr_proxy.get() - if isinstance(new_value, np.ndarray): - assert np.all(updated_value == new_value) - else: - assert updated_value == new_value - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("wait", [True, False]) -async def test_attribute_proxy_put_force_timeout(tango_test_device, wait): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "slow_attribute") - with pytest.raises(TimeoutError) as exc_info: - status = await attr_proxy.put(3.0, wait=wait, timeout=0.1) - await status - assert "attr put failed" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("wait", [True, False]) -async def test_attribute_proxy_put_exceptions(tango_test_device, wait): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "raise_exception_attr") - with pytest.raises(RuntimeError) as exc_info: - status = await attr_proxy.put(3.0, wait=wait) - await status - assert "device failure" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize( - "attr, new_value", [("justvalue", 10), ("array", np.array([[2, 3, 4], [5, 6, 7]]))] -) -async def test_attribute_proxy_get_w_value(tango_test_device, attr, new_value): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, attr) - await attr_proxy.put(new_value) - await asyncio.sleep(1.0) - attr_proxy_value = await attr_proxy.get() - if isinstance(new_value, np.ndarray): - assert np.all(attr_proxy_value == new_value) - else: - assert attr_proxy_value == new_value - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_get_config(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "justvalue") - config = await attr_proxy.get_config() - assert config.writable is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_get_reading(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "justvalue") - reading = await attr_proxy.get_reading() - assert reading["value"] is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_has_subscription(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "justvalue") - expected = bool(attr_proxy._callback) - has_subscription = attr_proxy.has_subscription() - assert has_subscription is expected - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_subscribe_callback(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - backend = await make_backend(float, source) - attr_proxy = backend.proxies[source] - val = None - - def callback(reading): - nonlocal val - val = reading["value"] - - attr_proxy.subscribe_callback(callback) - assert attr_proxy.has_subscription() - old_value = await attr_proxy.get() - new_value = old_value + 1 - await attr_proxy.put(new_value) - await asyncio.sleep(0.2) - attr_proxy.unsubscribe_callback() - assert val == new_value - - attr_proxy.set_polling(False) - attr_proxy.support_events = False - with pytest.raises(RuntimeError) as exc_info: - attr_proxy.subscribe_callback(callback) - assert "Cannot set a callback" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_unsubscribe_callback(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - backend = await make_backend(float, source) - attr_proxy = backend.proxies[source] - - def callback(reading): - pass - - attr_proxy.subscribe_callback(callback) - assert attr_proxy.has_subscription() - attr_proxy.unsubscribe_callback() - assert not attr_proxy.has_subscription() - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_set_polling(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "justvalue") - attr_proxy.set_polling(True, 0.1, 1, 0.1) - assert attr_proxy._allow_polling - assert attr_proxy._polling_period == 0.1 - assert attr_proxy._abs_change == 1 - assert attr_proxy._rel_change == 0.1 - attr_proxy.set_polling(False) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_poll(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, "floatvalue") - attr_proxy.support_events = False - - def callback(reading): - nonlocal val - val = reading["value"] - - def bad_callback(): - pass - - # Test polling with absolute change - val = None - - attr_proxy.set_polling(True, 0.1, 1, 1.0) - attr_proxy.subscribe_callback(callback) - current_value = await attr_proxy.get() - new_value = current_value + 2 - await attr_proxy.put(new_value) - polling_period = attr_proxy._polling_period - await asyncio.sleep(polling_period) - assert val is not None - attr_proxy.unsubscribe_callback() - - # Test polling with relative change - val = None - attr_proxy.set_polling(True, 0.1, 100, 0.1) - attr_proxy.subscribe_callback(callback) - current_value = await attr_proxy.get() - new_value = current_value * 2 - await attr_proxy.put(new_value) - polling_period = attr_proxy._polling_period - await asyncio.sleep(polling_period) - assert val is not None - attr_proxy.unsubscribe_callback() - - # Test polling with small changes. This should not update last_reading - attr_proxy.set_polling(True, 0.1, 100, 1.0) - attr_proxy.subscribe_callback(callback) - await asyncio.sleep(0.2) - current_value = await attr_proxy.get() - new_value = current_value + 1 - val = None - await attr_proxy.put(new_value) - polling_period = attr_proxy._polling_period - await asyncio.sleep(polling_period * 2) - assert val is None - attr_proxy.unsubscribe_callback() - - # Test polling with bad callback - attr_proxy.subscribe_callback(bad_callback) - await asyncio.sleep(0.2) - assert "Could not poll the attribute" in str(attr_proxy.exception) - attr_proxy.unsubscribe_callback() - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("attr", ["array", "label"]) -async def test_attribute_poll_stringsandarrays(tango_test_device, attr): - device_proxy = await DeviceProxy(tango_test_device) - attr_proxy = AttributeProxy(device_proxy, attr) - attr_proxy.support_events = False - - def callback(reading): - nonlocal val - val = reading["value"] - - val = None - attr_proxy.set_polling(True, 0.1) - attr_proxy.subscribe_callback(callback) - await asyncio.sleep(0.2) - assert val is not None - if isinstance(val, np.ndarray): - await attr_proxy.put(np.array([[2, 3, 4], [5, 6, 7]])) - await asyncio.sleep(0.5) - assert np.all(val == np.array([[2, 3, 4], [5, 6, 7]])) - if isinstance(val, str): - await attr_proxy.put("new label") - await asyncio.sleep(0.5) - assert val == "new label" - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_attribute_poll_exceptions(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - # Try to poll a non-existent attribute - attr_proxy = AttributeProxy(device_proxy, "nonexistent") - attr_proxy.support_events = False - attr_proxy.set_polling(True, 0.1) - - def callback(reading, value): - pass - - attr_proxy.subscribe_callback(callback) - await asyncio.sleep(0.2) - assert "Could not poll the attribute" in str(attr_proxy.exception) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_proxy_put_wait(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "clear") - - cmd_proxy._last_reading = None - await cmd_proxy.put(None, wait=True) - assert cmd_proxy._last_reading["value"] == "Received clear command" - - # Force timeout - cmd_proxy = CommandProxy(device_proxy, "slow_command") - cmd_proxy._last_reading = None - with pytest.raises(TimeoutError) as exc_info: - await cmd_proxy.put(None, wait=True, timeout=0.1) - assert "command failed" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_proxy_put_nowait(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "slow_command") - - # Reply before timeout - cmd_proxy._last_reading = None - status = await cmd_proxy.put(None, wait=False, timeout=0.5) - assert cmd_proxy._last_reading is None - await status - assert cmd_proxy._last_reading["value"] == "Completed slow command" - - # Timeout - cmd_proxy._last_reading = None - status = await cmd_proxy.put(None, wait=False, timeout=0.1) - with pytest.raises(TimeoutError) as exc_info: - await status - assert str(exc_info.value) == "Timeout while waiting for command reply" - - # No timeout - cmd_proxy._last_reading = None - status = await cmd_proxy.put(None, wait=False) - assert cmd_proxy._last_reading is None - await status - assert cmd_proxy._last_reading["value"] == "Completed slow command" - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("wait", [True, False]) -async def test_command_proxy_put_exceptions(tango_test_device, wait): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "raise_exception_cmd") - with pytest.raises(RuntimeError) as exc_info: - await cmd_proxy.put(None, wait=True) - assert "device failure" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_get(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "clear") - await cmd_proxy.put(None, wait=True, timeout=1.0) - reading = cmd_proxy._last_reading - assert reading["value"] is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_get_config(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "clear") - config = await cmd_proxy.get_config() - assert config.out_type is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_get_reading(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "clear") - await cmd_proxy.put(None, wait=True, timeout=1.0) - reading = await cmd_proxy.get_reading() - assert reading["value"] is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_command_set_polling(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - cmd_proxy = CommandProxy(device_proxy, "clear") - cmd_proxy.set_polling(True, 0.1) - # Set polling in the command proxy currently does nothing - assert True - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_init(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - assert transport is not None - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_source(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source) - transport_source = transport.source("", True) - assert transport_source == source - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_datatype_allowed(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - backend = await make_backend(float, source) - - assert backend.datatype_allowed(int) - assert backend.datatype_allowed(float) - assert backend.datatype_allowed(str) - assert backend.datatype_allowed(bool) - assert backend.datatype_allowed(np.ndarray) - assert backend.datatype_allowed(Enum) - assert backend.datatype_allowed(DevState) - assert not backend.datatype_allowed(list) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_connect(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - backend = await make_backend(float, source, connect=False) - assert backend is not None - await backend.connect(1) - backend.read_trl = "" - with pytest.raises(RuntimeError) as exc_info: - await backend.connect(1) - assert "trl not set" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_connect_and_store_config(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - await transport._connect_and_store_config(source, 1) - assert transport.trl_configs[source] is not None - - with pytest.raises(RuntimeError) as exc_info: - await transport._connect_and_store_config("", 1) - assert "trl not set" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_put(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - - with pytest.raises(NotConnected) as exc_info: - await transport.put(1.0) - assert "Not connected" in str(exc_info.value) - - await transport.connect(1) - source = transport.source("", True) - await transport.put(2.0) - val = await transport.proxies[source].get_w_value() - assert val == 2.0 - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_get_datakey(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - await transport.connect(1) - datakey = await transport.get_datakey(source) - assert datakey["source"] == source - assert datakey["dtype"] == "number" - assert datakey["shape"] == [] - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_get_reading(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - - with pytest.raises(NotConnected) as exc_info: - await transport.put(1.0) - assert "Not connected" in str(exc_info.value) - - await transport.connect(1) - reading = await transport.get_reading() - assert reading["value"] == 1.0 - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_get_value(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - - with pytest.raises(NotConnected) as exc_info: - await transport.put(1.0) - assert "Not connected" in str(exc_info.value) - - await transport.connect(1) - value = await transport.get_value() - assert value == 1.0 - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_get_setpoint(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - - with pytest.raises(NotConnected) as exc_info: - await transport.put(1.0) - assert "Not connected" in str(exc_info.value) - - await transport.connect(1) - new_setpoint = 2.0 - await transport.put(new_setpoint) - setpoint = await transport.get_setpoint() - assert setpoint == new_setpoint - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_set_callback(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - - with pytest.raises(NotConnected) as exc_info: - await transport.put(1.0) - assert "Not connected" in str(exc_info.value) - - await transport.connect(1) - val = None - - def callback(reading): - nonlocal val - val = reading["value"] - - # Correct usage - transport.set_callback(callback) - current_value = await transport.get_value() - new_value = current_value + 2 - await transport.put(new_value) - await asyncio.sleep(0.1) - assert val == new_value - - # Try to add second callback - with pytest.raises(RuntimeError) as exc_info: - transport.set_callback(callback) - assert "Cannot set a callback when one is already set" - - transport.set_callback(None) - - # Try to add a callback to a non-callable proxy - transport.allow_events(False) - transport.set_polling(False) - with pytest.raises(RuntimeError) as exc_info: - transport.set_callback(callback) - assert "Cannot set event" in str(exc_info.value) - - # Try to add a non-callable callback - transport.allow_events(True) - with pytest.raises(RuntimeError) as exc_info: - transport.set_callback(1) - assert "Callback must be a callable" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_set_polling(echo_device): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - transport.set_polling(True, 0.1, 1, 0.1) - assert transport._polling == (True, 0.1, 1, 0.1) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -@pytest.mark.parametrize("allow", [True, False]) -async def test_tango_transport_allow_events(echo_device, allow): - await prepare_device(echo_device, "float_scalar_attr", 1.0) - source = echo_device + "/" + "float_scalar_attr" - transport = await make_backend(float, source, connect=False) - transport.allow_events(allow) - assert transport.support_events == allow - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_read_and_write_trl(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - # Must use a FQTRL, at least on windows. - read_trl = tango_test_device + "/" + "readback" - write_trl = tango_test_device + "/" + "setpoint" - - # Test with existing proxy - transport = TangoSignalBackend(float, read_trl, write_trl, device_proxy) - await transport.connect(1) - reading = await transport.get_reading() - initial_value = reading["value"] - new_value = initial_value + 1.0 - await transport.put(new_value) - updated_value = await transport.get_value() - assert updated_value == new_value - - # Without pre-existing proxy - transport = TangoSignalBackend(float, read_trl, write_trl, None) - await transport.connect(1) - reading = await transport.get_reading() - initial_value = reading["value"] - new_value = initial_value + 1.0 - await transport.put(new_value) - updated_value = await transport.get_value() - assert updated_value == new_value - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_read_only_trl(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - trl = device_proxy.dev_name() - read_trl = trl + "/" + "readonly" - - # Test with existing proxy - transport = TangoSignalBackend(int, read_trl, read_trl, device_proxy) - await transport.connect(1) - with pytest.raises(RuntimeError) as exc_info: - await transport.put(1) - assert "is not writable" in str(exc_info.value) - - -# -------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_tango_transport_nonexistent_trl(tango_test_device): - device_proxy = await DeviceProxy(tango_test_device) - trl = device_proxy.dev_name() - nonexistent_trl = trl + "/" + "nonexistent" - - # Test with existing proxy - transport = TangoSignalBackend(int, nonexistent_trl, nonexistent_trl, device_proxy) - with pytest.raises(RuntimeError) as exc_info: - await transport.connect(1) - assert "cannot be found" in str(exc_info.value) - - # Without pre-existing proxy - transport = TangoSignalBackend(int, nonexistent_trl, nonexistent_trl, None) - with pytest.raises(RuntimeError) as exc_info: - await transport.connect(1) - assert "cannot be found" in str(exc_info.value)