Make scheduled flows not overlap in time #5373
-
I've read this in the docs:
But, there is no way to make runs of the same type to not overlap, and cancel the scheduled run if the previous one didn't finish? Thanks for your time! |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 4 replies
-
Here is an example state handler that allows you to skip (or cancel) a flow run if there are already some flow runs in progress. This allows non-overlapping flow runs, i.e. only one flow run of a specific flow at a time. import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(360)
def skip_if_running_handler(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}},
{state: {_eq: "Running"}}]}
limit: 1
offset: 1
) {
name
state
start_time
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs:
logger = prefect.context.get("logger")
message = "Skipping this flow run since there are already some flow runs in progress"
logger.info(message)
return Skipped(message) # or returned Cancelled state if you prefer this state in this use case
return new_state
with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
hello_task = hello_world() |
Beta Was this translation helpful? Give feedback.
-
Hi @igonro , did you manage to make use of the snippet above? For some reason in my case it does not prevent starting the same flow. |
Beta Was this translation helpful? Give feedback.
-
Thank @igonro and @anna-geller! |
Beta Was this translation helpful? Give feedback.
-
How can the same task be resolved in Prefect v2? |
Beta Was this translation helpful? Give feedback.
Here is an example state handler that allows you to skip (or cancel) a flow run if there are already some flow runs in progress. This allows non-overlapping flow runs, i.e. only one flow run of a specific flow at a time.