diff --git a/collector/cached_scraper.go b/collector/cached_scraper.go new file mode 100644 index 0000000..c3ffdc4 --- /dev/null +++ b/collector/cached_scraper.go @@ -0,0 +1,77 @@ +package collector + +import ( + "context" + + pgx "github.com/jackc/pgx/v4" + "github.com/prometheus/client_golang/prometheus" + + "time" +) + +type dbname string + +type cachedScraper struct { + scraper Scraper + ttl time.Duration + lastScrapeAt map[dbname]time.Time + lastValues map[dbname][]prometheus.Metric +} + +func NewCachedScraper(scraper Scraper, ttl time.Duration) Scraper { + return &cachedScraper{ + scraper: scraper, + ttl: ttl, + lastScrapeAt: make(map[dbname]time.Time), + lastValues: make(map[dbname][]prometheus.Metric), + } +} +func (c *cachedScraper) Name() string { + return "Cached" + c.scraper.Name() +} + +func (c *cachedScraper) Scrape(ctx context.Context, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) error { + key := (dbname)(conn.Config().Database) + + if c.shouldScrape(key) { + c.lastScrapeAt[key] = time.Now() + var newValues []prometheus.Metric + + interceptorCh := make(chan prometheus.Metric) + go func() { + for { + v, ok := <-interceptorCh + if ok { + ch <- v + newValues = append(newValues, v) + } else { + return + } + } + }() + + err := c.scraper.Scrape(ctx, conn, version, interceptorCh) + close(interceptorCh) + c.lastValues[key] = newValues + return err + } else { + values, ok := c.lastValues[key] + if ok { + for _, metric := range values { + ch <- metric + } + } + return nil + } +} + +func (c *cachedScraper) shouldScrape(key dbname) bool { + lastScrapeAt, ok := c.lastScrapeAt[key] + if !ok { + lastScrapeAt = time.Unix(0, 0) + } + + nextScrapeAt := lastScrapeAt.Add(c.ttl) + + return nextScrapeAt.Before(time.Now()) +} diff --git a/collector/exporter.go b/collector/exporter.go index b157456..3328e14 100644 --- a/collector/exporter.go +++ b/collector/exporter.go @@ -52,13 +52,17 @@ type Scraper interface { } type Exporter struct { - ctx context.Context logger kitlog.Logger connConfig *pgx.ConnConfig scrapers []Scraper datnameScrapers []Scraper } +type ExporterRun struct { + ctx context.Context + exporter *Exporter +} + // Postgres Version type Version struct { version float64 @@ -80,15 +84,14 @@ func (v Version) String() string { return fmt.Sprintf("%g", v.version) } -// Verify our Exporter satisfies the prometheus.Collector interface -var _ prometheus.Collector = (*Exporter)(nil) +// Verify our ExporterRun satisfies the prometheus.Collector interface +var _ prometheus.Collector = ((*Exporter)(nil)).NewRun((context.Context)(nil)) // NewExporter is called every time we receive a scrape request and knows how // to collect metrics using each of the scrapers. It will live only for the // duration of the scrape request. -func NewExporter(ctx context.Context, logger kitlog.Logger, connConfig *pgx.ConnConfig) *Exporter { +func NewExporter(logger kitlog.Logger, connConfig *pgx.ConnConfig) *Exporter { return &Exporter{ - ctx: ctx, logger: logger, connConfig: connConfig, scrapers: []Scraper{ @@ -103,33 +106,37 @@ func NewExporter(ctx context.Context, logger kitlog.Logger, connConfig *pgx.Conn datnameScrapers: []Scraper{ NewStatVacuumProgressScraper(), NewStatUserTablesScraper(), - NewDiskUsageScraper(), + NewCachedScraper(NewDiskUsageScraper(), 5*time.Minute), }, } } +func (e *Exporter) NewRun(ctx context.Context) *ExporterRun { + return &ExporterRun{ctx: ctx, exporter: e} +} + // Describe implements the prometheus.Collector interface. -func (e Exporter) Describe(ch chan<- *prometheus.Desc) { +func (er ExporterRun) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc ch <- scrapeSuccessDesc } // Collect implements the prometheus.Collector interface. -func (e Exporter) Collect(ch chan<- prometheus.Metric) { - conn, err := pgx.ConnectConfig(e.ctx, e.connConfig) +func (er ExporterRun) Collect(ch chan<- prometheus.Metric) { + conn, err := pgx.ConnectConfig(er.ctx, er.exporter.connConfig) if err != nil { ch <- prometheus.MustNewConstMetric(upDesc, prometheus.GaugeValue, 0) - level.Error(e.logger).Log("error", err) + level.Error(er.exporter.logger).Log("error", err) return // cannot continue without a valid connection } - defer conn.Close(e.ctx) + defer conn.Close(er.ctx) // postgres_up ch <- prometheus.MustNewConstMetric(upDesc, prometheus.GaugeValue, 1) var version string - if err := conn.QueryRow(e.ctx, infoQuery).Scan(&version); err != nil { - level.Error(e.logger).Log("error", err) + if err := conn.QueryRow(er.ctx, infoQuery).Scan(&version); err != nil { + level.Error(er.exporter.logger).Log("error", err) return // cannot continue without a version } @@ -140,9 +147,9 @@ func (e Exporter) Collect(ch chan<- prometheus.Metric) { // discovery databases var dbnames []string - rows, err := conn.Query(e.ctx, listDatnameQuery) + rows, err := conn.Query(er.ctx, listDatnameQuery) if err != nil { - level.Error(e.logger).Log("error", err) + level.Error(er.exporter.logger).Log("error", err) return } defer rows.Close() @@ -151,46 +158,46 @@ func (e Exporter) Collect(ch chan<- prometheus.Metric) { var dbname string err := rows.Scan(&dbname) if err != nil { - level.Error(e.logger).Log("error", err) + level.Error(er.exporter.logger).Log("error", err) return } dbnames = append(dbnames, dbname) } // run global scrapers - for _, scraper := range e.scrapers { - e.scrape(scraper, conn, v, ch) + for _, scraper := range er.exporter.scrapers { + scraper.Scrape(er.ctx, conn, v, ch) } // run datname scrapers for _, dbname := range dbnames { // update connection dbname - e.connConfig.Config.Database = dbname + er.exporter.connConfig.Config.Database = dbname // establish a new connection - conn, err := pgx.ConnectConfig(e.ctx, e.connConfig) + conn, err := pgx.ConnectConfig(er.ctx, er.exporter.connConfig) if err != nil { - level.Error(e.logger).Log("error", err) + level.Error(er.exporter.logger).Log("error", err) return // cannot continue without a valid connection } // scrape - for _, scraper := range e.datnameScrapers { - e.scrape(scraper, conn, v, ch) + for _, scraper := range er.exporter.datnameScrapers { + scraper.Scrape(er.ctx, conn, v, ch) } - conn.Close(e.ctx) + conn.Close(er.ctx) } } -func (e Exporter) scrape(scraper Scraper, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) { +func (er ExporterRun) scrape(scraper Scraper, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) { start := time.Now() - err := scraper.Scrape(e.ctx, conn, version, ch) + err := scraper.Scrape(er.ctx, conn, version, ch) duration := time.Since(start) var success float64 - logger := kitlog.With(e.logger, "scraper", scraper.Name(), "duration", duration.Seconds()) + logger := kitlog.With(er.exporter.logger, "scraper", scraper.Name(), "duration", duration.Seconds()) if err != nil { logger.Log("error", err) success = 0 @@ -199,7 +206,7 @@ func (e Exporter) scrape(scraper Scraper, conn *pgx.Conn, version Version, ch ch success = 1 } - datname := e.connConfig.Config.Database + datname := er.exporter.connConfig.Config.Database ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), scraper.Name(), datname) ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, scraper.Name(), datname) } diff --git a/postgres_exporter.go b/postgres_exporter.go index 8c42e12..f74940e 100644 --- a/postgres_exporter.go +++ b/postgres_exporter.go @@ -102,13 +102,15 @@ func catchHandler(meticsPath *string) http.Handler { } func metricsHandler(logger kitlog.Logger, connConfig *pgx.ConnConfig) http.Handler { + exporter := collector.NewExporter(logger, connConfig) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handlerLock.Lock() defer handlerLock.Unlock() registry := prometheus.NewRegistry() registry.MustRegister(version.NewCollector("postgres_exporter")) - registry.MustRegister(collector.NewExporter(r.Context(), logger, connConfig)) + registry.MustRegister(exporter.NewRun(r.Context())) gatherers := prometheus.Gatherers{ prometheus.DefaultGatherer,