diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 4aaf65e..ec44cfd 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -41,7 +41,10 @@ func (c *Collector) CollectMetrics(root fs.FS, collectionTime time.Time) error { return err } - c.storage.WriteMetrics(collected, collectionTime) + err = c.storage.WriteMetrics(collected, collectionTime) + if err != nil { + return err + } slog.Debug("Collected metrics", slog.Duration("duration", time.Since(start))) return nil diff --git a/internal/storage/fake.go b/internal/storage/fake.go index 8be9300..42048d3 100644 --- a/internal/storage/fake.go +++ b/internal/storage/fake.go @@ -14,7 +14,8 @@ func NewFakeStorage() FakeStorage { return FakeStorage{} } -func (f FakeStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) { +func (f FakeStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error { + return nil } func (f FakeStorage) IsEmpty() bool { diff --git a/internal/storage/influxdb.go b/internal/storage/influxdb.go index 696e856..25ca572 100644 --- a/internal/storage/influxdb.go +++ b/internal/storage/influxdb.go @@ -1,6 +1,8 @@ package storage import ( + "context" + "log/slog" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -16,24 +18,27 @@ const totalMeasurementName = "total" // InfluxDBStorage represents the implementation of a metric storage using InfluxDB. type InfluxDBStorage struct { - writeAPI api.WriteAPI + writeAPI api.WriteAPIBlocking queryAPI api.QueryAPI } // NewInfluxDBStorage creates a new `InfluxDBStorage`. func NewInfluxDBStorage(url, org, bucket, token string) InfluxDBStorage { client := influxdb2.NewClient(url, string(token)) - writeAPI := client.WriteAPI(org, bucket) + writeAPI := client.WriteAPIBlocking(org, bucket) queryAPI := client.QueryAPI(org) return InfluxDBStorage{writeAPI: writeAPI, queryAPI: queryAPI} } // WriteMetric writes `metric` for `noteName` to the storage with `timestamp`. -func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) { +func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error { points := createInfluxDBPoints(zettelkastenMetrics, timestamp) - for _, point := range points { - i.writeAPI.WritePoint(point) + slog.Debug("Writing metrics to InfluxDB", slog.Any("points", points)) + err := i.writeAPI.WritePoint(context.Background(), points...) + if err != nil { + slog.Error("Error writing points to InfluxDB storage", slog.Any("error", err)) } + return err } // createInfluxDBPoints creates a slice of InfluxDB measurement points from `zettelkastenMetrics` with the given `timestamp`. diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 7861ec9..fa01529 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -9,5 +9,5 @@ import ( // Storage represents a storage for metrics. type Storage interface { // WriteMetric writes the `zettelkastenMetrics` to the storage. - WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) + WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error } diff --git a/internal/storage/victoriametrics.go b/internal/storage/victoriametrics.go index ae7859b..7c12be9 100644 --- a/internal/storage/victoriametrics.go +++ b/internal/storage/victoriametrics.go @@ -20,19 +20,22 @@ func NewVictoriaMetricsStorage(url string) VictoriaMetricsStorage { return VictoriaMetricsStorage{writeUrl: fmt.Sprintf("%s/api/v2/write", url)} } -func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) { +func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) error { // NOTE: we encode the metrics in the InfluxDB line protocol and write them to the VictoriaMetrics write endpoint. // Reference: https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf points := createInfluxDBPoints(zettelkastenMetrics, timestamp) content, err := encodePoints(points) if err != nil { slog.Error("Error encoding points into line procotol", slog.Any("error", err)) + return err } - slog.Info("Writing metrics to endpoint", slog.String("content", string(content))) + slog.Debug("Writing metrics to VictoriaMetrics", slog.String("content", string(content))) _, err = http.Post(v.writeUrl, "application/x-www-form-urlencoded", bytes.NewBuffer(content)) if err != nil { slog.Error("Error sending POST request to endpoint", slog.Any("error", err), slog.String("url", v.writeUrl)) + return err } + return nil } // encodePoints encodes the given `points` into InfluxDB's line protocol. @@ -42,7 +45,7 @@ func encodePoints(points []*write.Point) ([]byte, error) { e.SetFieldTypeSupport(lp.UintSupport) e.FailOnFieldErr(true) e.SetPrecision(time.Millisecond) - slog.Info("Endcoding points", slog.Any("points", points)) + slog.Debug("Encoding points", slog.Any("points", points)) for _, point := range points { _, err := e.Encode(point) if err != nil {