Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SNS support. #197

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
41 changes: 37 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator,
# one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then
# update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`.
FROM python:3.11 AS build

# Stage 1: Install ADOT Python in the /operator-build folder
FROM public.ecr.aws/docker/library/python:3.11 AS build

WORKDIR /operator-build

Expand All @@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr

RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro

FROM public.ecr.aws/amazonlinux/amazonlinux:minimal
# Stage 2: Build the cp-utility binary
FROM public.ecr.aws/docker/library/rust:1.75 as builder

WORKDIR /usr/src/cp-utility
COPY ./tools/cp-utility .

## TARGETARCH is defined by buildx
# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope
ARG TARGETARCH

# Run validations and audit only on amd64 because it is faster and those two steps
# are only used to validate the source code and don't require anything that is
# architecture specific.

# Validations
# Validate formatting
RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi

# Audit dependencies
RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi


# Cross-compile based on the target platform.
RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \
elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \
else false; \
fi \
&& rustup target add ${ARCH}-unknown-linux-musl \
&& cargo test --target ${ARCH}-unknown-linux-musl \
&& cargo install --target ${ARCH}-unknown-linux-musl --path . --root .

# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations
FROM scratch

# Required to copy attribute files to distributed docker images
ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES

COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp
COPY --from=build /operator-build/workspace /autoinstrumentation

RUN chmod -R go+r /autoinstrumentation
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
_HTTP_METHOD: str = SpanAttributes.HTTP_METHOD
_HTTP_URL: str = SpanAttributes.HTTP_URL
_MESSAGING_OPERATION: str = SpanAttributes.MESSAGING_OPERATION
_MESSAGING_DESTINATION: str = SpanAttributes.MESSAGING_DESTINATION
_MESSAGING_SYSTEM: str = SpanAttributes.MESSAGING_SYSTEM
_NET_PEER_NAME: str = SpanAttributes.NET_PEER_NAME
_NET_PEER_PORT: str = SpanAttributes.NET_PEER_PORT
Expand All @@ -78,6 +79,7 @@
_NORMALIZED_KINESIS_SERVICE_NAME: str = "AWS::Kinesis"
_NORMALIZED_S3_SERVICE_NAME: str = "AWS::S3"
_NORMALIZED_SQS_SERVICE_NAME: str = "AWS::SQS"
_NORMALIZED_SNS_SERVICE_NAME: str = "AWS::SNS"
thpierce marked this conversation as resolved.
Show resolved Hide resolved
_DB_CONNECTION_STRING_TYPE: str = "DB::Connection"

# Special DEPENDENCY attribute value if GRAPHQL_OPERATION_TYPE attribute key is present.
Expand Down Expand Up @@ -372,6 +374,12 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
remote_resource_identifier = _escape_delimiters(
SqsUrlParser.get_queue_name(span.attributes.get(AWS_QUEUE_URL))
)
elif (
is_key_present(span, _MESSAGING_DESTINATION)
and _normalize_remote_service_name(span, _get_remote_service(span, _RPC_SERVICE)) == "AWS::SNS"
):
remote_resource_type = _NORMALIZED_SNS_SERVICE_NAME + "::Topic"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_MESSAGING_DESTINATION))
elif is_db_span(span):
remote_resource_type = _DB_CONNECTION_STRING_TYPE
remote_resource_identifier = _get_db_connection(span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def _apply_botocore_instrumentation_patches() -> None:
"""Botocore instrumentation patches

Adds patches to provide additional support and Java parity for Kinesis, S3, and SQS.
Adds patches to provide additional support for Kinesis, S3, and SQS.
"""
_apply_botocore_kinesis_patch()
_apply_botocore_s3_patch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ def test_normalize_remote_service_name_aws_sdk(self):
self.validate_aws_sdk_service_normalization("Kinesis", "AWS::Kinesis")
self.validate_aws_sdk_service_normalization("S3", "AWS::S3")
self.validate_aws_sdk_service_normalization("SQS", "AWS::SQS")
self.validate_aws_sdk_service_normalization("SNS", "AWS::SNS")

def validate_aws_sdk_service_normalization(self, service_name: str, expected_remote_service: str):
self._mock_attribute([SpanAttributes.RPC_SYSTEM, SpanAttributes.RPC_SERVICE], ["aws-api", service_name])
Expand Down Expand Up @@ -977,6 +978,16 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table^^name")
self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [None])

