Skip to content

Commit

Permalink
Add worker.executed-flow-run event (PrefectHQ#9227)
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald authored Apr 17, 2023
1 parent b7c35f2 commit 9abbc54
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 11 deletions.
15 changes: 14 additions & 1 deletion src/prefect/events/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterable, List, Tuple, cast
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast
from uuid import UUID, uuid4

import pendulum
Expand All @@ -25,6 +25,10 @@ def items(self) -> Iterable[Tuple[str, str]]:
def __getitem__(self, label: str) -> str:
return self.__root__[label]

def __setitem__(self, label: str, value: str) -> str:
self.__root__[label] = value
return value


class Resource(Labelled):
"""An observable business object of interest to the user"""
Expand Down Expand Up @@ -113,6 +117,15 @@ class Event(PrefectBaseModel):
default_factory=uuid4,
description="The client-provided identifier of this event",
)
follows: Optional[UUID] = Field(
None,
description=(
"The ID of an event that is known to have occurred prior to this "
"one. If set, this may be used to establish a more precise "
"ordering of causally-related events when they occur close enough "
"together in time that the system may receive them out-of-order."
),
)

@property
def involved_resources(self) -> Iterable[Resource]:
Expand Down
25 changes: 22 additions & 3 deletions src/prefect/events/utilities.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import pendulum

from prefect.server.utilities.schemas import DateTimeTZ

from .schemas import Event, RelatedResource
from .worker import EventsWorker

TIGHT_TIMING = timedelta(minutes=5)


def emit_event(
event: str,
Expand All @@ -14,7 +19,8 @@ def emit_event(
related: Optional[Union[List[Dict[str, str]], List[RelatedResource]]] = None,
payload: Optional[Dict[str, Any]] = None,
id: Optional[UUID] = None,
) -> None:
follows: Optional[Event] = None,
) -> Event:
"""
Send an event to Prefect Cloud.
Expand All @@ -27,14 +33,21 @@ def emit_event(
payload: An open-ended set of data describing what happened.
id: The sender-provided identifier for this event. Defaults to a random
UUID.
follows: The event that preceded this one. If the preceding event
happened more than 5 minutes prior to this event the follows
relationship will not be set.
Returns:
The event that was emitted.
"""
event_kwargs = {
"event": event,
"resource": resource,
}

if occurred is not None:
event_kwargs["occurred"] = occurred
if occurred is None:
occurred = pendulum.now()
event_kwargs["occurred"] = occurred

if related is not None:
event_kwargs["related"] = related
Expand All @@ -45,5 +58,11 @@ def emit_event(
if id is not None:
event_kwargs["id"] = id

if follows is not None:
if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING:
event_kwargs["follows"] = follows.id

event_obj = Event(**event_kwargs)
EventsWorker.instance().send(event_obj)

return event_obj
39 changes: 35 additions & 4 deletions src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from prefect._internal.compatibility.experimental import experimental
from prefect.client.orchestration import PrefectClient, get_client
from prefect.engine import propose_state
from prefect.events import emit_event
from prefect.events import Event, emit_event
from prefect.events.related import object_as_related_resource, tags_as_related_resources
from prefect.events.schemas import RelatedResource
from prefect.exceptions import (
Expand Down Expand Up @@ -714,7 +714,7 @@ async def _submit_run_and_capture_errors(
# TODO: Add functionality to handle base job configuration and
# job configuration variables when kicking off a flow run
configuration = await self._get_configuration(flow_run)
self._emit_flow_run_submitted_event(configuration)
submitted_event = self._emit_flow_run_submitted_event(configuration)
result = await self.run(
flow_run=flow_run,
task_status=task_status,
Expand Down Expand Up @@ -759,6 +759,8 @@ async def _submit_run_and_capture_errors(
),
)

self._emit_flow_run_executed_event(result, configuration, submitted_event)

return result

def get_status(self):
Expand Down Expand Up @@ -935,7 +937,9 @@ def _event_resource(self):
"prefect.worker-type": self.type,
}

def _emit_flow_run_submitted_event(self, configuration: BaseJobConfiguration):
def _emit_flow_run_submitted_event(
self, configuration: BaseJobConfiguration
) -> Event:
related = configuration._related_resources()

if self._work_pool:
Expand All @@ -945,8 +949,35 @@ def _emit_flow_run_submitted_event(self, configuration: BaseJobConfiguration):
)
)

