Skip to content

Commit

Permalink
feat: add support for KafkaDependentProducer instrumentation (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
overbit authored Aug 7, 2024
1 parent e8fa90c commit 0ac9890
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Confluent.Kafka.Extensions.Diagnostics;

/// <summary>
/// Extension methods for <see cref="DependentProducerBuilder{TKey,TValue}" />.
/// </summary>
public static class DependentProducerBuilderExtensions
{
/// <summary>
/// Builds a new instrumented instance of producer.
/// </summary>
public static IProducer<TKey, TValue> BuildWithInstrumentation<TKey, TValue>(
this DependentProducerBuilder<TKey, TValue> producerBuilder
)
{
if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder));

Check warning on line 15 in src/Confluent.Kafka.Extensions.Diagnostics/DependentProducerBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / publish

Use 'ArgumentNullException.ThrowIfNull' instead of explicitly throwing a new exception instance (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1510)

return new InstrumentedProducer<TKey, TValue>(producerBuilder.Build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public sealed class KafkaDiagnosticsTests : IAssemblyFixture<EnvironmentFixture>
{
private readonly EnvironmentFixture _environmentFixture;
private readonly IProducer<string, string> _producer;
private readonly IProducer<string, string> _dependentProducer;
private readonly IConsumer<string, string> _consumer;

private readonly Func<MatchOptions, MatchOptions> _matchOptions;
Expand All @@ -20,15 +21,22 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)

var kafkaBootstrapServers = _environmentFixture.KafkaBootstrapServers;
var kafkaClientConfig = new ClientConfig { BootstrapServers = kafkaBootstrapServers };

_producer = new ProducerBuilder<string, string>(new ProducerConfig(kafkaClientConfig))
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.BuildWithInstrumentation();

_dependentProducer = new DependentProducerBuilder<string, string>(_producer.Handle)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.BuildWithInstrumentation();

_consumer = new ConsumerBuilder<string, string>(
new ConsumerConfig(new ClientConfig { BootstrapServers = kafkaBootstrapServers })
{
GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest
})
.SetKeyDeserializer(Deserializers.Utf8)
.SetValueDeserializer(Deserializers.Utf8)
Expand All @@ -38,6 +46,55 @@ public KafkaDiagnosticsTests(EnvironmentFixture environmentFixture)
.ExcludeField("RootId").ExcludeField("TagObjects");
}

[Fact]
public async Task DependentProduceAsync()
{
// Arrange
var snapshotName = Snapshot.FullName();
using var listener = CreateActivityListener(activity =>
{
// Assert
activity.Should().MatchSnapshot(snapshotName, _matchOptions);
});
ActivitySource.AddActivityListener(listener);

// Act
await _dependentProducer.ProduceAsync("dependent_produce_async_topic",
new Message<string, string> { Key = "test", Value = "Hello World!" });
}

[Fact]
public async Task DependentProduce()
{
// Arrange
Activity? reportedActivity = null;
using var listener = CreateActivityListener(activity =>
{
reportedActivity = activity;
});
ActivitySource.AddActivityListener(listener);

var delivered = false;

// Act
_dependentProducer.Produce("dependent_produce_topic",
new Message<string, string> { Key = "test", Value = "Hello World!" }, report =>
{
delivered = true;
});

int leftAttempts = 10;
do
{
await Task.Delay(TimeSpan.FromMilliseconds(500));
} while (!delivered && leftAttempts-- > 0);

delivered.Should().BeTrue();
reportedActivity.Should().NotBeNull();
reportedActivity.Should().MatchSnapshot(_matchOptions);
}


[Fact]
public async Task ProduceAsync()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"Status": "Ok",
"StatusDescription": null,
"HasRemoteParent": false,
"Kind": "Producer",
"OperationName": "produce_topic publish",
"DisplayName": "produce_topic publish",
"Source": {
"Name": "Confluent.Kafka.Extensions.Diagnostics",
"Version": ""
},
"Parent": null,
"ParentId": null,
"Tags": [
{
"Key": "messaging.system",
"Value": "kafka"
},
{
"Key": "messaging.operation",
"Value": "publish"
},
{
"Key": "messaging.destination.kind",
"Value": "topic"
},
{
"Key": "messaging.destination.name",
"Value": "produce_topic"
},
{
"Key": "messaging.kafka.destination.partition",
"Value": "0"
},
{
"Key": "messaging.kafka.message.offset",
"Value": "0"
}
],
"Events": [],
"Links": [],
"Baggage": [],
"Context": {
"TraceId": {},
"SpanId": {},
"TraceFlags": "None",
"TraceState": null,
"IsRemote": false
},
"TraceStateString": null,
"SpanId": {},
"TraceId": {},
"Recorded": false,
"IsAllDataRequested": false,
"ActivityTraceFlags": "None",
"ParentSpanId": {},
"IsStopped": true,
"IdFormat": "W3C"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"Status": "Ok",
"StatusDescription": null,
"HasRemoteParent": false,
"Kind": "Producer",
"OperationName": "produce_async_topic publish",
"DisplayName": "produce_async_topic publish",
"Source": {
"Name": "Confluent.Kafka.Extensions.Diagnostics",
"Version": ""
},
"Parent": null,
"ParentId": null,
"Tags": [
{
"Key": "messaging.system",
"Value": "kafka"
},
{
"Key": "messaging.operation",
"Value": "publish"
},
{
"Key": "messaging.destination.kind",
"Value": "topic"
},
{
"Key": "messaging.destination.name",
"Value": "produce_async_topic"
},
{
"Key": "messaging.kafka.destination.partition",
"Value": "0"
},
{
"Key": "messaging.kafka.message.offset",
"Value": "1"
}
],
"Events": [],
"Links": [],
"Baggage": [],
"Context": {
"TraceId": {},
"SpanId": {},
"TraceFlags": "None",
"TraceState": null,
"IsRemote": false
},
"TraceStateString": null,
"SpanId": {},
"TraceId": {},
"Recorded": false,
"IsAllDataRequested": false,
"ActivityTraceFlags": "None",
"ParentSpanId": {},
"IsStopped": true,
"IdFormat": "W3C"
}

0 comments on commit 0ac9890

Please sign in to comment.