Skip to content

Commit

Permalink
Stop running all services in ephemeral app (#16120)
Browse files Browse the repository at this point in the history
Co-authored-by: nate nowack <[email protected]>
  • Loading branch information
cicdw and zzstoatzz authored Nov 26, 2024
1 parent 11eef8a commit 3da0f05
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 31 deletions.
65 changes: 34 additions & 31 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ def create_app(
"""
settings = settings or prefect.settings.get_current_settings()
cache_key = (settings.hash_key(), ephemeral)
ephemeral = ephemeral or bool(os.getenv("PREFECT__SERVER_EPHEMERAL"))

from prefect.logging.configuration import setup_logging

Expand Down Expand Up @@ -525,42 +526,11 @@ async def add_block_types():
async def start_services():
"""Start additional services when the Prefect REST API starts up."""

if ephemeral:
app.state.services = None
return

service_instances = []
if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
service_instances.append(services.scheduler.Scheduler())
service_instances.append(services.scheduler.RecentDeploymentsScheduler())

if prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED.value():
service_instances.append(services.late_runs.MarkLateRuns())

if prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED.value():
service_instances.append(services.pause_expirations.FailExpiredPauses())

if prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED.value():
service_instances.append(
services.cancellation_cleanup.CancellationCleanup()
)

if prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value():
service_instances.append(services.telemetry.Telemetry())

if prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value():
service_instances.append(
services.flow_run_notifications.FlowRunNotifications()
)

if prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED.value():
service_instances.append(services.foreman.Foreman())

if prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED.value():
service_instances.append(ReactiveTriggers())
service_instances.append(ProactiveTriggers())
service_instances.append(Actions())

if prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED:
service_instances.append(TaskRunRecorder())

Expand All @@ -570,6 +540,38 @@ async def start_services():
if prefect.settings.PREFECT_API_EVENTS_STREAM_OUT_ENABLED:
service_instances.append(stream.Distributor())

# don't run services in ephemeral mode
if not ephemeral:
if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
service_instances.append(services.scheduler.Scheduler())
service_instances.append(
services.scheduler.RecentDeploymentsScheduler()
)

if prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED.value():
service_instances.append(services.late_runs.MarkLateRuns())

if prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED.value():
service_instances.append(services.pause_expirations.FailExpiredPauses())

if prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED.value():
service_instances.append(
services.cancellation_cleanup.CancellationCleanup()
)

if prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value():
service_instances.append(
services.flow_run_notifications.FlowRunNotifications()
)

if prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED.value():
service_instances.append(services.foreman.Foreman())

if prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED.value():
service_instances.append(ReactiveTriggers())
service_instances.append(ProactiveTriggers())
service_instances.append(Actions())

loop = asyncio.get_running_loop()

app.state.services = {
Expand Down Expand Up @@ -828,6 +830,7 @@ def _run_uvicorn_command(self) -> subprocess.Popen:
# used to turn off serving the UI
server_env = {
"PREFECT_UI_ENABLED": "0",
"PREFECT__SERVER_EPHEMERAL": "1",
}
return subprocess.Popen(
args=[
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/services/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async def run_once(self):
"python_version": platform.python_version(),
"python_implementation": platform.python_implementation(),
"environment": self.telemetry_environment,
"ephemeral_server": bool(os.getenv("PREFECT__SERVER_EPHEMERAL", False)),
"api_version": SERVER_API_VERSION,
"prefect_version": prefect.__version__,
"session_id": self.session_id,
Expand Down

0 comments on commit 3da0f05

Please sign in to comment.