diff --git a/Dockerfile b/Dockerfile index e8aa35db1..23fc2c907 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,9 @@ # The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator, # one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then # update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`. -FROM python:3.11 AS build + +# Stage 1: Install ADOT Python in the /operator-build folder +FROM public.ecr.aws/docker/library/python:3.11 AS build WORKDIR /operator-build @@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro -FROM public.ecr.aws/amazonlinux/amazonlinux:minimal +# Stage 2: Build the cp-utility binary +FROM public.ecr.aws/docker/library/rust:1.75 as builder + +WORKDIR /usr/src/cp-utility +COPY ./tools/cp-utility . + +## TARGETARCH is defined by buildx +# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope +ARG TARGETARCH + +# Run validations and audit only on amd64 because it is faster and those two steps +# are only used to validate the source code and don't require anything that is +# architecture specific. + +# Validations +# Validate formatting +RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi + +# Audit dependencies +RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi + + +# Cross-compile based on the target platform. +RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \ + elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \ + else false; \ + fi \ + && rustup target add ${ARCH}-unknown-linux-musl \ + && cargo test --target ${ARCH}-unknown-linux-musl \ + && cargo install --target ${ARCH}-unknown-linux-musl --path . --root . + +# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations +FROM scratch # Required to copy attribute files to distributed docker images ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES +COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp COPY --from=build /operator-build/workspace /autoinstrumentation - -RUN chmod -R go+r /autoinstrumentation diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index f6498ac76..e3f711688 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -16,3 +16,4 @@ AWS_QUEUE_URL: str = "aws.sqs.queue_url" AWS_QUEUE_NAME: str = "aws.sqs.queue_name" AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 577d28f63..1e8f83338 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -15,6 +15,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, + AWS_STREAM_CONSUMER_NAME, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( @@ -361,6 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri elif is_key_present(span, AWS_STREAM_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream" remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME)) + elif is_key_present(span, AWS_STREAM_CONSUMER_NAME): + remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer" + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_NAME)) elif is_key_present(span, _AWS_BUCKET_NAME): remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index cf73fb345..072478c8b 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -94,3 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT): stream_name = self._call_context.params.get("StreamName") if stream_name: attributes["aws.kinesis.stream_name"] = stream_name + consumer_name = self._call_context.params.get("ConsumerName") + if consumer_name: + attributes["aws.stream.consumer_name"] = consumer_name diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 072e6eeb0..e961385fa 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -18,6 +18,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, + AWS_STREAM_CONSUMER_NAME, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator @@ -950,6 +951,32 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") self._mock_attribute([AWS_STREAM_NAME], [None]) + # Validate behaviour of AWS_STREAM_CONSUMER_ARN attribute, then remove it. + self._mock_attribute( + [AWS_STREAM_CONSUMER_NAME], + ["aws_stream_consumer_name"], + keys, + values, + ) + self._validate_remote_resource_attributes( + "AWS::Kinesis::StreamConsumer", + "aws_stream_consumer_name", + ) + self._mock_attribute([AWS_STREAM_CONSUMER_NAME], [None]) + + # Validate both AWS_STREAM_NAME and AWS_STREAM_CONSUMER_ARN present, then remove it. + self._mock_attribute( + [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], + [ + "aws_stream_name", + "aws_stream_consumer_name", + ], + keys, + values, + ) + self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") + self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], [None, None]) + # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values) self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table_name") diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 124abfad9..f0b4a4172 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -11,6 +11,7 @@ from opentelemetry.semconv.trace import SpanAttributes _STREAM_NAME: str = "streamName" +_CONSUMER_NAME: str = "consumerName" _BUCKET_NAME: str = "bucketName" _QUEUE_NAME: str = "queueName" _QUEUE_URL: str = "queueUrl" @@ -99,6 +100,8 @@ def _test_patched_botocore_instrumentation(self): kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes) self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME) + self.assertTrue("aws.stream.consumer_name" in kinesis_attributes) + self.assertEqual(kinesis_attributes["aws.stream.consumer_name"], _CONSUMER_NAME) # S3 self.assertTrue("s3" in _KNOWN_EXTENSIONS) @@ -140,7 +143,7 @@ def _reset_mocks(self): def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" - params: Dict[str, str] = {"StreamName": _STREAM_NAME} + params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerName": _CONSUMER_NAME} return _do_extract_attributes(service_name, params) diff --git a/contract-tests/README.md b/contract-tests/README.md index 7e6ff49b2..f4f0002c2 100644 --- a/contract-tests/README.md +++ b/contract-tests/README.md @@ -28,6 +28,9 @@ The steps to add a new test for a library or framework are: * The test class should be created in `contract-tests/tests/amazon/`. * The test class should extend `contract_test_base.py` +Note: For botocore applications, when creating new resources in [prepare_aws_server()](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/166c4cb36da6634cb070df5a312a62f6b0136a9c/contract-tests/images/applications/botocore/botocore_server.py#L215), make sure to check if the resource already exist before creation. +This is because each test pull the "aws-application-signals-tests-botocore-app" image and start a new container running `prepare_aws_server()` once, only the first attempt can succeeds, all subsequent attempts will fail due to the resources already existing. + # How to run the tests locally? Pre-requirements: diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index dd1e34c6b..4760746fd 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -5,6 +5,7 @@ import tempfile from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from threading import Thread +from typing import List import boto3 import requests @@ -200,6 +201,12 @@ def _handle_kinesis_request(self) -> None: elif self.in_path("putrecord/my-stream"): set_main_status(200) kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") + elif self.in_path("describestreamconsumer/my-consumer"): + set_main_status(200) + kinesis_client.describe_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", + ConsumerName="test_consumer", + ) else: set_main_status(404) @@ -212,17 +219,24 @@ def set_main_status(status: int) -> None: RequestHandler.main_status = status +# pylint: disable=too-many-locals def prepare_aws_server() -> None: requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") try: # Set up S3 so tests can access buckets and retrieve a file. s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION) - s3_client.create_bucket( - Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} - ) - s3_client.create_bucket( - Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} - ) + bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]] + put_bucket_name: str = "test-put-object-bucket-name" + if put_bucket_name not in bucket_names: + s3_client.create_bucket( + Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} + ) + + get_bucket_name: str = "test-get-object-bucket-name" + if get_bucket_name not in bucket_names: + s3_client.create_bucket( + Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} + ) with tempfile.NamedTemporaryFile(delete=True) as temp_file: temp_file_name: str = temp_file.name temp_file.write(b"This is temp file for S3 upload") @@ -231,22 +245,34 @@ def prepare_aws_server() -> None: # Set up DDB so tests can access a table. ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - ddb_client.create_table( - TableName="put_test_table", - KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], - AttributeDefinitions=[ - {"AttributeName": "id", "AttributeType": "S"}, - ], - BillingMode="PAY_PER_REQUEST", - ) + table_names: List[str] = ddb_client.list_tables()["TableNames"] + + table_name: str = "put_test_table" + if table_name not in table_names: + ddb_client.create_table( + TableName=table_name, + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + BillingMode="PAY_PER_REQUEST", + ) # Set up SQS so tests can access a queue. sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - sqs_client.create_queue(QueueName="test_put_get_queue") + queue_name: str = "test_put_get_queue" + queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name) + queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else [] + if not queues: + sqs_client.create_queue(QueueName=queue_name) # Set up Kinesis so tests can access a stream. kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) + stream_name: str = "test_stream" + stream_response = kinesis_client.list_streams() + if not stream_response["StreamNames"]: + kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) + kinesis_client.register_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer" + ) except Exception as exception: print("Unexpected exception occurred", exception) diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index 6fe278d3b..12d01684c 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -29,6 +29,7 @@ _AWS_QUEUE_URL: str = "aws.sqs.queue_url" _AWS_QUEUE_NAME: str = "aws.sqs.queue_name" _AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +_AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name" # pylint: disable=too-many-public-methods @@ -338,6 +339,23 @@ def test_kinesis_put_record(self): span_name="Kinesis.PutRecord", ) + def test_kinesis_describe_stream_consumer(self): + self.do_test_requests( + "kinesis/describestreamconsumer/my-consumer", + "GET", + 200, + 0, + 0, + remote_service="AWS::Kinesis", + remote_operation="DescribeStreamConsumer", + remote_resource_type="AWS::Kinesis::StreamConsumer", + remote_resource_identifier="test_consumer", + request_specific_attributes={ + _AWS_STREAM_CONSUMER_NAME: "test_consumer", + }, + span_name="Kinesis.DescribeStreamConsumer", + ) + def test_kinesis_error(self): self.do_test_requests( "kinesis/error", @@ -458,6 +476,7 @@ def _assert_semantic_conventions_attributes( self._assert_array_value_ddb_table_name(attributes_dict, key, value) @override + # pylint: disable=too-many-locals def _assert_metric_attributes( self, resource_scope_metrics: List[ResourceScopeMetric],