Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable remote pilot logging system #269

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions diracx-db/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB"

[project.entry-points."diracx.db.os"]
JobParametersDB = "diracx.db.os:JobParametersDB"
PilotLogsDB = "diracx.db.os:PilotLogsDB"

[tool.setuptools.packages.find]
where = ["src"]
Expand Down
6 changes: 5 additions & 1 deletion diracx-db/src/diracx/db/os/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

__all__ = ("JobParametersDB",)
__all__ = (
"JobParametersDB",
"PilotLogsDB",
)

from .job_parameters import JobParametersDB
from .pilot_logs import PilotLogsDB
21 changes: 21 additions & 0 deletions diracx-db/src/diracx/db/os/pilot_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import annotations

from diracx.db.os.utils import BaseOSDB


class PilotLogsDB(BaseOSDB):
fields = {
"PilotStamp": {"type": "keyword"},
"PilotID": {"type": "long"},
"SubmissionTime": {"type": "date"},
"LineNumber": {"type": "long"},
"Message": {"type": "text"},
"VO": {"type": "keyword"},
"timestamp": {"type": "date"},
}
index_prefix = "pilot_logs"

def index_name(self, doc_id: int) -> str:
# TODO decide how to define the index name
# use pilot ID
return f"{self.index_prefix}_{doc_id // 1e6:.0f}"
17 changes: 17 additions & 0 deletions diracx-db/src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Self

from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

from diracx.core.exceptions import InvalidQueryError
from diracx.core.extensions import select_from_extension
Expand Down Expand Up @@ -190,6 +191,13 @@ async def upsert(self, doc_id, document) -> None:
)
print(f"{response=}")

async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
# bulk inserting to database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# bulk inserting to database
"""bulk inserting to database."""

n_inserted = await async_bulk(
self.client, actions=[doc | {"_index": index_name} for doc in docs]
)
logger.info("Inserted %s documents to %s", n_inserted, index_name)

async def search(
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -231,6 +239,15 @@ async def search(

return hits

async def delete(self, query: list[dict[str, Any]]) -> None:

# Delete multiple documents by query.

Comment on lines +243 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Delete multiple documents by query.
"""Delete multiple documents by query."""

body = {}
if query:
body["query"] = apply_search_filters(self.fields, query)
await self.client.delete_by_query(body=body, index=f"{self.index_prefix}*")


def require_type(operator, field_name, field_type, allowed_types):
if field_type not in allowed_types:
Expand Down
2 changes: 2 additions & 0 deletions diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ types = [
]

[project.entry-points."diracx.services"]
pilots = "diracx.routers.pilots:router"
jobs = "diracx.routers.jobs:router"
config = "diracx.routers.configuration:router"
auth = "diracx.routers.auth:router"
Expand All @@ -56,6 +57,7 @@ auth = "diracx.routers.auth:router"
[project.entry-points."diracx.access_policies"]
WMSAccessPolicy = "diracx.routers.jobs.access_policies:WMSAccessPolicy"
SandboxAccessPolicy = "diracx.routers.jobs.access_policies:SandboxAccessPolicy"
PilotLogsAccessPolicy = "diracx.routers.pilots.access_policies:PilotLogsAccessPolicy"

# Minimum version of the client supported
[project.entry-points."diracx.min_client_version"]
Expand Down
8 changes: 5 additions & 3 deletions diracx-routers/src/diracx/routers/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"SandboxMetadataDB",
"TaskQueueDB",
"PilotAgentsDB",
"PilotLogsDB",
"add_settings_annotation",
"AvailableSecurityProperties",
)
Expand All @@ -21,6 +22,7 @@
from diracx.core.properties import SecurityProperty
from diracx.core.settings import DevelopmentSettings as _DevelopmentSettings
from diracx.db.os import JobParametersDB as _JobParametersDB
from diracx.db.os import PilotLogsDB as _PilotLogsDB
from diracx.db.sql import AuthDB as _AuthDB
from diracx.db.sql import JobDB as _JobDB
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
Expand All @@ -36,7 +38,7 @@ def add_settings_annotation(cls: T) -> T:
return Annotated[cls, Depends(cls.create)] # type: ignore


# Databases
# SQL Databases
AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)]
JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)]
JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)]
Expand All @@ -46,9 +48,9 @@ def add_settings_annotation(cls: T) -> T:
]
TaskQueueDB = Annotated[_TaskQueueDB, Depends(_TaskQueueDB.transaction)]