# Validate behaviour of AWS_TOPIC_ARN attribute, then remove it
self._mock_attribute(
[SpanAttributes.MESSAGING_DESTINATION, SpanAttributes.RPC_SERVICE],
["arn:aws:sns:us-west-2:012345678901:test_topic", "SNS"],
keys,
values,
)
self._validate_remote_resource_attributes("AWS::SNS::Topic", "arn:aws:sns:us-west-2:012345678901:test_topic")
self._mock_attribute([SpanAttributes.MESSAGING_DESTINATION, SpanAttributes.RPC_SERVICE], [None, None])

self._mock_attribute([SpanAttributes.RPC_SYSTEM], [None])

def test_client_db_span_with_remote_resource_attributes(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def _test_unpatched_botocore_instrumentation(self):

# SQS
self.assertTrue("sqs" in _KNOWN_EXTENSIONS, "Upstream has removed the SQS extension")
attributes: Dict[str, str] = _do_extract_sqs_attributes()
self.assertTrue("aws.queue_url" in attributes)
self.assertFalse("aws.sqs.queue_url" in attributes)
self.assertFalse("aws.sqs.queue_name" in attributes)
sqs_attributes: Dict[str, str] = _do_extract_sqs_attributes()
self.assertTrue("aws.queue_url" in sqs_attributes)
self.assertFalse("aws.sqs.queue_url" in sqs_attributes)
self.assertFalse("aws.sqs.queue_name" in sqs_attributes)

def _test_patched_botocore_instrumentation(self):
# Kinesis
Expand Down Expand Up @@ -160,6 +160,6 @@ def _do_extract_attributes(service_name: str, params: Dict[str, str]) -> Dict[st
mock_call_context: MagicMock = MagicMock()
mock_call_context.params = params
attributes: Dict[str, str] = {}
sqs_extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context)
sqs_extension.extract_attributes(attributes)
extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context)
extension.extract_attributes(attributes)
return attributes
31 changes: 27 additions & 4 deletions contract-tests/tests/test/amazon/botocore/botocore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def test_s3_create_bucket(self):
SpanAttributes.AWS_S3_BUCKET: "test-bucket-name",
},
span_name="S3.CreateBucket",
span_kind="CLIENT",
)

def test_s3_create_object(self):
Expand All @@ -115,6 +116,7 @@ def test_s3_create_object(self):
SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name",
},
span_name="S3.PutObject",
span_kind="CLIENT",
)

def test_s3_get_object(self):
Expand All @@ -132,6 +134,7 @@ def test_s3_get_object(self):
SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name",
},
span_name="S3.GetObject",
span_kind="CLIENT",
)

def test_s3_error(self):
Expand All @@ -149,6 +152,7 @@ def test_s3_error(self):
SpanAttributes.AWS_S3_BUCKET: "-",
},
span_name="S3.CreateBucket",
span_kind="CLIENT",
)

def test_s3_fault(self):
Expand All @@ -166,6 +170,7 @@ def test_s3_fault(self):
SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name",
},
span_name="S3.CreateBucket",
span_kind="CLIENT",
)

def test_dynamodb_create_table(self):
Expand All @@ -183,6 +188,7 @@ def test_dynamodb_create_table(self):
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"],
},
span_name="DynamoDB.CreateTable",
span_kind="CLIENT",
)

def test_dynamodb_put_item(self):
Expand All @@ -200,6 +206,7 @@ def test_dynamodb_put_item(self):
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"],
},
span_name="DynamoDB.PutItem",
span_kind="CLIENT",
)

def test_dynamodb_error(self):
Expand All @@ -217,6 +224,7 @@ def test_dynamodb_error(self):
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"],
},
span_name="DynamoDB.PutItem",
span_kind="CLIENT",
)

def test_dynamodb_fault(self):
Expand All @@ -234,6 +242,7 @@ def test_dynamodb_fault(self):
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"],
},
span_name="DynamoDB.PutItem",
span_kind="CLIENT",
)

def test_sqs_create_queue(self):
Expand All @@ -251,6 +260,7 @@ def test_sqs_create_queue(self):
_AWS_QUEUE_NAME: "test_queue",
},
span_name="SQS.CreateQueue",
span_kind="CLIENT",
)

def test_sqs_send_message(self):
Expand All @@ -268,6 +278,7 @@ def test_sqs_send_message(self):
_AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
},
span_name="SQS.SendMessage",
span_kind="CLIENT",
)

def test_sqs_receive_message(self):
Expand All @@ -285,6 +296,7 @@ def test_sqs_receive_message(self):
_AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue",
},
span_name="SQS.ReceiveMessage",
span_kind="CLIENT",
)

def test_sqs_error(self):
Expand All @@ -302,6 +314,7 @@ def test_sqs_error(self):
_AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror",
},
span_name="SQS.SendMessage",
span_kind="CLIENT",
)

