Skip to content

Commit

Permalink
Merge pull request #111 from PrefectHQ/scheduler-enhancement
Browse files Browse the repository at this point in the history
Allow for scheduling different parameters and run labels at the same time
  • Loading branch information
cicdw authored Oct 14, 2020
2 parents 7380c7f + 3d06688 commit ac1a4c6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
2 changes: 2 additions & 0 deletions changes/pr111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fix:
- "Allow for scheduling different parameters and different run labels at the exact same time - [#111](https://github.com/PrefectHQ/server/pull/111)"
18 changes: 16 additions & 2 deletions src/prefect_server/api/flows.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import hashlib
import json
import uuid
from typing import Any, Dict, List

Expand Down Expand Up @@ -541,16 +543,28 @@ async def schedule_flow_runs(flow_id: str, max_runs: int = None) -> List[str]:
# schedule every event with an idempotent flow run
for event in flow_schedule.next(n=max_runs, return_events=True):

# if the event has parameter defaults or labels, we do allow for
# same-time scheduling
if event.parameter_defaults or event.labels is not None:
md5 = hashlib.md5()
param_string = str(sorted(json.dumps(event.parameter_defaults)))
label_string = str(sorted(json.dumps(event.labels)))
md5.update((param_string + label_string).encode("utf-8"))
idempotency_key = (
f"auto-scheduled:{event.start_time.in_tz('UTC')}:{md5.hexdigest()}"
)
# if this run was already scheduled, continue
if last_scheduled_run and event.start_time <= last_scheduled_run:
elif last_scheduled_run and event.start_time <= last_scheduled_run:
continue
else:
idempotency_key = f"auto-scheduled:{event.start_time.in_tz('UTC')}"

run_id = await api.runs.create_flow_run(
flow_id=flow_id,
scheduled_start_time=event.start_time,
parameters=event.parameter_defaults,
labels=event.labels,
idempotency_key=f"auto-scheduled:{event.start_time.in_tz('UTC')}",
idempotency_key=idempotency_key,
)

logger.debug(
Expand Down
79 changes: 79 additions & 0 deletions tests/api/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,85 @@ async def test_schedule_does_not_overwrite_flow_labels(self, project_id, labels)
assert all([fr.labels == labels for fr in flow_runs[::2]])
assert all([fr.labels == ["bar", "foo"] for fr in flow_runs[1::2]])

async def test_doesnt_schedule_same_time_twice(self, project_id):
now = pendulum.now("UTC")
clock1 = prefect.schedules.clocks.IntervalClock(
start_date=now.add(minutes=1),
interval=datetime.timedelta(minutes=2),
)
clock2 = prefect.schedules.clocks.IntervalClock(
start_date=now.add(minutes=1),
interval=datetime.timedelta(minutes=2),
)

flow = prefect.Flow(
name="Test Scheduled Flow",
schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]),
)
flow.add_task(prefect.Parameter("x", default=1))
flow_id = await api.flows.create_flow(
project_id=project_id, serialized_flow=flow.serialize()
)
await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete()
assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10

flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get(
selection_set={"parameters": True, "scheduled_start_time": True},
order_by={"scheduled_start_time": EnumValue("asc")},
)

assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 10

@pytest.mark.parametrize(
"attrs",
[
[
dict(parameter_defaults=dict(x="a")),
dict(parameter_defaults=dict(x="b")),
],
[dict(parameter_defaults=dict(x="a")), dict(parameter_defaults=None)],
[dict(parameter_defaults=dict(x="a")), dict(labels=["b"])],
[dict(labels=["c", "d"]), dict(labels=["c"])],
[dict(labels=None), dict(labels=["ef"])],
[
dict(labels=None),
dict(labels=[]),
], # the scheduler should distinguish between none vs. empty
],
)
async def test_allows_for_same_time_if_event_attrs_are_different(
self, project_id, attrs
):
now = pendulum.now("UTC")
clock1 = prefect.schedules.clocks.IntervalClock(
start_date=now.add(minutes=1),
interval=datetime.timedelta(minutes=2),
**attrs[0],
)
clock2 = prefect.schedules.clocks.IntervalClock(
start_date=now.add(minutes=1),
interval=datetime.timedelta(minutes=2),
**attrs[1],
)

flow = prefect.Flow(
name="Test Scheduled Flow",
schedule=prefect.schedules.Schedule(clocks=[clock1, clock2]),
)
flow.add_task(prefect.Parameter("x", default=1))
flow_id = await api.flows.create_flow(
project_id=project_id, serialized_flow=flow.serialize()
)
await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).delete()
assert len(set((await api.flows.schedule_flow_runs(flow_id)))) == 10

flow_runs = await models.FlowRun.where({"flow_id": {"_eq": flow_id}}).get(
selection_set={"parameters": True, "scheduled_start_time": True},
order_by={"scheduled_start_time": EnumValue("asc")},
)

assert len(set([fr.scheduled_start_time for fr in flow_runs])) == 5


class TestScheduleRuns:
async def test_schedule_runs(self, flow_id):
Expand Down

0 comments on commit ac1a4c6

Please sign in to comment.