Skip to content

Commit

Permalink
Merge branch 'bluesky:main' into handle-field-signal-rbv
Browse files Browse the repository at this point in the history
  • Loading branch information
jwlodek authored Dec 3, 2024
2 parents de4bc08 + 666d222 commit eee702a
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 73 deletions.
2 changes: 2 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Fixes #ISSUE

### Checks for reviewer
- [ ] Would the PR title make sense to a user on a set of release notes
- [ ] If the change requires a bump in an IOC version, is that specified in a `##Changes` section in the body of the PR
- [ ] If the change requires a bump in the PandABlocks-ioc version, is the `ophyd_async.fastcs.panda._hdf_panda.MINIMUM_PANDA_IOC` variable updated to match
32 changes: 26 additions & 6 deletions src/ophyd_async/core/_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ class Device(HasName, Connectable):
_connect_task: asyncio.Task | None = None
# The mock if we have connected in mock mode
_mock: LazyMock | None = None
# The separator to use when making child names
_child_name_separator: str = "-"

def __init__(
self, name: str = "", connector: DeviceConnector | None = None
) -> None:
self._connector = connector or DeviceConnector()
self._connector.create_children_from_annotations(self)
self.set_name(name)
if name:
self.set_name(name)

@property
def name(self) -> str:
Expand All @@ -97,21 +100,30 @@ def log(self) -> LoggerAdapter:
getLogger("ophyd_async.devices"), {"ophyd_async_device_name": self.name}
)

def set_name(self, name: str):
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
"""Set ``self.name=name`` and each ``self.child.name=name+"-child"``.
Parameters
----------
name:
New name to set
child_name_separator:
Use this as a separator instead of "-". Use "_" instead to make the same
names as the equivalent ophyd sync device.
"""
self._name = name
if child_name_separator:
self._child_name_separator = child_name_separator
# Ensure logger is recreated after a name change
if "log" in self.__dict__:
del self.log
for child_name, child in self.children():
child_name = f"{self.name}-{child_name.strip('_')}" if self.name else ""
child.set_name(child_name)
for attr_name, child in self.children():
child_name = (
f"{self.name}{self._child_name_separator}{attr_name}"
if self.name
else ""
)
child.set_name(child_name, child_name_separator=self._child_name_separator)

