Skip to content

Commit

Permalink
add labels collector (#18)
Browse files Browse the repository at this point in the history
Signed-off-by: Augustin Husson <[email protected]>
  • Loading branch information
Nexucis authored Nov 7, 2024
1 parent 91d54db commit 2f714e4
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 7 deletions.
24 changes: 24 additions & 0 deletions config/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ func (c *RulesCollector) Verify() error {
return nil
}

type LabelsCollector struct {
Enable bool `yaml:"enable"`
Period model.Duration `yaml:"period,omitempty"`
// MetricUsageClient is a client to send the metrics usage to a remote metrics_usage server.
MetricUsageClient *HTTPClient `yaml:"metric_usage_client,omitempty"`
HTTPClient HTTPClient `yaml:"prometheus_client"`
}

func (c *LabelsCollector) Verify() error {
if !c.Enable {
return nil
}
if c.Period <= 0 {
c.Period = model.Duration(defaultMetricCollectorPeriodDuration)
}
if c.HTTPClient.URL == nil {
return fmt.Errorf("missing Prometheus URL for the rules collector")
}
if c.MetricUsageClient != nil && c.MetricUsageClient.URL == nil {
return fmt.Errorf("missing Metrics Usage URL for the rules collector")
}
return nil
}

type PersesCollector struct {
Enable bool `yaml:"enable"`
Period model.Duration `yaml:"period,omitempty"`
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func (d *Database) Verify() error {
}

type Config struct {
Database Database `yaml:"database"`
MetricCollector MetricCollector `yaml:"metric_collector,omitempty"`
RulesCollectors []*RulesCollector `yaml:"rules_collectors,omitempty"`
PersesCollector PersesCollector `yaml:"perses_collector,omitempty"`
GrafanaCollector GrafanaCollector `yaml:"grafana_collector,omitempty"`
Database Database `yaml:"database"`
MetricCollector MetricCollector `yaml:"metric_collector,omitempty"`
RulesCollectors []*RulesCollector `yaml:"rules_collectors,omitempty"`
LabelsCollectors []*LabelsCollector `yaml:"labels_collectors,omitempty"`
PersesCollector PersesCollector `yaml:"perses_collector,omitempty"`
GrafanaCollector GrafanaCollector `yaml:"grafana_collector,omitempty"`
}

func Resolve(configFile string) (Config, error) {
Expand Down
28 changes: 28 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@ type Database interface {
ListMetrics() map[string]*v1.Metric
EnqueueMetricList(metrics []string)
EnqueueUsage(usages map[string]*v1.MetricUsage)
EnqueueLabels(labels map[string][]string)
}

func New(cfg config.Database) Database {
d := &db{
metrics: make(map[string]*v1.Metric),
usage: make(map[string]*v1.MetricUsage),
usageQueue: make(chan map[string]*v1.MetricUsage, 250),
labelsQueue: make(chan map[string][]string, 250),
metricsQueue: make(chan []string, 10),
path: cfg.Path,
}

go d.watchUsageQueue()
go d.watchMetricsQueue()
go d.watchLabelsQueue()
if !*cfg.InMemory {
if err := d.readMetricsInJSONFile(); err != nil {
logrus.WithError(err).Warning("failed to read metrics file")
Expand All @@ -62,6 +65,10 @@ type db struct {
// metricsQueue is the channel that should be used to send and receive the list of metric name to keep in memory.
// Based on this list, we will then collect their usage.
metricsQueue chan []string
// labelsQueue is the way to send the labels per metric to write in the database.
// There will be no other way to write in it.
// Doing that allows us to accept more HTTP requests to write data and to delay the actual writing.
labelsQueue chan map[string][]string
// usageQueue is the way to send the usage per metric to write in the database.
// There will be no other way to write in it.
// Doing that allows us to accept more HTTP requests to write data and to delay the actual writing.
Expand Down Expand Up @@ -99,6 +106,10 @@ func (d *db) EnqueueUsage(usages map[string]*v1.MetricUsage) {
d.usageQueue <- usages
}

func (d *db) EnqueueLabels(labels map[string][]string) {
d.labelsQueue <- labels
}

func (d *db) watchMetricsQueue() {
for _metrics := range d.metricsQueue {
d.mutex.Lock()
Expand Down Expand Up @@ -141,6 +152,23 @@ func (d *db) watchUsageQueue() {
}
}

func (d *db) watchLabelsQueue() {
for data := range d.labelsQueue {
d.mutex.Lock()
for metricName, labels := range data {
if _, ok := d.metrics[metricName]; !ok {
// In this case, we should add the metric, because it means the metrics has been found from another source.
d.metrics[metricName] = &v1.Metric{
Labels: labels,
}
} else {
d.metrics[metricName].Labels = utils.Merge(d.metrics[metricName].Labels, labels)
}
}
d.mutex.Unlock()
}
}

func (d *db) flush(period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()
Expand Down
5 changes: 5 additions & 0 deletions dev/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ rules_collectors:
prometheus_client:
url: "https://prometheus.demo.do.prometheus.io"

labels_collectors:
- enable: true
prometheus_client:
url: "https://prometheus.demo.do.prometheus.io"

perses_collector:
enable: true
perses_client:
Expand Down
14 changes: 13 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/perses/metrics-usage/config"
"github.com/perses/metrics-usage/database"
"github.com/perses/metrics-usage/source/grafana"
"github.com/perses/metrics-usage/source/labels"
"github.com/perses/metrics-usage/source/metric"
"github.com/perses/metrics-usage/source/perses"
"github.com/perses/metrics-usage/source/rules"
Expand Down Expand Up @@ -60,6 +61,16 @@ func main() {
}
}

for i, labelsCollectorConfig := range conf.LabelsCollectors {
if labelsCollectorConfig.Enable {
labelsCollector, collectorErr := labels.NewCollector(db, labelsCollectorConfig)
if collectorErr != nil {
logrus.WithError(collectorErr).Fatalf("unable to create the labels collector number %d", i)
}
runner.WithTimerTasks(time.Duration(labelsCollectorConfig.Period), labelsCollector)
}
}

if conf.PersesCollector.Enable {
persesCollectorConfig := conf.PersesCollector
persesCollector, collectorErr := perses.NewCollector(db, persesCollectorConfig)
Expand All @@ -81,6 +92,7 @@ func main() {
runner.HTTPServerBuilder().
ActivatePprof(*pprof).
APIRegistration(metric.NewAPI(db)).
APIRegistration(rules.NewAPI(db))
APIRegistration(rules.NewAPI(db)).
APIRegistration(labels.NewAPI(db))
runner.Start()
}
3 changes: 2 additions & 1 deletion pkg/api/v1/metric_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ type MetricUsage struct {
}

type Metric struct {
Usage *MetricUsage `json:"usage,omitempty"`
Labels []string `json:"labels,omitempty"`
Usage *MetricUsage `json:"usage,omitempty"`
}
17 changes: 17 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

type Client interface {
Usage(map[string]*modelAPIV1.MetricUsage) error
Labels(map[string][]string) error
}

func New(cfg config.HTTPClient) (Client, error) {
Expand Down Expand Up @@ -61,6 +62,22 @@ func (c *client) Usage(metrics map[string]*modelAPIV1.MetricUsage) error {
return nil
}

func (c *client) Labels(labels map[string][]string) error {
data, err := json.Marshal(labels)
if err != nil {
return err
}
body := bytes.NewBuffer(data)
resp, err := c.httpClient.Post(c.url("/api/v1/labels").String(), "application/json", body)
if err != nil {
return err
}
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
return fmt.Errorf("when sending label names, unexpected status code: %d", resp.StatusCode)
}
return nil
}

func (c *client) url(ep string) *url.URL {
p := path.Join(c.endpoint.Path, ep)
u := *c.endpoint
Expand Down
48 changes: 48 additions & 0 deletions source/labels/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 The Perses Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package labels

import (
"net/http"

"github.com/labstack/echo/v4"
persesEcho "github.com/perses/common/echo"
"github.com/perses/metrics-usage/database"
)

func NewAPI(db database.Database) persesEcho.Register {
return &endpoint{
db: db,
}
}

type endpoint struct {
db database.Database
}

func (e *endpoint) RegisterRoute(ech *echo.Echo) {
path := "/api/v1/labels"
ech.POST(path, e.PushLabels)
}

func (e *endpoint) PushLabels(ctx echo.Context) error {
data := make(map[string][]string)
if err := ctx.Bind(&data); err != nil {
return ctx.JSON(http.StatusBadRequest, echo.Map{"message": err.Error()})
}
if len(data) > 0 {
e.db.EnqueueLabels(data)
}
return ctx.JSON(http.StatusAccepted, echo.Map{"message": "OK"})
}
102 changes: 102 additions & 0 deletions source/labels/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 The Perses Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package labels

import (
"context"
"time"

"github.com/perses/common/async"
"github.com/perses/metrics-usage/config"
"github.com/perses/metrics-usage/database"
"github.com/perses/metrics-usage/pkg/client"
"github.com/perses/metrics-usage/utils/prometheus"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
)

func NewCollector(db database.Database, cfg *config.LabelsCollector) (async.SimpleTask, error) {
promClient, err := prometheus.NewClient(cfg.HTTPClient)
if err != nil {
return nil, err
}
var metricUsageClient client.Client
if cfg.MetricUsageClient != nil {
metricUsageClient, err = client.New(*cfg.MetricUsageClient)
if err != nil {
return nil, err
}
}
return &labelCollector{
promClient: promClient,
db: db,
metricUsageClient: metricUsageClient,
period: cfg.Period,
logger: logrus.StandardLogger().WithField("collector", "labels"),
}, nil
}

type labelCollector struct {
async.SimpleTask
promClient v1.API
db database.Database
metricUsageClient client.Client
period model.Duration
logger *logrus.Entry
}

func (c *labelCollector) Execute(ctx context.Context, _ context.CancelFunc) error {
now := time.Now()
start := now.Add(time.Duration(-c.period))
labelValues, _, err := c.promClient.LabelValues(ctx, "__name__", nil, start, now)
if err != nil {
c.logger.WithError(err).Error("failed to query metrics")
return nil
}
result := make(map[string][]string)
for _, metricName := range labelValues {
c.logger.Debugf("querying Prometheus to get label names for metric %s", metricName)
labels, _, queryErr := c.promClient.LabelNames(ctx, []string{string(metricName)}, start, now)
if queryErr != nil {
c.logger.WithError(queryErr).Errorf("failed to query labels for the metric %q", metricName)
continue
}
result[string(metricName)] = removeLabelName(labels)
}
if len(result) > 0 {
if c.metricUsageClient != nil {
// In this case, that means we have to send the data to a remote server.
if sendErr := c.metricUsageClient.Labels(result); sendErr != nil {
c.logger.WithError(sendErr).Error("Failed to send labels name")
}
} else {
c.db.EnqueueLabels(result)
}
}
return nil
}

func (c *labelCollector) String() string {
return "labels collector"
}

func removeLabelName(labels []string) []string {
for i, label := range labels {
if label == "__name__" {
return append(labels[:i], labels[i+1:]...)
}
}
return labels
}

0 comments on commit 2f714e4

Please sign in to comment.