Skip to content

Commit

Permalink
Move testing function to new testing directory
Browse files Browse the repository at this point in the history
  • Loading branch information
olliesilvester committed Nov 29, 2024
1 parent aabfdcb commit fb10a80
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 25 deletions.
2 changes: 0 additions & 2 deletions src/ophyd_async/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
get_unique,
in_micros,
wait_for_connection,
wait_for_pending_wakeups,
)

__all__ = [
Expand Down Expand Up @@ -193,5 +192,4 @@
"in_micros",
"wait_for_connection",
"completed_status",
"wait_for_pending_wakeups",
]
21 changes: 0 additions & 21 deletions src/ophyd_async/core/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,24 +311,3 @@ def __call__(self) -> Mock:
if self.parent is not None:
self.parent().attach_mock(self._mock, self.name)
return self._mock


async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True):
"""Allow any ready asyncio tasks to be woken up.
Used in:
- Tests to allow tasks like ``set()`` to start so that signal
puts can be tested
- `observe_value` to allow it to be wrapped in `asyncio.wait_for`
with a timeout
"""
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready: # type: ignore # noqa: SLF001
return
if raise_if_exceeded:
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")
22 changes: 22 additions & 0 deletions src/ophyd_async/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio


async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True):
"""Allow any ready asyncio tasks to be woken up.
Used in:
- Tests to allow tasks like ``set()`` to start so that signal
puts can be tested
- `observe_value` to allow it to be wrapped in `asyncio.wait_for`
with a timeout
"""
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready: # type: ignore # noqa: SLF001
return
if raise_if_exceeded:
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")
2 changes: 1 addition & 1 deletion tests/epics/demo/test_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
get_mock,
get_mock_put,
set_mock_value,
wait_for_pending_wakeups,
)
from ophyd_async.epics import demo
from ophyd_async.testing import wait_for_pending_wakeups


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion tests/epics/test_motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
set_mock_put_proceeds,
set_mock_value,
soft_signal_rw,
wait_for_pending_wakeups,
)
from ophyd_async.epics import motor
from ophyd_async.testing import wait_for_pending_wakeups


@pytest.fixture
Expand Down

0 comments on commit fb10a80

Please sign in to comment.