diff --git a/config/config.go b/config/config.go index e54281cc..a5dbc4db 100644 --- a/config/config.go +++ b/config/config.go @@ -47,9 +47,15 @@ type Sentry struct { DSN string `yaml:"dsn"` } +type Metrics struct { + Namespace string `yaml:"namespace"` + Tags []string `yaml:"tags"` +} + type Settings struct { DynamoDB *DynamoDB `yaml:"dynamodb"` Reporting *Reporting `yaml:"reporting"` + Metrics *Metrics `yaml:"metrics"` Kafka *Kafka `yaml:"kafka"` } diff --git a/lib/kafkalib/batch.go b/lib/kafkalib/batch.go index 5d252470..3de08b2f 100644 --- a/lib/kafkalib/batch.go +++ b/lib/kafkalib/batch.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/transfer/lib/jitter" + "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/segmentio/kafka-go" "time" ) @@ -66,12 +67,18 @@ func (b *Batch) NextChunk() []kafka.Message { } func (b *Batch) Publish(ctx context.Context) error { - kafkaWriter := FromContext(ctx) for b.HasNext() { var err error + var count int64 + tags := map[string]string{ + "what": "error", + } for attempts := 0; attempts < MaxRetries; attempts++ { - err = kafkaWriter.WriteMessages(ctx, b.NextChunk()...) + chunk := b.NextChunk() + count = int64(len(chunk)) + err = FromContext(ctx).WriteMessages(ctx, chunk...) if err == nil { + tags["what"] = "success" break } @@ -83,6 +90,7 @@ func (b *Batch) Publish(ctx context.Context) error { time.Sleep(sleepDuration) } + metrics.FromContext(ctx).Count("kafka.publish", count, tags) if err != nil { return err } diff --git a/main.go b/main.go index ad7319a7..f61b160a 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" "github.com/artie-labs/reader/sources/dynamodb" + "github.com/artie-labs/transfer/lib/telemetry/metrics" + "github.com/artie-labs/transfer/lib/telemetry/metrics/datadog" "log" ) @@ -23,6 +25,19 @@ func main() { ctx := config.InjectIntoContext(context.Background(), cfg) ctx = logger.InjectLoggerIntoCtx(ctx) ctx = kafkalib.InjectIntoContext(ctx) + if cfg.Metrics != nil { + logger.FromContext(ctx).Info("injecting datadog") + client, err := datadog.NewDatadogClient(ctx, map[string]interface{}{ + datadog.Namespace: cfg.Metrics.Namespace, + datadog.Tags: cfg.Metrics.Tags, + }) + + if err != nil { + logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client") + } + + ctx = metrics.InjectMetricsClientIntoCtx(ctx, client) + } ddb := dynamodb.Load(ctx) ddb.Run(ctx)