emit_event(
return emit_event(
event=f"prefect.worker.submitted-flow-run",
resource=self._event_resource(),
related=related,
)

def _emit_flow_run_executed_event(
self,
result: BaseWorkerResult,
configuration: BaseJobConfiguration,
submitted_event: Event,
):
related = configuration._related_resources()

if self._work_pool:
related.append(
object_as_related_resource(
kind="work-pool", role="work-pool", object=self._work_pool
)
)

for resource in related:
if resource.role == "flow-run":
resource["prefect.infrastructure.identifier"] = str(result.identifier)
resource["prefect.infrastructure.status-code"] = str(result.status_code)

emit_event(
event=f"prefect.worker.executed-flow-run",
resource=self._event_resource(),
related=related,
follows=submitted_event,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from prefect.events.worker import EventsWorker
from prefect.states import Scheduled
from prefect.testing.utilities import AsyncMock
from prefect.workers.base import BaseJobConfiguration, BaseWorker
from prefect.workers.base import BaseJobConfiguration, BaseWorker, BaseWorkerResult


class WorkerEventsTestImpl(BaseWorker):
Expand All @@ -20,7 +20,7 @@ async def verify_submitted_deployment(self, deployment):
pass


async def test_worker_calls_run_with_expected_arguments(
async def test_worker_emits_submitted_event(
asserting_events_worker: EventsWorker,
reset_worker_events,
orion_client: PrefectClient,
Expand All @@ -43,7 +43,11 @@ async def test_worker_calls_run_with_expected_arguments(
asserting_events_worker.drain()

assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(asserting_events_worker._client.events) == 1

# When a worker submits a flow-run it also monitors that flow run until it's
# complete. When it's complete it fires a second 'monitored' event, which
# is covered by the test_worker_emits_monitored_event below.
assert len(asserting_events_worker._client.events) == 2

submit_event = asserting_events_worker._client.events[0]
assert submit_event.event == "prefect.worker.submitted-flow-run"
Expand Down Expand Up @@ -89,3 +93,87 @@ async def test_worker_calls_run_with_expected_arguments(
"prefect.resource.name": work_pool.name,
},
]


async def test_worker_emits_executed_event(
asserting_events_worker: EventsWorker,
reset_worker_events,
orion_client: PrefectClient,
worker_deployment_wq1,
work_pool,
):
flow_run = await orion_client.create_flow_run_from_deployment(
worker_deployment_wq1.id,
state=Scheduled(scheduled_time=pendulum.now("utc")),
tags=["flow-run-one"],
)

flow = await orion_client.read_flow(flow_run.flow_id)

worker_result = BaseWorkerResult(status_code=1, identifier="process123")
run_flow_fn = AsyncMock(return_value=worker_result)

async with WorkerEventsTestImpl(work_pool_name=work_pool.name) as worker:
worker._work_pool = work_pool
worker.run = run_flow_fn
await worker.get_and_submit_flow_runs()

asserting_events_worker.drain()

assert isinstance(asserting_events_worker._client, AssertingEventsClient)

# When a worker submits a flow-run it also monitors that flow run until
# it's complete. When it's submits it fires a 'sumbitted' event,
# which is covered by the test_worker_emits_submitted_event above.
assert len(asserting_events_worker._client.events) == 2

submitted_event = asserting_events_worker._client.events[0]
executed_event = asserting_events_worker._client.events[1]

assert executed_event.event == "prefect.worker.executed-flow-run"

assert dict(executed_event.resource.items()) == {
"prefect.resource.id": f"prefect.worker.events-test.{worker.get_name_slug()}",
"prefect.resource.name": worker.name,
"prefect.version": str(__version__),
"prefect.worker-type": worker.type,
}

assert len(executed_event.related) == 6

related = [dict(r.items()) for r in executed_event.related]

assert related == [
{
"prefect.resource.id": f"prefect.deployment.{worker_deployment_wq1.id}",
"prefect.resource.role": "deployment",
"prefect.resource.name": worker_deployment_wq1.name,
},
{
"prefect.resource.id": f"prefect.flow.{flow.id}",
"prefect.resource.role": "flow",
"prefect.resource.name": flow.name,
},
{
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.role": "flow-run",
"prefect.resource.name": flow_run.name,
"prefect.infrastructure.status-code": "1",
"prefect.infrastructure.identifier": "process123",
},
{
"prefect.resource.id": "prefect.tag.flow-run-one",
"prefect.resource.role": "tag",
},
{
"prefect.resource.id": "prefect.tag.test",
"prefect.resource.role": "tag",
},
{
"prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
"prefect.resource.role": "work-pool",
"prefect.resource.name": work_pool.name,
},
]

assert executed_event.follows == submitted_event.id
2 changes: 2 additions & 0 deletions tests/events/test_event_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def test_json_representation():
],
payload={"hello": "world"},
id=uuid4(),
follows=uuid4(),
)

jsonified = json.loads(event.json().encode())
Expand All @@ -94,6 +95,7 @@ def test_json_representation():
],
"payload": {"hello": "world"},
"id": str(event.id),
"follows": str(event.follows),
}


