diff --git a/changes/pr111.yaml b/changes/pr111.yaml new file mode 100644 index 00000000..59342f86 --- /dev/null +++ b/changes/pr111.yaml @@ -0,0 +1,2 @@ +fix: + - "Allow for scheduling different parameters and different run labels at the exact same time - [#111](https://github.com/PrefectHQ/server/pull/111)" diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 2c66e2ef..2bd85be3 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -1,4 +1,6 @@ import datetime +import hashlib +import json import uuid from typing import Any, Dict, List @@ -541,16 +543,28 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]: # schedule every event with an idempotent flow run for event in flow_schedule.next(n=max_runs, return_events=True): + # if the event has parameter defaults or labels, we do allow for + # same-time scheduling + if event.parameter_defaults or event.labels is not None: + md5 = hashlib.md5() + param_string = str(sorted(json.dumps(event.parameter_defaults))) + label_string = str(sorted(json.dumps(event.labels))) + md5.update((param_string + label_string).encode("utf-8")) + idempotency_key = ( + f"auto-scheduled:{event.start_time.in_tz('UTC')}:{md5.hexdigest()}" + ) # if this run was already scheduled, continue - if last_scheduled_run and event.start_time <= last_scheduled_run: + elif last_scheduled_run and event.start_time <= last_scheduled_run: continue + else: + idempotency_key = f"auto-scheduled:{event.start_time.in_tz('UTC')}" run_id = await api.runs.create_flow_run( flow_id=flow_id, scheduled_start_time=event.start_time, parameters=event.parameter_defaults, labels=event.labels, - idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}", + idempotency_key=idempotency_key, ) logger.debug( diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 75fc9da3..e35feef5 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1102,6 +1102,85 @@ async def test_schedule_does_not_overwrite_flow_labels(self, project_id, labels) assert all([fr.labels == labels for fr in flow_runs[::2]]) assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]]) + async def test_doesnt_schedule_same_time_twice(self, project_id): + now = pendulum.now("UTC") + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"parameters": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 10 + + @pytest.mark.parametrize( + "attrs", + [ + [ + dict(parameter_defaults=dict(x="a")), + dict(parameter_defaults=dict(x="b")), + ], + [dict(parameter_defaults=dict(x="a")), dict(parameter_defaults=None)], + [dict(parameter_defaults=dict(x="a")), dict(labels=["b"])], + [dict(labels=["c", "d"]), dict(labels=["c"])], + [dict(labels=None), dict(labels=["ef"])], + [ + dict(labels=None), + dict(labels=[]), + ], # the scheduler should distinguish between none vs. empty + ], + ) + async def test_allows_for_same_time_if_event_attrs_are_different( + self, project_id, attrs + ): + now = pendulum.now("UTC") + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + **attrs[0], + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + **attrs[1], + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"parameters": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 5 + class TestScheduleRuns: async def test_schedule_runs(self, flow_id):