Skip to content

Commit

Permalink
Should be done now.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 6, 2023
1 parent 8036c90 commit b0f3f80
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ type Sentry struct {
DSN string `yaml:"dsn"`
}

type Metrics struct {
Namespace string `yaml:"namespace"`
Tags []string `yaml:"tags"`
}

type Settings struct {
DynamoDB *DynamoDB `yaml:"dynamodb"`
Reporting *Reporting `yaml:"reporting"`
Metrics *Metrics `yaml:"metrics"`
Kafka *Kafka `yaml:"kafka"`
}

Expand Down
12 changes: 10 additions & 2 deletions lib/kafkalib/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/segmentio/kafka-go"
"time"
)
Expand Down Expand Up @@ -66,12 +67,18 @@ func (b *Batch) NextChunk() []kafka.Message {
}

func (b *Batch) Publish(ctx context.Context) error {
kafkaWriter := FromContext(ctx)
for b.HasNext() {
var err error
var count int64
tags := map[string]string{
"what": "error",
}
for attempts := 0; attempts < MaxRetries; attempts++ {
err = kafkaWriter.WriteMessages(ctx, b.NextChunk()...)
chunk := b.NextChunk()
count = int64(len(chunk))
err = FromContext(ctx).WriteMessages(ctx, chunk...)
if err == nil {
tags["what"] = "success"
break
}

Expand All @@ -83,6 +90,7 @@ func (b *Batch) Publish(ctx context.Context) error {
time.Sleep(sleepDuration)
}

metrics.FromContext(ctx).Count("kafka.publish", count, tags)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/sources/dynamodb"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/artie-labs/transfer/lib/telemetry/metrics/datadog"
"log"
)

Expand All @@ -23,6 +25,19 @@ func main() {
ctx := config.InjectIntoContext(context.Background(), cfg)
ctx = logger.InjectLoggerIntoCtx(ctx)
ctx = kafkalib.InjectIntoContext(ctx)
if cfg.Metrics != nil {
logger.FromContext(ctx).Info("injecting datadog")
client, err := datadog.NewDatadogClient(ctx, map[string]interface{}{
datadog.Namespace: cfg.Metrics.Namespace,
datadog.Tags: cfg.Metrics.Tags,
})

if err != nil {
logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client")
}

ctx = metrics.InjectMetricsClientIntoCtx(ctx, client)
}

ddb := dynamodb.Load(ctx)
ddb.Run(ctx)
Expand Down

0 comments on commit b0f3f80

Please sign in to comment.