Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel instrumentation for outgoing calls using generated clientlibs #19

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
python_requires=">=3.7",
install_requires=[
'py_zipkin>=0.10.1',
'opentelemetry-sdk>=0.26.1',
'bravado>=11.0.3'
],
keywords='zipkin',
classifiers=[
Expand Down
6 changes: 3 additions & 3 deletions swagger_zipkin/decorate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class OperationDecorator(Generic[P, T]):
:type func: callable
"""

def __init__(self, operation: Resource, func: Callable[P, T]) -> None:
def __init__(self, operation: Operation, func: Callable[P, T]) -> None:
self.operation = operation
self.func = func

Expand All @@ -56,8 +56,8 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:


def decorate_client(
api_client: Client,
func: Callable[P, T],
api_client: Resource,
func: Callable[[str, P.args, P.kwargs], T],
name: str,
) -> Resource[P, T]:
"""A helper for decorating :class:`bravado.client.SwaggerClient`.
Expand Down
154 changes: 154 additions & 0 deletions swagger_zipkin/otel_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import annotations

from contextlib import contextmanager
from typing import Any
from typing import TypeVar

from bravado.client import construct_request
from bravado.exception import HTTPError
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.span import format_span_id
from opentelemetry.trace.span import format_trace_id
from opentelemetry.trace.span import TraceFlags
from typing_extensions import ParamSpec

from swagger_zipkin.decorate_client import Client
from swagger_zipkin.decorate_client import decorate_client
from swagger_zipkin.decorate_client import Resource

T = TypeVar('T', covariant=True)
P = ParamSpec('P')

tracer = trace.get_tracer("otel_decorator")


class OtelResourceDecorator:
"""A wrapper to the swagger resource.

:param resource: A resource object. eg. `client.pet`, `client.store`.
:type resource: :class:`swaggerpy.client.Resource` or :class:`bravado_core.resource.Resource`
"""

def __init__(self, resource: Resource, client_identifier: str, smartstack_namespace: str) -> None:
self.resource = resource
self.client_identifier = client_identifier
self.smartstack_namespace = smartstack_namespace

def __getattr__(self, name: str) -> Resource:
return decorate_client(self.resource, self.with_headers, name)
benbariteau marked this conversation as resolved.
Show resolved Hide resolved

def with_headers(self, call_name: str, *args: Any, **kwargs: Any) -> Any:

with self.handle_exception():
kwargs.setdefault('_request_options', {})
request_options: dict = kwargs['_request_options']
request_options.setdefault('headers', {})

operation = getattr(self.resource, call_name)
request = construct_request(operation, request_options, **kwargs) # type: ignore

url = getattr(request, "url", "")
path = getattr(request, "path", "")
method = getattr(request, "method", "")

parent_span = trace.get_current_span()
span_name = f"{method} {path}"

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
span.set_attribute("url.path", url)
span.set_attribute("http.request.method", method)
span.set_attribute("client.namespace", self.client_identifier)
span.set_attribute("peer.service", self.smartstack_namespace)
span.set_attribute("server.namespace", self.smartstack_namespace)
span.set_attribute("http.response.status_code", "200")

self.inject_otel_headers(kwargs, span)
self.inject_zipkin_headers(kwargs, span, parent_span)

try:
operation(*args, **kwargs)
except HTTPError as e:
span.set_attribute("error.type", e.__class__.__name__)
span.set_status(trace.Status(trace.StatusCode.ERROR, e.message))
span.set_attribute("http.response.status_code", e.status_code)
raise e

return operation
Comment on lines +71 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic won't work as is. Calling an operation will never raise an HTTPError as you must call result or response on it. You'll need to create a wrapper future class in order to implement the logic you want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is an issue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for catching this! Will work on a fix.


@contextmanager
def handle_exception(
self,
) -> Any:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct return type here is Iterator[None]

try:
yield
except Exception as e:
benbariteau marked this conversation as resolved.
Show resolved Hide resolved
# not raising an exception if the instrumentation had a problem
raise e

def __dir__(self) -> list[str]:
return dir(self.resource) # pragma: no cover

Comment on lines +91 to +93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think overriding __dir__ is desirable here, especially since we're not overriding __getattr__

def inject_otel_headers(
self, kwargs: dict[str, Any], current_span: trace.Span
) -> None:
propagator = TraceContextTextMapPropagator()
carrier = kwargs['_request_options']["headers"]
propagator.inject(carrier=carrier, context=trace.set_span_in_context(current_span))

def inject_zipkin_headers(
self, kwargs: dict[str, Any], current_span: trace.Span, parent_span: trace.Span
) -> None:
current_span_context = current_span.get_span_context()
kwargs["_request_options"]["headers"]["X-B3-TraceId"] = format_trace_id(
current_span_context.trace_id
)
kwargs["_request_options"]["headers"]["X-B3-SpanId"] = format_span_id(
current_span_context.span_id
)
if parent_span is not None and parent_span.is_recording():
parent_span_context = parent_span.get_span_context()
kwargs["_request_options"]["headers"]["X-B3-ParentSpanId"] = format_span_id(
parent_span_context.span_id)

kwargs["_request_options"]["headers"]["X-B3-Sampled"] = (
"1"
if (current_span_context.trace_flags & TraceFlags.SAMPLED == TraceFlags.SAMPLED)
else "0"
)
kwargs["_request_options"]["headers"]["X-B3-Flags"] = "0"


class OtelClientDecorator:
"""A wrapper to swagger client (swagger-py or bravado) to pass on otel and zipkin
headers to the service call. It will also generate a CLIENT span for the outgoing call.

Even though client is initialised once, all the calls made will have
independent spans.

:param client: Swagger Client
:type client: :class:`swaggerpy.client.SwaggerClient` or :class:`bravado.client.SwaggerClient`.
:param client_identifier: the name of the service that is using this
generated clientlib
:type client_identifier: string
:param smartstack_namespace: the smartstack name of the paasta instance
this generated clientlib is hitting
:type smartstack_namespace: string
"""

def __init__(self, client: Client, client_identifier: str, smartstack_namespace: str):
self._client = client
self.client_identifier = client_identifier
self.smartstack_namespace = smartstack_namespace

def __getattr__(self, name: str) -> Client:
return OtelResourceDecorator(
getattr(self._client, name),
client_identifier=self.client_identifier,
smartstack_namespace=self.smartstack_namespace,
)

def __dir__(self) -> list[str]:
return dir(self._client) # pragma: no cover
Comment on lines +152 to +154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here.

2 changes: 1 addition & 1 deletion swagger_zipkin/zipkin_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ZipkinResourceDecorator:
:type resource: :class:`swaggerpy.client.Resource` or :class:`bravado_core.resource.Resource`
"""

def __init__(self, resource: Client, context_stack: Stack | None = None) -> None:
def __init__(self, resource: Resource, context_stack: Stack | None = None) -> None:
self.resource = resource
self._context_stack = context_stack

Expand Down
186 changes: 186 additions & 0 deletions tests/otel_decorator_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
from unittest import mock

import pytest
from bravado.exception import HTTPError
from bravado.exception import HTTPInternalServerError
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_span_id
from opentelemetry.trace.span import format_trace_id

from swagger_zipkin.otel_decorator import OtelClientDecorator
from swagger_zipkin.otel_decorator import OtelResourceDecorator

memory_exporter = InMemorySpanExporter()
span_processor = SimpleSpanProcessor(memory_exporter)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(span_processor)

client_identifier = "test_client"
smartstack_namespace = "smartstack_namespace"
tracer = trace.get_tracer("otel_decorator")


@pytest.fixture
def setup():
memory_exporter.clear()


@pytest.fixture
def get_request():
mock_request = mock.Mock()
mock_request.url = "/sample-url"
mock_request.path = "/sample-url"
mock_request.method = "GET"
return mock_request


def create_request_options(parent_span: trace.Span, exported_span: trace.Span):
trace_id = format_trace_id(exported_span.get_span_context().trace_id)
span_id = format_span_id(exported_span.get_span_context().span_id)

headers = {}
headers['headers'] = {
'traceparent': f'00-{trace_id}-{span_id}-01',
'X-B3-TraceId': trace_id,
'X-B3-SpanId': span_id,
'X-B3-Flags': '0',
'X-B3-Sampled': '1',
}
if parent_span is not None:
headers['headers']['X-B3-ParentSpanId'] = format_span_id(parent_span.get_span_context().span_id)

return headers


@mock.patch(
"swagger_zipkin.otel_decorator.construct_request", autospec=True
)
def test_client_request(mock_request, get_request, setup):
mock_request.return_value = get_request

with tracer.start_as_current_span(
"parent_span", kind=trace.SpanKind.SERVER
) as parent_span:
client = mock.Mock()
wrapped_client = OtelClientDecorator(
client,
client_identifier=client_identifier,
smartstack_namespace=smartstack_namespace
)
resource = wrapped_client.resource
param1 = mock.Mock()
resource.operation(param1)

assert len(memory_exporter.get_finished_spans()) == 1
exported_span = memory_exporter.get_finished_spans()[0]

client.resource.operation.assert_called_with(
param1,
_request_options=create_request_options(parent_span, exported_span)
)

assert exported_span.kind == SpanKind.CLIENT
assert exported_span.name == f"{get_request.method} {get_request.path}"
assert exported_span.attributes["url.path"] == get_request.path
assert exported_span.attributes["http.request.method"] == get_request.method
assert exported_span.attributes["client.namespace"] == client_identifier
assert exported_span.attributes["peer.service"] == smartstack_namespace
assert exported_span.attributes["server.namespace"] == smartstack_namespace
assert exported_span.attributes["http.response.status_code"] == "200"

param2 = mock.Mock()
resource.operation(param2)

assert len(memory_exporter.get_finished_spans()) == 2
exported_span = memory_exporter.get_finished_spans()[1]

client.resource.operation.assert_called_with(
param2,
_request_options=create_request_options(parent_span, exported_span)
)

assert exported_span.name == f"{get_request.method} {get_request.path}"
assert exported_span.attributes["url.path"] == get_request.path
assert exported_span.attributes["http.request.method"] == get_request.method
assert exported_span.attributes["client.namespace"] == client_identifier
assert exported_span.attributes["peer.service"] == smartstack_namespace
assert exported_span.attributes["server.namespace"] == smartstack_namespace
assert exported_span.attributes["http.response.status_code"] == "200"


@mock.patch(
"swagger_zipkin.otel_decorator.construct_request", autospec=True
)
def test_client_request_no_parent_span(mock_request, get_request, setup):
mock_request.return_value = get_request

client = mock.Mock()
wrapped_client = OtelClientDecorator(
client,
client_identifier=client_identifier,
smartstack_namespace=smartstack_namespace
)
resource = wrapped_client.resource
param = mock.Mock()
resource.operation(param)

assert len(memory_exporter.get_finished_spans()) == 1
exported_span = memory_exporter.get_finished_spans()[0]

client.resource.operation.assert_called_with(
param,
_request_options=create_request_options(None, exported_span)
)

assert exported_span.kind == SpanKind.CLIENT
assert exported_span.name == f"{get_request.method} {get_request.path}"
assert exported_span.attributes["url.path"] == get_request.path
assert exported_span.attributes["http.request.method"] == get_request.method
assert exported_span.attributes["client.namespace"] == client_identifier
assert exported_span.attributes["peer.service"] == smartstack_namespace
assert exported_span.attributes["server.namespace"] == smartstack_namespace
assert exported_span.attributes["http.response.status_code"] == "200"


@mock.patch(
"swagger_zipkin.otel_decorator.construct_request", autospec=True
)
def test_with_headers_exception(mock_request, get_request, setup):
mock_request.return_value = get_request

# Create a mock resource and configure it to raise an exception
mock_resource = mock.MagicMock()
mock_response = mock.MagicMock()
mock_response.status_code = "500"
mock_method = mock.MagicMock(side_effect=HTTPInternalServerError(response=mock_response))
setattr(mock_resource, 'test_operation', mock_method)

decorator = OtelResourceDecorator(resource=mock_resource, client_identifier="test_client",
smartstack_namespace="smartstack_namespace")

args = ()
kwargs = {'_request_options': {'headers': {}}}

with pytest.raises(HTTPError):
decorator.with_headers("test_operation", *args, **kwargs)

assert len(memory_exporter.get_finished_spans()) == 1
exported_span = memory_exporter.get_finished_spans()[0]

expected_headers = kwargs['_request_options']['headers']
actual_headers = create_request_options(None, exported_span)['headers']
assert expected_headers == actual_headers

assert exported_span.kind == SpanKind.CLIENT
assert exported_span.name == f"{get_request.method} {get_request.path}"
assert exported_span.attributes["url.path"] == get_request.path
assert exported_span.attributes["http.request.method"] == get_request.method
assert exported_span.attributes["client.namespace"] == client_identifier
assert exported_span.attributes["peer.service"] == smartstack_namespace
assert exported_span.attributes["server.namespace"] == smartstack_namespace
assert exported_span.attributes["error.type"] == "HTTPInternalServerError"
assert exported_span.attributes["http.response.status_code"] == "500"
Loading