Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance kinesis Consumer support #200

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
AWS_QUEUE_URL: str = "aws.sqs.queue_url"
AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn"
thpierce marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from urllib.parse import ParseResult, urlparse

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_CONSUMER_ARN,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
AWS_QUEUE_NAME,
Expand Down Expand Up @@ -361,6 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
elif is_key_present(span, AWS_STREAM_NAME):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME))
elif is_key_present(span, AWS_CONSUMER_ARN):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_CONSUMER_ARN))
thpierce marked this conversation as resolved.
Show resolved Hide resolved
elif is_key_present(span, _AWS_BUCKET_NAME):
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
stream_name = self._call_context.params.get("StreamName")
if stream_name:
attributes["aws.kinesis.stream_name"] = stream_name
consumer_arn = self._call_context.params.get("ConsumerARN")
if consumer_arn:
attributes["aws.kinesis.consumer_arn"] = consumer_arn
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest.mock import MagicMock

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_CONSUMER_ARN,
AWS_CONSUMER_PARENT_SPAN_KIND,
AWS_LOCAL_OPERATION,
AWS_LOCAL_SERVICE,
Expand Down Expand Up @@ -950,6 +951,19 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
self._mock_attribute([AWS_STREAM_NAME], [None])

# Validate behaviour of AWS_CONSUMER_ARN attribute, then remove it.
thpierce marked this conversation as resolved.
Show resolved Hide resolved
self._mock_attribute(
[AWS_CONSUMER_ARN],
["arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789"],
keys,
values,
)
self._validate_remote_resource_attributes(
"AWS::Kinesis::StreamConsumer",
"arn:aws:kinesis:us-west-2:000000000000:stream/test_stream/consumer/test_consumer:0123456789",
)
self._mock_attribute([AWS_CONSUMER_ARN], [None])

# Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it.
self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values)
self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opentelemetry.semconv.trace import SpanAttributes

_STREAM_NAME: str = "streamName"
_CONSUMER_ARN: str = "consumerArn"
_BUCKET_NAME: str = "bucketName"
_QUEUE_NAME: str = "queueName"
_QUEUE_URL: str = "queueUrl"
Expand Down Expand Up @@ -80,6 +81,8 @@ def _validate_patched_botocore_instrumentation(self):
kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes()
self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME)
self.assertTrue("aws.kinesis.consumer_arn" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.kinesis.consumer_arn"], _CONSUMER_ARN)

# S3
self.assertTrue("s3" in _KNOWN_EXTENSIONS)
Expand All @@ -99,7 +102,7 @@ def _validate_patched_botocore_instrumentation(self):

def _do_extract_kinesis_attributes() -> Dict[str, str]:
service_name: str = "kinesis"
params: Dict[str, str] = {"StreamName": _STREAM_NAME}
params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerARN": _CONSUMER_ARN}
return _do_extract_attributes(service_name, params)


Expand Down
76 changes: 57 additions & 19 deletions contract-tests/images/applications/botocore/botocore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from threading import Thread
from typing import List, Optional

import boto3
import requests
Expand All @@ -30,6 +31,10 @@
class RequestHandler(BaseHTTPRequestHandler):
main_status: int = 200

def __init__(self, request, client_address, server, *args, consumer_arn=None, **kwargs):
self.consumer_arn = consumer_arn
super().__init__(request, client_address, server, *args, **kwargs)

@override
# pylint: disable=invalid-name
def do_GET(self):
Expand Down Expand Up @@ -200,6 +205,9 @@ def _handle_kinesis_request(self) -> None:
elif self.in_path("putrecord/my-stream"):
set_main_status(200)
kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key")
elif self.in_path("describestreamconsumer/my-consumer"):
set_main_status(200)
kinesis_client.describe_stream_consumer(ConsumerARN=self.consumer_arn)
else:
set_main_status(404)

Expand All @@ -212,17 +220,24 @@ def set_main_status(status: int) -> None:
RequestHandler.main_status = status


def prepare_aws_server() -> None:
# pylint: disable=too-many-locals
def prepare_aws_server() -> Optional[List[str]]:
requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset")
try:
# Set up S3 so tests can access buckets and retrieve a file.
s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION)
s3_client.create_bucket(
Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
s3_client.create_bucket(
Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]
put_bucket_name: str = "test-put-object-bucket-name"
if put_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)

