diff --git a/src/prefect/events/schemas.py b/src/prefect/events/schemas.py index 58a5879f4aa8..c6b0cea8f173 100644 --- a/src/prefect/events/schemas.py +++ b/src/prefect/events/schemas.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Iterable, List, Tuple, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast from uuid import UUID, uuid4 import pendulum @@ -25,6 +25,10 @@ def items(self) -> Iterable[Tuple[str, str]]: def __getitem__(self, label: str) -> str: return self.__root__[label] + def __setitem__(self, label: str, value: str) -> str: + self.__root__[label] = value + return value + class Resource(Labelled): """An observable business object of interest to the user""" @@ -113,6 +117,15 @@ class Event(PrefectBaseModel): default_factory=uuid4, description="The client-provided identifier of this event", ) + follows: Optional[UUID] = Field( + None, + description=( + "The ID of an event that is known to have occurred prior to this " + "one. If set, this may be used to establish a more precise " + "ordering of causally-related events when they occur close enough " + "together in time that the system may receive them out-of-order." + ), + ) @property def involved_resources(self) -> Iterable[Resource]: diff --git a/src/prefect/events/utilities.py b/src/prefect/events/utilities.py index eac21154f584..76cf5c8d6a4c 100644 --- a/src/prefect/events/utilities.py +++ b/src/prefect/events/utilities.py @@ -1,11 +1,16 @@ +from datetime import timedelta from typing import Any, Dict, List, Optional, Union from uuid import UUID +import pendulum + from prefect.server.utilities.schemas import DateTimeTZ from .schemas import Event, RelatedResource from .worker import EventsWorker +TIGHT_TIMING = timedelta(minutes=5) + def emit_event( event: str, @@ -14,7 +19,8 @@ def emit_event( related: Optional[Union[List[Dict[str, str]], List[RelatedResource]]] = None, payload: Optional[Dict[str, Any]] = None, id: Optional[UUID] = None, -) -> None: + follows: Optional[Event] = None, +) -> Event: """ Send an event to Prefect Cloud. @@ -27,14 +33,21 @@ def emit_event( payload: An open-ended set of data describing what happened. id: The sender-provided identifier for this event. Defaults to a random UUID. + follows: The event that preceded this one. If the preceding event + happened more than 5 minutes prior to this event the follows + relationship will not be set. + + Returns: + The event that was emitted. """ event_kwargs = { "event": event, "resource": resource, } - if occurred is not None: - event_kwargs["occurred"] = occurred + if occurred is None: + occurred = pendulum.now() + event_kwargs["occurred"] = occurred if related is not None: event_kwargs["related"] = related @@ -45,5 +58,11 @@ def emit_event( if id is not None: event_kwargs["id"] = id + if follows is not None: + if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING: + event_kwargs["follows"] = follows.id + event_obj = Event(**event_kwargs) EventsWorker.instance().send(event_obj) + + return event_obj diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index 97c05e63d8d5..afe0cceb8a85 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -12,7 +12,7 @@ from prefect._internal.compatibility.experimental import experimental from prefect.client.orchestration import PrefectClient, get_client from prefect.engine import propose_state -from prefect.events import emit_event +from prefect.events import Event, emit_event from prefect.events.related import object_as_related_resource, tags_as_related_resources from prefect.events.schemas import RelatedResource from prefect.exceptions import ( @@ -714,7 +714,7 @@ async def _submit_run_and_capture_errors( # TODO: Add functionality to handle base job configuration and # job configuration variables when kicking off a flow run configuration = await self._get_configuration(flow_run) - self._emit_flow_run_submitted_event(configuration) + submitted_event = self._emit_flow_run_submitted_event(configuration) result = await self.run( flow_run=flow_run, task_status=task_status, @@ -759,6 +759,8 @@ async def _submit_run_and_capture_errors( ), ) + self._emit_flow_run_executed_event(result, configuration, submitted_event) + return result def get_status(self): @@ -935,7 +937,9 @@ def _event_resource(self): "prefect.worker-type": self.type, } - def _emit_flow_run_submitted_event(self, configuration: BaseJobConfiguration): + def _emit_flow_run_submitted_event( + self, configuration: BaseJobConfiguration + ) -> Event: related = configuration._related_resources() if self._work_pool: @@ -945,8 +949,35 @@ def _emit_flow_run_submitted_event(self, configuration: BaseJobConfiguration): ) ) - emit_event( + return emit_event( event=f"prefect.worker.submitted-flow-run", resource=self._event_resource(), related=related, ) + + def _emit_flow_run_executed_event( + self, + result: BaseWorkerResult, + configuration: BaseJobConfiguration, + submitted_event: Event, + ): + related = configuration._related_resources() + + if self._work_pool: + related.append( + object_as_related_resource( + kind="work-pool", role="work-pool", object=self._work_pool + ) + ) + + for resource in related: + if resource.role == "flow-run": + resource["prefect.infrastructure.identifier"] = str(result.identifier) + resource["prefect.infrastructure.status-code"] = str(result.status_code) + + emit_event( + event=f"prefect.worker.executed-flow-run", + resource=self._event_resource(), + related=related, + follows=submitted_event, + ) diff --git a/tests/events/instrumentation/test_events_workers_instrumentation.py b/tests/events/instrumentation/test_events_workers_instrumentation.py index 3269913097a1..0096f9cb3b2b 100644 --- a/tests/events/instrumentation/test_events_workers_instrumentation.py +++ b/tests/events/instrumentation/test_events_workers_instrumentation.py @@ -6,7 +6,7 @@ from prefect.events.worker import EventsWorker from prefect.states import Scheduled from prefect.testing.utilities import AsyncMock -from prefect.workers.base import BaseJobConfiguration, BaseWorker +from prefect.workers.base import BaseJobConfiguration, BaseWorker, BaseWorkerResult class WorkerEventsTestImpl(BaseWorker): @@ -20,7 +20,7 @@ async def verify_submitted_deployment(self, deployment): pass -async def test_worker_calls_run_with_expected_arguments( +async def test_worker_emits_submitted_event( asserting_events_worker: EventsWorker, reset_worker_events, orion_client: PrefectClient, @@ -43,7 +43,11 @@ async def test_worker_calls_run_with_expected_arguments( asserting_events_worker.drain() assert isinstance(asserting_events_worker._client, AssertingEventsClient) - assert len(asserting_events_worker._client.events) == 1 + + # When a worker submits a flow-run it also monitors that flow run until it's + # complete. When it's complete it fires a second 'monitored' event, which + # is covered by the test_worker_emits_monitored_event below. + assert len(asserting_events_worker._client.events) == 2 submit_event = asserting_events_worker._client.events[0] assert submit_event.event == "prefect.worker.submitted-flow-run" @@ -89,3 +93,87 @@ async def test_worker_calls_run_with_expected_arguments( "prefect.resource.name": work_pool.name, }, ] + + +async def test_worker_emits_executed_event( + asserting_events_worker: EventsWorker, + reset_worker_events, + orion_client: PrefectClient, + worker_deployment_wq1, + work_pool, +): + flow_run = await orion_client.create_flow_run_from_deployment( + worker_deployment_wq1.id, + state=Scheduled(scheduled_time=pendulum.now("utc")), + tags=["flow-run-one"], + ) + + flow = await orion_client.read_flow(flow_run.flow_id) + + worker_result = BaseWorkerResult(status_code=1, identifier="process123") + run_flow_fn = AsyncMock(return_value=worker_result) + + async with WorkerEventsTestImpl(work_pool_name=work_pool.name) as worker: + worker._work_pool = work_pool + worker.run = run_flow_fn + await worker.get_and_submit_flow_runs() + + asserting_events_worker.drain() + + assert isinstance(asserting_events_worker._client, AssertingEventsClient) + + # When a worker submits a flow-run it also monitors that flow run until + # it's complete. When it's submits it fires a 'sumbitted' event, + # which is covered by the test_worker_emits_submitted_event above. + assert len(asserting_events_worker._client.events) == 2 + + submitted_event = asserting_events_worker._client.events[0] + executed_event = asserting_events_worker._client.events[1] + + assert executed_event.event == "prefect.worker.executed-flow-run" + + assert dict(executed_event.resource.items()) == { + "prefect.resource.id": f"prefect.worker.events-test.{worker.get_name_slug()}", + "prefect.resource.name": worker.name, + "prefect.version": str(__version__), + "prefect.worker-type": worker.type, + } + + assert len(executed_event.related) == 6 + + related = [dict(r.items()) for r in executed_event.related] + + assert related == [ + { + "prefect.resource.id": f"prefect.deployment.{worker_deployment_wq1.id}", + "prefect.resource.role": "deployment", + "prefect.resource.name": worker_deployment_wq1.name, + }, + { + "prefect.resource.id": f"prefect.flow.{flow.id}", + "prefect.resource.role": "flow", + "prefect.resource.name": flow.name, + }, + { + "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", + "prefect.resource.role": "flow-run", + "prefect.resource.name": flow_run.name, + "prefect.infrastructure.status-code": "1", + "prefect.infrastructure.identifier": "process123", + }, + { + "prefect.resource.id": "prefect.tag.flow-run-one", + "prefect.resource.role": "tag", + }, + { + "prefect.resource.id": "prefect.tag.test", + "prefect.resource.role": "tag", + }, + { + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.role": "work-pool", + "prefect.resource.name": work_pool.name, + }, + ] + + assert executed_event.follows == submitted_event.id diff --git a/tests/events/test_event_schemas.py b/tests/events/test_event_schemas.py index 3560c5dfffc4..e83a166d9bab 100644 --- a/tests/events/test_event_schemas.py +++ b/tests/events/test_event_schemas.py @@ -79,6 +79,7 @@ def test_json_representation(): ], payload={"hello": "world"}, id=uuid4(), + follows=uuid4(), ) jsonified = json.loads(event.json().encode()) @@ -94,6 +95,7 @@ def test_json_representation(): ], "payload": {"hello": "world"}, "id": str(event.id), + "follows": str(event.follows), } diff --git a/tests/events/test_events_emit_event.py b/tests/events/test_events_emit_event.py index 7e971c1214e1..898f47a9ddcb 100644 --- a/tests/events/test_events_emit_event.py +++ b/tests/events/test_events_emit_event.py @@ -1,6 +1,10 @@ +from datetime import timedelta from uuid import UUID +import pendulum + from prefect.events import emit_event +from prefect.events.clients import AssertingEventsClient from prefect.events.worker import EventsWorker from prefect.server.utilities.schemas import DateTimeTZ @@ -13,6 +17,7 @@ def test_emits_simple_event(asserting_events_worker: EventsWorker, reset_worker_ asserting_events_worker.drain() + assert isinstance(asserting_events_worker._client, AssertingEventsClient) assert len(asserting_events_worker._client.events) == 1 event = asserting_events_worker._client.events[0] assert event.event == "vogon.poetry.read" @@ -38,6 +43,7 @@ def test_emits_complex_event( asserting_events_worker.drain() + assert isinstance(asserting_events_worker._client, AssertingEventsClient) assert len(asserting_events_worker._client.events) == 1 event = asserting_events_worker._client.events[0] assert event.event == "vogon.poetry.read" @@ -48,3 +54,54 @@ def test_emits_complex_event( assert event.related[0].role == "locale" assert event.payload == {"text": "Oh freddled gruntbuggly..."} assert event.id == UUID(int=1) + + +def test_returns_event(asserting_events_worker: EventsWorker, reset_worker_events): + emitted_event = emit_event( + event="vogon.poetry.read", + resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"}, + ) + + asserting_events_worker.drain() + assert isinstance(asserting_events_worker._client, AssertingEventsClient) + assert len(asserting_events_worker._client.events) == 1 + assert emitted_event == asserting_events_worker._client.events[0] + + +def test_sets_follows_tight_timing( + asserting_events_worker: EventsWorker, reset_worker_events +): + destroyed_event = emit_event( + event="planet.destroyed", + resource={"prefect.resource.id": "milky-way.sol.earth"}, + ) + + read_event = emit_event( + event="vogon.poetry.read", + resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"}, + follows=destroyed_event, + ) + + asserting_events_worker.drain() + assert read_event.follows == destroyed_event.id + + +def test_does_not_set_follows_not_tight_timing( + asserting_events_worker: EventsWorker, reset_worker_events +): + destroyed_event = emit_event( + event="planet.destroyed", + occurred=pendulum.now() - timedelta(minutes=10), + resource={"prefect.resource.id": "milky-way.sol.earth"}, + ) + + # These events are more than 5m apart so the `follows` property of the + # emitted event shouldn't be set. + read_event = emit_event( + event="vogon.poetry.read", + resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"}, + follows=destroyed_event, + ) + + asserting_events_worker.drain() + assert read_event.follows == None