-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable remote pilot logging system #269
Open
martynia
wants to merge
10
commits into
DIRACGrid:main
Choose a base branch
from
martynia:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+457
−4
Open
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
8da8e2b
feat: enable remote pilot logging system
martynia e2d0bb3
test: add a policy test
martynia 02622b0
feat: add remote_logger router test
martynia 050dc97
feat: implement bulk insert
martynia fb3dbb9
fix: fix multiple inheritance.
martynia 0c1ecde
fix: remove refs to DB in check_permissions
martynia f02efba
fix: fix pyproject.toml
martynia e9ad537
fix: refactoring pilot logging code
martynia 752143c
fix: refactor pilot logging code (2)
martynia 1774fb4
fix: fix test after rebasing
martynia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,9 @@ | ||
from __future__ import annotations | ||
|
||
__all__ = ("JobParametersDB",) | ||
__all__ = ( | ||
"JobParametersDB", | ||
"PilotLogsDB", | ||
) | ||
|
||
from .job_parameters import JobParametersDB | ||
from .pilot_logs import PilotLogsDB |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from __future__ import annotations | ||
|
||
from diracx.db.os.utils import BaseOSDB | ||
|
||
|
||
class PilotLogsDB(BaseOSDB): | ||
fields = { | ||
"PilotStamp": {"type": "keyword"}, | ||
"PilotID": {"type": "long"}, | ||
"SubmissionTime": {"type": "date"}, | ||
"LineNumber": {"type": "long"}, | ||
"Message": {"type": "text"}, | ||
"VO": {"type": "keyword"}, | ||
"timestamp": {"type": "date"}, | ||
} | ||
index_prefix = "pilot_logs" | ||
|
||
def index_name(self, doc_id: int) -> str: | ||
# TODO decide how to define the index name | ||
# use pilot ID | ||
return f"{self.index_prefix}_{doc_id // 1e6:.0f}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ | |||||||
from typing import Any, Self | ||||||||
|
||||||||
from opensearchpy import AsyncOpenSearch | ||||||||
from opensearchpy.helpers import async_bulk | ||||||||
|
||||||||
from diracx.core.exceptions import InvalidQueryError | ||||||||
from diracx.core.extensions import select_from_extension | ||||||||
|
@@ -190,6 +191,13 @@ async def upsert(self, doc_id, document) -> None: | |||||||
) | ||||||||
print(f"{response=}") | ||||||||
|
||||||||
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None: | ||||||||
# bulk inserting to database | ||||||||
n_inserted = await async_bulk( | ||||||||
self.client, actions=[doc | {"_index": index_name} for doc in docs] | ||||||||
) | ||||||||
logger.info("Inserted %s documents to %s", n_inserted, index_name) | ||||||||
|
||||||||
async def search( | ||||||||
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None | ||||||||
) -> list[dict[str, Any]]: | ||||||||
|
@@ -231,6 +239,15 @@ async def search( | |||||||
|
||||||||
return hits | ||||||||
|
||||||||
async def delete(self, query: list[dict[str, Any]]) -> None: | ||||||||
|
||||||||
# Delete multiple documents by query. | ||||||||
|
||||||||
Comment on lines
+243
to
+245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
body = {} | ||||||||
if query: | ||||||||
body["query"] = apply_search_filters(self.fields, query) | ||||||||
await self.client.delete_by_query(body=body, index=f"{self.index_prefix}*") | ||||||||
|
||||||||
|
||||||||
def require_type(operator, field_name, field_type, allowed_types): | ||||||||
if field_type not in allowed_types: | ||||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from __future__ import annotations | ||
|
||
from logging import getLogger | ||
|
||
from ..fastapi_classes import DiracxRouter | ||
from .logging import router as logging_router | ||
|
||
logger = getLogger(__name__) | ||
|
||
router = DiracxRouter() | ||
router.include_router(logging_router) |
63 changes: 63 additions & 0 deletions
63
diracx-routers/src/diracx/routers/pilots/access_policies.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from __future__ import annotations | ||
|
||
from enum import StrEnum, auto | ||
from typing import Annotated, Callable | ||
|
||
from fastapi import Depends, HTTPException, status | ||
|
||
from diracx.core.properties import ( | ||
GENERIC_PILOT, | ||
NORMAL_USER, | ||
OPERATOR, | ||
PILOT, | ||
SERVICE_ADMINISTRATOR, | ||
) | ||
from diracx.routers.access_policies import BaseAccessPolicy | ||
|
||
from ..utils.users import AuthorizedUserInfo | ||
|
||
|
||
class ActionType(StrEnum): | ||
#: Create/update pilot log records | ||
CREATE = auto() | ||
#: delete pilot logs | ||
DELETE = auto() | ||
#: Search | ||
QUERY = auto() | ||
|
||
|
||
class PilotLogsAccessPolicy(BaseAccessPolicy): | ||
"""Rules: | ||
Only PILOT, GENERIC_PILOT, SERVICE_ADMINISTRATOR and OPERATOR can process log records. | ||
Policies for other actions to be determined. | ||
""" | ||
|
||
@staticmethod | ||
async def policy( | ||
policy_name: str, | ||
user_info: AuthorizedUserInfo, | ||
/, | ||
*, | ||
action: ActionType | None = None, | ||
): | ||
|
||
if action is None: | ||
raise HTTPException( | ||
status.HTTP_400_BAD_REQUEST, detail="Action is a mandatory argument" | ||
) | ||
|
||
if GENERIC_PILOT in user_info.properties and action == ActionType.CREATE: | ||
return user_info | ||
if PILOT in user_info.properties and action == ActionType.CREATE: | ||
return user_info | ||
if NORMAL_USER in user_info.properties and action == ActionType.QUERY: | ||
return user_info | ||
if SERVICE_ADMINISTRATOR in user_info.properties: | ||
return user_info | ||
if OPERATOR in user_info.properties: | ||
return user_info | ||
|
||
raise HTTPException(status.HTTP_403_FORBIDDEN, detail=user_info.properties) | ||
|
||
|
||
CheckPilotLogsPolicyCallable = Annotated[Callable, Depends(PilotLogsAccessPolicy.check)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
from __future__ import annotations | ||
|
||
import datetime | ||
import logging | ||
|
||
from fastapi import HTTPException, status | ||
from pydantic import BaseModel | ||
from sqlalchemy import select | ||
from sqlalchemy.exc import NoResultFound | ||
|
||
from diracx.core.exceptions import InvalidQueryError | ||
from diracx.core.properties import OPERATOR, SERVICE_ADMINISTRATOR | ||
from diracx.db.sql.pilot_agents.schema import PilotAgents | ||
from diracx.db.sql.utils import BaseSQLDB | ||
|
||
from ..dependencies import PilotLogsDB | ||
from ..fastapi_classes import DiracxRouter | ||
from ..utils.users import AuthorizedUserInfo | ||
from .access_policies import ActionType, CheckPilotLogsPolicyCallable | ||
|
||
logger = logging.getLogger(__name__) | ||
router = DiracxRouter() | ||
|
||
|
||
class LogLine(BaseModel): | ||
line_no: int | ||
line: str | ||
|
||
|
||
class LogMessage(BaseModel): | ||
pilot_stamp: str | ||
lines: list[LogLine] | ||
vo: str | ||
|
||
|
||
class DateRange(BaseModel): | ||
min: str | None = None # expects a string in ISO 8601 ("%Y-%m-%dT%H:%M:%S.%f%z") | ||
max: str | None = None # expects a string in ISO 8601 ("%Y-%m-%dT%H:%M:%S.%f%z") | ||
|
||
|
||
@router.post("/") | ||
async def send_message( | ||
data: LogMessage, | ||
pilot_logs_db: PilotLogsDB, | ||
check_permissions: CheckPilotLogsPolicyCallable, | ||
) -> int: | ||
|
||
logger.warning(f"Message received '{data}'") | ||
user_info = await check_permissions(action=ActionType.CREATE) | ||
pilot_id = 0 # need to get pilot id from pilot_stamp (via PilotAgentsDB) | ||
# also add a timestamp to be able to select and delete logs based on pilot creation dates, even if corresponding | ||
# pilots have been already deleted from PilotAgentsDB (so the logs can live longer than pilots). | ||
submission_time = datetime.datetime.fromtimestamp(0, datetime.timezone.utc) | ||
pilot_agents_db = BaseSQLDB.available_implementations("PilotAgentsDB")[0] | ||
url = BaseSQLDB.available_urls()["PilotAgentsDB"] | ||
db = pilot_agents_db(url) | ||
|
||
try: | ||
async with db.engine_context(): | ||
async with db: | ||
stmt = select(PilotAgents.pilot_id, PilotAgents.submission_time).where( | ||
PilotAgents.pilot_stamp == data.pilot_stamp | ||
) | ||
pilot_id, submission_time = (await db.conn.execute(stmt)).one() | ||
except NoResultFound as exc: | ||
logger.error( | ||
f"Cannot determine PilotID for requested PilotStamp: {data.pilot_stamp}, Error: {exc}." | ||
) | ||
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc | ||
|
||
docs = [] | ||
for line in data.lines: | ||
docs.append( | ||
{ | ||
"PilotStamp": data.pilot_stamp, | ||
"PilotID": pilot_id, | ||
"SubmissionTime": submission_time, | ||
"VO": user_info.vo, | ||
"LineNumber": line.line_no, | ||
"Message": line.line, | ||
} | ||
) | ||
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(pilot_id), docs) | ||
return pilot_id | ||
|
||
|
||
@router.get("/logs") | ||
async def get_logs( | ||
pilot_id: int, | ||
db: PilotLogsDB, | ||
check_permissions: CheckPilotLogsPolicyCallable, | ||
) -> list[dict]: | ||
|
||
logger.warning(f"Retrieving logs for pilot ID '{pilot_id}'") | ||
user_info = await check_permissions(action=ActionType.QUERY) | ||
|
||
# here, users with privileged properties will see logs from all VOs. Is it what we want ? | ||
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}] | ||
if _non_privileged(user_info): | ||
search_params.append( | ||
{"parameter": "VO", "operator": "eq", "value": user_info.vo} | ||
) | ||
result = await db.search( | ||
["Message"], | ||
search_params, | ||
[{"parameter": "LineNumber", "direction": "asc"}], | ||
) | ||
if not result: | ||
return [{"Message": f"No logs for pilot ID = {pilot_id}"}] | ||
return result | ||
|
||
|
||
@router.delete("/logs") | ||
async def delete( | ||
pilot_id: int, | ||
data: DateRange, | ||
db: PilotLogsDB, | ||
check_permissions: CheckPilotLogsPolicyCallable, | ||
) -> str: | ||
"""Delete either logs for a specific PilotID or a creation date range. | ||
Non-privileged users can only delete log files within their own VO. | ||
""" | ||
message = "no-op" | ||
user_info = await check_permissions(action=ActionType.DELETE) | ||
non_privil_params = {"parameter": "VO", "operator": "eq", "value": user_info.vo} | ||
|
||
# id pilot_id is provided we ignore data.min and data.max | ||
if data.min and data.max and not pilot_id: | ||
raise InvalidQueryError( | ||
"This query requires a range operator definition in DiracX" | ||
) | ||
|
||
if pilot_id: | ||
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}] | ||
if _non_privileged(user_info): | ||
search_params.append(non_privil_params) | ||
await db.delete(search_params) | ||
message = f"Logs for pilot ID '{pilot_id}' successfully deleted" | ||
|
||
elif data.min: | ||
logger.warning(f"Deleting logs for pilots with submission data >='{data.min}'") | ||
search_params = [ | ||
{"parameter": "SubmissionTime", "operator": "gt", "value": data.min} | ||
] | ||
if _non_privileged(user_info): | ||
search_params.append(non_privil_params) | ||
await db.delete(search_params) | ||
message = f"Logs for for pilots with submission data >='{data.min}' successfully deleted" | ||
|
||
return message | ||
|
||
|
||
def _non_privileged(user_info: AuthorizedUserInfo): | ||
return ( | ||
SERVICE_ADMINISTRATOR not in user_info.properties | ||
and OPERATOR not in user_info.properties | ||
) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.