+ + +
+
+ +
+
+ +
+ +
+ + +
+ +
+ + +
+
+ + + + + +
+ +

Source code for ophyd_async.plan_stubs._settings

+from __future__ import annotations
+
+import asyncio
+from collections.abc import Callable, Mapping
+from typing import Any
+
+import bluesky.plan_stubs as bps
+import numpy as np
+from bluesky.utils import MsgGenerator, plan
+
+from ophyd_async.core import (
+    Device,
+    Settings,
+    SettingsProvider,
+    SignalRW,
+    T,
+    walk_rw_signals,
+)
+from ophyd_async.core._table import Table
+
+from ._wait_for_awaitable import wait_for_awaitable
+
+
+@plan
+def _get_values_of_signals(
+    signals: Mapping[T, SignalRW],
+) -> MsgGenerator[dict[T, Any]]:
+    coros = [sig.get_value() for sig in signals.values()]
+    values = yield from wait_for_awaitable(asyncio.gather(*coros))
+    named_values = dict(zip(signals, values, strict=True))
+    return named_values
+
+
+@plan
+def get_current_settings(device: Device) -> MsgGenerator[Settings]:
+    signals = walk_rw_signals(device)
+    named_values = yield from _get_values_of_signals(signals)
+    signal_values = {signals[name]: value for name, value in named_values.items()}
+    return Settings(device, signal_values)
+
+
+
+[docs] +@plan +def store_settings( + provider: SettingsProvider, name: str, device: Device +) -> MsgGenerator[None]: + """Walk a Device for SignalRWs and store their values with a provider associated + with the given name. + """ + signals = walk_rw_signals(device) + named_values = yield from _get_values_of_signals(signals) + yield from wait_for_awaitable(provider.store(name, named_values))
+ + + +
+[docs] +@plan +def retrieve_settings( + provider: SettingsProvider, name: str, device: Device +) -> MsgGenerator[Settings]: + """Retrieve named Settings for a Device from a provider.""" + named_values = yield from wait_for_awaitable(provider.retrieve(name)) + signals = walk_rw_signals(device) + unknown_names = set(named_values) - set(signals) + if unknown_names: + raise NameError(f"Unknown signal names {sorted(unknown_names)}") + signal_values = {signals[name]: value for name, value in named_values.items()} + return Settings(device, signal_values)
+ + + +
+[docs] +@plan +def apply_settings(settings: Settings) -> MsgGenerator[None]: + """Set every SignalRW to the given value in Settings. If value is None ignore it.""" + signal_values = { + signal: value for signal, value in settings.items() if value is not None + } + if signal_values: + for signal, value in signal_values.items(): + yield from bps.abs_set(signal, value, group="apply_settings") + yield from bps.wait("apply_settings")
+ + + +
+[docs] +@plan +def apply_settings_if_different( + settings: Settings, + apply_plan: Callable[[Settings], MsgGenerator[None]], + current_settings: Settings | None = None, +) -> MsgGenerator[None]: + """Set every SignalRW in settings to its given value if it is different to the + current value. + + Parameters + ---------- + apply_plan: + A device specific plan which takes the Settings to apply and applies them to + the Device. Used to add device specific ordering to setting the signals. + current_settings: + If given, should be a superset of settings containing the current value of + the Settings in the Device. If not given it will be created by reading just + the signals given in settings. + """ + if current_settings is None: + # If we aren't give the current settings, then get the + # values of just the signals we were asked to change. + # This allows us to use this plan with Settings for a subset + # of signals in the Device without retrieving them all + signal_values = yield from _get_values_of_signals( + {sig: sig for sig in settings} + ) + current_settings = Settings(settings.device, signal_values) + + def _is_different(current, required) -> bool: + if isinstance(current, Table): + current = current.model_dump() + if isinstance(required, Table): + required = required.model_dump() + return current.keys() != required.keys() or any( + _is_different(current[k], required[k]) for k in current + ) + elif isinstance(current, np.ndarray): + return not np.array_equal(current, required) + else: + return current != required + + settings_to_change, _ = settings.partition( + lambda sig: _is_different(current_settings[sig], settings[sig]) + ) + yield from apply_plan(settings_to_change)
+ +
+ +
+ + + + + +
+ +
+
+
+ +
+ + + + +
+
+ +
+ +