Skip to content

Commit

Permalink
Merge pull request #109 from PrefectHQ/scheduler-labels
Browse files Browse the repository at this point in the history
Scheduler labels
  • Loading branch information
cicdw authored Oct 13, 2020
2 parents 9b31558 + 67f1779 commit f78cdd6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
2 changes: 2 additions & 0 deletions changes/pr109.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feature:
- "Allow for scheduling changing labels on a per-flow run basis - [#109](https://github.com/PrefectHQ/server/pull/109)"
2 changes: 2 additions & 0 deletions src/prefect_server/api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Config:

class ClockSchema(Model):
parameter_defaults: Dict = Field(default_factory=dict)
labels: List[str] = Field(default=None)


class ScheduleSchema(Model):
Expand Down Expand Up @@ -548,6 +549,7 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]:
flow_id=flow_id,
scheduled_start_time=event.start_time,
parameters=event.parameter_defaults,
labels=event.labels,
idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}",
)

Expand Down
65 changes: 64 additions & 1 deletion tests/api/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ async def test_set_schedule_inactive_deletes_runs_in_utc(self, project_id):
assert {r.scheduled_start_time for r in new_runs} == start_times


class TestScheduledParameters:
class TestScheduledRunAttributes:
async def test_schedule_creates_parametrized_flow_runs(self, project_id):
clock1 = prefect.schedules.clocks.IntervalClock(
start_date=pendulum.now("UTC").add(minutes=1),
Expand Down Expand Up @@ -1039,6 +1039,69 @@ async def test_schedule_creates_parametrized_flow_runs(self, project_id):
assert all([fr.parameters == dict(x="a") for fr in flow_runs[::2]])
assert all([fr.parameters == dict(x="b") for fr in flow_runs[1::2]])

async def test_schedule_adds_labels_to_flow_runs(self, project_id):
clock1 = prefect.schedules.clocks.IntervalClock(
start_date=pendulum.now("UTC").add(minutes=1),
interval=datetime.timedelta(minutes=2),
labels=["a", "b"],
)
clock2 = prefect.schedules.clocks.IntervalClock(
start_date=pendulum.now("UTC"),
interval=datetime.timedelta(minutes=2),
labels=["c", "d"],
)

flow = prefect.Flow(
name="Test Scheduled Flow",
schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]),
)
flow.add_task(prefect.Parameter("x", default=1))
flow_id = await api.flows.create_flow(
project_id=project_id, serialized_flow=flow.serialize()
)
await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete()
assert len(await api.flows.schedule_flow_runs(flow_id)) == 10

flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get(
selection_set={"labels": True, "scheduled_start_time": True},
order_by={"scheduled_start_time": EnumValue("asc")},
)

assert all([fr.labels == ["a", "b"] for fr in flow_runs[::2]])
assert all([fr.labels == ["c", "d"] for fr in flow_runs[1::2]])

@pytest.mark.parametrize("labels", [[], ["a", "b"]])
async def test_schedule_does_not_overwrite_flow_labels(self, project_id, labels):
clock1 = prefect.schedules.clocks.IntervalClock(
start_date=pendulum.now("UTC").add(minutes=1),
interval=datetime.timedelta(minutes=2),
labels=labels,
)
clock2 = prefect.schedules.clocks.IntervalClock(
start_date=pendulum.now("UTC"),
interval=datetime.timedelta(minutes=2),
)

flow = prefect.Flow(
name="Test Scheduled Flow",
schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]),
environment=prefect.environments.LocalEnvironment(labels=["foo", "bar"]),
)
flow.add_task(prefect.Parameter("x", default=1))
flow_id = await api.flows.create_flow(
project_id=project_id, serialized_flow=flow.serialize()
)
await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete()
assert len(await api.flows.schedule_flow_runs(flow_id)) == 10

flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get(
selection_set={"labels": True, "scheduled_start_time": True},
order_by={"scheduled_start_time": EnumValue("asc")},
)

assert all([fr.labels == labels for fr in flow_runs[::2]])
assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]])


class TestScheduleRuns:
async def test_schedule_runs(self, flow_id):
Expand Down

0 comments on commit f78cdd6

Please sign in to comment.