From c9ea8fe846e4e733dc41b5f0efd639df7dd9be58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Simas?= Date: Sat, 27 Jul 2024 13:27:31 -0300 Subject: [PATCH] feat: add support for VictoriaMetrics storage Adds support for using VictoriaMetrics as a storage backend. The metrics are encoded using the InfluxDB line protocol and written to the remote VictoriaMetrics write endpoint. Using the InfluxDB line protocol allows for specifying a timestamp when writing the metrics, enabling backfilling of historical metrics. --- go.mod | 2 +- internal/config/config.go | 16 ++++++--- internal/storage/influxdb.go | 17 +++++++-- internal/storage/victoriametrics.go | 53 +++++++++++++++++++++++++++++ main.go | 9 +++-- 5 files changed, 87 insertions(+), 10 deletions(-) create mode 100644 internal/storage/victoriametrics.go diff --git a/go.mod b/go.mod index 41427d6..82b690e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.2 require ( github.com/gookit/validate v1.5.2 github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 github.com/knadh/koanf v1.5.0 github.com/stretchr/testify v1.9.0 github.com/yuin/goldmark v1.7.4 @@ -19,7 +20,6 @@ require ( github.com/google/uuid v1.3.1 // indirect github.com/gookit/filter v1.2.1 // indirect github.com/gookit/goutil v0.6.15 // indirect - github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/internal/config/config.go b/internal/config/config.go index 7550a46..4b5b09c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,10 +22,11 @@ type Config struct { IgnoreFiles []string `koanf:"ignore_files"` CollectionInterval time.Duration `koanf:"collection_interval"` CollectHistoricalMetrics bool `koanf:"collect_historical_metrics"` - InfluxDBURL string `koanf:"influxdb_url" validate:"required|fullUrl"` - InfluxDBToken string `koanf:"influxdb_token" validate:"required"` - InfluxDBOrg string `koanf:"influxdb_org" validate:"required"` - InfluxDBBucket string `koanf:"influxdb_bucket" validate:"required"` + VictoriaMetricsURL string `koanf:"victoriametrics_url" validate:"fullUrl"` + InfluxDBURL string `koanf:"influxdb_url" validate:"fullUrl"` + InfluxDBToken string `koanf:"influxdb_token" validate:"requiredWith:InfluxDBURL"` + InfluxDBOrg string `koanf:"influxdb_org" validate:"requiredWith:InfluxDBURL"` + InfluxDBBucket string `koanf:"influxdb_bucket" validate:"requiredWith:InfluxDBURL"` } func LoadConfig() (Config, error) { @@ -75,6 +76,12 @@ func LoadConfig() (Config, error) { if cfg.ZettelkastenGitURL != "" && cfg.ZettelkastenDirectory != "" { return Config{}, errors.New("ZettelkastenGitURL and ZettelkastenDirectory cannot be provided together") } + if cfg.VictoriaMetricsURL != "" && cfg.InfluxDBURL != "" { + return Config{}, errors.New("InfluxDBURL and VictoriaMetricsURL cannot be provided together") + } + if cfg.VictoriaMetricsURL == "" && cfg.InfluxDBURL == "" { + return Config{}, errors.New("Either InfluxDBURL or VictoriaMetricsURL must be provided") + } return cfg, nil } @@ -89,6 +96,7 @@ func (c Config) LogValue() slog.Value { slog.Any("IgnoreFiles", c.IgnoreFiles), slog.Duration("CollectionInterval", c.CollectionInterval), slog.Bool("CollectHistoricalMetrics", c.CollectHistoricalMetrics), + slog.String("VictoriaMetricsURL", c.VictoriaMetricsURL), slog.String("InfluxDBURL", c.InfluxDBURL), slog.String("InfluxDBToken", "[REDACTED]"), slog.String("InfluxDBOrg", c.InfluxDBOrg), diff --git a/internal/storage/influxdb.go b/internal/storage/influxdb.go index 777325c..696e856 100644 --- a/internal/storage/influxdb.go +++ b/internal/storage/influxdb.go @@ -5,6 +5,7 @@ import ( influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/luissimas/zettelkasten-exporter/internal/metrics" ) @@ -29,6 +30,15 @@ func NewInfluxDBStorage(url, org, bucket, token string) InfluxDBStorage { // WriteMetric writes `metric` for `noteName` to the storage with `timestamp`. func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) { + points := createInfluxDBPoints(zettelkastenMetrics, timestamp) + for _, point := range points { + i.writeAPI.WritePoint(point) + } +} + +// createInfluxDBPoints creates a slice of InfluxDB measurement points from `zettelkastenMetrics` with the given `timestamp`. +func createInfluxDBPoints(zettelkastenMetrics metrics.Metrics, timestamp time.Time) []*write.Point { + points := make([]*write.Point, 0, len(zettelkastenMetrics.Notes)+1) // Aggregated metrics point := influxdb2.NewPoint( totalMeasurementName, @@ -40,11 +50,11 @@ func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, times }, timestamp, ) - i.writeAPI.WritePoint(point) + points = append(points, point) // Individual note metrics for name, metric := range zettelkastenMetrics.Notes { - point := influxdb2.NewPoint( + point = influxdb2.NewPoint( notesMeasurementName, map[string]string{"name": name}, map[string]interface{}{ @@ -54,6 +64,7 @@ func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, times }, timestamp, ) - i.writeAPI.WritePoint(point) + points = append(points, point) } + return points } diff --git a/internal/storage/victoriametrics.go b/internal/storage/victoriametrics.go new file mode 100644 index 0000000..ae7859b --- /dev/null +++ b/internal/storage/victoriametrics.go @@ -0,0 +1,53 @@ +package storage + +import ( + "bytes" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/influxdata/influxdb-client-go/v2/api/write" + lp "github.com/influxdata/line-protocol" + "github.com/luissimas/zettelkasten-exporter/internal/metrics" +) + +type VictoriaMetricsStorage struct { + writeUrl string +} + +func NewVictoriaMetricsStorage(url string) VictoriaMetricsStorage { + return VictoriaMetricsStorage{writeUrl: fmt.Sprintf("%s/api/v2/write", url)} +} + +func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) { + // NOTE: we encode the metrics in the InfluxDB line protocol and write them to the VictoriaMetrics write endpoint. + // Reference: https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf + points := createInfluxDBPoints(zettelkastenMetrics, timestamp) + content, err := encodePoints(points) + if err != nil { + slog.Error("Error encoding points into line procotol", slog.Any("error", err)) + } + slog.Info("Writing metrics to endpoint", slog.String("content", string(content))) + _, err = http.Post(v.writeUrl, "application/x-www-form-urlencoded", bytes.NewBuffer(content)) + if err != nil { + slog.Error("Error sending POST request to endpoint", slog.Any("error", err), slog.String("url", v.writeUrl)) + } +} + +// encodePoints encodes the given `points` into InfluxDB's line protocol. +func encodePoints(points []*write.Point) ([]byte, error) { + var buffer bytes.Buffer + e := lp.NewEncoder(&buffer) + e.SetFieldTypeSupport(lp.UintSupport) + e.FailOnFieldErr(true) + e.SetPrecision(time.Millisecond) + slog.Info("Endcoding points", slog.Any("points", points)) + for _, point := range points { + _, err := e.Encode(point) + if err != nil { + return make([]byte, 0), err + } + } + return buffer.Bytes(), nil +} diff --git a/main.go b/main.go index 58fc474..8e57fca 100644 --- a/main.go +++ b/main.go @@ -21,8 +21,13 @@ func main() { os.Exit(1) } slog.Debug("Loaded config", slog.Any("config", cfg)) - storage := storage.NewInfluxDBStorage(cfg.InfluxDBURL, cfg.InfluxDBOrg, cfg.InfluxDBBucket, cfg.InfluxDBToken) - collector := collector.NewCollector(cfg.IgnoreFiles, storage) + var metricsStorage storage.Storage + if cfg.VictoriaMetricsURL != "" { + metricsStorage = storage.NewVictoriaMetricsStorage(cfg.VictoriaMetricsURL) + } else { + metricsStorage = storage.NewInfluxDBStorage(cfg.InfluxDBURL, cfg.InfluxDBOrg, cfg.InfluxDBBucket, cfg.InfluxDBToken) + } + collector := collector.NewCollector(cfg.IgnoreFiles, metricsStorage) var zet zettelkasten.Zettelkasten if cfg.ZettelkastenGitURL != "" { zet = zettelkasten.NewGitZettelkasten(cfg.ZettelkastenGitURL, cfg.ZettelkastenGitBranch, cfg.ZettelkastenGitToken)