Skip to content

Commit

Permalink
feat: add support for VictoriaMetrics storage
Browse files Browse the repository at this point in the history
Adds support for using VictoriaMetrics as a storage backend. The metrics
are encoded using the InfluxDB line protocol and written to the remote
VictoriaMetrics write endpoint. Using the InfluxDB line protocol allows
for specifying a timestamp when writing the metrics, enabling
backfilling of historical metrics.
  • Loading branch information
luissimas committed Jul 27, 2024
1 parent e28369f commit c9ea8fe
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.2
require (
github.com/gookit/validate v1.5.2
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839
github.com/knadh/koanf v1.5.0
github.com/stretchr/testify v1.9.0
github.com/yuin/goldmark v1.7.4
Expand All @@ -19,7 +20,6 @@ require (
github.com/google/uuid v1.3.1 // indirect
github.com/gookit/filter v1.2.1 // indirect
github.com/gookit/goutil v0.6.15 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand Down
16 changes: 12 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type Config struct {
IgnoreFiles []string `koanf:"ignore_files"`
CollectionInterval time.Duration `koanf:"collection_interval"`
CollectHistoricalMetrics bool `koanf:"collect_historical_metrics"`
InfluxDBURL string `koanf:"influxdb_url" validate:"required|fullUrl"`
InfluxDBToken string `koanf:"influxdb_token" validate:"required"`
InfluxDBOrg string `koanf:"influxdb_org" validate:"required"`
InfluxDBBucket string `koanf:"influxdb_bucket" validate:"required"`
VictoriaMetricsURL string `koanf:"victoriametrics_url" validate:"fullUrl"`
InfluxDBURL string `koanf:"influxdb_url" validate:"fullUrl"`
InfluxDBToken string `koanf:"influxdb_token" validate:"requiredWith:InfluxDBURL"`
InfluxDBOrg string `koanf:"influxdb_org" validate:"requiredWith:InfluxDBURL"`
InfluxDBBucket string `koanf:"influxdb_bucket" validate:"requiredWith:InfluxDBURL"`
}

func LoadConfig() (Config, error) {
Expand Down Expand Up @@ -75,6 +76,12 @@ func LoadConfig() (Config, error) {
if cfg.ZettelkastenGitURL != "" && cfg.ZettelkastenDirectory != "" {
return Config{}, errors.New("ZettelkastenGitURL and ZettelkastenDirectory cannot be provided together")
}
if cfg.VictoriaMetricsURL != "" && cfg.InfluxDBURL != "" {
return Config{}, errors.New("InfluxDBURL and VictoriaMetricsURL cannot be provided together")
}
if cfg.VictoriaMetricsURL == "" && cfg.InfluxDBURL == "" {
return Config{}, errors.New("Either InfluxDBURL or VictoriaMetricsURL must be provided")
}

return cfg, nil
}
Expand All @@ -89,6 +96,7 @@ func (c Config) LogValue() slog.Value {
slog.Any("IgnoreFiles", c.IgnoreFiles),
slog.Duration("CollectionInterval", c.CollectionInterval),
slog.Bool("CollectHistoricalMetrics", c.CollectHistoricalMetrics),
slog.String("VictoriaMetricsURL", c.VictoriaMetricsURL),
slog.String("InfluxDBURL", c.InfluxDBURL),
slog.String("InfluxDBToken", "[REDACTED]"),
slog.String("InfluxDBOrg", c.InfluxDBOrg),
Expand Down
17 changes: 14 additions & 3 deletions internal/storage/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"

"github.com/luissimas/zettelkasten-exporter/internal/metrics"
)
Expand All @@ -29,6 +30,15 @@ func NewInfluxDBStorage(url, org, bucket, token string) InfluxDBStorage {

// WriteMetric writes `metric` for `noteName` to the storage with `timestamp`.
func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) {
points := createInfluxDBPoints(zettelkastenMetrics, timestamp)
for _, point := range points {
i.writeAPI.WritePoint(point)
}
}

// createInfluxDBPoints creates a slice of InfluxDB measurement points from `zettelkastenMetrics` with the given `timestamp`.
func createInfluxDBPoints(zettelkastenMetrics metrics.Metrics, timestamp time.Time) []*write.Point {
points := make([]*write.Point, 0, len(zettelkastenMetrics.Notes)+1)
// Aggregated metrics
point := influxdb2.NewPoint(
totalMeasurementName,
Expand All @@ -40,11 +50,11 @@ func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, times
},
timestamp,
)
i.writeAPI.WritePoint(point)
points = append(points, point)

// Individual note metrics
for name, metric := range zettelkastenMetrics.Notes {
point := influxdb2.NewPoint(
point = influxdb2.NewPoint(
notesMeasurementName,
map[string]string{"name": name},
map[string]interface{}{
Expand All @@ -54,6 +64,7 @@ func (i InfluxDBStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, times
},
timestamp,
)
i.writeAPI.WritePoint(point)
points = append(points, point)
}
return points
}
53 changes: 53 additions & 0 deletions internal/storage/victoriametrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package storage

import (
"bytes"
"fmt"
"log/slog"
"net/http"
"time"

"github.com/influxdata/influxdb-client-go/v2/api/write"
lp "github.com/influxdata/line-protocol"
"github.com/luissimas/zettelkasten-exporter/internal/metrics"
)

type VictoriaMetricsStorage struct {
writeUrl string
}

func NewVictoriaMetricsStorage(url string) VictoriaMetricsStorage {
return VictoriaMetricsStorage{writeUrl: fmt.Sprintf("%s/api/v2/write", url)}
}

func (v VictoriaMetricsStorage) WriteMetrics(zettelkastenMetrics metrics.Metrics, timestamp time.Time) {
// NOTE: we encode the metrics in the InfluxDB line protocol and write them to the VictoriaMetrics write endpoint.
// Reference: https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf
points := createInfluxDBPoints(zettelkastenMetrics, timestamp)
content, err := encodePoints(points)
if err != nil {
slog.Error("Error encoding points into line procotol", slog.Any("error", err))
}
slog.Info("Writing metrics to endpoint", slog.String("content", string(content)))
_, err = http.Post(v.writeUrl, "application/x-www-form-urlencoded", bytes.NewBuffer(content))
if err != nil {
slog.Error("Error sending POST request to endpoint", slog.Any("error", err), slog.String("url", v.writeUrl))
}
}

// encodePoints encodes the given `points` into InfluxDB's line protocol.
func encodePoints(points []*write.Point) ([]byte, error) {
var buffer bytes.Buffer
e := lp.NewEncoder(&buffer)
e.SetFieldTypeSupport(lp.UintSupport)
e.FailOnFieldErr(true)
e.SetPrecision(time.Millisecond)
slog.Info("Endcoding points", slog.Any("points", points))
for _, point := range points {
_, err := e.Encode(point)
if err != nil {
return make([]byte, 0), err
}
}
return buffer.Bytes(), nil
}
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ func main() {
os.Exit(1)
}
slog.Debug("Loaded config", slog.Any("config", cfg))
storage := storage.NewInfluxDBStorage(cfg.InfluxDBURL, cfg.InfluxDBOrg, cfg.InfluxDBBucket, cfg.InfluxDBToken)
collector := collector.NewCollector(cfg.IgnoreFiles, storage)
var metricsStorage storage.Storage
if cfg.VictoriaMetricsURL != "" {
metricsStorage = storage.NewVictoriaMetricsStorage(cfg.VictoriaMetricsURL)
} else {
metricsStorage = storage.NewInfluxDBStorage(cfg.InfluxDBURL, cfg.InfluxDBOrg, cfg.InfluxDBBucket, cfg.InfluxDBToken)
}
collector := collector.NewCollector(cfg.IgnoreFiles, metricsStorage)
var zet zettelkasten.Zettelkasten
if cfg.ZettelkastenGitURL != "" {
zet = zettelkasten.NewGitZettelkasten(cfg.ZettelkastenGitURL, cfg.ZettelkastenGitBranch, cfg.ZettelkastenGitToken)
Expand Down

0 comments on commit c9ea8fe

Please sign in to comment.