Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: 🎨 Enh/api server error handling #5233

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
from starlette.responses import JSONResponse

from ...services.log_streaming import (
LogDistributionBaseException,
LogStreamerNotRegistered,
LogStreamerRegistionConflict,
LogDistributionBaseError,
LogStreamerNotRegisteredError,
LogStreamerRegistionConflictError,
)
from .http_error import create_error_json_response


async def log_handling_error_handler(
_: Request, exc: LogDistributionBaseException
_: Request, exc: LogDistributionBaseError
) -> JSONResponse:
msg = f"{exc}"
status_code: int = 500
if isinstance(exc, LogStreamerNotRegistered):
if isinstance(exc, LogStreamerNotRegisteredError):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
elif isinstance(exc, LogStreamerRegistionConflict):
elif isinstance(exc, LogStreamerRegistionConflictError):
status_code = status.HTTP_409_CONFLICT

return create_error_json_response(msg, status_code=status_code)
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def upload_file(
request: Request,
file: Annotated[UploadFile, FileParam(...)],
user_id: Annotated[int, Depends(get_current_user_id)],
content_length: str | None = Header(None), # noqa: B008
content_length: str | None = Header(None),
):
"""Uploads a single file to the system"""
# TODO: For the moment we upload file here and re-upload to S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _check_response_headers(
"application/x-ndjson",
"application/json",
} # nosec
headers: dict = dict()
headers: dict = {}
headers[b"content-type"] = b"application/x-ndjson"
return list(headers.items())

Expand All @@ -36,7 +36,8 @@ def is_last_response(response_headers: dict[bytes, bytes], message: dict[str, An
return True
if (more_body := message.get("more_body")) is not None:
return not more_body
raise RuntimeError("Could not determine if last response")
msg = "Could not determine if last response"
raise RuntimeError(msg)


class ApiServerProfilerMiddleware:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
from simcore_service_api_server.api.errors.log_handling_error import (
log_handling_error_handler,
)
from simcore_service_api_server.services.log_streaming import (
LogDistributionBaseException,
)
from simcore_service_api_server.services.log_streaming import LogDistributionBaseError
from starlette import status
from starlette.exceptions import HTTPException

Expand Down Expand Up @@ -108,7 +106,7 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI:
app.add_exception_handler(HTTPException, http_error_handler)
app.add_exception_handler(RequestValidationError, http422_error_handler)
app.add_exception_handler(HTTPStatusError, httpx_client_error_handler)
app.add_exception_handler(LogDistributionBaseException, log_handling_error_handler)
app.add_exception_handler(LogDistributionBaseError, log_handling_error_handler)
app.add_exception_handler(CustomBaseError, custom_error_handler)

# SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from enum import auto

import httpx
from models_library.utils.enums import StrAutoEnum
from pydantic.errors import PydanticErrorMixin
from servicelib.fastapi.httpx_utils import to_httpx_command


class _BaseAppError(PydanticErrorMixin, ValueError):
@classmethod
def get_full_class_name(cls) -> str:
# Can be used as unique code identifier
return f"{cls.__module__}.{cls.__name__}"


class BackendEnum(StrAutoEnum):
CATALOG = auto()
DIRECTOR = auto()
STORAGE = auto()
WEBSERVER = auto()


class BackendServiceError(_BaseAppError):
http_status_error: httpx.HTTPStatusError | None = None
service: BackendEnum

msg_template = "{service} error"

@classmethod
def from_httpx_status_error(
cls, error: httpx.HTTPStatusError, **ctx
) -> "BackendServiceError":
return cls(http_status_error=error, service=cls.service, **ctx)

def get_debug_message(self) -> str:
msg = f"{self}"
if http_status_error := getattr(self, "http_status_error", None):
resp = http_status_error.response
# request
msg += f"\n\t'{to_httpx_command(http_status_error.request)}'"
# response
msg += f"\n\t'{resp.text}'"
# status, latency
msg += f"\n\t{resp.status_code}, {resp.elapsed.total_seconds()*1E6}us"
return msg


class DirectorError(BackendServiceError):
service = BackendEnum.DIRECTOR


class WebServerError(BackendServiceError):
service = BackendEnum.WEBSERVER
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import logging
from contextlib import contextmanager
from typing import Any, ClassVar
from typing import Any, ClassVar, TypeAlias
from uuid import UUID

import httpx
from fastapi import FastAPI
from fastapi.exceptions import HTTPException
from httpx import HTTPStatusError, codes
from httpx import codes
from models_library.clusters import ClusterID
from models_library.projects_nodes import NodeID
from models_library.projects_pipeline import ComputationTask
from models_library.projects_state import RunningState
from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, PositiveInt, parse_raw_as
from servicelib.error_codes import create_error_code
from servicelib.fastapi.httpx_utils import to_httpx_command
from simcore_service_api_server.db.repositories.groups_extra_properties import (
GroupsExtraPropertiesRepository,
)
from starlette import status

from ..core.errors import DirectorError
from ..core.settings import DirectorV2Settings
from ..models.schemas.jobs import PercentageInt
from ..utils.client_base import BaseServiceClientApi, setup_client_instance

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


# API MODELS ---------------------------------------------
Expand Down Expand Up @@ -59,43 +63,78 @@ class TaskLogFileGet(BaseModel):
)


NodeName = str
DownloadLink = AnyUrl

# API CLASS ---------------------------------------------
NodeName: TypeAlias = str
DownloadLink: TypeAlias = AnyUrl


@contextmanager
def _handle_errors_context(project_id: UUID):
try:
yield

# except ValidationError
except HTTPStatusError as err:
msg = (
f"Failed {err.request.url} with status={err.response.status_code}: {err.response.json()}",
)
except httpx.HTTPStatusError as err:
if codes.is_client_error(err.response.status_code):
# client errors are mapped
logger.debug(msg)
if err.response.status_code == status.HTTP_404_NOT_FOUND:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job {project_id} not found",
) from err

