From 0ac98900511b6ae28a765854c13252366bf6f92a Mon Sep 17 00:00:00 2001
From: Daniele Iasella <2861984+overbit@users.noreply.github.com>
Date: Wed, 7 Aug 2024 19:11:35 +0100
Subject: [PATCH] feat: add support for KafkaDependentProducer instrumentation
(#34)
---
.../DependentProducerBuilderExtensions.cs | 19 ++++++
.../KafkaDiagnosticsTests.cs | 59 ++++++++++++++++++-
...afkaDiagnosticsTests.DependentProduce.snap | 59 +++++++++++++++++++
...iagnosticsTests.DependentProduceAsync.snap | 59 +++++++++++++++++++
4 files changed, 195 insertions(+), 1 deletion(-)
create mode 100644 src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs
create mode 100644 tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap
create mode 100644 tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap
diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs b/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs
new file mode 100644
index 0000000..44e4ebd
--- /dev/null
+++ b/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs
@@ -0,0 +1,19 @@
+namespace Confluent.Kafka.Extensions.Diagnostics;
+
+///
+/// Extension methods for .
+///
+public static class DependentProducerBuilderExtensions
+{
+ ///
+ /// Builds a new instrumented instance of producer.
+ ///
+ public static IProducer BuildWithInstrumentation(
+ this DependentProducerBuilder producerBuilder
+ )
+ {
+ if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder));
+
+ return new InstrumentedProducer(producerBuilder.Build());
+ }
+}
diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs
index 4408464..e05bc87 100644
--- a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs
+++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs
@@ -10,6 +10,7 @@ public sealed class KafkaDiagnosticsTests : IAssemblyFixture
{
private readonly EnvironmentFixture _environmentFixture;
private readonly IProducer _producer;
+ private readonly IProducer _dependentProducer;
private readonly IConsumer _consumer;
private readonly Func _matchOptions;
@@ -20,15 +21,22 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)
var kafkaBootstrapServers = _environmentFixture.KafkaBootstrapServers;
var kafkaClientConfig = new ClientConfig { BootstrapServers = kafkaBootstrapServers };
+
_producer = new ProducerBuilder(new ProducerConfig(kafkaClientConfig))
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.BuildWithInstrumentation();
+ _dependentProducer = new DependentProducerBuilder(_producer.Handle)
+ .SetKeySerializer(Serializers.Utf8)
+ .SetValueSerializer(Serializers.Utf8)
+ .BuildWithInstrumentation();
+
_consumer = new ConsumerBuilder(
new ConsumerConfig(new ClientConfig { BootstrapServers = kafkaBootstrapServers })
{
- GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
+ GroupId = "group",
+ AutoOffsetReset = AutoOffsetReset.Earliest
})
.SetKeyDeserializer(Deserializers.Utf8)
.SetValueDeserializer(Deserializers.Utf8)
@@ -38,6 +46,55 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)
.ExcludeField("RootId").ExcludeField("TagObjects");
}
+ [Fact]
+ public async Task DependentProduceAsync()
+ {
+ // Arrange
+ var snapshotName = Snapshot.FullName();
+ using var listener = CreateActivityListener(activity =>
+ {
+ // Assert
+ activity.Should().MatchSnapshot(snapshotName, _matchOptions);
+ });
+ ActivitySource.AddActivityListener(listener);
+
+ // Act
+ await _dependentProducer.ProduceAsync("dependent_produce_async_topic",
+ new Message { Key = "test", Value = "Hello World!" });
+ }
+
+ [Fact]
+ public async Task DependentProduce()
+ {
+ // Arrange
+ Activity? reportedActivity = null;
+ using var listener = CreateActivityListener(activity =>
+ {
+ reportedActivity = activity;
+ });
+ ActivitySource.AddActivityListener(listener);
+
+ var delivered = false;
+
+ // Act
+ _dependentProducer.Produce("dependent_produce_topic",
+ new Message { Key = "test", Value = "Hello World!" }, report =>
+ {
+ delivered = true;
+ });
+
+ int leftAttempts = 10;
+ do
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(500));
+ } while (!delivered && leftAttempts-- > 0);
+
+ delivered.Should().BeTrue();
+ reportedActivity.Should().NotBeNull();
+ reportedActivity.Should().MatchSnapshot(_matchOptions);
+ }
+
+
[Fact]
public async Task ProduceAsync()
{
diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap
new file mode 100644
index 0000000..502162a
--- /dev/null
+++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap
@@ -0,0 +1,59 @@
+{
+ "Status": "Ok",
+ "StatusDescription": null,
+ "HasRemoteParent": false,
+ "Kind": "Producer",
+ "OperationName": "produce_topic publish",
+ "DisplayName": "produce_topic publish",
+ "Source": {
+ "Name": "Confluent.Kafka.Extensions.Diagnostics",
+ "Version": ""
+ },
+ "Parent": null,
+ "ParentId": null,
+ "Tags": [
+ {
+ "Key": "messaging.system",
+ "Value": "kafka"
+ },
+ {
+ "Key": "messaging.operation",
+ "Value": "publish"
+ },
+ {
+ "Key": "messaging.destination.kind",
+ "Value": "topic"
+ },
+ {
+ "Key": "messaging.destination.name",
+ "Value": "produce_topic"
+ },
+ {
+ "Key": "messaging.kafka.destination.partition",
+ "Value": "0"
+ },
+ {
+ "Key": "messaging.kafka.message.offset",
+ "Value": "0"
+ }
+ ],
+ "Events": [],
+ "Links": [],
+ "Baggage": [],
+ "Context": {
+ "TraceId": {},
+ "SpanId": {},
+ "TraceFlags": "None",
+ "TraceState": null,
+ "IsRemote": false
+ },
+ "TraceStateString": null,
+ "SpanId": {},
+ "TraceId": {},
+ "Recorded": false,
+ "IsAllDataRequested": false,
+ "ActivityTraceFlags": "None",
+ "ParentSpanId": {},
+ "IsStopped": true,
+ "IdFormat": "W3C"
+}
diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap
new file mode 100644
index 0000000..e1a35ba
--- /dev/null
+++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap
@@ -0,0 +1,59 @@
+{
+ "Status": "Ok",
+ "StatusDescription": null,
+ "HasRemoteParent": false,
+ "Kind": "Producer",
+ "OperationName": "produce_async_topic publish",
+ "DisplayName": "produce_async_topic publish",
+ "Source": {
+ "Name": "Confluent.Kafka.Extensions.Diagnostics",
+ "Version": ""
+ },
+ "Parent": null,
+ "ParentId": null,
+ "Tags": [
+ {
+ "Key": "messaging.system",
+ "Value": "kafka"
+ },
+ {
+ "Key": "messaging.operation",
+ "Value": "publish"
+ },
+ {
+ "Key": "messaging.destination.kind",
+ "Value": "topic"
+ },
+ {
+ "Key": "messaging.destination.name",
+ "Value": "produce_async_topic"
+ },
+ {
+ "Key": "messaging.kafka.destination.partition",
+ "Value": "0"
+ },
+ {
+ "Key": "messaging.kafka.message.offset",
+ "Value": "1"
+ }
+ ],
+ "Events": [],
+ "Links": [],
+ "Baggage": [],
+ "Context": {
+ "TraceId": {},
+ "SpanId": {},
+ "TraceFlags": "None",
+ "TraceState": null,
+ "IsRemote": false
+ },
+ "TraceStateString": null,
+ "SpanId": {},
+ "TraceId": {},
+ "Recorded": false,
+ "IsAllDataRequested": false,
+ "ActivityTraceFlags": "None",
+ "ParentSpanId": {},
+ "IsStopped": true,
+ "IdFormat": "W3C"
+}