def test_sqs_fault(self):
Expand All @@ -319,6 +332,7 @@ def test_sqs_fault(self):
_AWS_QUEUE_NAME: "invalid_test",
},
span_name="SQS.CreateQueue",
span_kind="CLIENT",
)

def test_kinesis_put_record(self):
Expand All @@ -336,6 +350,7 @@ def test_kinesis_put_record(self):
_AWS_STREAM_NAME: "test_stream",
},
span_name="Kinesis.PutRecord",
span_kind="CLIENT",
)

def test_kinesis_error(self):
Expand All @@ -353,6 +368,7 @@ def test_kinesis_error(self):
_AWS_STREAM_NAME: "invalid_stream",
},
span_name="Kinesis.PutRecord",
span_kind="CLIENT",
)

def test_kinesis_fault(self):
Expand All @@ -370,16 +386,19 @@ def test_kinesis_fault(self):
_AWS_STREAM_NAME: "test_stream",
},
span_name="Kinesis.PutRecord",
span_kind="CLIENT",
)

@override
def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSpan], path: str, **kwargs) -> None:
target_spans: List[Span] = []
for resource_scope_span in resource_scope_spans:
# pylint: disable=no-member
if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT:
if resource_scope_span.span.kind in (Span.SPAN_KIND_CLIENT, Span.SPAN_KIND_PRODUCER):
thpierce marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(
resource_scope_span.span.kind, Span.SpanKind.Value("SPAN_KIND_" + kwargs.get("span_kind"))
)
target_spans.append(resource_scope_span.span)

self.assertEqual(len(target_spans), 1)
self._assert_aws_attributes(
target_spans[0].attributes,
Expand Down Expand Up @@ -420,7 +439,10 @@ def _assert_semantic_conventions_span_attributes(
target_spans: List[Span] = []
for resource_scope_span in resource_scope_spans:
# pylint: disable=no-member
if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT:
if resource_scope_span.span.kind in (Span.SPAN_KIND_CLIENT, Span.SPAN_KIND_PRODUCER):
self.assertEqual(
resource_scope_span.span.kind, Span.SpanKind.Value("SPAN_KIND_" + kwargs.get("span_kind"))
)
target_spans.append(resource_scope_span.span)

self.assertEqual(len(target_spans), 1)
Expand All @@ -443,6 +465,7 @@ def _assert_semantic_conventions_attributes(
request_specific_attributes: dict,
) -> None:
attributes_dict: Dict[str, AnyValue] = self._get_attributes_dict(attributes_list)

self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_METHOD, operation)
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_SYSTEM, "aws-api")
self._assert_str_attribute(attributes_dict, SpanAttributes.RPC_SERVICE, service.split("::")[-1])
Expand Down Expand Up @@ -486,7 +509,7 @@ def _assert_metric_attributes(
self._assert_str_attribute(attribute_dict, AWS_LOCAL_OPERATION, "InternalOperation")
self._assert_str_attribute(attribute_dict, AWS_REMOTE_SERVICE, kwargs.get("remote_service"))
self._assert_str_attribute(attribute_dict, AWS_REMOTE_OPERATION, kwargs.get("remote_operation"))
self._assert_str_attribute(attribute_dict, AWS_SPAN_KIND, "CLIENT")
self._assert_str_attribute(attribute_dict, AWS_SPAN_KIND, kwargs.get("span_kind"))
remote_resource_type = kwargs.get("remote_resource_type", "None")
remote_resource_identifier = kwargs.get("remote_resource_identifier", "None")
if remote_resource_type != "None":
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ changedir =

commands_pre =
; Install without -e to test the actual installation
3.{7,8,9,10,11}: python -m pip install -U pip setuptools wheel
3.{7,8,9,10,11}: python -m pip install -U pip setuptools wheel moto
; Install common packages for all the tests. These are not needed in all the
; cases but it saves a lot of boilerplate in this file.
test: pip install botocore
test: pip install "opentelemetry-api[test] @ {env:CORE_REPO}#egg=opentelemetry-api&subdirectory=opentelemetry-api"
test: pip install "opentelemetry-sdk[test] @ {env:CORE_REPO}#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk"
test: pip install "opentelemetry-instrumentation[test] @ {env:CONTRIB_REPO}#egg=opentelemetry-instrumentation&subdirectory=opentelemetry-instrumentation"
test: pip install "opentelemetry-exporter-otlp[test] @ {env:CORE_REPO}#egg=opentelemetry-exporter-otlp&subdirectory=exporter/opentelemetry-exporter-otlp"
test: pip install "opentelemetry-test-utils[test] @ {env:CORE_REPO}#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils"
aws-opentelemetry-distro: pip install {toxinidir}/aws-opentelemetry-distro

commands =
Expand Down