diff --git a/awsagentprovider/build.gradle.kts b/awsagentprovider/build.gradle.kts index 3e92ad8b1a..e052e5eb47 100644 --- a/awsagentprovider/build.gradle.kts +++ b/awsagentprovider/build.gradle.kts @@ -40,12 +40,15 @@ dependencies { implementation("com.amazonaws:aws-java-sdk-core:1.12.773") // Export configuration compileOnly("io.opentelemetry:opentelemetry-exporter-otlp") + // For Udp emitter + compileOnly("io.opentelemetry:opentelemetry-exporter-otlp-common") testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") testImplementation("io.opentelemetry:opentelemetry-extension-aws") testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators") testImplementation("com.google.guava:guava") + testRuntimeOnly("io.opentelemetry:opentelemetry-exporter-otlp-common") compileOnly("com.google.code.findbugs:jsr305:3.0.2") testImplementation("org.mockito:mockito-core:5.3.1") diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporter.java new file mode 100644 index 0000000000..fd130e54a6 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporter.java @@ -0,0 +1,97 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.concurrent.Immutable; + +/** + * Exports spans via UDP, using OpenTelemetry's protobuf model. The protobuf modelled spans are + * Base64 encoded and prefixed with AWS X-Ray specific information before being sent over to {@link + * UdpSender}. + * + *

This exporter is NOT meant for generic use since the payload is prefixed with AWS X-Ray + * specific information. + */ +@Immutable +class OtlpUdpSpanExporter implements SpanExporter { + + private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName()); + + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final UdpSender sender; + private final String payloadPrefix; + + OtlpUdpSpanExporter(UdpSender sender, String payloadPrefix) { + this.sender = sender; + this.payloadPrefix = payloadPrefix; + } + + @Override + public CompletableResultCode export(Collection spans) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + + TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + exportRequest.writeBinaryTo(baos); + String payload = payloadPrefix + Base64.getEncoder().encodeToString(baos.toByteArray()); + sender.send(payload.getBytes(StandardCharsets.UTF_8)); + return CompletableResultCode.ofSuccess(); + } catch (Exception e) { + logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e); + return CompletableResultCode.ofFailure(); + } + } + + @Override + public CompletableResultCode flush() { + // TODO: implement + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + return CompletableResultCode.ofSuccess(); + } + return sender.shutdown(); + } + + // Visible for testing + UdpSender getSender() { + return sender; + } + + // Visible for testing + String getPayloadPrefix() { + return payloadPrefix; + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporterBuilder.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporterBuilder.java new file mode 100644 index 0000000000..33cc1dca9f --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/OtlpUdpSpanExporterBuilder.java @@ -0,0 +1,77 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static java.util.Objects.requireNonNull; + +final class OtlpUdpSpanExporterBuilder { + + private static final String DEFAULT_HOST = "127.0.0.1"; + private static final int DEFAULT_PORT = 2000; + + // The protocol header and delimiter is required for sending data to X-Ray Daemon or when running + // in Lambda. + // https://docs.aws.amazon.com/xray/latest/devguide/xray-api-sendingdata.html#xray-api-daemon + private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}"; + private static final char PROTOCOL_DELIMITER = '\n'; + + // These prefixes help the backend identify if the spans payload is sampled or not. + private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"; + private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"; + + private UdpSender sender; + private String tracePayloadPrefix = FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX; + + public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) { + requireNonNull(endpoint, "endpoint must not be null"); + try { + String[] parts = endpoint.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + this.sender = new UdpSender(host, port); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e); + } + return this; + } + + public OtlpUdpSpanExporterBuilder setPayloadSampleDecision(TracePayloadSampleDecision decision) { + this.tracePayloadPrefix = + decision == TracePayloadSampleDecision.SAMPLED + ? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX + : FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX; + return this; + } + + public OtlpUdpSpanExporter build() { + if (sender == null) { + this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT); + } + return new OtlpUdpSpanExporter( + this.sender, PROTOCOL_HEADER + PROTOCOL_DELIMITER + tracePayloadPrefix); + } + + // Only for testing + OtlpUdpSpanExporterBuilder setSender(UdpSender sender) { + this.sender = sender; + return this; + } +} + +enum TracePayloadSampleDecision { + SAMPLED, + UNSAMPLED +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/UdpSender.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/UdpSender.java new file mode 100644 index 0000000000..deb9327e39 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/UdpSender.java @@ -0,0 +1,76 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This class represents a UDP sender that sends data to a specified endpoint. It is used to send + * data to a remote host and port using UDP protocol. + */ +class UdpSender { + private static final Logger logger = Logger.getLogger(UdpSender.class.getName()); + + private DatagramSocket socket; + private final InetSocketAddress endpoint; + + public UdpSender(String host, int port) { + this.endpoint = new InetSocketAddress(host, port); + try { + this.socket = new DatagramSocket(); + } catch (SocketException e) { + logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e); + } + } + + public CompletableResultCode shutdown() { + try { + if (socket == null) { + return CompletableResultCode.ofSuccess(); + } + socket.close(); + return CompletableResultCode.ofSuccess(); + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e); + return CompletableResultCode.ofFailure(); + } + } + + public void send(byte[] data) { + if (socket == null) { + logger.log(Level.WARNING, "UdpSender socket is null. Cannot send data."); + return; + } + DatagramPacket packet = new DatagramPacket(data, data.length, endpoint); + try { + socket.send(packet); + } catch (IOException e) { + logger.log(Level.SEVERE, "Exception while sending data.", e); + } + } + + // Visible for testing + InetSocketAddress getEndpoint() { + return endpoint; + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/UdpExporterTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/UdpExporterTest.java new file mode 100644 index 0000000000..1494b30c98 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/UdpExporterTest.java @@ -0,0 +1,147 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.*; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.*; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +public class UdpExporterTest { + + @Test + public void testUdpExporterWithDefaults() { + OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().build(); + UdpSender sender = exporter.getSender(); + assertThat(sender.getEndpoint().getHostName()) + .isEqualTo("localhost"); // getHostName implicitly converts 127.0.0.1 to localhost + assertThat(sender.getEndpoint().getPort()).isEqualTo(2000); + assertThat(exporter.getPayloadPrefix()).endsWith("T1S"); + } + + @Test + public void testUdpExporterWithCustomEndpointAndSample() { + OtlpUdpSpanExporter exporter = + new OtlpUdpSpanExporterBuilder() + .setEndpoint("somehost:1000") + .setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED) + .build(); + UdpSender sender = exporter.getSender(); + assertThat(sender.getEndpoint().getHostName()).isEqualTo("somehost"); + assertThat(sender.getEndpoint().getPort()).isEqualTo(1000); + assertThat(exporter.getPayloadPrefix()).endsWith("T1U"); + } + + @Test + public void testUdpExporterWithInvalidEndpoint() { + assertThatThrownBy( + () -> { + new OtlpUdpSpanExporterBuilder().setEndpoint("invalidhost"); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid endpoint, must be a valid URL: invalidhost"); + } + + @Test + public void testExportDefaultBehavior() { + UdpSender senderMock = mock(UdpSender.class); + + // mock SpanData + SpanData spanData = buildSpanDataMock(); + + OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().setSender(senderMock).build(); + exporter.export(Collections.singletonList(spanData)); + + // assert that the senderMock.send is called once + verify(senderMock, times(1)).send(any(byte[].class)); + verify(senderMock) + .send( + argThat( + (byte[] bytes) -> { + assertThat(bytes.length).isGreaterThan(0); + String payload = new String(bytes, StandardCharsets.UTF_8); + assertThat(payload) + .startsWith("{\"format\": \"json\", \"version\": 1}" + "\n" + "T1S"); + return true; + })); + } + + @Test + public void testExportWithSampledFalse() { + UdpSender senderMock = mock(UdpSender.class); + + // mock SpanData + SpanData spanData = buildSpanDataMock(); + + OtlpUdpSpanExporter exporter = + new OtlpUdpSpanExporterBuilder() + .setSender(senderMock) + .setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED) + .build(); + exporter.export(Collections.singletonList(spanData)); + + verify(senderMock, times(1)).send(any(byte[].class)); + verify(senderMock) + .send( + argThat( + (byte[] bytes) -> { + assertThat(bytes.length).isGreaterThan(0); + String payload = new String(bytes, StandardCharsets.UTF_8); + assertThat(payload) + .startsWith("{\"format\": \"json\", \"version\": 1}" + "\n" + "T1U"); + return true; + })); + } + + private SpanData buildSpanDataMock() { + SpanData mockSpanData = mock(SpanData.class); + + Attributes spanAttributes = + Attributes.of(AttributeKey.stringKey("original key"), "original value"); + when(mockSpanData.getAttributes()).thenReturn(spanAttributes); + when(mockSpanData.getTotalAttributeCount()).thenReturn(spanAttributes.size()); + when(mockSpanData.getKind()).thenReturn(SpanKind.SERVER); + + SpanContext parentSpanContextMock = mock(SpanContext.class); + when(mockSpanData.getParentSpanContext()).thenReturn(parentSpanContextMock); + + SpanContext spanContextMock = mock(SpanContext.class); + when(spanContextMock.isValid()).thenReturn(true); + when(mockSpanData.getSpanContext()).thenReturn(spanContextMock); + + TraceState traceState = TraceState.builder().build(); + when(spanContextMock.getTraceState()).thenReturn(traceState); + + when(mockSpanData.getStatus()).thenReturn(StatusData.unset()); + when(mockSpanData.getInstrumentationScopeInfo()) + .thenReturn(InstrumentationScopeInfo.create("Dummy Scope")); + + Resource testResource = Resource.empty(); + when(mockSpanData.getResource()).thenReturn(testResource); + + return mockSpanData; + } +}