get_bucket_name: str = "test-get-object-bucket-name"
if get_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
Comment on lines +228 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Same with changes to DDB, SQS sections? Is it because we are not refreshing the AWS container between tests?

Copy link
Contributor Author

@zzhlogin zzhlogin Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each test, we will pull the "aws-application-signals-tests-botocore-app", and start a new container. Only the first container will succeed, all the following containers are fail with CreateBucket, exception:
[INFO] Unexpected exception occurred An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it. Then the function directly return [None] without proceed with following code.

Same for other resources. So, for all the services, only the first container succeed with creating resources. All the following container fails. This was fine as long as we already created the resource before. But After adding kinesis_client consumer, we will have trouble, because we want to have different consumer_arn returned for each container.

Copy link
Contributor

@thpierce thpierce Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, thanks for investigating and improving!

Not blocking: Consider adding documentation to this logic that explains it really only runs once, to avoid confusion in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

with tempfile.NamedTemporaryFile(delete=True) as temp_file:
temp_file_name: str = temp_file.name
temp_file.write(b"This is temp file for S3 upload")
Expand All @@ -231,31 +246,54 @@ def prepare_aws_server() -> None:

# Set up DDB so tests can access a table.
ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
ddb_client.create_table(
TableName="put_test_table",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
table_names: List[str] = ddb_client.list_tables()["TableNames"]

table_name: str = "put_test_table"
if table_name not in table_names:
ddb_client.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)

# Set up SQS so tests can access a queue.
sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
sqs_client.create_queue(QueueName="test_put_get_queue")
queue_name: str = "test_put_get_queue"
queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name)
queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else []
if not queues:
sqs_client.create_queue(QueueName=queue_name)

# Set up Kinesis so tests can access a stream.
kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
kinesis_client.create_stream(StreamName="test_stream", ShardCount=1)
stream_name: str = "test_stream"
stream_response = kinesis_client.list_streams()
if not stream_response["StreamNames"]:
kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)
response = kinesis_client.register_stream_consumer(
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer"
)
consumer_arn: str = response["Consumer"]["ConsumerARN"]
else:
response = kinesis_client.list_stream_consumers(
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream"
)
consumer_arn: str = response.get("Consumers", [])[0]["ConsumerARN"]

return [consumer_arn]
except Exception as exception:
print("Unexpected exception occurred", exception)
return [None]


def main() -> None:
prepare_aws_server()
[consumer_arn] = prepare_aws_server()
thpierce marked this conversation as resolved.
Show resolved Hide resolved
server_address: tuple[str, int] = ("0.0.0.0", _PORT)
request_handler_class: type = RequestHandler
requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, request_handler_class)
requests_server: ThreadingHTTPServer = ThreadingHTTPServer(
server_address, lambda *args, **kwargs: request_handler_class(*args, consumer_arn=consumer_arn, **kwargs)
)
atexit.register(requests_server.shutdown)
server_thread: Thread = Thread(target=requests_server.serve_forever)
server_thread.start()
Expand Down
17 changes: 17 additions & 0 deletions contract-tests/tests/test/amazon/base/contract_test_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import re
import time
from logging import INFO, Logger, getLogger
from typing import Dict, List
Expand Down Expand Up @@ -168,6 +169,12 @@ def _assert_int_attribute(self, attributes_dict: Dict[str, AnyValue], key: str,
self.assertIsNotNone(actual_value)
self.assertEqual(expected_value, actual_value.int_value)

def _assert_match_attribute(self, attributes_dict: Dict[str, AnyValue], key: str, pattern: str) -> None:
self.assertIn(key, attributes_dict)
actual_value: AnyValue = attributes_dict[key]
self.assertIsNotNone(actual_value)
self.assertRegex(actual_value.string_value, pattern)

def check_sum(self, metric_name: str, actual_sum: float, expected_sum: float) -> None:
if metric_name is LATENCY_METRIC:
self.assertTrue(0 < actual_sum < expected_sum)
Expand Down Expand Up @@ -217,3 +224,13 @@ def _assert_metric_attributes(
self, resource_scope_metrics: List[ResourceScopeMetric], metric_name: str, expected_sum: int, **kwargs
):
self.fail("Tests must implement this function")

def _is_valid_regex(self, pattern: str) -> bool:
if not isinstance(pattern, str):
return False

try:
re.compile(pattern)
return True
except re.error:
return False
37 changes: 34 additions & 3 deletions contract-tests/tests/test/amazon/botocore/botocore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
_AWS_QUEUE_URL: str = "aws.sqs.queue_url"
_AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
_AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
_AWS_CONSUMER_ARN: str = "aws.kinesis.consumer_arn"


# pylint: disable=too-many-public-methods
Expand Down Expand Up @@ -337,6 +338,25 @@ def test_kinesis_put_record(self):
span_name="Kinesis.PutRecord",
)

