From b6e9392767b531e2d5ba4721a770772784f07852 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 10:36:19 -0700 Subject: [PATCH 01/13] Add Kinesis consumer support. --- .../distro/_aws_attribute_keys.py | 1 + .../distro/_aws_metric_attribute_generator.py | 4 + .../distro/patches/_botocore_patches.py | 3 + .../test_aws_metric_attribute_generator.py | 6 + .../distro/test_instrumentation_patch.py | 5 +- .../applications/botocore/botocore_server.py | 8 + .../test/amazon/base/contract_test_base.py | 14 + .../test/amazon/botocore/botocore_test.py | 523 ++++++++++-------- 8 files changed, 317 insertions(+), 247 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index f6498ac76..4b722d91c 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -16,3 +16,4 @@ AWS_QUEUE_URL: str = "aws.sqs.queue_url" AWS_QUEUE_NAME: str = "aws.sqs.queue_name" AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 577d28f63..784a73eee 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -16,6 +16,7 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, + AWS_CONSUMER_ARN, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( LOCAL_ROOT, @@ -361,6 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri elif is_key_present(span, AWS_STREAM_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream" remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME)) + elif is_key_present(span, AWS_CONSUMER_ARN): + remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer" + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_CONSUMER_ARN)) elif is_key_present(span, _AWS_BUCKET_NAME): remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index cf73fb345..16af472dc 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -94,3 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT): stream_name = self._call_context.params.get("StreamName") if stream_name: attributes["aws.kinesis.stream_name"] = stream_name + consumer_arn = self._call_context.params.get("ConsumerARN") + if consumer_arn: + attributes["aws.kinesis.consumer_arn"] = consumer_arn diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 072e6eeb0..eb8b94b71 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -19,6 +19,7 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, + AWS_CONSUMER_ARN, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator from amazon.opentelemetry.distro.metric_attribute_generator import DEPENDENCY_METRIC, SERVICE_METRIC @@ -950,6 +951,11 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") self._mock_attribute([AWS_STREAM_NAME], [None]) + # Validate behaviour of AWS_CONSUMER_ARN attribute, then remove it. + self._mock_attribute([AWS_CONSUMER_ARN], ["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"], keys, values) + self._validate_remote_resource_attributes("AWS::Kinesis::StreamConsumer", "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789") + self._mock_attribute([AWS_CONSUMER_ARN], [None]) + # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values) self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table_name") diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index bc6e851a9..cbf337806 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -11,6 +11,7 @@ from opentelemetry.semconv.trace import SpanAttributes _STREAM_NAME: str = "streamName" +_CONSUMER_ARN: str = "consumerArn" _BUCKET_NAME: str = "bucketName" _QUEUE_NAME: str = "queueName" _QUEUE_URL: str = "queueUrl" @@ -80,6 +81,8 @@ def _validate_patched_botocore_instrumentation(self): kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes) self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME) + self.assertTrue("aws.kinesis.consumer_arn" in kinesis_attributes) + self.assertEqual(kinesis_attributes["aws.kinesis.consumer_arn"], _CONSUMER_ARN) # S3 self.assertTrue("s3" in _KNOWN_EXTENSIONS) @@ -99,7 +102,7 @@ def _validate_patched_botocore_instrumentation(self): def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" - params: Dict[str, str] = {"StreamName": _STREAM_NAME} + params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerArn": _CONSUMER_ARN} return _do_extract_attributes(service_name, params) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 192093d1f..5a93c0e24 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -6,6 +6,7 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from threading import Thread +from logging import INFO, Logger, getLogger import boto3 import requests from botocore.client import BaseClient @@ -25,6 +26,8 @@ os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testcontainers-localstack") _NO_RETRY_CONFIG: Config = Config(retries={"max_attempts": 0}, connect_timeout=3, read_timeout=3) +_logger: Logger = getLogger(__name__) +_logger.setLevel(INFO) # pylint: disable=broad-exception-caught class RequestHandler(BaseHTTPRequestHandler): @@ -200,6 +203,11 @@ def _handle_kinesis_request(self) -> None: elif self.in_path("putrecord/my-stream"): set_main_status(200) kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") + elif self.in_path("describestreamconsumer/my-consumer"): + set_main_status(200) + response = kinesis_client.register_stream_consumer(StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer") + consumer_arn = response['Consumer']['ConsumerARN'] + kinesis_client.describe_stream_consumer(ConsumerARN=consumer_arn) else: set_main_status(404) diff --git a/contract-tests/tests/test/amazon/base/contract_test_base.py b/contract-tests/tests/test/amazon/base/contract_test_base.py index 8364bd830..91fe45df8 100644 --- a/contract-tests/tests/test/amazon/base/contract_test_base.py +++ b/contract-tests/tests/test/amazon/base/contract_test_base.py @@ -1,5 +1,6 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import re import time from logging import INFO, Logger, getLogger from typing import Dict, List @@ -168,6 +169,12 @@ def _assert_int_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, self.assertIsNotNone(actual_value) self.assertEqual(expected_value, actual_value.int_value) + def _assert_match_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, pattern: str) -> None: + self.assertIn(key, attributes_dict) + actual_value: AnyValue = attributes_dict[key] + self.assertIsNotNone(actual_value) + self.assertRegex(actual_value.string_value, pattern) + def check_sum(self, metric_name: str, actual_sum: float, expected_sum: float) -> None: if metric_name is LATENCY_METRIC: self.assertTrue(0 < actual_sum < expected_sum) @@ -217,3 +224,10 @@ def _assert_metric_attributes( self, resource_scope_metrics: List[ResourceScopeMetric], metric_name: str, expected_sum: int, **kwargs ): self.fail("Tests must implement this function") + + def _is_valid_regex(self, pattern: str) -> bool: + try: + re.compile(pattern) + return True + except re.error: + return False \ No newline at end of file diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index f309f343b..3946ceb30 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -29,6 +29,7 @@ _AWS_QUEUE_URL: str = "aws.sqs.queue_url" _AWS_QUEUE_NAME: str = "aws.sqs.queue_name" _AWS_STREAM_NAME: str = "aws.kinesis.stream_name" +_AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn" # pylint: disable=too-many-public-methods @@ -82,259 +83,277 @@ def tear_down_dependency_container(cls): _logger.info(cls._local_stack.get_logs()[1].decode()) cls._local_stack.stop() - def test_s3_create_bucket(self): - self.do_test_requests( - "s3/createbucket/create-bucket", - "GET", - 200, - 0, - 0, - remote_service="AWS::S3", - remote_operation="CreateBucket", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="test-bucket-name", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "test-bucket-name", - }, - span_name="S3.CreateBucket", - ) - - def test_s3_create_object(self): - self.do_test_requests( - "s3/createobject/put-object/some-object", - "GET", - 200, - 0, - 0, - remote_service="AWS::S3", - remote_operation="PutObject", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="test-put-object-bucket-name", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name", - }, - span_name="S3.PutObject", - ) - - def test_s3_get_object(self): - self.do_test_requests( - "s3/getobject/get-object/some-object", - "GET", - 200, - 0, - 0, - remote_service="AWS::S3", - remote_operation="GetObject", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="test-get-object-bucket-name", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name", - }, - span_name="S3.GetObject", - ) - - def test_s3_error(self): - self.do_test_requests( - "s3/error", - "GET", - 400, - 1, - 0, - remote_service="AWS::S3", - remote_operation="CreateBucket", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="-", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "-", - }, - span_name="S3.CreateBucket", - ) - - def test_s3_fault(self): - self.do_test_requests( - "s3/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::S3", - remote_operation="CreateBucket", - remote_resource_type="AWS::S3::Bucket", - remote_resource_identifier="valid-bucket-name", - request_specific_attributes={ - SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name", - }, - span_name="S3.CreateBucket", - ) - - def test_dynamodb_create_table(self): - self.do_test_requests( - "ddb/createtable/some-table", - "GET", - 200, - 0, - 0, - remote_service="AWS::DynamoDB", - remote_operation="CreateTable", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="test_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"], - }, - span_name="DynamoDB.CreateTable", - ) - - def test_dynamodb_put_item(self): - self.do_test_requests( - "ddb/putitem/putitem-table/key", - "GET", - 200, - 0, - 0, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="put_test_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_dynamodb_error(self): - self.do_test_requests( - "ddb/error", - "GET", - 400, - 1, - 0, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="invalid_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_dynamodb_fault(self): - self.do_test_requests( - "ddb/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::DynamoDB", - remote_operation="PutItem", - remote_resource_type="AWS::DynamoDB::Table", - remote_resource_identifier="invalid_table", - request_specific_attributes={ - SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - }, - span_name="DynamoDB.PutItem", - ) - - def test_sqs_create_queue(self): - self.do_test_requests( - "sqs/createqueue/some-queue", - "GET", - 200, - 0, - 0, - remote_service="AWS::SQS", - remote_operation="CreateQueue", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_queue", - request_specific_attributes={ - _AWS_QUEUE_NAME: "test_queue", - }, - span_name="SQS.CreateQueue", - ) + # def test_s3_create_bucket(self): + # self.do_test_requests( + # "s3/createbucket/create-bucket", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::S3", + # remote_operation="CreateBucket", + # remote_resource_type="AWS::S3::Bucket", + # remote_resource_identifier="test-bucket-name", + # request_specific_attributes={ + # SpanAttributes.AWS_S3_BUCKET: "test-bucket-name", + # }, + # span_name="S3.CreateBucket", + # ) + # + # def test_s3_create_object(self): + # self.do_test_requests( + # "s3/createobject/put-object/some-object", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::S3", + # remote_operation="PutObject", + # remote_resource_type="AWS::S3::Bucket", + # remote_resource_identifier="test-put-object-bucket-name", + # request_specific_attributes={ + # SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name", + # }, + # span_name="S3.PutObject", + # ) + # + # def test_s3_get_object(self): + # self.do_test_requests( + # "s3/getobject/get-object/some-object", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::S3", + # remote_operation="GetObject", + # remote_resource_type="AWS::S3::Bucket", + # remote_resource_identifier="test-get-object-bucket-name", + # request_specific_attributes={ + # SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name", + # }, + # span_name="S3.GetObject", + # ) + # + # def test_s3_error(self): + # self.do_test_requests( + # "s3/error", + # "GET", + # 400, + # 1, + # 0, + # remote_service="AWS::S3", + # remote_operation="CreateBucket", + # remote_resource_type="AWS::S3::Bucket", + # remote_resource_identifier="-", + # request_specific_attributes={ + # SpanAttributes.AWS_S3_BUCKET: "-", + # }, + # span_name="S3.CreateBucket", + # ) + # + # def test_s3_fault(self): + # self.do_test_requests( + # "s3/fault", + # "GET", + # 500, + # 0, + # 1, + # remote_service="AWS::S3", + # remote_operation="CreateBucket", + # remote_resource_type="AWS::S3::Bucket", + # remote_resource_identifier="valid-bucket-name", + # request_specific_attributes={ + # SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name", + # }, + # span_name="S3.CreateBucket", + # ) + # + # def test_dynamodb_create_table(self): + # self.do_test_requests( + # "ddb/createtable/some-table", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::DynamoDB", + # remote_operation="CreateTable", + # remote_resource_type="AWS::DynamoDB::Table", + # remote_resource_identifier="test_table", + # request_specific_attributes={ + # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"], + # }, + # span_name="DynamoDB.CreateTable", + # ) + # + # def test_dynamodb_put_item(self): + # self.do_test_requests( + # "ddb/putitem/putitem-table/key", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::DynamoDB", + # remote_operation="PutItem", + # remote_resource_type="AWS::DynamoDB::Table", + # remote_resource_identifier="put_test_table", + # request_specific_attributes={ + # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"], + # }, + # span_name="DynamoDB.PutItem", + # ) + # + # def test_dynamodb_error(self): + # self.do_test_requests( + # "ddb/error", + # "GET", + # 400, + # 1, + # 0, + # remote_service="AWS::DynamoDB", + # remote_operation="PutItem", + # remote_resource_type="AWS::DynamoDB::Table", + # remote_resource_identifier="invalid_table", + # request_specific_attributes={ + # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], + # }, + # span_name="DynamoDB.PutItem", + # ) + # + # def test_dynamodb_fault(self): + # self.do_test_requests( + # "ddb/fault", + # "GET", + # 500, + # 0, + # 1, + # remote_service="AWS::DynamoDB", + # remote_operation="PutItem", + # remote_resource_type="AWS::DynamoDB::Table", + # remote_resource_identifier="invalid_table", + # request_specific_attributes={ + # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], + # }, + # span_name="DynamoDB.PutItem", + # ) + # + # def test_sqs_create_queue(self): + # self.do_test_requests( + # "sqs/createqueue/some-queue", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::SQS", + # remote_operation="CreateQueue", + # remote_resource_type="AWS::SQS::Queue", + # remote_resource_identifier="test_queue", + # request_specific_attributes={ + # _AWS_QUEUE_NAME: "test_queue", + # }, + # span_name="SQS.CreateQueue", + # ) + # + # def test_sqs_send_message(self): + # self.do_test_requests( + # "sqs/publishqueue/some-queue", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::SQS", + # remote_operation="SendMessage", + # remote_resource_type="AWS::SQS::Queue", + # remote_resource_identifier="test_put_get_queue", + # request_specific_attributes={ + # _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + # }, + # span_name="SQS.SendMessage", + # ) + # + # def test_sqs_receive_message(self): + # self.do_test_requests( + # "sqs/consumequeue/some-queue", + # "GET", + # 200, + # 0, + # 0, + # remote_service="AWS::SQS", + # remote_operation="ReceiveMessage", + # remote_resource_type="AWS::SQS::Queue", + # remote_resource_identifier="test_put_get_queue", + # request_specific_attributes={ + # _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + # }, + # span_name="SQS.ReceiveMessage", + # ) + # + # def test_sqs_error(self): + # self.do_test_requests( + # "sqs/error", + # "GET", + # 400, + # 1, + # 0, + # remote_service="AWS::SQS", + # remote_operation="SendMessage", + # remote_resource_type="AWS::SQS::Queue", + # remote_resource_identifier="sqserror", + # request_specific_attributes={ + # _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", + # }, + # span_name="SQS.SendMessage", + # ) + # + # def test_sqs_fault(self): + # self.do_test_requests( + # "sqs/fault", + # "GET", + # 500, + # 0, + # 1, + # remote_service="AWS::SQS", + # remote_operation="CreateQueue", + # remote_resource_type="AWS::SQS::Queue", + # remote_resource_identifier="invalid_test", + # request_specific_attributes={ + # _AWS_QUEUE_NAME: "invalid_test", + # }, + # span_name="SQS.CreateQueue", + # ) - def test_sqs_send_message(self): - self.do_test_requests( - "sqs/publishqueue/some-queue", - "GET", - 200, - 0, - 0, - remote_service="AWS::SQS", - remote_operation="SendMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_put_get_queue", - request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", - }, - span_name="SQS.SendMessage", - ) - - def test_sqs_receive_message(self): + def test_kinesis_put_record(self): self.do_test_requests( - "sqs/consumequeue/some-queue", + "kinesis/putrecord/my-stream", "GET", 200, 0, 0, - remote_service="AWS::SQS", - remote_operation="ReceiveMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="test_put_get_queue", - request_specific_attributes={ - _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", - }, - span_name="SQS.ReceiveMessage", - ) - - def test_sqs_error(self): - self.do_test_requests( - "sqs/error", - "GET", - 400, - 1, - 0, - remote_service="AWS::SQS", - remote_operation="SendMessage", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="sqserror", - request_specific_attributes={ - _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", - }, - span_name="SQS.SendMessage", - ) - - def test_sqs_fault(self): - self.do_test_requests( - "sqs/fault", - "GET", - 500, - 0, - 1, - remote_service="AWS::SQS", - remote_operation="CreateQueue", - remote_resource_type="AWS::SQS::Queue", - remote_resource_identifier="invalid_test", + remote_service="AWS::Kinesis", + remote_operation="PutRecord", + remote_resource_type="AWS::Kinesis::Stream", + remote_resource_identifier="test_stream", request_specific_attributes={ - _AWS_QUEUE_NAME: "invalid_test", + _AWS_STREAM_NAME: "test_stream", }, - span_name="SQS.CreateQueue", + span_name="Kinesis.PutRecord", ) - def test_kinesis_put_record(self): + def test_kinesis_describe_stream_consumer(self): self.do_test_requests( - "kinesis/putrecord/my-stream", + "kinesis/describestreamconsumer/my-consumer", "GET", 200, 0, 0, remote_service="AWS::Kinesis", - remote_operation="PutRecord", - remote_resource_type="AWS::Kinesis::Stream", - remote_resource_identifier="test_stream", + remote_operation="DescribeStreamConsumer", + remote_resource_type="AWS::Kinesis::StreamConsumer", + remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:\d{10}", request_specific_attributes={ - _AWS_STREAM_NAME: "test_stream", + _AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:\d{10}", }, - span_name="Kinesis.PutRecord", + span_name="Kinesis.DescribeStreamConsumer", + span_length = 2, ) def test_kinesis_error(self): @@ -379,9 +398,11 @@ def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSp if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: target_spans.append(resource_scope_span.span) - self.assertEqual(len(target_spans), 1) + span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 + + self.assertEqual(len(target_spans), span_length) self._assert_aws_attributes( - target_spans[0].attributes, + target_spans[span_length-1].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), "LOCAL_ROOT", @@ -408,7 +429,10 @@ def _assert_aws_attributes( if remote_resource_type != "None": self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type) if remote_resource_identifier != "None": - self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + if self._is_valid_regex(remote_resource_identifier): + self._assert_match_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + else: + self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) # See comment above AWS_LOCAL_OPERATION self._assert_str_attribute(attributes_dict, AWS_SPAN_KIND, span_kind) @@ -422,10 +446,12 @@ def _assert_semantic_conventions_span_attributes( if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: target_spans.append(resource_scope_span.span) - self.assertEqual(len(target_spans), 1) - self.assertEqual(target_spans[0].name, kwargs.get("span_name")) + span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 + + self.assertEqual(len(target_spans), span_length) + self.assertEqual(target_spans[span_length-1].name, kwargs.get("span_name")) self._assert_semantic_conventions_attributes( - target_spans[0].attributes, + target_spans[span_length-1].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), status_code, @@ -449,7 +475,9 @@ def _assert_semantic_conventions_attributes( # TODO: botocore instrumentation is not respecting PEER_SERVICE # self._assert_str_attribute(attributes_dict, SpanAttributes.PEER_SERVICE, "backend:8080") for key, value in request_specific_attributes.items(): - if isinstance(value, str): + if self._is_valid_regex(value): + self._assert_match_attribute(attributes_dict, key, value) + elif isinstance(value, str): self._assert_str_attribute(attributes_dict, key, value) elif isinstance(value, int): self._assert_int_attribute(attributes_dict, key, value) @@ -468,9 +496,9 @@ def _assert_metric_attributes( for resource_scope_metric in resource_scope_metrics: if resource_scope_metric.metric.name.lower() == metric_name.lower(): target_metrics.append(resource_scope_metric.metric) - - self.assertEqual(len(target_metrics), 1) - target_metric: Metric = target_metrics[0] + span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 + self.assertEqual(len(target_metrics), span_length) + target_metric: Metric = target_metrics[span_length-1] dp_list: List[ExponentialHistogramDataPoint] = target_metric.exponential_histogram.data_points dp_list_count: int = kwargs.get("dp_count", 2) self.assertEqual(len(dp_list), dp_list_count) @@ -491,7 +519,10 @@ def _assert_metric_attributes( if remote_resource_type != "None": self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type) if remote_resource_identifier != "None": - self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + if self._is_valid_regex(remote_resource_identifier): + self._assert_match_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + else: + self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) self.check_sum(metric_name, dependency_dp.sum, expected_sum) attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(service_dp.attributes) From 8dd74b9ce9f563c97e502b89a11eb051c32dab60 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 10:37:08 -0700 Subject: [PATCH 02/13] lint --- .../distro/_aws_metric_attribute_generator.py | 2 +- .../distro/test_aws_metric_attribute_generator.py | 14 +++++++++++--- .../applications/botocore/botocore_server.py | 9 ++++++--- .../tests/test/amazon/base/contract_test_base.py | 2 +- .../tests/test/amazon/botocore/botocore_test.py | 14 ++++++++------ 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 784a73eee..e08d1bb20 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -6,6 +6,7 @@ from urllib.parse import ParseResult, urlparse from amazon.opentelemetry.distro._aws_attribute_keys import ( + AWS_CONSUMER_ARN, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, AWS_QUEUE_NAME, @@ -16,7 +17,6 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, - AWS_CONSUMER_ARN, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( LOCAL_ROOT, diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index eb8b94b71..75bdfad10 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -8,6 +8,7 @@ from unittest.mock import MagicMock from amazon.opentelemetry.distro._aws_attribute_keys import ( + AWS_CONSUMER_ARN, AWS_CONSUMER_PARENT_SPAN_KIND, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, @@ -19,7 +20,6 @@ AWS_REMOTE_SERVICE, AWS_SPAN_KIND, AWS_STREAM_NAME, - AWS_CONSUMER_ARN, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator from amazon.opentelemetry.distro.metric_attribute_generator import DEPENDENCY_METRIC, SERVICE_METRIC @@ -952,8 +952,16 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._mock_attribute([AWS_STREAM_NAME], [None]) # Validate behaviour of AWS_CONSUMER_ARN attribute, then remove it. - self._mock_attribute([AWS_CONSUMER_ARN], ["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"], keys, values) - self._validate_remote_resource_attributes("AWS::Kinesis::StreamConsumer", "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789") + self._mock_attribute( + [AWS_CONSUMER_ARN], + ["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"], + keys, + values, + ) + self._validate_remote_resource_attributes( + "AWS::Kinesis::StreamConsumer", + "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789", + ) self._mock_attribute([AWS_CONSUMER_ARN], [None]) # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 5a93c0e24..9d8ea34e8 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -4,9 +4,9 @@ import os import tempfile from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from logging import INFO, Logger, getLogger from threading import Thread -from logging import INFO, Logger, getLogger import boto3 import requests from botocore.client import BaseClient @@ -29,6 +29,7 @@ _logger: Logger = getLogger(__name__) _logger.setLevel(INFO) + # pylint: disable=broad-exception-caught class RequestHandler(BaseHTTPRequestHandler): main_status: int = 200 @@ -205,8 +206,10 @@ def _handle_kinesis_request(self) -> None: kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") elif self.in_path("describestreamconsumer/my-consumer"): set_main_status(200) - response = kinesis_client.register_stream_consumer(StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer") - consumer_arn = response['Consumer']['ConsumerARN'] + response = kinesis_client.register_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer" + ) + consumer_arn = response["Consumer"]["ConsumerARN"] kinesis_client.describe_stream_consumer(ConsumerARN=consumer_arn) else: set_main_status(404) diff --git a/contract-tests/tests/test/amazon/base/contract_test_base.py b/contract-tests/tests/test/amazon/base/contract_test_base.py index 91fe45df8..df270c242 100644 --- a/contract-tests/tests/test/amazon/base/contract_test_base.py +++ b/contract-tests/tests/test/amazon/base/contract_test_base.py @@ -230,4 +230,4 @@ def _is_valid_regex(self, pattern: str) -> bool: re.compile(pattern) return True except re.error: - return False \ No newline at end of file + return False diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index 3946ceb30..f2c7f740e 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -353,7 +353,7 @@ def test_kinesis_describe_stream_consumer(self): _AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:\d{10}", }, span_name="Kinesis.DescribeStreamConsumer", - span_length = 2, + span_length=2, ) def test_kinesis_error(self): @@ -402,7 +402,7 @@ def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSp self.assertEqual(len(target_spans), span_length) self._assert_aws_attributes( - target_spans[span_length-1].attributes, + target_spans[span_length - 1].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), "LOCAL_ROOT", @@ -430,7 +430,9 @@ def _assert_aws_attributes( self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type) if remote_resource_identifier != "None": if self._is_valid_regex(remote_resource_identifier): - self._assert_match_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + self._assert_match_attribute( + attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier + ) else: self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) # See comment above AWS_LOCAL_OPERATION @@ -449,9 +451,9 @@ def _assert_semantic_conventions_span_attributes( span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 self.assertEqual(len(target_spans), span_length) - self.assertEqual(target_spans[span_length-1].name, kwargs.get("span_name")) + self.assertEqual(target_spans[span_length - 1].name, kwargs.get("span_name")) self._assert_semantic_conventions_attributes( - target_spans[span_length-1].attributes, + target_spans[span_length - 1].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), status_code, @@ -498,7 +500,7 @@ def _assert_metric_attributes( target_metrics.append(resource_scope_metric.metric) span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 self.assertEqual(len(target_metrics), span_length) - target_metric: Metric = target_metrics[span_length-1] + target_metric: Metric = target_metrics[span_length - 1] dp_list: List[ExponentialHistogramDataPoint] = target_metric.exponential_histogram.data_points dp_list_count: int = kwargs.get("dp_count", 2) self.assertEqual(len(dp_list), dp_list_count) From 5ec390ef4609728e563aca3d7307955892d99357 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 10:51:54 -0700 Subject: [PATCH 03/13] Fix UT. --- .../opentelemetry/distro/test_instrumentation_patch.py | 2 +- contract-tests/tests/test/amazon/botocore/botocore_test.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index cbf337806..b38f0405c 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -102,7 +102,7 @@ def _validate_patched_botocore_instrumentation(self): def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" - params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerArn": _CONSUMER_ARN} + params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerARN": _CONSUMER_ARN} return _do_extract_attributes(service_name, params) diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index f2c7f740e..cb44cd157 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -348,9 +348,11 @@ def test_kinesis_describe_stream_consumer(self): remote_service="AWS::Kinesis", remote_operation="DescribeStreamConsumer", remote_resource_type="AWS::Kinesis::StreamConsumer", - remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:\d{10}", + remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:" + r"stream/test_stream/consumer/test_consumer:\d{10}", request_specific_attributes={ - _AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:\d{10}", + _AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:" + r"stream/test_stream/consumer/test_consumer:\d{10}", }, span_name="Kinesis.DescribeStreamConsumer", span_length=2, From 2a08e025a1bf198499ebf3695db163897c5a7636 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 11:10:20 -0700 Subject: [PATCH 04/13] Fix lint. --- contract-tests/tests/test/amazon/botocore/botocore_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index cb44cd157..79cb37ee0 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -489,6 +489,7 @@ def _assert_semantic_conventions_attributes( self._assert_array_value_ddb_table_name(attributes_dict, key, value) @override + # pylint: disable=too-many-locals def _assert_metric_attributes( self, resource_scope_metrics: List[ResourceScopeMetric], From d636f491066ef8182e13d5ddf2d39f1f8352bd72 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 15:45:07 -0700 Subject: [PATCH 05/13] Fix contract test. --- .../applications/botocore/botocore_server.py | 23 +- .../test/amazon/botocore/botocore_test.py | 495 +++++++++--------- 2 files changed, 260 insertions(+), 258 deletions(-) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 9d8ea34e8..db4723dba 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -6,6 +6,7 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from logging import INFO, Logger, getLogger from threading import Thread +from typing import Optional, List import boto3 import requests @@ -34,6 +35,10 @@ class RequestHandler(BaseHTTPRequestHandler): main_status: int = 200 + def __init__(self, request, client_address, server, *args, consumer_arn=None, **kwargs): + self.consumer_arn = consumer_arn + super().__init__(request, client_address, server, *args, **kwargs) + @override # pylint: disable=invalid-name def do_GET(self): @@ -206,11 +211,7 @@ def _handle_kinesis_request(self) -> None: kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") elif self.in_path("describestreamconsumer/my-consumer"): set_main_status(200) - response = kinesis_client.register_stream_consumer( - StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer" - ) - consumer_arn = response["Consumer"]["ConsumerARN"] - kinesis_client.describe_stream_consumer(ConsumerARN=consumer_arn) + kinesis_client.describe_stream_consumer(ConsumerARN=self.consumer_arn) else: set_main_status(404) @@ -223,7 +224,7 @@ def set_main_status(status: int) -> None: RequestHandler.main_status = status -def prepare_aws_server() -> None: +def prepare_aws_server() -> Optional[List[str]]: requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") try: # Set up S3 so tests can access buckets and retrieve a file. @@ -258,15 +259,21 @@ def prepare_aws_server() -> None: # Set up Kinesis so tests can access a stream. kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) + response = kinesis_client.register_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer" + ) + consumer_arn = response["Consumer"]["ConsumerARN"] + return [consumer_arn] except Exception as exception: print("Unexpected exception occurred", exception) + return [None] def main() -> None: - prepare_aws_server() + [consumer_arn] = prepare_aws_server() server_address: tuple[str, int] = ("0.0.0.0", _PORT) request_handler_class: type = RequestHandler - requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, request_handler_class) + requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs)) atexit.register(requests_server.shutdown) server_thread: Thread = Thread(target=requests_server.serve_forever) server_thread.start() diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index 79cb37ee0..54ce637a6 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -83,243 +83,243 @@ def tear_down_dependency_container(cls): _logger.info(cls._local_stack.get_logs()[1].decode()) cls._local_stack.stop() - # def test_s3_create_bucket(self): - # self.do_test_requests( - # "s3/createbucket/create-bucket", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::S3", - # remote_operation="CreateBucket", - # remote_resource_type="AWS::S3::Bucket", - # remote_resource_identifier="test-bucket-name", - # request_specific_attributes={ - # SpanAttributes.AWS_S3_BUCKET: "test-bucket-name", - # }, - # span_name="S3.CreateBucket", - # ) - # - # def test_s3_create_object(self): - # self.do_test_requests( - # "s3/createobject/put-object/some-object", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::S3", - # remote_operation="PutObject", - # remote_resource_type="AWS::S3::Bucket", - # remote_resource_identifier="test-put-object-bucket-name", - # request_specific_attributes={ - # SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name", - # }, - # span_name="S3.PutObject", - # ) - # - # def test_s3_get_object(self): - # self.do_test_requests( - # "s3/getobject/get-object/some-object", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::S3", - # remote_operation="GetObject", - # remote_resource_type="AWS::S3::Bucket", - # remote_resource_identifier="test-get-object-bucket-name", - # request_specific_attributes={ - # SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name", - # }, - # span_name="S3.GetObject", - # ) - # - # def test_s3_error(self): - # self.do_test_requests( - # "s3/error", - # "GET", - # 400, - # 1, - # 0, - # remote_service="AWS::S3", - # remote_operation="CreateBucket", - # remote_resource_type="AWS::S3::Bucket", - # remote_resource_identifier="-", - # request_specific_attributes={ - # SpanAttributes.AWS_S3_BUCKET: "-", - # }, - # span_name="S3.CreateBucket", - # ) - # - # def test_s3_fault(self): - # self.do_test_requests( - # "s3/fault", - # "GET", - # 500, - # 0, - # 1, - # remote_service="AWS::S3", - # remote_operation="CreateBucket", - # remote_resource_type="AWS::S3::Bucket", - # remote_resource_identifier="valid-bucket-name", - # request_specific_attributes={ - # SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name", - # }, - # span_name="S3.CreateBucket", - # ) - # - # def test_dynamodb_create_table(self): - # self.do_test_requests( - # "ddb/createtable/some-table", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::DynamoDB", - # remote_operation="CreateTable", - # remote_resource_type="AWS::DynamoDB::Table", - # remote_resource_identifier="test_table", - # request_specific_attributes={ - # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"], - # }, - # span_name="DynamoDB.CreateTable", - # ) - # - # def test_dynamodb_put_item(self): - # self.do_test_requests( - # "ddb/putitem/putitem-table/key", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::DynamoDB", - # remote_operation="PutItem", - # remote_resource_type="AWS::DynamoDB::Table", - # remote_resource_identifier="put_test_table", - # request_specific_attributes={ - # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"], - # }, - # span_name="DynamoDB.PutItem", - # ) - # - # def test_dynamodb_error(self): - # self.do_test_requests( - # "ddb/error", - # "GET", - # 400, - # 1, - # 0, - # remote_service="AWS::DynamoDB", - # remote_operation="PutItem", - # remote_resource_type="AWS::DynamoDB::Table", - # remote_resource_identifier="invalid_table", - # request_specific_attributes={ - # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - # }, - # span_name="DynamoDB.PutItem", - # ) - # - # def test_dynamodb_fault(self): - # self.do_test_requests( - # "ddb/fault", - # "GET", - # 500, - # 0, - # 1, - # remote_service="AWS::DynamoDB", - # remote_operation="PutItem", - # remote_resource_type="AWS::DynamoDB::Table", - # remote_resource_identifier="invalid_table", - # request_specific_attributes={ - # SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], - # }, - # span_name="DynamoDB.PutItem", - # ) - # - # def test_sqs_create_queue(self): - # self.do_test_requests( - # "sqs/createqueue/some-queue", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::SQS", - # remote_operation="CreateQueue", - # remote_resource_type="AWS::SQS::Queue", - # remote_resource_identifier="test_queue", - # request_specific_attributes={ - # _AWS_QUEUE_NAME: "test_queue", - # }, - # span_name="SQS.CreateQueue", - # ) - # - # def test_sqs_send_message(self): - # self.do_test_requests( - # "sqs/publishqueue/some-queue", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::SQS", - # remote_operation="SendMessage", - # remote_resource_type="AWS::SQS::Queue", - # remote_resource_identifier="test_put_get_queue", - # request_specific_attributes={ - # _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", - # }, - # span_name="SQS.SendMessage", - # ) - # - # def test_sqs_receive_message(self): - # self.do_test_requests( - # "sqs/consumequeue/some-queue", - # "GET", - # 200, - # 0, - # 0, - # remote_service="AWS::SQS", - # remote_operation="ReceiveMessage", - # remote_resource_type="AWS::SQS::Queue", - # remote_resource_identifier="test_put_get_queue", - # request_specific_attributes={ - # _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", - # }, - # span_name="SQS.ReceiveMessage", - # ) - # - # def test_sqs_error(self): - # self.do_test_requests( - # "sqs/error", - # "GET", - # 400, - # 1, - # 0, - # remote_service="AWS::SQS", - # remote_operation="SendMessage", - # remote_resource_type="AWS::SQS::Queue", - # remote_resource_identifier="sqserror", - # request_specific_attributes={ - # _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", - # }, - # span_name="SQS.SendMessage", - # ) - # - # def test_sqs_fault(self): - # self.do_test_requests( - # "sqs/fault", - # "GET", - # 500, - # 0, - # 1, - # remote_service="AWS::SQS", - # remote_operation="CreateQueue", - # remote_resource_type="AWS::SQS::Queue", - # remote_resource_identifier="invalid_test", - # request_specific_attributes={ - # _AWS_QUEUE_NAME: "invalid_test", - # }, - # span_name="SQS.CreateQueue", - # ) + def test_s3_create_bucket(self): + self.do_test_requests( + "s3/createbucket/create-bucket", + "GET", + 200, + 0, + 0, + remote_service="AWS::S3", + remote_operation="CreateBucket", + remote_resource_type="AWS::S3::Bucket", + remote_resource_identifier="test-bucket-name", + request_specific_attributes={ + SpanAttributes.AWS_S3_BUCKET: "test-bucket-name", + }, + span_name="S3.CreateBucket", + ) + + def test_s3_create_object(self): + self.do_test_requests( + "s3/createobject/put-object/some-object", + "GET", + 200, + 0, + 0, + remote_service="AWS::S3", + remote_operation="PutObject", + remote_resource_type="AWS::S3::Bucket", + remote_resource_identifier="test-put-object-bucket-name", + request_specific_attributes={ + SpanAttributes.AWS_S3_BUCKET: "test-put-object-bucket-name", + }, + span_name="S3.PutObject", + ) + + def test_s3_get_object(self): + self.do_test_requests( + "s3/getobject/get-object/some-object", + "GET", + 200, + 0, + 0, + remote_service="AWS::S3", + remote_operation="GetObject", + remote_resource_type="AWS::S3::Bucket", + remote_resource_identifier="test-get-object-bucket-name", + request_specific_attributes={ + SpanAttributes.AWS_S3_BUCKET: "test-get-object-bucket-name", + }, + span_name="S3.GetObject", + ) + + def test_s3_error(self): + self.do_test_requests( + "s3/error", + "GET", + 400, + 1, + 0, + remote_service="AWS::S3", + remote_operation="CreateBucket", + remote_resource_type="AWS::S3::Bucket", + remote_resource_identifier="-", + request_specific_attributes={ + SpanAttributes.AWS_S3_BUCKET: "-", + }, + span_name="S3.CreateBucket", + ) + + def test_s3_fault(self): + self.do_test_requests( + "s3/fault", + "GET", + 500, + 0, + 1, + remote_service="AWS::S3", + remote_operation="CreateBucket", + remote_resource_type="AWS::S3::Bucket", + remote_resource_identifier="valid-bucket-name", + request_specific_attributes={ + SpanAttributes.AWS_S3_BUCKET: "valid-bucket-name", + }, + span_name="S3.CreateBucket", + ) + + def test_dynamodb_create_table(self): + self.do_test_requests( + "ddb/createtable/some-table", + "GET", + 200, + 0, + 0, + remote_service="AWS::DynamoDB", + remote_operation="CreateTable", + remote_resource_type="AWS::DynamoDB::Table", + remote_resource_identifier="test_table", + request_specific_attributes={ + SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["test_table"], + }, + span_name="DynamoDB.CreateTable", + ) + + def test_dynamodb_put_item(self): + self.do_test_requests( + "ddb/putitem/putitem-table/key", + "GET", + 200, + 0, + 0, + remote_service="AWS::DynamoDB", + remote_operation="PutItem", + remote_resource_type="AWS::DynamoDB::Table", + remote_resource_identifier="put_test_table", + request_specific_attributes={ + SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["put_test_table"], + }, + span_name="DynamoDB.PutItem", + ) + + def test_dynamodb_error(self): + self.do_test_requests( + "ddb/error", + "GET", + 400, + 1, + 0, + remote_service="AWS::DynamoDB", + remote_operation="PutItem", + remote_resource_type="AWS::DynamoDB::Table", + remote_resource_identifier="invalid_table", + request_specific_attributes={ + SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], + }, + span_name="DynamoDB.PutItem", + ) + + def test_dynamodb_fault(self): + self.do_test_requests( + "ddb/fault", + "GET", + 500, + 0, + 1, + remote_service="AWS::DynamoDB", + remote_operation="PutItem", + remote_resource_type="AWS::DynamoDB::Table", + remote_resource_identifier="invalid_table", + request_specific_attributes={ + SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: ["invalid_table"], + }, + span_name="DynamoDB.PutItem", + ) + + def test_sqs_create_queue(self): + self.do_test_requests( + "sqs/createqueue/some-queue", + "GET", + 200, + 0, + 0, + remote_service="AWS::SQS", + remote_operation="CreateQueue", + remote_resource_type="AWS::SQS::Queue", + remote_resource_identifier="test_queue", + request_specific_attributes={ + _AWS_QUEUE_NAME: "test_queue", + }, + span_name="SQS.CreateQueue", + ) + + def test_sqs_send_message(self): + self.do_test_requests( + "sqs/publishqueue/some-queue", + "GET", + 200, + 0, + 0, + remote_service="AWS::SQS", + remote_operation="SendMessage", + remote_resource_type="AWS::SQS::Queue", + remote_resource_identifier="test_put_get_queue", + request_specific_attributes={ + _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + }, + span_name="SQS.SendMessage", + ) + + def test_sqs_receive_message(self): + self.do_test_requests( + "sqs/consumequeue/some-queue", + "GET", + 200, + 0, + 0, + remote_service="AWS::SQS", + remote_operation="ReceiveMessage", + remote_resource_type="AWS::SQS::Queue", + remote_resource_identifier="test_put_get_queue", + request_specific_attributes={ + _AWS_QUEUE_URL: "http://localstack:4566/000000000000/test_put_get_queue", + }, + span_name="SQS.ReceiveMessage", + ) + + def test_sqs_error(self): + self.do_test_requests( + "sqs/error", + "GET", + 400, + 1, + 0, + remote_service="AWS::SQS", + remote_operation="SendMessage", + remote_resource_type="AWS::SQS::Queue", + remote_resource_identifier="sqserror", + request_specific_attributes={ + _AWS_QUEUE_URL: "http://error.test:8080/000000000000/sqserror", + }, + span_name="SQS.SendMessage", + ) + + def test_sqs_fault(self): + self.do_test_requests( + "sqs/fault", + "GET", + 500, + 0, + 1, + remote_service="AWS::SQS", + remote_operation="CreateQueue", + remote_resource_type="AWS::SQS::Queue", + remote_resource_identifier="invalid_test", + request_specific_attributes={ + _AWS_QUEUE_NAME: "invalid_test", + }, + span_name="SQS.CreateQueue", + ) def test_kinesis_put_record(self): self.do_test_requests( @@ -355,7 +355,6 @@ def test_kinesis_describe_stream_consumer(self): r"stream/test_stream/consumer/test_consumer:\d{10}", }, span_name="Kinesis.DescribeStreamConsumer", - span_length=2, ) def test_kinesis_error(self): @@ -400,11 +399,9 @@ def _assert_aws_span_attributes(self, resource_scope_spans: List[ResourceScopeSp if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: target_spans.append(resource_scope_span.span) - span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 - - self.assertEqual(len(target_spans), span_length) + self.assertEqual(len(target_spans), 1) self._assert_aws_attributes( - target_spans[span_length - 1].attributes, + target_spans[0].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), "LOCAL_ROOT", @@ -450,12 +447,10 @@ def _assert_semantic_conventions_span_attributes( if resource_scope_span.span.kind == Span.SPAN_KIND_CLIENT: target_spans.append(resource_scope_span.span) - span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 - - self.assertEqual(len(target_spans), span_length) - self.assertEqual(target_spans[span_length - 1].name, kwargs.get("span_name")) + self.assertEqual(len(target_spans), 1) + self.assertEqual(target_spans[0].name, kwargs.get("span_name")) self._assert_semantic_conventions_attributes( - target_spans[span_length - 1].attributes, + target_spans[0].attributes, kwargs.get("remote_service"), kwargs.get("remote_operation"), status_code, @@ -501,9 +496,9 @@ def _assert_metric_attributes( for resource_scope_metric in resource_scope_metrics: if resource_scope_metric.metric.name.lower() == metric_name.lower(): target_metrics.append(resource_scope_metric.metric) - span_length = kwargs.get("span_length") if kwargs.get("span_length") else 1 - self.assertEqual(len(target_metrics), span_length) - target_metric: Metric = target_metrics[span_length - 1] + + self.assertEqual(len(target_metrics), 1) + target_metric: Metric = target_metrics[0] dp_list: List[ExponentialHistogramDataPoint] = target_metric.exponential_histogram.data_points dp_list_count: int = kwargs.get("dp_count", 2) self.assertEqual(len(dp_list), dp_list_count) From fd73ad21b97827b3838344315384d9a086eb94f9 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Mon, 3 Jun 2024 17:19:08 -0700 Subject: [PATCH 06/13] Fix contract tests. --- .../applications/botocore/botocore_server.py | 68 +++++++++++++------ .../test/amazon/base/contract_test_base.py | 3 + 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index db4723dba..a1d843a64 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -6,7 +6,7 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from logging import INFO, Logger, getLogger from threading import Thread -from typing import Optional, List +from typing import List, Optional import boto3 import requests @@ -224,17 +224,24 @@ def set_main_status(status: int) -> None: RequestHandler.main_status = status +# pylint: disable=too-many-locals def prepare_aws_server() -> Optional[List[str]]: requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") try: # Set up S3 so tests can access buckets and retrieve a file. s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION) - s3_client.create_bucket( - Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} - ) - s3_client.create_bucket( - Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} - ) + bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]] + put_bucket_name: str = "test-put-object-bucket-name" + if put_bucket_name not in bucket_names: + s3_client.create_bucket( + Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} + ) + + get_bucket_name: str = "test-get-object-bucket-name" + if get_bucket_name not in bucket_names: + s3_client.create_bucket( + Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} + ) with tempfile.NamedTemporaryFile(delete=True) as temp_file: temp_file_name: str = temp_file.name temp_file.write(b"This is temp file for S3 upload") @@ -243,26 +250,41 @@ def prepare_aws_server() -> Optional[List[str]]: # Set up DDB so tests can access a table. ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - ddb_client.create_table( - TableName="put_test_table", - KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], - AttributeDefinitions=[ - {"AttributeName": "id", "AttributeType": "S"}, - ], - BillingMode="PAY_PER_REQUEST", - ) + table_names: List[str] = ddb_client.list_tables()["TableNames"] + + table_name: str = "put_test_table" + if table_name not in table_names: + ddb_client.create_table( + TableName=table_name, + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + BillingMode="PAY_PER_REQUEST", + ) # Set up SQS so tests can access a queue. sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - sqs_client.create_queue(QueueName="test_put_get_queue") + queue_name: str = "test_put_get_queue" + queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name) + queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else [] + if not queues: + sqs_client.create_queue(QueueName=queue_name) # Set up Kinesis so tests can access a stream. kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) - kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) - response = kinesis_client.register_stream_consumer( - StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer" - ) - consumer_arn = response["Consumer"]["ConsumerARN"] + stream_name: str = "test_stream" + stream_response = kinesis_client.list_streams() + if not stream_response["StreamNames"]: + kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) + response = kinesis_client.register_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer" + ) + consumer_arn: str = response["Consumer"]["ConsumerARN"] + else: + response = kinesis_client.list_stream_consumers( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream" + ) + consumer_arn: str = response.get("Consumers", [])[0]["ConsumerARN"] + return [consumer_arn] except Exception as exception: print("Unexpected exception occurred", exception) @@ -273,7 +295,9 @@ def main() -> None: [consumer_arn] = prepare_aws_server() server_address: tuple[str, int] = ("0.0.0.0", _PORT) request_handler_class: type = RequestHandler - requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs)) + requests_server: ThreadingHTTPServer = ThreadingHTTPServer( + server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs) + ) atexit.register(requests_server.shutdown) server_thread: Thread = Thread(target=requests_server.serve_forever) server_thread.start() diff --git a/contract-tests/tests/test/amazon/base/contract_test_base.py b/contract-tests/tests/test/amazon/base/contract_test_base.py index df270c242..1c1555713 100644 --- a/contract-tests/tests/test/amazon/base/contract_test_base.py +++ b/contract-tests/tests/test/amazon/base/contract_test_base.py @@ -226,6 +226,9 @@ def _assert_metric_attributes( self.fail("Tests must implement this function") def _is_valid_regex(self, pattern: str) -> bool: + if not isinstance(pattern, str): + return False + try: re.compile(pattern) return True From d9ce47ba28126c416b76d4980b7c124619de5219 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Tue, 4 Jun 2024 09:38:55 -0700 Subject: [PATCH 07/13] Remove log. --- .../images/applications/botocore/botocore_server.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index a1d843a64..7d5140ce7 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -4,7 +4,6 @@ import os import tempfile from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from logging import INFO, Logger, getLogger from threading import Thread from typing import List, Optional @@ -27,9 +26,6 @@ os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testcontainers-localstack") _NO_RETRY_CONFIG: Config = Config(retries={"max_attempts": 0}, connect_timeout=3, read_timeout=3) -_logger: Logger = getLogger(__name__) -_logger.setLevel(INFO) - # pylint: disable=broad-exception-caught class RequestHandler(BaseHTTPRequestHandler): From fb0c647964730da2e17cad59a65e54c8504d981b Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Wed, 5 Jun 2024 10:04:52 -0700 Subject: [PATCH 08/13] Address comments. --- .../distro/_aws_attribute_keys.py | 2 +- .../distro/_aws_metric_attribute_generator.py | 6 +++--- .../distro/patches/_botocore_patches.py | 2 +- .../test_aws_metric_attribute_generator.py | 21 +++++++++++++++---- .../distro/test_instrumentation_patch.py | 4 ++-- .../applications/botocore/botocore_server.py | 2 ++ .../test/amazon/botocore/botocore_test.py | 4 ++-- 7 files changed, 28 insertions(+), 13 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index 4b722d91c..055b837a3 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -16,4 +16,4 @@ AWS_QUEUE_URL: str = "aws.sqs.queue_url" AWS_QUEUE_NAME: str = "aws.sqs.queue_name" AWS_STREAM_NAME: str = "aws.kinesis.stream_name" -AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn" +AWS_STREAM_CONSUMER_ARN: str = "aws.kinesis.stream_consumer_arn" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index e08d1bb20..8b2d27b76 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -6,7 +6,6 @@ from urllib.parse import ParseResult, urlparse from amazon.opentelemetry.distro._aws_attribute_keys import ( - AWS_CONSUMER_ARN, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, AWS_QUEUE_NAME, @@ -16,6 +15,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, + AWS_STREAM_CONSUMER_ARN, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( @@ -362,9 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri elif is_key_present(span, AWS_STREAM_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream" remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME)) - elif is_key_present(span, AWS_CONSUMER_ARN): + elif is_key_present(span, AWS_STREAM_CONSUMER_ARN): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer" - remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_CONSUMER_ARN)) + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_ARN)) elif is_key_present(span, _AWS_BUCKET_NAME): remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index 16af472dc..df9f991d1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -96,4 +96,4 @@ def extract_attributes(self, attributes: _AttributeMapT): attributes["aws.kinesis.stream_name"] = stream_name consumer_arn = self._call_context.params.get("ConsumerARN") if consumer_arn: - attributes["aws.kinesis.consumer_arn"] = consumer_arn + attributes["aws.kinesis.stream_consumer_arn"] = consumer_arn diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 75bdfad10..40759b846 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -8,7 +8,6 @@ from unittest.mock import MagicMock from amazon.opentelemetry.distro._aws_attribute_keys import ( - AWS_CONSUMER_ARN, AWS_CONSUMER_PARENT_SPAN_KIND, AWS_LOCAL_OPERATION, AWS_LOCAL_SERVICE, @@ -19,6 +18,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, + AWS_STREAM_CONSUMER_ARN, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator @@ -951,9 +951,9 @@ def test_sdk_client_span_with_remote_resource_attributes(self): self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") self._mock_attribute([AWS_STREAM_NAME], [None]) - # Validate behaviour of AWS_CONSUMER_ARN attribute, then remove it. + # Validate behaviour of AWS_STREAM_CONSUMER_ARN attribute, then remove it. self._mock_attribute( - [AWS_CONSUMER_ARN], + [AWS_STREAM_CONSUMER_ARN], ["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"], keys, values, @@ -962,7 +962,20 @@ def test_sdk_client_span_with_remote_resource_attributes(self): "AWS::Kinesis::StreamConsumer", "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789", ) - self._mock_attribute([AWS_CONSUMER_ARN], [None]) + self._mock_attribute([AWS_STREAM_CONSUMER_ARN], [None]) + + # Validate both AWS_STREAM_NAME and AWS_STREAM_NAME present, then remove it. + self._mock_attribute( + [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_ARN], + [ + "aws_stream_name", + "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789", + ], + keys, + values, + ) + self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") + self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_ARN], [None, None]) # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index b38f0405c..8b4d1a872 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -81,8 +81,8 @@ def _validate_patched_botocore_instrumentation(self): kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes) self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME) - self.assertTrue("aws.kinesis.consumer_arn" in kinesis_attributes) - self.assertEqual(kinesis_attributes["aws.kinesis.consumer_arn"], _CONSUMER_ARN) + self.assertTrue("aws.kinesis.stream_consumer_arn" in kinesis_attributes) + self.assertEqual(kinesis_attributes["aws.kinesis.stream_consumer_arn"], _CONSUMER_ARN) # S3 self.assertTrue("s3" in _KNOWN_EXTENSIONS) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 7d5140ce7..1601dbfd7 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -206,6 +206,8 @@ def _handle_kinesis_request(self) -> None: set_main_status(200) kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") elif self.in_path("describestreamconsumer/my-consumer"): + if self.consumer_arn is None: + raise ValueError("Consumer ARN is None. Cannot describe stream consumer.") set_main_status(200) kinesis_client.describe_stream_consumer(ConsumerARN=self.consumer_arn) else: diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index 54ce637a6..b10e8a50d 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -29,7 +29,7 @@ _AWS_QUEUE_URL: str = "aws.sqs.queue_url" _AWS_QUEUE_NAME: str = "aws.sqs.queue_name" _AWS_STREAM_NAME: str = "aws.kinesis.stream_name" -_AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn" +_AWS_STREAM_CONSUMER_ARN: str = "aws.kinesis.stream_consumer_arn" # pylint: disable=too-many-public-methods @@ -351,7 +351,7 @@ def test_kinesis_describe_stream_consumer(self): remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:" r"stream/test_stream/consumer/test_consumer:\d{10}", request_specific_attributes={ - _AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:" + _AWS_STREAM_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:" r"stream/test_stream/consumer/test_consumer:\d{10}", }, span_name="Kinesis.DescribeStreamConsumer", From d6706490ecc456d89295a29e034c4ae55a8116af Mon Sep 17 00:00:00 2001 From: zzhlogin Date: Wed, 5 Jun 2024 10:10:30 -0700 Subject: [PATCH 09/13] fix typo --- .../opentelemetry/distro/test_aws_metric_attribute_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 40759b846..7b001a008 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -964,7 +964,7 @@ def test_sdk_client_span_with_remote_resource_attributes(self): ) self._mock_attribute([AWS_STREAM_CONSUMER_ARN], [None]) - # Validate both AWS_STREAM_NAME and AWS_STREAM_NAME present, then remove it. + # Validate both AWS_STREAM_NAME and AWS_STREAM_CONSUMER_ARN present, then remove it. self._mock_attribute( [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_ARN], [ From 846d43584aca263ebf20e16f7169777df27fe004 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Wed, 5 Jun 2024 17:30:36 -0700 Subject: [PATCH 10/13] Update readme. --- contract-tests/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/contract-tests/README.md b/contract-tests/README.md index 7e6ff49b2..f4f0002c2 100644 --- a/contract-tests/README.md +++ b/contract-tests/README.md @@ -28,6 +28,9 @@ The steps to add a new test for a library or framework are: * The test class should be created in `contract-tests/tests/amazon/`. * The test class should extend `contract_test_base.py` +Note: For botocore applications, when creating new resources in [prepare_aws_server()](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/166c4cb36da6634cb070df5a312a62f6b0136a9c/contract-tests/images/applications/botocore/botocore_server.py#L215), make sure to check if the resource already exist before creation. +This is because each test pull the "aws-application-signals-tests-botocore-app" image and start a new container running `prepare_aws_server()` once, only the first attempt can succeeds, all subsequent attempts will fail due to the resources already existing. + # How to run the tests locally? Pre-requirements: From dc468f3050e2c92fc45ab03f222164d0ff3dee35 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Tue, 25 Jun 2024 12:22:11 -0700 Subject: [PATCH 11/13] Change to use consumerName. --- .../distro/_aws_attribute_keys.py | 2 +- .../distro/_aws_metric_attribute_generator.py | 6 ++-- .../distro/patches/_botocore_patches.py | 6 ++-- .../test_aws_metric_attribute_generator.py | 16 +++++----- .../distro/test_instrumentation_patch.py | 8 ++--- .../applications/botocore/botocore_server.py | 32 ++++++------------- .../test/amazon/base/contract_test_base.py | 17 ---------- .../test/amazon/botocore/botocore_test.py | 24 ++++---------- 8 files changed, 34 insertions(+), 77 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py index 055b837a3..e3f711688 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py @@ -16,4 +16,4 @@ AWS_QUEUE_URL: str = "aws.sqs.queue_url" AWS_QUEUE_NAME: str = "aws.sqs.queue_name" AWS_STREAM_NAME: str = "aws.kinesis.stream_name" -AWS_STREAM_CONSUMER_ARN: str = "aws.kinesis.stream_consumer_arn" +AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py index 8b2d27b76..1e8f83338 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py @@ -15,7 +15,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, - AWS_STREAM_CONSUMER_ARN, + AWS_STREAM_CONSUMER_NAME, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_span_processing_util import ( @@ -362,9 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri elif is_key_present(span, AWS_STREAM_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream" remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME)) - elif is_key_present(span, AWS_STREAM_CONSUMER_ARN): + elif is_key_present(span, AWS_STREAM_CONSUMER_NAME): remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer" - remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_ARN)) + remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_NAME)) elif is_key_present(span, _AWS_BUCKET_NAME): remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket" remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index df9f991d1..072478c8b 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -94,6 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT): stream_name = self._call_context.params.get("StreamName") if stream_name: attributes["aws.kinesis.stream_name"] = stream_name - consumer_arn = self._call_context.params.get("ConsumerARN") - if consumer_arn: - attributes["aws.kinesis.stream_consumer_arn"] = consumer_arn + consumer_name = self._call_context.params.get("ConsumerName") + if consumer_name: + attributes["aws.stream.consumer_name"] = consumer_name diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py index 7b001a008..e961385fa 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py @@ -18,7 +18,7 @@ AWS_REMOTE_RESOURCE_TYPE, AWS_REMOTE_SERVICE, AWS_SPAN_KIND, - AWS_STREAM_CONSUMER_ARN, + AWS_STREAM_CONSUMER_NAME, AWS_STREAM_NAME, ) from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator @@ -953,29 +953,29 @@ def test_sdk_client_span_with_remote_resource_attributes(self): # Validate behaviour of AWS_STREAM_CONSUMER_ARN attribute, then remove it. self._mock_attribute( - [AWS_STREAM_CONSUMER_ARN], - ["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"], + [AWS_STREAM_CONSUMER_NAME], + ["aws_stream_consumer_name"], keys, values, ) self._validate_remote_resource_attributes( "AWS::Kinesis::StreamConsumer", - "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789", + "aws_stream_consumer_name", ) - self._mock_attribute([AWS_STREAM_CONSUMER_ARN], [None]) + self._mock_attribute([AWS_STREAM_CONSUMER_NAME], [None]) # Validate both AWS_STREAM_NAME and AWS_STREAM_CONSUMER_ARN present, then remove it. self._mock_attribute( - [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_ARN], + [AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], [ "aws_stream_name", - "arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789", + "aws_stream_consumer_name", ], keys, values, ) self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name") - self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_ARN], [None, None]) + self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], [None, None]) # Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it. self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 8b4d1a872..27424cd5f 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -11,7 +11,7 @@ from opentelemetry.semconv.trace import SpanAttributes _STREAM_NAME: str = "streamName" -_CONSUMER_ARN: str = "consumerArn" +_CONSUMER_NAME: str = "consumerName" _BUCKET_NAME: str = "bucketName" _QUEUE_NAME: str = "queueName" _QUEUE_URL: str = "queueUrl" @@ -81,8 +81,8 @@ def _validate_patched_botocore_instrumentation(self): kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes() self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes) self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME) - self.assertTrue("aws.kinesis.stream_consumer_arn" in kinesis_attributes) - self.assertEqual(kinesis_attributes["aws.kinesis.stream_consumer_arn"], _CONSUMER_ARN) + self.assertTrue("aws.stream.consumer_name" in kinesis_attributes) + self.assertEqual(kinesis_attributes["aws.stream.consumer_name"], _CONSUMER_NAME) # S3 self.assertTrue("s3" in _KNOWN_EXTENSIONS) @@ -102,7 +102,7 @@ def _validate_patched_botocore_instrumentation(self): def _do_extract_kinesis_attributes() -> Dict[str, str]: service_name: str = "kinesis" - params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerARN": _CONSUMER_ARN} + params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerName": _CONSUMER_NAME} return _do_extract_attributes(service_name, params) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index 1601dbfd7..d7b5ee31c 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -5,7 +5,7 @@ import tempfile from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from threading import Thread -from typing import List, Optional +from typing import List import boto3 import requests @@ -31,10 +31,6 @@ class RequestHandler(BaseHTTPRequestHandler): main_status: int = 200 - def __init__(self, request, client_address, server, *args, consumer_arn=None, **kwargs): - self.consumer_arn = consumer_arn - super().__init__(request, client_address, server, *args, **kwargs) - @override # pylint: disable=invalid-name def do_GET(self): @@ -206,10 +202,11 @@ def _handle_kinesis_request(self) -> None: set_main_status(200) kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") elif self.in_path("describestreamconsumer/my-consumer"): - if self.consumer_arn is None: - raise ValueError("Consumer ARN is None. Cannot describe stream consumer.") set_main_status(200) - kinesis_client.describe_stream_consumer(ConsumerARN=self.consumer_arn) + kinesis_client.describe_stream_consumer( + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/", + ConsumerName="test_consumer", + ) else: set_main_status(404) @@ -223,7 +220,7 @@ def set_main_status(status: int) -> None: # pylint: disable=too-many-locals -def prepare_aws_server() -> Optional[List[str]]: +def prepare_aws_server() -> None: requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") try: # Set up S3 so tests can access buckets and retrieve a file. @@ -273,29 +270,18 @@ def prepare_aws_server() -> Optional[List[str]]: stream_response = kinesis_client.list_streams() if not stream_response["StreamNames"]: kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) - response = kinesis_client.register_stream_consumer( + kinesis_client.register_stream_consumer( StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer" ) - consumer_arn: str = response["Consumer"]["ConsumerARN"] - else: - response = kinesis_client.list_stream_consumers( - StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream" - ) - consumer_arn: str = response.get("Consumers", [])[0]["ConsumerARN"] - - return [consumer_arn] except Exception as exception: print("Unexpected exception occurred", exception) - return [None] def main() -> None: - [consumer_arn] = prepare_aws_server() + prepare_aws_server() server_address: tuple[str, int] = ("0.0.0.0", _PORT) request_handler_class: type = RequestHandler - requests_server: ThreadingHTTPServer = ThreadingHTTPServer( - server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs) - ) + requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, request_handler_class) atexit.register(requests_server.shutdown) server_thread: Thread = Thread(target=requests_server.serve_forever) server_thread.start() diff --git a/contract-tests/tests/test/amazon/base/contract_test_base.py b/contract-tests/tests/test/amazon/base/contract_test_base.py index 1c1555713..8364bd830 100644 --- a/contract-tests/tests/test/amazon/base/contract_test_base.py +++ b/contract-tests/tests/test/amazon/base/contract_test_base.py @@ -1,6 +1,5 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -import re import time from logging import INFO, Logger, getLogger from typing import Dict, List @@ -169,12 +168,6 @@ def _assert_int_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, self.assertIsNotNone(actual_value) self.assertEqual(expected_value, actual_value.int_value) - def _assert_match_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, pattern: str) -> None: - self.assertIn(key, attributes_dict) - actual_value: AnyValue = attributes_dict[key] - self.assertIsNotNone(actual_value) - self.assertRegex(actual_value.string_value, pattern) - def check_sum(self, metric_name: str, actual_sum: float, expected_sum: float) -> None: if metric_name is LATENCY_METRIC: self.assertTrue(0 < actual_sum < expected_sum) @@ -224,13 +217,3 @@ def _assert_metric_attributes( self, resource_scope_metrics: List[ResourceScopeMetric], metric_name: str, expected_sum: int, **kwargs ): self.fail("Tests must implement this function") - - def _is_valid_regex(self, pattern: str) -> bool: - if not isinstance(pattern, str): - return False - - try: - re.compile(pattern) - return True - except re.error: - return False diff --git a/contract-tests/tests/test/amazon/botocore/botocore_test.py b/contract-tests/tests/test/amazon/botocore/botocore_test.py index b10e8a50d..b7e2924a1 100644 --- a/contract-tests/tests/test/amazon/botocore/botocore_test.py +++ b/contract-tests/tests/test/amazon/botocore/botocore_test.py @@ -29,7 +29,7 @@ _AWS_QUEUE_URL: str = "aws.sqs.queue_url" _AWS_QUEUE_NAME: str = "aws.sqs.queue_name" _AWS_STREAM_NAME: str = "aws.kinesis.stream_name" -_AWS_STREAM_CONSUMER_ARN: str = "aws.kinesis.stream_consumer_arn" +_AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name" # pylint: disable=too-many-public-methods @@ -348,11 +348,9 @@ def test_kinesis_describe_stream_consumer(self): remote_service="AWS::Kinesis", remote_operation="DescribeStreamConsumer", remote_resource_type="AWS::Kinesis::StreamConsumer", - remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:" - r"stream/test_stream/consumer/test_consumer:\d{10}", + remote_resource_identifier="test_consumer", request_specific_attributes={ - _AWS_STREAM_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:" - r"stream/test_stream/consumer/test_consumer:\d{10}", + _AWS_STREAM_CONSUMER_NAME: "test_consumer", }, span_name="Kinesis.DescribeStreamConsumer", ) @@ -428,12 +426,7 @@ def _assert_aws_attributes( if remote_resource_type != "None": self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type) if remote_resource_identifier != "None": - if self._is_valid_regex(remote_resource_identifier): - self._assert_match_attribute( - attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier - ) - else: - self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) # See comment above AWS_LOCAL_OPERATION self._assert_str_attribute(attributes_dict, AWS_SPAN_KIND, span_kind) @@ -474,9 +467,7 @@ def _assert_semantic_conventions_attributes( # TODO: botocore instrumentation is not respecting PEER_SERVICE # self._assert_str_attribute(attributes_dict, SpanAttributes.PEER_SERVICE, "backend:8080") for key, value in request_specific_attributes.items(): - if self._is_valid_regex(value): - self._assert_match_attribute(attributes_dict, key, value) - elif isinstance(value, str): + if isinstance(value, str): self._assert_str_attribute(attributes_dict, key, value) elif isinstance(value, int): self._assert_int_attribute(attributes_dict, key, value) @@ -519,10 +510,7 @@ def _assert_metric_attributes( if remote_resource_type != "None": self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type) if remote_resource_identifier != "None": - if self._is_valid_regex(remote_resource_identifier): - self._assert_match_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) - else: - self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) + self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier) self.check_sum(metric_name, dependency_dp.sum, expected_sum) attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(service_dp.attributes) From 41571fcd4150cb1023ef5dcd1136c6204f7009db Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Tue, 25 Jun 2024 13:14:14 -0700 Subject: [PATCH 12/13] Fix contract test. --- contract-tests/images/applications/botocore/botocore_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contract-tests/images/applications/botocore/botocore_server.py b/contract-tests/images/applications/botocore/botocore_server.py index e10f897dd..4760746fd 100644 --- a/contract-tests/images/applications/botocore/botocore_server.py +++ b/contract-tests/images/applications/botocore/botocore_server.py @@ -204,7 +204,7 @@ def _handle_kinesis_request(self) -> None: elif self.in_path("describestreamconsumer/my-consumer"): set_main_status(200) kinesis_client.describe_stream_consumer( - StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/", + StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream", ConsumerName="test_consumer", ) else: From 72c59b843fba9449ab73b3f923a9f85b8eaba774 Mon Sep 17 00:00:00 2001 From: Thomas Pierce Date: Fri, 23 Aug 2024 16:45:38 -0700 Subject: [PATCH 13/13] Update Dockerfile --- Dockerfile | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index e8aa35db1..23fc2c907 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,9 @@ # The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator, # one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then # update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`. -FROM python:3.11 AS build + +# Stage 1: Install ADOT Python in the /operator-build folder +FROM public.ecr.aws/docker/library/python:3.11 AS build WORKDIR /operator-build @@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro -FROM public.ecr.aws/amazonlinux/amazonlinux:minimal +# Stage 2: Build the cp-utility binary +FROM public.ecr.aws/docker/library/rust:1.75 as builder + +WORKDIR /usr/src/cp-utility +COPY ./tools/cp-utility . + +## TARGETARCH is defined by buildx +# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope +ARG TARGETARCH + +# Run validations and audit only on amd64 because it is faster and those two steps +# are only used to validate the source code and don't require anything that is +# architecture specific. + +# Validations +# Validate formatting +RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi + +# Audit dependencies +RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi + + +# Cross-compile based on the target platform. +RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \ + elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \ + else false; \ + fi \ + && rustup target add ${ARCH}-unknown-linux-musl \ + && cargo test --target ${ARCH}-unknown-linux-musl \ + && cargo install --target ${ARCH}-unknown-linux-musl --path . --root . + +# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations +FROM scratch # Required to copy attribute files to distributed docker images ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES +COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp COPY --from=build /operator-build/workspace /autoinstrumentation - -RUN chmod -R go+r /autoinstrumentation