def __setattr__(self, name: str, value: Any) -> None:
# Bear in mind that this function is called *a lot*, so
Expand Down Expand Up @@ -147,6 +159,10 @@ async def connect(
timeout:
Time to wait before failing with a TimeoutError.
"""
assert hasattr(self, "_connector"), (
f"{self}: doesn't have attribute `_connector`,"
" did you call `super().__init__` in your `__init__` method?"
)
if mock:
# Always connect in mock mode serially
if isinstance(mock, LazyMock):
Expand Down Expand Up @@ -247,6 +263,8 @@ class DeviceCollector:
set_name:
If True, call ``device.set_name(variable_name)`` on all collected
Devices
child_name_separator:
Use this as a separator if we call ``set_name``.
connect:
If True, call ``device.connect(mock)`` in parallel on all
collected Devices
Expand All @@ -271,11 +289,13 @@ class DeviceCollector:
def __init__(
self,
set_name=True,
child_name_separator: str = "-",
connect=True,
mock=False,
timeout: float = 10.0,
):
self._set_name = set_name
self._child_name_separator = child_name_separator
self._connect = connect
self._mock = mock
self._timeout = timeout
Expand Down Expand Up @@ -311,7 +331,7 @@ async def _on_exit(self) -> None:
for name, obj in self._objects_on_exit.items():
if name not in self._names_on_enter and isinstance(obj, Device):
if self._set_name and not obj.name:
obj.set_name(name)
obj.set_name(name, child_name_separator=self._child_name_separator)
if self._connect:
connect_coroutines[name] = obj.connect(
self._mock, timeout=self._timeout
Expand Down
47 changes: 35 additions & 12 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import functools
import time
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from typing import Any, Generic, cast

Expand Down Expand Up @@ -425,6 +426,7 @@ async def observe_value(
signal: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[SignalDatatypeT, None]:
"""Subscribe to the value of a signal so it can be iterated from.
Expand All @@ -439,25 +441,44 @@ async def observe_value(
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is still
being watched after this length, raise asyncio.TimeoutError. This should be used
instead of on an 'asyncio.wait_for' timeout
Notes
-----
Due to a rare condition with busy signals, it is not recommended to use this
function with asyncio.timeout, including in an 'asyncio.wait_for' loop. Instead,
this timeout should be given to the done_timeout parameter.
Example usage::
async for value in observe_value(sig):
do_something_with(value)
"""

async for _, value in observe_signals_value(
signal, timeout=timeout, done_status=done_status
signal,
timeout=timeout,
done_status=done_status,
done_timeout=done_timeout,
):
yield value


def _get_iteration_timeout(
timeout: float | None, overall_deadline: float | None
) -> float | None:
overall_deadline = overall_deadline - time.monotonic() if overall_deadline else None
return min([x for x in [overall_deadline, timeout] if x is not None], default=None)


async def observe_signals_value(
*signals: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]:
"""Subscribe to the value of a signal so it can be iterated from.
Expand All @@ -472,6 +493,10 @@ async def observe_signals_value(
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is still
being watched after this length, raise asyncio.TimeoutError. This should be used
instead of on an 'asyncio.wait_for' timeout
Notes
-----
Expand All @@ -486,12 +511,6 @@ async def observe_signals_value(
q: asyncio.Queue[tuple[SignalR[SignalDatatypeT], SignalDatatypeT] | Status] = (
asyncio.Queue()
)
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

cbs: dict[SignalR, Callback] = {}
for signal in signals:
Expand All @@ -504,13 +523,17 @@ def queue_value(value: SignalDatatypeT, signal=signal):

if done_status is not None:
done_status.add_callback(q.put_nowait)

overall_deadline = time.monotonic() + done_timeout if done_timeout else None
try:
while True:
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await get_value()
if overall_deadline and time.monotonic() >= overall_deadline:
raise asyncio.TimeoutError(
f"observe_value was still observing signals "
f"{[signal.source for signal in signals]} after "
f"timeout {done_timeout}s"
)
iteration_timeout = _get_iteration_timeout(timeout, overall_deadline)
item = await asyncio.wait_for(q.get(), iteration_timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
7 changes: 5 additions & 2 deletions src/ophyd_async/epics/adcore/_hdf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
wait_for_value,
)

from ._core_io import NDArrayBaseIO, NDFileHDFIO
from ._core_io import Callback, NDArrayBaseIO, NDFileHDFIO
from ._utils import (
FileWriteMode,
convert_param_dtype_to_np,
Expand Down Expand Up @@ -71,6 +71,7 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
# Never use custom xml layout file but use the one defined
# in the source code file NDFileHDF5LayoutXML.cpp
self.hdf.xml_file_name.set(""),
self.hdf.enable_callbacks.set(Callback.ENABLE),
)

assert (
Expand All @@ -80,7 +81,9 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
# Overwrite num_capture to go forever
await self.hdf.num_capture.set(0)
# Wait for it to start, stashing the status that tells us when it finishes
self._capture_status = await set_and_wait_for_value(self.hdf.capture, True)
self._capture_status = await set_and_wait_for_value(
self.hdf.capture, True, wait_for_set_completion=False
)
name = self._name_provider()
detector_shape = await self._dataset_describer.shape()
np_dtype = await self._dataset_describer.np_datatype()
Expand Down
6 changes: 4 additions & 2 deletions src/ophyd_async/epics/core/_pvi_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ def _get_signal_details(entry: Entry) -> tuple[type[Signal], str, str]:


class PviDeviceConnector(DeviceConnector):
def __init__(self, prefix: str = "") -> None:
def __init__(self, prefix: str = "", error_hint: str = "") -> None:
# TODO: what happens if we get a leading "pva://" here?
self.prefix = prefix
self.pvi_pv = prefix + "PVI"
self.error_hint = error_hint

def create_children_from_annotations(self, device: Device):
if not hasattr(self, "filler"):
Expand Down Expand Up @@ -85,7 +86,8 @@ async def connect_real(
if e:
self._fill_child(name, e, i)
# Check that all the requested children have been filled
self.filler.check_filled(f"{self.pvi_pv}: {entries}")
suffix = f"\n{self.error_hint}" if self.error_hint else ""
self.filler.check_filled(f"{self.pvi_pv}: {entries}{suffix}")
# Set the name of the device to name all children
device.set_name(device.name)
return await super().connect_real(device, timeout, force_reconnect)
4 changes: 2 additions & 2 deletions src/ophyd_async/epics/demo/_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def __init__(self, prefix: str, name="") -> None:

super().__init__(name=name)

def set_name(self, name: str):
super().set_name(name)
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)

Expand Down
13 changes: 8 additions & 5 deletions src/ophyd_async/epics/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_x
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_w


class MotorLimitsException(Exception):
Expand Down Expand Up @@ -76,7 +76,10 @@ def __init__(self, prefix: str, name="") -> None:
self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM")
self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM")

self.motor_stop = epics_signal_x(prefix + ".STOP")
# Note:cannot use epics_signal_x here, as the motor record specifies that
# we must write 1 to stop the motor. Simply processing the record is not
# sufficient.
self.motor_stop = epics_signal_w(int, prefix + ".STOP")
# Whether set() should complete successfully or not
self._set_success = True

Expand All @@ -91,8 +94,8 @@ def __init__(self, prefix: str, name="") -> None:

super().__init__(name=name)

def set_name(self, name: str):
super().set_name(name)
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.user_readback.set_name(name)

Expand Down Expand Up @@ -178,7 +181,7 @@ async def stop(self, success=False):
self._set_success = success
# Put with completion will never complete as we are waiting for completion on
# the move above, so need to pass wait=False
await self.motor_stop.trigger(wait=False)
await self.motor_stop.set(1, wait=False)

async def _prepare_velocity(
self, start_position: float, end_position: float, time_for_move: float
Expand Down
6 changes: 6 additions & 0 deletions src/ophyd_async/epics/testing/test_records.db
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,9 @@ record(lsi, "$(device)longstr2") {
field(INP, {const:"a string that is just longer than forty characters"})
field(PINI, "YES")
}

record(calc, "$(device)ticking") {
field(INPA, "$(device)ticking")
field(CALC, "A+1")
field(SCAN, ".1 second")
}
4 changes: 2 additions & 2 deletions src/ophyd_async/fastcs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from ophyd_async.epics.core import PviDeviceConnector


def fastcs_connector(device: Device, uri: str) -> DeviceConnector:
def fastcs_connector(device: Device, uri: str, error_hint: str = "") -> DeviceConnector:
# TODO: add Tango support based on uri scheme
connector = PviDeviceConnector(uri)
connector = PviDeviceConnector(uri, error_hint)
connector.create_children_from_annotations(device)
return connector
5 changes: 4 additions & 1 deletion src/ophyd_async/fastcs/panda/_hdf_panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from ._control import PandaPcapController
from ._writer import PandaHDFWriter

MINIMUM_PANDA_IOC = "0.11.4"


class HDFPanda(CommonPandaBlocks, StandardDetector):
def __init__(
Expand All @@ -18,8 +20,9 @@ def __init__(
config_sigs: Sequence[SignalR] = (),
name: str = "",
):
error_hint = f"Is PandABlocks-ioc at least version {MINIMUM_PANDA_IOC}?"
# This has to be first so we make self.pcap
connector = fastcs_connector(self, prefix)
connector = fastcs_connector(self, prefix, error_hint)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
path_provider=path_provider,
Expand Down
22 changes: 22 additions & 0 deletions src/ophyd_async/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio


async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True):
"""Allow any ready asyncio tasks to be woken up.
Used in:
- Tests to allow tasks like ``set()`` to start so that signal
puts can be tested
- `observe_value` to allow it to be wrapped in `asyncio.wait_for`
with a timeout
"""
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready: # type: ignore # noqa: SLF001
return
if raise_if_exceeded:
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")
Loading

0 comments on commit eee702a

Please sign in to comment.