diff --git a/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs b/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs new file mode 100644 index 0000000..44e4ebd --- /dev/null +++ b/src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs @@ -0,0 +1,19 @@ +namespace Confluent.Kafka.Extensions.Diagnostics; + +/// +/// Extension methods for . +/// +public static class DependentProducerBuilderExtensions +{ + /// + /// Builds a new instrumented instance of producer. + /// + public static IProducer BuildWithInstrumentation( + this DependentProducerBuilder producerBuilder + ) + { + if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder)); + + return new InstrumentedProducer(producerBuilder.Build()); + } +} diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs index 4408464..e05bc87 100644 --- a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs +++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/KafkaDiagnosticsTests.cs @@ -10,6 +10,7 @@ public sealed class KafkaDiagnosticsTests : IAssemblyFixture { private readonly EnvironmentFixture _environmentFixture; private readonly IProducer _producer; + private readonly IProducer _dependentProducer; private readonly IConsumer _consumer; private readonly Func _matchOptions; @@ -20,15 +21,22 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture) var kafkaBootstrapServers = _environmentFixture.KafkaBootstrapServers; var kafkaClientConfig = new ClientConfig { BootstrapServers = kafkaBootstrapServers }; + _producer = new ProducerBuilder(new ProducerConfig(kafkaClientConfig)) .SetKeySerializer(Serializers.Utf8) .SetValueSerializer(Serializers.Utf8) .BuildWithInstrumentation(); + _dependentProducer = new DependentProducerBuilder(_producer.Handle) + .SetKeySerializer(Serializers.Utf8) + .SetValueSerializer(Serializers.Utf8) + .BuildWithInstrumentation(); + _consumer = new ConsumerBuilder( new ConsumerConfig(new ClientConfig { BootstrapServers = kafkaBootstrapServers }) { - GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest + GroupId = "group", + AutoOffsetReset = AutoOffsetReset.Earliest }) .SetKeyDeserializer(Deserializers.Utf8) .SetValueDeserializer(Deserializers.Utf8) @@ -38,6 +46,55 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture) .ExcludeField("RootId").ExcludeField("TagObjects"); } + [Fact] + public async Task DependentProduceAsync() + { + // Arrange + var snapshotName = Snapshot.FullName(); + using var listener = CreateActivityListener(activity => + { + // Assert + activity.Should().MatchSnapshot(snapshotName, _matchOptions); + }); + ActivitySource.AddActivityListener(listener); + + // Act + await _dependentProducer.ProduceAsync("dependent_produce_async_topic", + new Message { Key = "test", Value = "Hello World!" }); + } + + [Fact] + public async Task DependentProduce() + { + // Arrange + Activity? reportedActivity = null; + using var listener = CreateActivityListener(activity => + { + reportedActivity = activity; + }); + ActivitySource.AddActivityListener(listener); + + var delivered = false; + + // Act + _dependentProducer.Produce("dependent_produce_topic", + new Message { Key = "test", Value = "Hello World!" }, report => + { + delivered = true; + }); + + int leftAttempts = 10; + do + { + await Task.Delay(TimeSpan.FromMilliseconds(500)); + } while (!delivered && leftAttempts-- > 0); + + delivered.Should().BeTrue(); + reportedActivity.Should().NotBeNull(); + reportedActivity.Should().MatchSnapshot(_matchOptions); + } + + [Fact] public async Task ProduceAsync() { diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap new file mode 100644 index 0000000..502162a --- /dev/null +++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduce.snap @@ -0,0 +1,59 @@ +{ + "Status": "Ok", + "StatusDescription": null, + "HasRemoteParent": false, + "Kind": "Producer", + "OperationName": "produce_topic publish", + "DisplayName": "produce_topic publish", + "Source": { + "Name": "Confluent.Kafka.Extensions.Diagnostics", + "Version": "" + }, + "Parent": null, + "ParentId": null, + "Tags": [ + { + "Key": "messaging.system", + "Value": "kafka" + }, + { + "Key": "messaging.operation", + "Value": "publish" + }, + { + "Key": "messaging.destination.kind", + "Value": "topic" + }, + { + "Key": "messaging.destination.name", + "Value": "produce_topic" + }, + { + "Key": "messaging.kafka.destination.partition", + "Value": "0" + }, + { + "Key": "messaging.kafka.message.offset", + "Value": "0" + } + ], + "Events": [], + "Links": [], + "Baggage": [], + "Context": { + "TraceId": {}, + "SpanId": {}, + "TraceFlags": "None", + "TraceState": null, + "IsRemote": false + }, + "TraceStateString": null, + "SpanId": {}, + "TraceId": {}, + "Recorded": false, + "IsAllDataRequested": false, + "ActivityTraceFlags": "None", + "ParentSpanId": {}, + "IsStopped": true, + "IdFormat": "W3C" +} diff --git a/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap new file mode 100644 index 0000000..e1a35ba --- /dev/null +++ b/tests/Confluent.Kafka.Extensions.Diagnostics.Tests/__snapshots__/KafkaDiagnosticsTests.DependentProduceAsync.snap @@ -0,0 +1,59 @@ +{ + "Status": "Ok", + "StatusDescription": null, + "HasRemoteParent": false, + "Kind": "Producer", + "OperationName": "produce_async_topic publish", + "DisplayName": "produce_async_topic publish", + "Source": { + "Name": "Confluent.Kafka.Extensions.Diagnostics", + "Version": "" + }, + "Parent": null, + "ParentId": null, + "Tags": [ + { + "Key": "messaging.system", + "Value": "kafka" + }, + { + "Key": "messaging.operation", + "Value": "publish" + }, + { + "Key": "messaging.destination.kind", + "Value": "topic" + }, + { + "Key": "messaging.destination.name", + "Value": "produce_async_topic" + }, + { + "Key": "messaging.kafka.destination.partition", + "Value": "0" + }, + { + "Key": "messaging.kafka.message.offset", + "Value": "1" + } + ], + "Events": [], + "Links": [], + "Baggage": [], + "Context": { + "TraceId": {}, + "SpanId": {}, + "TraceFlags": "None", + "TraceState": null, + "IsRemote": false + }, + "TraceStateString": null, + "SpanId": {}, + "TraceId": {}, + "Recorded": false, + "IsAllDataRequested": false, + "ActivityTraceFlags": "None", + "ParentSpanId": {}, + "IsStopped": true, + "IdFormat": "W3C" +}