Skip to content

Commit

Permalink
add instrumentation for consumer (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
vhatsura authored Aug 7, 2022
1 parent 7a0dd8d commit 935ef42
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 31 deletions.
63 changes: 62 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
![GitHub Actions Badge](https://github.com/vhatsura/confluent-kafka-extensions-diagnostics/actions/workflows/continuous.integration.yml/badge.svg)
[![NuGet Badge](https://buildstats.info/nuget/Confluent.Kafka.Extensions.Diagnostics)](https://www.nuget.org/packages/Confluent.Kafka.Extensions.Diagnostics/)

The `Confluent.Kafka.Extensions.Diagnostics` package enables instrumentation of the `Confluent.Kafka` library
via [Activity API](https://docs.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs).

## Installation

```powershell
Expand All @@ -11,7 +14,65 @@ Install-Package Confluent.Kafka.Extensions.Diagnostics

## Usage

### Producer

Producer instrumentation is done via wrapper class and, for this reason, the producer usage is not needed to be rewritten. However,
to enable producer instrumentation, `BuildWithInstrumentation` method should be called on the producer builder instead of `Build`.
After that, all produce calls (sync and async) will be instrumented.

```csharp
using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;


using var producer =
new ProducerBuilder<Null, string>(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }))
.SetKeySerializer(Serializers.Null)
.SetValueSerializer(Serializers.Utf8)
.BuildWithInstrumentation();

await producer.ProduceAsync("topic", new Message<Null, string> { Value = "Hello World!" });

```

## Roadmap
### Consumer

Unfortunately, consumer interface of `Confluent.Kafka` library is not very flexible. Therefore, the instrumentation is implemented
via an extension method on the consumer itself. For this reason, the consumer usage should be rewritten as follows:

```csharp
using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;

using var consumer = new ConsumerBuilder<Ignore, string>(
new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })
{
GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
})
.SetValueDeserializer(Deserializers.Utf8)
.Build();

consumer.Subscribe("topic");

try
{
while (true)
{
try
{
consumer.ConsumeWithInstrumentation((result) =>
{
Console.WriteLine(result.Message.Value);
});
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
```
Original file line number Diff line number Diff line change
@@ -1,42 +1,30 @@
using System.Diagnostics;
using System.Diagnostics;
using System.Text;

namespace Confluent.Kafka.Extensions.Diagnostics;

internal static class ActivityDiagnosticsHelper
{
private const string ActivitySourceName = "Confluent.Kafka.Extensions.Diagnostics";
private const string TraceParentHeaderName = "traceparent";
private const string TraceStateHeaderName = "tracestate";

private static ActivitySource ActivitySource { get; } = new(ActivitySourceName);

internal static Activity? Start<TKey, TValue>(TopicPartition partition, Message<TKey, TValue> message)
internal static Activity? StartProduceActivity<TKey, TValue>(TopicPartition partition,
Message<TKey, TValue> message)
{
try
{
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Client,
default(ActivityContext),
new[]
{
new KeyValuePair<string, object>("messaging.system", "kafka"),
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
new KeyValuePair<string, object>("messaging.destination_kind", "topic"),
new KeyValuePair<string, object>("messaging.kafka.partition", partition.Partition.ToString())
}!);

if (activity == null) return null;
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Producer,
default(ActivityContext), ActivityTags(partition)!);

if (activity == null)
return null;

if (activity.IsAllDataRequested)
{
if (message.Key != null)
{
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
}

if (message.Value != null)
{
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()!);
activity.AddTag("messaging.message_payload_size_bytes", messagePayloadBytes.ToString());
}
SetActivityTags(activity, message);
}

if (message.Headers == null)
Expand All @@ -45,12 +33,12 @@ internal static class ActivityDiagnosticsHelper
}

if (activity.Id != null)
message.Headers.Add("traceparent", Encoding.UTF8.GetBytes(activity.Id));
message.Headers.Add(TraceParentHeaderName, Encoding.UTF8.GetBytes(activity.Id));

var tracestateStr = activity.Context.TraceState;
if (tracestateStr?.Length > 0)
{
message.Headers.Add("tracestate", Encoding.UTF8.GetBytes(tracestateStr));
message.Headers.Add(TraceStateHeaderName, Encoding.UTF8.GetBytes(tracestateStr));
}

return activity;
Expand All @@ -61,4 +49,60 @@ internal static class ActivityDiagnosticsHelper
return null;
}
}

internal static Activity? StartConsumeActivity<TKey, TValue>(TopicPartition partition,
Message<TKey, TValue> message)
{
var activity = ActivitySource.CreateActivity("Confluent.Kafka.Consume", ActivityKind.Consumer,
default(ActivityContext), ActivityTags(partition)!);

if (activity != null)
{
var traceParentHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceParentHeaderName);
var traceStateHeader = message.Headers?.FirstOrDefault(x => x.Key == TraceStateHeaderName);

var traceParent = traceParentHeader != null
? Encoding.UTF8.GetString(traceParentHeader.GetValueBytes())
: null;
var traceState = traceStateHeader != null
? Encoding.UTF8.GetString(traceStateHeader.GetValueBytes())
: null;

if (ActivityContext.TryParse(traceParent, traceState, out var activityContext))
{
activity.SetParentId(activityContext.TraceId, activityContext.SpanId, activityContext.TraceFlags);
activity.TraceStateString = activityContext.TraceState;
}

if (activity.IsAllDataRequested)
{
SetActivityTags(activity, message);
}

activity.Start();
}


return activity;
}

private static void SetActivityTags<TKey, TValue>(Activity activity, Message<TKey, TValue> message)
{
if (message.Key != null)
{
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
}
}

private static IEnumerable<KeyValuePair<string, object>> ActivityTags(TopicPartition partition)
{
return new[]
{
new KeyValuePair<string, object>("messaging.system", "kafka"),
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
new KeyValuePair<string, object>("messaging.destination_kind", "topic"), new KeyValuePair<string, object>(
"messaging.kafka.partition",
partition.Partition.ToString())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="[1.6.2, 2.0.0)"/>
<PackageReference Include="Confluent.Kafka" Version="[1.6.2, 2.0.0)" />
<PackageReference Include="GitVersion.MsBuild" Version="5.10.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
47 changes: 47 additions & 0 deletions src/Confluent.Kafka.Extensions.Diagnostics/ConsumerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace Confluent.Kafka.Extensions.Diagnostics;

/// <summary>
/// Extension methods for <see cref="IConsumer{TKey,TValue}" />.
/// </summary>
public static class ConsumerExtensions
{
/// <summary>
/// Consumes a message from the topic with instrumentation.
/// </summary>
public static async Task ConsumeWithInstrumentation<TKey, TValue>(this IConsumer<TKey, TValue> consumer,
Func<ConsumeResult<TKey, TValue>, CancellationToken, Task> action, CancellationToken cancellationToken)
{
var result = consumer.Consume(cancellationToken);

var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);

try
{
await action(result, cancellationToken);
}
finally
{
activity?.Stop();
}
}

/// <summary>
/// Consumes a message from the topic with instrumentation.
/// </summary>
public static void ConsumeWithInstrumentation<TKey, TValue>(this IConsumer<TKey, TValue> consumer,
Action<ConsumeResult<TKey, TValue>> action, int millisecondsTimeout)
{
var result = consumer.Consume(millisecondsTimeout);

var activity = ActivityDiagnosticsHelper.StartConsumeActivity(result.TopicPartition, result.Message);

try
{
action(result);
}
finally
{
activity?.Stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
TopicPartition topicPartition, Message<TKey, TValue> message,
CancellationToken cancellationToken = new CancellationToken())
{
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);
var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message);

try
{
// todo: get delivery result and put it into the activity
return await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken);
}
finally
Expand All @@ -38,14 +39,14 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
}

public void Produce(
string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null) =>
string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) =>
Produce(new TopicPartition(topic, Partition.Any), message, deliveryHandler);

public void Produce(
TopicPartition topicPartition, Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
{
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);
var activity = ActivityDiagnosticsHelper.StartProduceActivity(topicPartition, message);

try
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
namespace Confluent.Kafka.Extensions.Diagnostics;

/// <summary>
/// Extension methods for <see cref="ProducerBuilder{TKey,TValue}" />.
/// </summary>
public static class ProducerBuilderExtensions
{
/// <summary>
/// Builds a new instrumented instance of producer.
/// </summary>
public static IProducer<TKey, TValue> BuildWithInstrumentation<TKey, TValue>(
this ProducerBuilder<TKey, TValue> producerBuilder)
{
Expand Down

0 comments on commit 935ef42

Please sign in to comment.