def test_kinesis_describe_stream_consumer(self):
self.do_test_requests(
"kinesis/describestreamconsumer/my-consumer",
"GET",
200,
0,
0,
remote_service="AWS::Kinesis",
remote_operation="DescribeStreamConsumer",
remote_resource_type="AWS::Kinesis::StreamConsumer",
remote_resource_identifier=r"arn:aws:kinesis:us-west-2:000000000000:"
r"stream/test_stream/consumer/test_consumer:\d{10}",
request_specific_attributes={
_AWS_CONSUMER_ARN: r"arn:aws:kinesis:us-west-2:000000000000:"
r"stream/test_stream/consumer/test_consumer:\d{10}",
},
span_name="Kinesis.DescribeStreamConsumer",
)

def test_kinesis_error(self):
self.do_test_requests(
"kinesis/error",
Expand Down Expand Up @@ -408,7 +428,12 @@ def _assert_aws_attributes(
if remote_resource_type != "None":
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type)
if remote_resource_identifier != "None":
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
if self._is_valid_regex(remote_resource_identifier):
self._assert_match_attribute(
attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier
)
else:
self._assert_str_attribute(attributes_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
# See comment above AWS_LOCAL_OPERATION
self._assert_str_attribute(attributes_dict, AWS_SPAN_KIND, span_kind)

Expand Down Expand Up @@ -449,14 +474,17 @@ def _assert_semantic_conventions_attributes(
# TODO: botocore instrumentation is not respecting PEER_SERVICE
# self._assert_str_attribute(attributes_dict, SpanAttributes.PEER_SERVICE, "backend:8080")
for key, value in request_specific_attributes.items():
if isinstance(value, str):
if self._is_valid_regex(value):
self._assert_match_attribute(attributes_dict, key, value)
elif isinstance(value, str):
self._assert_str_attribute(attributes_dict, key, value)
elif isinstance(value, int):
self._assert_int_attribute(attributes_dict, key, value)
else:
self._assert_array_value_ddb_table_name(attributes_dict, key, value)

@override
# pylint: disable=too-many-locals
def _assert_metric_attributes(
self,
resource_scope_metrics: List[ResourceScopeMetric],
Expand Down Expand Up @@ -491,7 +519,10 @@ def _assert_metric_attributes(
if remote_resource_type != "None":
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_TYPE, remote_resource_type)
if remote_resource_identifier != "None":
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
if self._is_valid_regex(remote_resource_identifier):
self._assert_match_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
else:
self._assert_str_attribute(attribute_dict, AWS_REMOTE_RESOURCE_IDENTIFIER, remote_resource_identifier)
self.check_sum(metric_name, dependency_dp.sum, expected_sum)

attribute_dict: Dict[str, AnyValue] = self._get_attributes_dict(service_dp.attributes)
Expand Down
Loading