Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Include spans for dark storage #9535

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ dependencies = [
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry.instrumentation.fastapi",
"opentelemetry-instrumentation-httpx",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
Expand Down
3 changes: 3 additions & 0 deletions src/ert/dark_storage/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import FastAPI, Request, status
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from ert.dark_storage.endpoints import router as endpoints_router
from ert.dark_storage.exceptions import ErtStorageError
Expand Down Expand Up @@ -107,3 +108,5 @@ async def healthcheck() -> str:


app.include_router(endpoints_router)

FastAPIInstrumentor.instrument_app(app)
2 changes: 1 addition & 1 deletion src/ert/logging/storage_log.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ loggers:
ert.shared.storage.info:
level: INFO
handlers: [infohandler, file]
propagate: False
propagate: True
ert.shared.status:
level: INFO
res:
Expand Down
73 changes: 58 additions & 15 deletions src/ert/services/_storage_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@
import signal
import socket
import string
import sys
import threading
import time
import warnings
from typing import Any

import uvicorn
import yaml
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from uvicorn.supervisors import ChangeReload

from ert.logging import STORAGE_LOG_CONFIG
from ert.plugins import ErtPluginContext
from ert.shared import __file__ as ert_shared_path
from ert.shared import find_available_socket
from ert.shared.storage.command import add_parser_options
from ert.trace import tracer, tracer_provider

DARK_STORAGE_APP = "ert.dark_storage.app:app"


class Server(uvicorn.Server):
Expand Down Expand Up @@ -81,7 +86,9 @@
return connection_info


def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None:
def run_server(

Check failure on line 89 in src/ert/services/_storage_main.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Function is missing a type annotation for one or more arguments
args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config=None
) -> None:
if args is None:
args = parse_args()

Expand All @@ -102,12 +109,16 @@
# Appropriated from uvicorn.main:run
os.environ["ERT_STORAGE_NO_TOKEN"] = "1"
os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project)
config = uvicorn.Config("ert.dark_storage.app:app", **config_args)
config = (
uvicorn.Config(DARK_STORAGE_APP, **config_args)
if uvicorn_config is None
else uvicorn_config
) # uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system
server = Server(config, json.dumps(connection_info))

logger = logging.getLogger("ert.shared.storage.info")
log_level = logging.INFO if args.verbose else logging.WARNING
logger.setLevel(log_level)
if args.verbose and logger.level > logging.INFO:
logger.setLevel(logging.INFO)
logger.info("Storage server is ready to accept requests. Listening on:")
for url in connection_info["urls"]:
logger.info(f" {url}")
Expand Down Expand Up @@ -143,21 +154,53 @@
os.kill(os.getpid(), signal.SIGTERM)


if __name__ == "__main__":
def main():

Check failure on line 157 in src/ert/services/_storage_main.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Function is missing a return type annotation
args = parse_args()
config_args: dict[str, Any] = {}
with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file:
logging_conf = yaml.safe_load(conf_file)
logging.config.dictConfig(logging_conf)
config_args.update(log_config=logging_conf)
warnings.filterwarnings("ignore", category=DeprecationWarning)
uvicorn.config.LOGGING_CONFIG.clear()
uvicorn.config.LOGGING_CONFIG.update(logging_conf)
_stopped = threading.Event()

if args.debug:
config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)])
uvicorn_config = uvicorn.Config(
DARK_STORAGE_APP, **config_args
) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext

ctx = (
TraceContextTextMapPropagator().extract(
carrier={"traceparent": args.traceparent}
)
if args.traceparent
else None
)

stopped = threading.Event()
terminate_on_parent_death_thread = threading.Thread(
target=terminate_on_parent_death, args=[_stopped, 1.0]
target=terminate_on_parent_death, args=[stopped, 1.0]
)
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider):
terminate_on_parent_death_thread.start()
try:
run_server(debug=False)
finally:
_stopped.set()
terminate_on_parent_death_thread.join()
with tracer.start_as_current_span("run_storage_server", ctx):
logger = logging.getLogger("ert.shared.storage.info")
try:
logger.info("Starting dark storage")
run_server(args, debug=False, uvicorn_config=uvicorn_config)
except SystemExit:
logger.info("Stopping dark storage")
finally:
stopped.set()
terminate_on_parent_death_thread.join()


def sigterm_handler(_signo, _stack_frame):

Check failure on line 198 in src/ert/services/_storage_main.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Function is missing a type annotation
sys.exit(0)


signal.signal(signal.SIGTERM, sigterm_handler)


if __name__ == "__main__":
main()

Check failure on line 206 in src/ert/services/_storage_main.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Call to untyped function "main" in typed context
10 changes: 10 additions & 0 deletions src/ert/services/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

import httpx
import requests
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

from ert.dark_storage.client import Client, ConnInfo
from ert.services._base_service import BaseService, _Context, local_exec_args
from ert.trace import get_traceparent

HTTPXClientInstrumentor().instrument()


class StorageService(BaseService):
Expand All @@ -21,6 +25,7 @@ def __init__(
conn_info: Mapping[str, Any] | Exception | None = None,
project: str | None = None,
verbose: bool = False,
traceparent: str | None = "inherit_parent",
):
self._url: str | None = None

Expand All @@ -29,6 +34,11 @@ def __init__(
exec_args.extend(["--project", str(project)])
if verbose:
exec_args.append("--verbose")
if traceparent:
traceparent = (
get_traceparent() if traceparent == "inherit_parent" else traceparent
)
exec_args.extend(["--traceparent", str(traceparent)])

super().__init__(exec_args, timeout, conn_info, project)

Expand Down
6 changes: 6 additions & 0 deletions src/ert/shared/storage/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ def add_parser_options(ap: ArgumentParser) -> None:
help="Path to directory in which to create storage_server.json",
default=os.getcwd(),
)
ap.add_argument(
"--traceparent",
type=str,
help="Trace parent id to be used by the storage root span",
default=None,
)
ap.add_argument(
"--host", type=str, default=os.environ.get("ERT_STORAGE_HOST", "127.0.0.1")
)
Expand Down
8 changes: 8 additions & 0 deletions src/ert/trace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

resource = Resource(attributes={SERVICE_NAME: "ert"})
tracer_provider = TracerProvider(
Expand All @@ -13,3 +14,10 @@

def get_trace_id() -> str:
return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id)


def get_traceparent() -> str | None:
carrier = {}

Check failure on line 20 in src/ert/trace.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Need type annotation for "carrier" (hint: "carrier: dict[<type>, <type>] = ...")
# Write the current context into the carrier.
TraceContextTextMapPropagator().inject(carrier)
return carrier.get("traceparent")
Loading