raise err
# FIXME: what to do with these
raise DirectorError.from_httpx_status_error(err) from err

else:
# server errors are logged and re-raised as 503
assert codes.is_server_error(err.response.status_code) # nosec

oec = create_error_code(err)
err_detail = (
f"Service handling job '{project_id}' unexpectedly failed [{oec}]"
)
_logger.exception(
"%s: %s",
err_detail,
DirectorError.from_httpx_status_error(err).get_debug_message(),
extra={"error_code": oec},
)

raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=err_detail,
) from err

# server errors are logged and re-raised as 503
assert codes.is_server_error(err.response.status_code) # nosec
except httpx.TimeoutException as err:
# TODO: refer resource?
oec = create_error_code(err)
err_detail = (
f"Service handling job operation on '{project_id}' timed out [{oec}]"
)
_logger.exception(
"%s: %s",
err_detail,
to_httpx_command(err.request),
extra={"error_code": oec},
)

logger.exception(
"director-v2 service failed: %s. Re-rasing as service unavailable (503)",
msg,
# SEE https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=err_detail,
) from err

except httpx.HTTPError as err:
oec = create_error_code(err)
err_detail = f"Unexpected error while processing job '{project_id}' [{oec}]"
_logger.exception(
"%s: %s",
err_detail,
to_httpx_command(err.request),
extra={"error_code": oec},
)

raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Director service failed",
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=err_detail,
) from err


Expand All @@ -106,17 +145,18 @@ async def create_computation(
user_id: PositiveInt,
product_name: str,
) -> ComputationTaskGet:
response = await self.client.post(
"/v2/computations",
json={
"user_id": user_id,
"project_id": str(project_id),
"start_pipeline": False,
"product_name": product_name,
},
)
response.raise_for_status()
return ComputationTaskGet(**response.json())
with _handle_errors_context(project_id):
response = await self.client.post(
"/v2/computations",
json={
"user_id": user_id,
"project_id": str(project_id),
"start_pipeline": False,
"product_name": product_name,
},
)
response.raise_for_status()
return ComputationTaskGet(**response.json())

async def start_computation(
self,
Expand All @@ -128,7 +168,6 @@ async def start_computation(
) -> ComputationTaskGet:
with _handle_errors_context(project_id):
extras = {}

use_on_demand_clusters = (
await groups_extra_properties_repository.use_on_demand_clusters(
user_id, product_name
Expand All @@ -155,56 +194,61 @@ async def start_computation(
async def get_computation(
self, project_id: UUID, user_id: PositiveInt
) -> ComputationTaskGet:
response = await self.client.get(
f"/v2/computations/{project_id}",
params={
"user_id": user_id,
},
)
response.raise_for_status()
return ComputationTaskGet(**response.json())
with _handle_errors_context(project_id):
response = await self.client.get(
f"/v2/computations/{project_id}",
params={
"user_id": user_id,
},
)
response.raise_for_status()

return ComputationTaskGet(**response.json())

async def stop_computation(
self, project_id: UUID, user_id: PositiveInt
) -> ComputationTaskGet:
response = await self.client.post(
f"/v2/computations/{project_id}:stop",
json={
"user_id": user_id,
},
)
with _handle_errors_context(project_id):
response = await self.client.post(
f"/v2/computations/{project_id}:stop",
json={
"user_id": user_id,
},
)

return ComputationTaskGet(**response.json())
return ComputationTaskGet(**response.json())

async def delete_computation(self, project_id: UUID, user_id: PositiveInt):
await self.client.request(
"DELETE",
f"/v2/computations/{project_id}",
json={
"user_id": user_id,
"force": True,
},
)
with _handle_errors_context(project_id):
await self.client.request(
"DELETE",
f"/v2/computations/{project_id}",
json={
"user_id": user_id,
"force": True,
},
)

async def get_computation_logs(
self, user_id: PositiveInt, project_id: UUID
) -> dict[NodeName, DownloadLink]:
response = await self.client.get(
f"/v2/computations/{project_id}/tasks/-/logfile",
params={
"user_id": user_id,
},
)
with _handle_errors_context(project_id):
response = await self.client.get(
f"/v2/computations/{project_id}/tasks/-/logfile",
params={
"user_id": user_id,
},
)

# probably not found
response.raise_for_status()
# probably not found
response.raise_for_status()

node_to_links: dict[NodeName, DownloadLink] = {}
for r in parse_raw_as(list[TaskLogFileGet], response.text or "[]"):
if r.download_link:
node_to_links[f"{r.task_id}"] = r.download_link
node_to_links: dict[NodeName, DownloadLink] = {}
for r in parse_raw_as(list[TaskLogFileGet], response.text or "[]"):
if r.download_link:
node_to_links[f"{r.task_id}"] = r.download_link

return node_to_links
return node_to_links


# MODULES APP SETUP -------------------------------------------------------------
Expand Down
Loading
Loading