# Opensearch databases
# OpenSearch Databases
JobParametersDB = Annotated[_JobParametersDB, Depends(_JobParametersDB.session)]

PilotLogsDB = Annotated[_PilotLogsDB, Depends(_PilotLogsDB.session)]

# Miscellaneous
Config = Annotated[_Config, Depends(ConfigSource.create)]
Expand Down
11 changes: 11 additions & 0 deletions diracx-routers/src/diracx/routers/pilots/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

from logging import getLogger

from ..fastapi_classes import DiracxRouter
from .logging import router as logging_router

logger = getLogger(__name__)

router = DiracxRouter()
router.include_router(logging_router)
63 changes: 63 additions & 0 deletions diracx-routers/src/diracx/routers/pilots/access_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from enum import StrEnum, auto
from typing import Annotated, Callable

from fastapi import Depends, HTTPException, status

from diracx.core.properties import (
GENERIC_PILOT,
NORMAL_USER,
OPERATOR,
PILOT,
SERVICE_ADMINISTRATOR,
)
from diracx.routers.access_policies import BaseAccessPolicy

from ..utils.users import AuthorizedUserInfo


class ActionType(StrEnum):
#: Create/update pilot log records
CREATE = auto()
#: delete pilot logs
DELETE = auto()
#: Search
QUERY = auto()


class PilotLogsAccessPolicy(BaseAccessPolicy):
"""Rules:
Only PILOT, GENERIC_PILOT, SERVICE_ADMINISTRATOR and OPERATOR can process log records.
Policies for other actions to be determined.
"""

@staticmethod
async def policy(
policy_name: str,
user_info: AuthorizedUserInfo,
/,
*,
action: ActionType | None = None,
):

if action is None:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, detail="Action is a mandatory argument"
)

if GENERIC_PILOT in user_info.properties and action == ActionType.CREATE:
return user_info
if PILOT in user_info.properties and action == ActionType.CREATE:
return user_info
if NORMAL_USER in user_info.properties and action == ActionType.QUERY:
return user_info
if SERVICE_ADMINISTRATOR in user_info.properties:
return user_info
if OPERATOR in user_info.properties:
return user_info

raise HTTPException(status.HTTP_403_FORBIDDEN, detail=user_info.properties)


CheckPilotLogsPolicyCallable = Annotated[Callable, Depends(PilotLogsAccessPolicy.check)]
157 changes: 157 additions & 0 deletions diracx-routers/src/diracx/routers/pilots/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

import datetime
import logging

from fastapi import HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.exc import NoResultFound

from diracx.core.exceptions import InvalidQueryError
from diracx.core.properties import OPERATOR, SERVICE_ADMINISTRATOR
from diracx.db.sql.pilot_agents.schema import PilotAgents
from diracx.db.sql.utils import BaseSQLDB

from ..dependencies import PilotLogsDB
from ..fastapi_classes import DiracxRouter
from ..utils.users import AuthorizedUserInfo
from .access_policies import ActionType, CheckPilotLogsPolicyCallable

logger = logging.getLogger(__name__)
router = DiracxRouter()


class LogLine(BaseModel):
line_no: int
line: str


class LogMessage(BaseModel):
pilot_stamp: str
lines: list[LogLine]
vo: str


