Skip to content

Commit

Permalink
feat: handle errors when writing to storage
Browse files Browse the repository at this point in the history
Adds error handling when writing metrics to the storage.
  • Loading branch information
luissimas committed Jul 29, 2024
1 parent c93ced5 commit acae8be
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
5 changes: 4 additions & 1 deletion internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (c *Collector) CollectMetrics(root fs.FS, collectionTime time.Time) error {
return err
}

c.storage.WriteMetrics(collected, collectionTime)
err = c.storage.WriteMetrics(collected, collectionTime)
if err != nil {
return err
}
slog.Debug("Collected metrics", slog.Duration("duration", time.Since(start)))

return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func NewFakeStorage() FakeStorage {
return FakeStorage{}
}

func (f FakeStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) {
func (f FakeStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error {
return nil
}

func (f FakeStorage) IsEmpty() bool {
Expand Down
15 changes: 10 additions & 5 deletions internal/storage/influxdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storage

import (
"context"
"log/slog"
"time"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
Expand All @@ -16,24 +18,27 @@ const totalMeasurementName = "total"

// InfluxDBStorage represents the implementation of a metric storage using InfluxDB.
type InfluxDBStorage struct {
writeAPI api.WriteAPI
writeAPI api.WriteAPIBlocking
queryAPI api.QueryAPI
}

// NewInfluxDBStorage creates a new `InfluxDBStorage`.
func NewInfluxDBStorage(url, org, bucket, token string) InfluxDBStorage {
client := influxdb2.NewClient(url, string(token))
writeAPI := client.WriteAPI(org, bucket)
writeAPI := client.WriteAPIBlocking(org, bucket)
queryAPI := client.QueryAPI(org)
return InfluxDBStorage{writeAPI: writeAPI, queryAPI: queryAPI}
}

// WriteMetric writes `metric` for `noteName` to the storage with `timestamp`.
func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) {
func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error {
points := createInfluxDBPoints(zettelkastenMetrics, timestamp)
for _, point := range points {
i.writeAPI.WritePoint(point)
slog.Debug("Writing metrics to InfluxDB", slog.Any("points", points))
err := i.writeAPI.WritePoint(context.Background(), points...)
if err != nil {
slog.Error("Error writing points to InfluxDB storage", slog.Any("error", err))
}
return err
}

// createInfluxDBPoints creates a slice of InfluxDB measurement points from `zettelkastenMetrics` with the given `timestamp`.
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
// Storage represents a storage for metrics.
type Storage interface {
// WriteMetric writes the `zettelkastenMetrics` to the storage.
WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time)
WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error
}
9 changes: 6 additions & 3 deletions internal/storage/victoriametrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ func NewVictoriaMetricsStorage(url string) VictoriaMetricsStorage {
return VictoriaMetricsStorage{writeUrl: fmt.Sprintf("%s/api/v2/write", url)}
}

func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) {
func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error {
// NOTE: we encode the metrics in the InfluxDB line protocol and write them to the VictoriaMetrics write endpoint.
// Reference: https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
points := createInfluxDBPoints(zettelkastenMetrics, timestamp)
content, err := encodePoints(points)
if err != nil {
slog.Error("Error encoding points into line procotol", slog.Any("error", err))
return err
}
slog.Info("Writing metrics to endpoint", slog.String("content", string(content)))
slog.Debug("Writing metrics to VictoriaMetrics", slog.String("content", string(content)))
_, err = http.Post(v.writeUrl, "application/x-www-form-urlencoded", bytes.NewBuffer(content))
if err != nil {
slog.Error("Error sending POST request to endpoint", slog.Any("error", err), slog.String("url", v.writeUrl))
return err
}
return nil
}

// encodePoints encodes the given `points` into InfluxDB's line protocol.
Expand All @@ -42,7 +45,7 @@ func encodePoints(points []*write.Point) ([]byte, error) {
e.SetFieldTypeSupport(lp.UintSupport)
e.FailOnFieldErr(true)
e.SetPrecision(time.Millisecond)
slog.Info("Endcoding points", slog.Any("points", points))
slog.Debug("Encoding points", slog.Any("points", points))
for _, point := range points {
_, err := e.Encode(point)
if err != nil {
Expand Down

0 comments on commit acae8be

Please sign in to comment.