From ca1c51f350ebc8bde59304bc1bdfdf578f321081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Thu, 12 Dec 2024 15:19:10 +0100 Subject: [PATCH] WIP: Include spans fro dark storage --- pyproject.toml | 2 ++ src/ert/dark_storage/app.py | 4 +++ src/ert/logging/storage_log.conf | 4 +-- src/ert/services/_storage_main.py | 47 ++++++++++++++++++++++------- src/ert/services/storage_service.py | 2 ++ 5 files changed, 46 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e353f4ceb9c..566afdf7b82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "openpyxl", # extra dependency for pandas (excel) "opentelemetry-api", "opentelemetry-sdk", + "opentelemetry.instrumentation.fastapi", + "opentelemetry-instrumentation-httpx", "opentelemetry-instrumentation-threading", "orjson", "packaging", diff --git a/src/ert/dark_storage/app.py b/src/ert/dark_storage/app.py index 03bcf7ef9fc..760564620e3 100644 --- a/src/ert/dark_storage/app.py +++ b/src/ert/dark_storage/app.py @@ -10,6 +10,8 @@ from ert.dark_storage.endpoints import router as endpoints_router from ert.dark_storage.exceptions import ErtStorageError +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + class JSONEncoder(json.JSONEncoder): """ @@ -107,3 +109,5 @@ async def healthcheck() -> str: app.include_router(endpoints_router) + +FastAPIInstrumentor.instrument_app(app) \ No newline at end of file diff --git a/src/ert/logging/storage_log.conf b/src/ert/logging/storage_log.conf index d850e528a49..24c8cf75dbe 100644 --- a/src/ert/logging/storage_log.conf +++ b/src/ert/logging/storage_log.conf @@ -1,5 +1,5 @@ version: 1 -disable_existing_loggers: True, +disable_existing_loggers: False, formatters: standard: format: '%(asctime)s [%(levelname)s] %(name)s: %(message)s' @@ -32,7 +32,7 @@ loggers: ert.shared.storage.info: level: INFO handlers: [infohandler, file] - propagate: False + propagate: True ert.shared.status: level: INFO res: diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index cfa83049be5..a5ffafce40e 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -7,6 +7,7 @@ import signal import socket import string +import sys import threading import time import warnings @@ -21,6 +22,9 @@ from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket from ert.shared.storage.command import add_parser_options +from ert.trace import get_trace_id, trace, tracer, tracer_provider + +from opentelemetry.trace.span import Span class Server(uvicorn.Server): @@ -80,8 +84,8 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> Dict[str, An return connection_info - def run_server(args: Optional[argparse.Namespace] = None, debug: bool = False) -> None: + trace_id = get_trace_id() if args is None: args = parse_args() @@ -106,7 +110,7 @@ def run_server(args: Optional[argparse.Namespace] = None, debug: bool = False) - server = Server(config, json.dumps(connection_info)) logger = logging.getLogger("ert.shared.storage.info") - log_level = logging.INFO if args.verbose else logging.WARNING + log_level = logging.INFO if args.verbose else logging.INFO logger.setLevel(log_level) logger.info("Storage server is ready to accept requests. Listening on:") for url in connection_info["urls"]: @@ -121,11 +125,13 @@ def run_server(args: Optional[argparse.Namespace] = None, debug: bool = False) - supervisor = ChangeReload(config, target=server.run, sockets=[sock]) supervisor.run() else: + print("Start server") server.run(sockets=[sock]) + print("Server stopped") def terminate_on_parent_death( - stopped: threading.Event, poll_interval: float = 1.0 + stopped: threading.Event, span: Span, poll_interval: float = 1.0 ) -> None: """ Quit the server when the parent process is no longer running. @@ -138,12 +144,12 @@ def check_parent_alive() -> bool: if stopped.is_set(): return time.sleep(poll_interval) - + span.end() # Parent is no longer alive, terminate this process. os.kill(os.getpid(), signal.SIGTERM) -if __name__ == "__main__": +def main(): with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file: logging_conf = yaml.safe_load(conf_file) logging.config.dictConfig(logging_conf) @@ -154,10 +160,29 @@ def check_parent_alive() -> bool: terminate_on_parent_death_thread = threading.Thread( target=terminate_on_parent_death, args=[_stopped, 1.0] ) - with ErtPluginContext(logger=logging.getLogger()) as context: + with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider) as context: terminate_on_parent_death_thread.start() - try: - run_server(debug=False) - finally: - _stopped.set() - terminate_on_parent_death_thread.join() + with tracer.start_as_current_span(f"run_storage_server") as currentSpan: + try: + current_span = trace.get_current_span() + print(f"Opertation ID: {get_trace_id()}") + run_server(debug=False) + except BaseException as err: + print(f"Stopped with exception {err}") + finally: + print("Closing1") + _stopped.set() + terminate_on_parent_death_thread.join() + print("Closing2") + + + +def sigterm_handler(_signo, _stack_frame): + print("handle sigterm") + sys.exit(0) + +signal.signal(signal.SIGTERM, sigterm_handler) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index d1a1226e876..39d89e5a994 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -9,6 +9,8 @@ from ert.dark_storage.client import Client, ConnInfo from ert.services._base_service import BaseService, _Context, local_exec_args +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +HTTPXClientInstrumentor().instrument() class StorageService(BaseService): service_name = "storage"