From 8af729580e1683552322b8c374dd63f37b79555f Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 13 Oct 2020 11:28:41 -0700 Subject: [PATCH 1/3] Add clock specific labels to scheduler flow runs --- src/prefect_server/api/flows.py | 2 ++ tests/api/test_flows.py | 64 ++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index b0c4fd17..b7f59022 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -29,6 +29,7 @@ class Config: class ClockSchema(Model): parameter_defaults: Dict = Field(default_factory=dict) + labels: List[str] = Field(default_factory=list) class ScheduleSchema(Model): @@ -548,6 +549,7 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]: flow_id=flow_id, scheduled_start_time=event.start_time, parameters=event.parameter_defaults, + labels=event.labels or None, idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}", ) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index bd76e344..57baed07 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1007,7 +1007,7 @@ async def test_set_schedule_inactive_deletes_runs_in_utc(self, project_id): assert {r.scheduled_start_time for r in new_runs} == start_times -class TestScheduledParameters: +class TestScheduledRunAttributes: async def test_schedule_creates_parametrized_flow_runs(self, project_id): clock1 = prefect.schedules.clocks.IntervalClock( start_date=pendulum.now("UTC").add(minutes=1), @@ -1039,6 +1039,68 @@ async def test_schedule_creates_parametrized_flow_runs(self, project_id): assert all([fr.parameters == dict(x="a") for fr in flow_runs[::2]]) assert all([fr.parameters == dict(x="b") for fr in flow_runs[1::2]]) + async def test_schedule_adds_labels_to_flow_runs(self, project_id): + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=pendulum.now("UTC").add(minutes=1), + interval=datetime.timedelta(minutes=2), + labels=["a", "b"], + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=pendulum.now("UTC"), + interval=datetime.timedelta(minutes=2), + labels=["c", "d"], + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(await api.flows.schedule_flow_runs(flow_id)) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"labels": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert all([fr.labels == ["a", "b"] for fr in flow_runs[::2]]) + assert all([fr.labels == ["c", "d"] for fr in flow_runs[1::2]]) + + async def test_schedule_does_not_overwrite_flow_labels(self, project_id): + clock1 = prefect.schedules.clocks.IntervalClock( + start_date=pendulum.now("UTC").add(minutes=1), + interval=datetime.timedelta(minutes=2), + labels=["a", "b"], + ) + clock2 = prefect.schedules.clocks.IntervalClock( + start_date=pendulum.now("UTC"), + interval=datetime.timedelta(minutes=2), + ) + + flow = prefect.Flow( + name="Test Scheduled Flow", + schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]), + environment=prefect.environments.LocalEnvironment(labels=["foo", "bar"]), + ) + flow.add_task(prefect.Parameter("x", default=1)) + flow_id = await api.flows.create_flow( + project_id=project_id, serialized_flow=flow.serialize() + ) + await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete() + assert len(await api.flows.schedule_flow_runs(flow_id)) == 10 + + flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get( + selection_set={"labels": True, "scheduled_start_time": True}, + order_by={"scheduled_start_time": EnumValue("asc")}, + ) + + assert all([fr.labels == ["a", "b"] for fr in flow_runs[::2]]) + assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]]) + class TestScheduleRuns: async def test_schedule_runs(self, flow_id): From 5380e8698255c68e37c454ab787ae50c5a75886f Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 13 Oct 2020 11:29:36 -0700 Subject: [PATCH 2/3] Add changelog entry --- changes/pr109.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr109.yaml diff --git a/changes/pr109.yaml b/changes/pr109.yaml new file mode 100644 index 00000000..8ae36a31 --- /dev/null +++ b/changes/pr109.yaml @@ -0,0 +1,2 @@ +feature: + - "Allow for scheduling changing labels on a per-flow run basis - [#109](https://github.com/PrefectHQ/server/pull/109)" From 67f177978d62d42ed3c83edcf3d74426c1db2866 Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 13 Oct 2020 12:11:44 -0700 Subject: [PATCH 3/3] Use labels from event always, allowing for empty lists to be passed by the user --- src/prefect_server/api/flows.py | 4 ++-- tests/api/test_flows.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/prefect_server/api/flows.py b/src/prefect_server/api/flows.py index b7f59022..2c66e2ef 100644 --- a/src/prefect_server/api/flows.py +++ b/src/prefect_server/api/flows.py @@ -29,7 +29,7 @@ class Config: class ClockSchema(Model): parameter_defaults: Dict = Field(default_factory=dict) - labels: List[str] = Field(default_factory=list) + labels: List[str] = Field(default=None) class ScheduleSchema(Model): @@ -549,7 +549,7 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]: flow_id=flow_id, scheduled_start_time=event.start_time, parameters=event.parameter_defaults, - labels=event.labels or None, + labels=event.labels, idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}", ) diff --git a/tests/api/test_flows.py b/tests/api/test_flows.py index 57baed07..75fc9da3 100644 --- a/tests/api/test_flows.py +++ b/tests/api/test_flows.py @@ -1070,11 +1070,12 @@ async def test_schedule_adds_labels_to_flow_runs(self, project_id): assert all([fr.labels == ["a", "b"] for fr in flow_runs[::2]]) assert all([fr.labels == ["c", "d"] for fr in flow_runs[1::2]]) - async def test_schedule_does_not_overwrite_flow_labels(self, project_id): + @pytest.mark.parametrize("labels", [[], ["a", "b"]]) + async def test_schedule_does_not_overwrite_flow_labels(self, project_id, labels): clock1 = prefect.schedules.clocks.IntervalClock( start_date=pendulum.now("UTC").add(minutes=1), interval=datetime.timedelta(minutes=2), - labels=["a", "b"], + labels=labels, ) clock2 = prefect.schedules.clocks.IntervalClock( start_date=pendulum.now("UTC"), @@ -1098,7 +1099,7 @@ async def test_schedule_does_not_overwrite_flow_labels(self, project_id): order_by={"scheduled_start_time": EnumValue("asc")}, ) - assert all([fr.labels == ["a", "b"] for fr in flow_runs[::2]]) + assert all([fr.labels == labels for fr in flow_runs[::2]]) assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]])