class DateRange(BaseModel):
min: str | None = None # expects a string in ISO 8601 ("%Y-%m-%dT%H:%M:%S.%f%z")
max: str | None = None # expects a string in ISO 8601 ("%Y-%m-%dT%H:%M:%S.%f%z")


@router.post("/")
async def send_message(
data: LogMessage,
pilot_logs_db: PilotLogsDB,
check_permissions: CheckPilotLogsPolicyCallable,
) -> int:

logger.warning(f"Message received '{data}'")
user_info = await check_permissions(action=ActionType.CREATE)
pilot_id = 0 # need to get pilot id from pilot_stamp (via PilotAgentsDB)
# also add a timestamp to be able to select and delete logs based on pilot creation dates, even if corresponding
# pilots have been already deleted from PilotAgentsDB (so the logs can live longer than pilots).
submission_time = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
pilot_agents_db = BaseSQLDB.available_implementations("PilotAgentsDB")[0]
url = BaseSQLDB.available_urls()["PilotAgentsDB"]
db = pilot_agents_db(url)

try:
async with db.engine_context():
async with db:
stmt = select(PilotAgents.pilot_id, PilotAgents.submission_time).where(
PilotAgents.pilot_stamp == data.pilot_stamp
)
pilot_id, submission_time = (await db.conn.execute(stmt)).one()
except NoResultFound as exc:
logger.error(
f"Cannot determine PilotID for requested PilotStamp: {data.pilot_stamp}, Error: {exc}."
)
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

docs = []
for line in data.lines:
docs.append(
{
"PilotStamp": data.pilot_stamp,
"PilotID": pilot_id,
"SubmissionTime": submission_time,
"VO": user_info.vo,
"LineNumber": line.line_no,
"Message": line.line,
}
)
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(pilot_id), docs)
return pilot_id


@router.get("/logs")
async def get_logs(
pilot_id: int,
db: PilotLogsDB,
check_permissions: CheckPilotLogsPolicyCallable,
) -> list[dict]:

logger.warning(f"Retrieving logs for pilot ID '{pilot_id}'")
user_info = await check_permissions(action=ActionType.QUERY)

# here, users with privileged properties will see logs from all VOs. Is it what we want ?
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]
if _non_privileged(user_info):
search_params.append(
{"parameter": "VO", "operator": "eq", "value": user_info.vo}
)
result = await db.search(
["Message"],
search_params,
[{"parameter": "LineNumber", "direction": "asc"}],
)
if not result:
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]
return result


@router.delete("/logs")
async def delete(
pilot_id: int,
data: DateRange,
db: PilotLogsDB,
check_permissions: CheckPilotLogsPolicyCallable,
) -> str:
"""Delete either logs for a specific PilotID or a creation date range.
Non-privileged users can only delete log files within their own VO.
"""
message = "no-op"
user_info = await check_permissions(action=ActionType.DELETE)
non_privil_params = {"parameter": "VO", "operator": "eq", "value": user_info.vo}

# id pilot_id is provided we ignore data.min and data.max
if data.min and data.max and not pilot_id:
raise InvalidQueryError(
"This query requires a range operator definition in DiracX"
)

if pilot_id:
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]
if _non_privileged(user_info):
search_params.append(non_privil_params)
await db.delete(search_params)
message = f"Logs for pilot ID '{pilot_id}' successfully deleted"

elif data.min:
logger.warning(f"Deleting logs for pilots with submission data >='{data.min}'")
search_params = [
{"parameter": "SubmissionTime", "operator": "gt", "value": data.min}
]
if _non_privileged(user_info):
search_params.append(non_privil_params)
await db.delete(search_params)
message = f"Logs for for pilots with submission data >='{data.min}' successfully deleted"

return message


def _non_privileged(user_info: AuthorizedUserInfo):
return (
SERVICE_ADMINISTRATOR not in user_info.properties
and OPERATOR not in user_info.properties
)
Loading
Loading