-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timeout does not stop execution #682
Comments
Hello @matt3o, this issue can be boiled down to this: import asyncio
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_B(self):
print("ENTERED")
await asyncio.sleep(2)
print("DONE")
async def handle_timeout(self):
print("CANCEL")
await self.to_A()
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}], initial='A')
await m.to_B()
asyncio.run(run()) Obviously, |
Without queuing one could transition away from the current state and cause a from contextvars import Context
from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout
import asyncio
from enum import Enum
from transitions.extensions.states import add_state_features, Tags
from transitions.extensions import GraphMachine
import logging
import time
logger = logging.getLogger("tmp")
logging.getLogger('transitions').setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)
class States(str, Enum):
start = "Start handling of request"
parse_input = "Parse Input"
finished = "Finished handling of request"
error = "Error"
states = [
States.start,
{"name": States.parse_input, 'timeout': 5, "on_timeout": "timeout_handler"},
{"name": States.finished, "final": True},
States.error
]
@add_state_features(Tags, AsyncTimeout)
class CustomGraphMachine(GraphMachine):
pass
@add_state_features(Tags, AsyncTimeout)
class CustomAsyncMachine(AsyncMachine):
pass
class ChatStateMachine:
def __init__(self):
self.machine = CustomAsyncMachine(
model=self,
states=states,
auto_transitions=True,
initial=States.start,
send_event=True,
on_exception='handle_error',
name="ChatStateMachine",
before_state_change=[self.default_on_exit],
after_state_change=[self.default_on_enter],
)
self.start = time.time()
async def default_on_enter(self, event):
# print(event)
state = event.state.name
print(f"Entering state: {state}")
# Fire and forget the async database tracking logic
asyncio.create_task(self.update_database(state))
# await asyncio.sleep(0.01)
async def default_on_exit(self, event):
# print(event)
state = event.state.name
print(f"Exiting state: {state}")
# Fire and forget the async database tracking logic
# asyncio.create_task(self.update_database(state))
async def update_database(self, state_name):
# Simulate async database update
await asyncio.sleep(0.1)
print(f"Database updated with state: {state_name}")
async def on_enter_parse_input(self, event):
print(f"on_enter_parse_input {time.time() - self.start:.2f} seconds")
await asyncio.sleep(20)
print(f"resuming after sleep {time.time() - self.start:.2f} seconds")
# await asyncio.wait_for(asyncio.sleep(30), 6)
async def handle_error(self, event):
print(f"Received error: {event.error=}")
# if not event.state.name == States.fatal_error:
# await self.to_fatal_error_raised()
# raise event.error
del event.error
async def timeout_handler(self, event):
print(f"timeout_handler() after {time.time() - self.start:.2f} seconds")
print("TIMEOUT")
await asyncio.create_task(self.to_error(), context=Context())
async def run(self):
await self.to_parse_input()
await asyncio.sleep(0.5) # let the update database task finish
await self.to_finished()
async def run_state_machine():
chat_sm = ChatStateMachine()
await chat_sm.run()
if __name__ == "__main__":
asyncio.run(run_state_machine()) This is just a workaround. A part of the fix will/could be that this 'context switch' will be handled by |
I don't see an easy way to make import asyncio
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_A(self, event_data: AsyncEventData):
print("Enter A...")
async def on_enter_B(self, event_data: AsyncEventData):
print("Enter B...")
await asyncio.sleep(event_data.kwargs.get("sleep", 0))
print("... Done in B.")
async def handle_timeout(self, event_data: AsyncEventData):
print("Timeout!")
if event_data.machine.has_queue:
await event_data.machine.switch_model_context(self)
await self.to_A()
async def handle_error(self, event_data):
print(f"Error: {event_data.error=}")
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("First round")
await m.to_B(sleep=2)
print("Second round")
await m.to_B(sleep=0.5)
while not m.is_A():
await m.try_something() # when send_event=True, this will always return True
print("Try something else...")
await asyncio.sleep(0.2)
asyncio.run(run()) Output:
If you have some feedback, let me know. |
- see #682 for details - renamed `AsyncMachine.switch_model_context` to `cancel_running_transitions`
I updated the development branch with another approach which hopefully tackles most scenarios when errors are (re-)raised and keeps internals internal since it does not require to manually cancel ongoing transitions. I am trying to wrap my head around all the possible outcomes for an async timeout. When a timeout raises an exception and This means an exception raised in a timeout cannot be 'forwarded' to the initial event caller. However, a CancelledException can be intercepted and -- if the cancellation has been caused by a timeout -- raised as another error (e.g. TimeoutException). How to determine wether the cancellation has been triggered by the timeout depends on the design of the state machine. A sequential order of Diagramflowchart TD
A[TimeoutState] --[timeout]--> B
B[on_timeout] --> L["trigger event?"]
L --"yes & queue"--> X
L --"yes & !queue"--> D
L --"no"--> X[continue...]
B[on_timeout] --[error]--> C
C["on_exception(TimeoutError)"] --[error]--> D
C --> L
D[cancel_running_transitions] --> E
E[running tasks?] --yes--> G
E --"no"--> X
G["on_exception(CancelledError)"] --> X
G --error--> I
I --"other"--> M["raised to event caller"]
I[clear_queue] --"CancelledError"--> X
Sample codeimport asyncio
from asyncio import CancelledError
from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features
@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):
async def on_enter_A(self, event_data: AsyncEventData):
print("Enter A...")
async def on_enter_B(self, event_data: AsyncEventData):
self.timeout_called = False
await self.to_C() # this will not be called when B is blocking
seconds = event_data.kwargs.get("sleep", 100)
await asyncio.sleep(seconds)
async def on_enter_C(self, event_data: AsyncEventData):
print("We are in C now. B was (hopefully!) non-blocking")
async def handle_timeout(self, event_data: AsyncEventData):
print("Trigger Timeout!")
self.timeout_called = True
raise TimeoutError()
async def handle_error(self, event_data):
print(f"Handle Error: {event_data.error=}")
if isinstance(event_data.error, CancelledError) and self.timeout_called:
print("CancelledError after a timeout!")
raise TimeoutError()
raise event_data.err
async def run():
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
{"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("# Scenario A: No queue and blocking to_B...")
try:
await m.to_B()
except TimeoutError:
print("Caught a timeout without queue.")
print("# Scenario B: No queue and non-blocking to_B...")
await m.to_B(sleep=0.1)
await asyncio.sleep(2)
m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
{"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
transitions=[{"trigger": "try_something", "source": "*",
"dest": "A", "conditions": [lambda event_data: False]}],
initial='A', queued=True, send_event=True, on_exception="handle_error")
print("# Scenario C: queue and blocking to_B...")
try:
await m.to_B()
except TimeoutError:
print("Caught a timeout without queue.")
print("# Scenario D: queue and non-blocking to_B...")
await m.to_B(sleep=0.1)
await asyncio.sleep(2)
asyncio.run(run()) Output# Scenario A: No queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario B: No queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
# Scenario C: queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario D: queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError() |
Sorry for the super late response @aleneum. I'll try it this week or next week and come back to you. |
Describe the bug
What did I want: The timeout shall immediately stop the execution of the currently running state. I have e.g. blocking database calls which shall just end in that case. That is at least what I expected that the timeout would do.
However in the default implementation, the timeout does not interrupt the execution of the state. By now, I know the timeout runs in a separate thread.
I tried to raise a TimeoutError in the thread (so in the timeout_handler() function), however, that does not work as this exception is never caught in the main loop. I think it would be possible to at least catch the exception over event.state.runner, if I am not mistaken.
This approach should not work on a blocked main process loop, which is exactly where I would've wanted this feature.
My workaround will probably be to call asyncio.wait_for / asyncio.timeout
Minimal working example
Output of that snippet (filenames censored):
Expected behavior
If you activate the line
await asyncio.wait_for(asyncio.sleep(30), 6)
you can see the expected bevhaviour. The code now correctly raises a TimeoutError which can be caught and handeld in the error handler.The text was updated successfully, but these errors were encountered: