Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance kinesis Consumer support #200

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
41 changes: 37 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# The packages are installed in the `/autoinstrumentation` directory. This is required as when instrumenting the pod by CWOperator,
# one init container will be created to copy all the content in `/autoinstrumentation` directory to app's container. Then
# update the `PYTHONPATH` environment variable accordingly. Then in the second stage, copy the directory to `/autoinstrumentation`.
FROM python:3.11 AS build

# Stage 1: Install ADOT Python in the /operator-build folder
FROM public.ecr.aws/docker/library/python:3.11 AS build

WORKDIR /operator-build

Expand All @@ -18,11 +20,42 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr

RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro

FROM public.ecr.aws/amazonlinux/amazonlinux:minimal
# Stage 2: Build the cp-utility binary
FROM public.ecr.aws/docker/library/rust:1.75 as builder

WORKDIR /usr/src/cp-utility
COPY ./tools/cp-utility .

## TARGETARCH is defined by buildx
# https://docs.docker.com/engine/reference/builder/#automatic-platform-args-in-the-global-scope
ARG TARGETARCH

# Run validations and audit only on amd64 because it is faster and those two steps
# are only used to validate the source code and don't require anything that is
# architecture specific.

# Validations
# Validate formatting
RUN if [ $TARGETARCH = "amd64" ]; then rustup component add rustfmt && cargo fmt --check ; fi

# Audit dependencies
RUN if [ $TARGETARCH = "amd64" ]; then cargo install cargo-audit && cargo audit ; fi


# Cross-compile based on the target platform.
RUN if [ $TARGETARCH = "amd64" ]; then export ARCH="x86_64" ; \
elif [ $TARGETARCH = "arm64" ]; then export ARCH="aarch64" ; \
else false; \
fi \
&& rustup target add ${ARCH}-unknown-linux-musl \
&& cargo test --target ${ARCH}-unknown-linux-musl \
&& cargo install --target ${ARCH}-unknown-linux-musl --path . --root .

# Stage 3: Build the distribution image by copying the THIRD-PARTY-LICENSES, the custom built cp command from stage 2, and the installed ADOT Python from stage 1 to their respective destinations
FROM scratch

# Required to copy attribute files to distributed docker images
ADD THIRD-PARTY-LICENSES ./THIRD-PARTY-LICENSES

COPY --from=builder /usr/src/cp-utility/bin/cp-utility /bin/cp
COPY --from=build /operator-build/workspace /autoinstrumentation

RUN chmod -R go+r /autoinstrumentation
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
AWS_QUEUE_URL: str = "aws.sqs.queue_url"
AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name"
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SPAN_KIND,
AWS_STREAM_CONSUMER_NAME,
AWS_STREAM_NAME,
)
from amazon.opentelemetry.distro._aws_span_processing_util import (
Expand Down Expand Up @@ -361,6 +362,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
elif is_key_present(span, AWS_STREAM_NAME):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_NAME))
elif is_key_present(span, AWS_STREAM_CONSUMER_NAME):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STREAM_CONSUMER_NAME))
elif is_key_present(span, _AWS_BUCKET_NAME):
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
stream_name = self._call_context.params.get("StreamName")
if stream_name:
attributes["aws.kinesis.stream_name"] = stream_name
consumer_name = self._call_context.params.get("ConsumerName")
if consumer_name:
attributes["aws.stream.consumer_name"] = consumer_name
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SPAN_KIND,
AWS_STREAM_CONSUMER_NAME,
AWS_STREAM_NAME,
)
from amazon.opentelemetry.distro._aws_metric_attribute_generator import _AwsMetricAttributeGenerator
Expand Down Expand Up @@ -950,6 +951,32 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
self._mock_attribute([AWS_STREAM_NAME], [None])

# Validate behaviour of AWS_STREAM_CONSUMER_ARN attribute, then remove it.
self._mock_attribute(
[AWS_STREAM_CONSUMER_NAME],
["aws_stream_consumer_name"],
keys,
values,
)
self._validate_remote_resource_attributes(
"AWS::Kinesis::StreamConsumer",
"aws_stream_consumer_name",
)
self._mock_attribute([AWS_STREAM_CONSUMER_NAME], [None])

# Validate both AWS_STREAM_NAME and AWS_STREAM_CONSUMER_ARN present, then remove it.
self._mock_attribute(
[AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME],
[
"aws_stream_name",
"aws_stream_consumer_name",
],
keys,
values,
)
self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
self._mock_attribute([AWS_STREAM_NAME, AWS_STREAM_CONSUMER_NAME], [None, None])

# Validate behaviour of SpanAttributes.AWS_DYNAMODB_TABLE_NAMES attribute with one table name, then remove it.
self._mock_attribute([SpanAttributes.AWS_DYNAMODB_TABLE_NAMES], [["aws_table_name"]], keys, values)
self._validate_remote_resource_attributes("AWS::DynamoDB::Table", "aws_table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opentelemetry.semconv.trace import SpanAttributes

_STREAM_NAME: str = "streamName"
_CONSUMER_NAME: str = "consumerName"
_BUCKET_NAME: str = "bucketName"
_QUEUE_NAME: str = "queueName"
_QUEUE_URL: str = "queueUrl"
Expand Down Expand Up @@ -99,6 +100,8 @@ def _test_patched_botocore_instrumentation(self):
kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes()
self.assertTrue("aws.kinesis.stream_name" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.kinesis.stream_name"], _STREAM_NAME)
self.assertTrue("aws.stream.consumer_name" in kinesis_attributes)
self.assertEqual(kinesis_attributes["aws.stream.consumer_name"], _CONSUMER_NAME)

# S3
self.assertTrue("s3" in _KNOWN_EXTENSIONS)
Expand Down Expand Up @@ -140,7 +143,7 @@ def _reset_mocks(self):

def _do_extract_kinesis_attributes() -> Dict[str, str]:
service_name: str = "kinesis"
params: Dict[str, str] = {"StreamName": _STREAM_NAME}
params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerName": _CONSUMER_NAME}
return _do_extract_attributes(service_name, params)


Expand Down
3 changes: 3 additions & 0 deletions contract-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ The steps to add a new test for a library or framework are:
* The test class should be created in `contract-tests/tests/amazon/<framework-name>`.
* The test class should extend `contract_test_base.py`

Note: For botocore applications, when creating new resources in [prepare_aws_server()](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/166c4cb36da6634cb070df5a312a62f6b0136a9c/contract-tests/images/applications/botocore/botocore_server.py#L215), make sure to check if the resource already exist before creation.
This is because each test pull the "aws-application-signals-tests-botocore-app" image and start a new container running `prepare_aws_server()` once, only the first attempt can succeeds, all subsequent attempts will fail due to the resources already existing.

# How to run the tests locally?

Pre-requirements:
Expand Down
58 changes: 42 additions & 16 deletions contract-tests/images/applications/botocore/botocore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from threading import Thread
from typing import List

import boto3
import requests
Expand Down Expand Up @@ -200,6 +201,12 @@ def _handle_kinesis_request(self) -> None:
elif self.in_path("putrecord/my-stream"):
set_main_status(200)
kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key")
elif self.in_path("describestreamconsumer/my-consumer"):
set_main_status(200)
kinesis_client.describe_stream_consumer(
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/test_stream",
ConsumerName="test_consumer",
)
else:
set_main_status(404)

Expand All @@ -212,17 +219,24 @@ def set_main_status(status: int) -> None:
RequestHandler.main_status = status


# pylint: disable=too-many-locals
def prepare_aws_server() -> None:
requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset")
try:
# Set up S3 so tests can access buckets and retrieve a file.
s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION)
s3_client.create_bucket(
Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
s3_client.create_bucket(
Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
bucket_names: List[str] = [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]
put_bucket_name: str = "test-put-object-bucket-name"
if put_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=put_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)

get_bucket_name: str = "test-get-object-bucket-name"
if get_bucket_name not in bucket_names:
s3_client.create_bucket(
Bucket=get_bucket_name, CreateBucketConfiguration={"LocationConstraint": _AWS_REGION}
)
Comment on lines +228 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Same with changes to DDB, SQS sections? Is it because we are not refreshing the AWS container between tests?

Copy link
Contributor Author

@zzhlogin zzhlogin Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each test, we will pull the "aws-application-signals-tests-botocore-app", and start a new container. Only the first container will succeed, all the following containers are fail with CreateBucket, exception:
[INFO] Unexpected exception occurred An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it. Then the function directly return [None] without proceed with following code.

Same for other resources. So, for all the services, only the first container succeed with creating resources. All the following container fails. This was fine as long as we already created the resource before. But After adding kinesis_client consumer, we will have trouble, because we want to have different consumer_arn returned for each container.

Copy link
Contributor

@thpierce thpierce Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, thanks for investigating and improving!

Not blocking: Consider adding documentation to this logic that explains it really only runs once, to avoid confusion in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

with tempfile.NamedTemporaryFile(delete=True) as temp_file:
temp_file_name: str = temp_file.name
temp_file.write(b"This is temp file for S3 upload")
Expand All @@ -231,22 +245,34 @@ def prepare_aws_server() -> None:

# Set up DDB so tests can access a table.
ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
ddb_client.create_table(
TableName="put_test_table",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
table_names: List[str] = ddb_client.list_tables()["TableNames"]

table_name: str = "put_test_table"
if table_name not in table_names:
ddb_client.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)

# Set up SQS so tests can access a queue.
sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
sqs_client.create_queue(QueueName="test_put_get_queue")
queue_name: str = "test_put_get_queue"
queues_response = sqs_client.list_queues(QueueNamePrefix=queue_name)
queues: List[str] = queues_response["QueueUrls"] if "QueueUrls" in queues_response else []
if not queues:
sqs_client.create_queue(QueueName=queue_name)

# Set up Kinesis so tests can access a stream.
kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION)
kinesis_client.create_stream(StreamName="test_stream", ShardCount=1)
stream_name: str = "test_stream"
stream_response = kinesis_client.list_streams()
if not stream_response["StreamNames"]:
kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)
kinesis_client.register_stream_consumer(
StreamARN="arn:aws:kinesis:us-west-2:000000000000:stream/" + stream_name, ConsumerName="test_consumer"
)
except Exception as exception:
print("Unexpected exception occurred", exception)

Expand Down
19 changes: 19 additions & 0 deletions contract-tests/tests/test/amazon/botocore/botocore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
_AWS_QUEUE_URL: str = "aws.sqs.queue_url"
_AWS_QUEUE_NAME: str = "aws.sqs.queue_name"
_AWS_STREAM_NAME: str = "aws.kinesis.stream_name"
_AWS_STREAM_CONSUMER_NAME: str = "aws.stream.consumer_name"


# pylint: disable=too-many-public-methods
Expand Down Expand Up @@ -338,6 +339,23 @@ def test_kinesis_put_record(self):
span_name="Kinesis.PutRecord",
)

def test_kinesis_describe_stream_consumer(self):
self.do_test_requests(
"kinesis/describestreamconsumer/my-consumer",
"GET",
200,
0,
0,
remote_service="AWS::Kinesis",
remote_operation="DescribeStreamConsumer",
remote_resource_type="AWS::Kinesis::StreamConsumer",
remote_resource_identifier="test_consumer",
request_specific_attributes={
_AWS_STREAM_CONSUMER_NAME: "test_consumer",
},
span_name="Kinesis.DescribeStreamConsumer",
)

def test_kinesis_error(self):
self.do_test_requests(
"kinesis/error",
Expand Down Expand Up @@ -458,6 +476,7 @@ def _assert_semantic_conventions_attributes(
self._assert_array_value_ddb_table_name(attributes_dict, key, value)

@override
# pylint: disable=too-many-locals
def _assert_metric_attributes(
self,
resource_scope_metrics: List[ResourceScopeMetric],
Expand Down