Expand Down
57 changes: 57 additions & 0 deletions tests/events/test_events_emit_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import timedelta
from uuid import UUID

import pendulum

from prefect.events import emit_event
from prefect.events.clients import AssertingEventsClient
from prefect.events.worker import EventsWorker
from prefect.server.utilities.schemas import DateTimeTZ

Expand All @@ -13,6 +17,7 @@ def test_emits_simple_event(asserting_events_worker: EventsWorker, reset_worker_

asserting_events_worker.drain()

assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(asserting_events_worker._client.events) == 1
event = asserting_events_worker._client.events[0]
assert event.event == "vogon.poetry.read"
Expand All @@ -38,6 +43,7 @@ def test_emits_complex_event(

asserting_events_worker.drain()

assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(asserting_events_worker._client.events) == 1
event = asserting_events_worker._client.events[0]
assert event.event == "vogon.poetry.read"
Expand All @@ -48,3 +54,54 @@ def test_emits_complex_event(
assert event.related[0].role == "locale"
assert event.payload == {"text": "Oh freddled gruntbuggly..."}
assert event.id == UUID(int=1)


def test_returns_event(asserting_events_worker: EventsWorker, reset_worker_events):
emitted_event = emit_event(
event="vogon.poetry.read",
resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"},
)

asserting_events_worker.drain()
assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(asserting_events_worker._client.events) == 1
assert emitted_event == asserting_events_worker._client.events[0]


def test_sets_follows_tight_timing(
asserting_events_worker: EventsWorker, reset_worker_events
):
destroyed_event = emit_event(
event="planet.destroyed",
resource={"prefect.resource.id": "milky-way.sol.earth"},
)

read_event = emit_event(
event="vogon.poetry.read",
resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"},
follows=destroyed_event,
)

asserting_events_worker.drain()
assert read_event.follows == destroyed_event.id


def test_does_not_set_follows_not_tight_timing(
asserting_events_worker: EventsWorker, reset_worker_events
):
destroyed_event = emit_event(
event="planet.destroyed",
occurred=pendulum.now() - timedelta(minutes=10),
resource={"prefect.resource.id": "milky-way.sol.earth"},
)

# These events are more than 5m apart so the `follows` property of the
# emitted event shouldn't be set.
read_event = emit_event(
event="vogon.poetry.read",
resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"},
follows=destroyed_event,
)

asserting_events_worker.drain()
assert read_event.follows == None

0 comments on commit 9abbc54

Please sign in to comment.