From 3722aa164323baedcaa26bb9c6f53ecbfa0269f5 Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 14 Oct 2020 11:29:27 -0700 Subject: [PATCH 1/4] Allow for scheduling different parameters and run labels at the same time --- src/prefect_server/api/flows.py | 18 ++++++++- tests/api/test_flows.py | 66 +++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 2c66e2ef..0fa1e8c2 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -1,4 +1,6 @@ import datetime +import hashlib +import json import uuid from typing import Any, Dict, List @@ -541,16 +543,28 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]: # schedule every event with an idempotent flow run for event in flow_schedule.next(n=max_runs, return_events=True): + # if the event has parameter defaults or labels, we do allow for + # same-time scheduling + if event.parameter_defaults or event.labels: + md5 = hashlib.md5() + param_string = str(sorted(json.dumps(event.parameter_defaults))) + label_string = str(sorted(json.dumps(event.labels))) + md5.update((param_string + label_string).encode("utf-8")) + idempotency_key = ( + f"auto-scheduled:{event.start_time.in_tz('UTC')}:{md5.hexdigest()}" + ) # if this run was already scheduled, continue - if last_scheduled_run and event.start_time <= last_scheduled_run: + elif last_scheduled_run and event.start_time <= last_scheduled_run: continue + else: + idempotency_key = f"auto-scheduled:{event.start_time.in_tz('UTC')}" run_id = await api.runs.create_flow_run( flow_id=flow_id, scheduled_start_time=event.start_time, parameters=event.parameter_defaults, labels=event.labels, - idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}", + idempotency_key=idempotency_key, ) logger.debug( diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 75fc9da3..43bd2de6 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1102,6 +1102,72 @@ async def test_schedule_does_not_overwrite_flow_labels(self, project_id, labels) assert all([fr.labels == labels for fr in flow_runs[::2]]) assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]]) + async def test_doesnt_schedule_same_time_twice(self, project_id): + now = pendulum.now("UTC") + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"parameters": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 10 + + @pytest.mark.parametrize( + "attrs", + [[dict(parameter_defaults=dict(x="a")), dict(parameter_defaults=dict(x="b"))]], + ) + async def test_allows_for_same_time_if_event_attrs_are_different( + self, project_id, attrs + ): + now = pendulum.now("UTC") + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + **attrs[0], + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=now.add(minutes=1), + interval=datetime.timedelta(minutes=2), + **attrs[1], + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"parameters": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 5 + class TestScheduleRuns: async def test_schedule_runs(self, flow_id): From 2eea5cfefeeb2db87a0e4074255fef6b9468a0f2 Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 14 Oct 2020 11:30:29 -0700 Subject: [PATCH 2/4] Add changelog entry --- changes/pr111.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr111.yaml diff --git a/changes/pr111.yaml b/changes/pr111.yaml new file mode 100644 index 00000000..59342f86 --- /dev/null +++ b/changes/pr111.yaml @@ -0,0 +1,2 @@ +fix: + - "Allow for scheduling different parameters and different run labels at the exact same time - [#111](https://github.com/PrefectHQ/server/pull/111)" From 5f4f68f7ef5fa78f31a0d7d952cd91c0d644baf1 Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 14 Oct 2020 11:36:03 -0700 Subject: [PATCH 3/4] Parametrize over more event attribute possibilities --- tests/api/test_flows.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 43bd2de6..c1ce5bc6 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1133,7 +1133,16 @@ async def test_doesnt_schedule_same_time_twice(self, project_id): @pytest.mark.parametrize( "attrs", - [[dict(parameter_defaults=dict(x="a")), dict(parameter_defaults=dict(x="b"))]], + [ + [ + dict(parameter_defaults=dict(x="a")), + dict(parameter_defaults=dict(x="b")), + ], + [dict(parameter_defaults=dict(x="a")), dict(parameter_defaults=None)], + [dict(parameter_defaults=dict(x="a")), dict(labels=["b"])], + [dict(labels=["c", "d"]), dict(labels=["c"])], + [dict(labels=None), dict(labels=["ef"])], + ], ) async def test_allows_for_same_time_if_event_attrs_are_different( self, project_id, attrs From 3d06688117593eac9727485a996764db812de94c Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 14 Oct 2020 11:55:06 -0700 Subject: [PATCH 4/4] Ensure empty label lists are disambiguated from None --- src/prefect_server/api/flows.py | 2 +- tests/api/test_flows.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index 0fa1e8c2..2bd85be3 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -545,7 +545,7 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]: # if the event has parameter defaults or labels, we do allow for # same-time scheduling - if event.parameter_defaults or event.labels: + if event.parameter_defaults or event.labels is not None: md5 = hashlib.md5() param_string = str(sorted(json.dumps(event.parameter_defaults))) label_string = str(sorted(json.dumps(event.labels))) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index c1ce5bc6..e35feef5 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1142,6 +1142,10 @@ async def test_doesnt_schedule_same_time_twice(self, project_id): [dict(parameter_defaults=dict(x="a")), dict(labels=["b"])], [dict(labels=["c", "d"]), dict(labels=["c"])], [dict(labels=None), dict(labels=["ef"])], + [ + dict(labels=None), + dict(labels=[]), + ], # the scheduler should distinguish between none vs. empty ], ) async def test_allows_for_same_time_if_event_attrs_are_different(