From 4aaf53555bb5f742f0cca749b1249b0e2e2b8ec1 Mon Sep 17 00:00:00 2001 From: Andrew Morton Date: Thu, 28 Apr 2022 16:31:59 +0100 Subject: [PATCH] Re-work standby side metrics to be based around the `pg_stat_wal_receiver` view --- README.md | 3 +- collector/exporter.go | 1 + collector/stat_replication.go | 91 +++++++++++++++------------------- collector/stat_wal_receiver.go | 84 +++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 52 deletions(-) create mode 100644 collector/stat_wal_receiver.go diff --git a/README.md b/README.md index 5c64777..b55a011 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,6 @@ Prometheus exporter for PostgreSQL server metrics. | postgres_stat_database_xact_commit_total | Number of transactions in this database that have been committed | datname | | postgres_stat_database_xact_rollback_total | Number of transactions in this database that have been rolled back | datname | | postgres_stat_replication_lag_bytes | Replication Lag in bytes | application_name, client_addr, state, sync_state | -| postgres_standby_wal_replay_lag_seconds | Replication lag measured in seconds on the standby. Measured as `EXTRACT (EPOCH FROM now()) - pg_last_xact_replay_timestamp()` | application_name, client_addr, state, sync_state | | postgres_stat_replication_flush_lag_seconds | Elapsed time during committed WALs from primary to the standby (WAL's has already been flushed but not yet applied). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state | | postgres_stat_replication_replay_lag_seconds | Elapsed time during committed WALs from primary to the standby (fully committed in standby node). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state | | postgres_stat_replication_write_lag_seconds | Elapsed time during committed WALs from primary to the standby (but not yet committed in the standby). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state | @@ -79,8 +78,10 @@ Prometheus exporter for PostgreSQL server metrics. | postgres_stat_user_indexes_scan_total | Number of times this index has been scanned | datname, schemaname, tablename, indexname | | postgres_stat_user_indexes_tuple_read_total | Number of times tuples have been returned from scanning this index | datname, schemaname, tablename, indexname | | postgres_stat_user_indexes_tuple_fetch_total | Number of live tuples fetched by scans on this index | datname, schemaname, tablename, indexname | +| postgres_wal_receiver_replay_lag_seconds | Replication lag measured in seconds on the standby. Measured as `EXTRACT (EPOCH FROM now()) - pg_last_xact_replay_timestamp()` | status | | postgres_up | Whether the Postgres server is up | | + ### Run #### Passing in a libpq connection string diff --git a/collector/exporter.go b/collector/exporter.go index f30c271..4db1b87 100644 --- a/collector/exporter.go +++ b/collector/exporter.go @@ -101,6 +101,7 @@ func NewExporter(ctx context.Context, logger kitlog.Logger, connConfig *pgx.Conn NewStatBgwriterScraper(), NewStatDatabaseScraper(), NewStatReplicationScraper(), + NewWalReceiverScraper(), }, datnameScrapers: []Scraper{ NewStatVacuumProgressScraper(), diff --git a/collector/stat_replication.go b/collector/stat_replication.go index 42ede6e..31a9421 100644 --- a/collector/stat_replication.go +++ b/collector/stat_replication.go @@ -2,6 +2,7 @@ package collector import ( "context" + "database/sql" "net" pgx "github.com/jackc/pgx/v4" @@ -25,11 +26,6 @@ WITH pg_replication AS ( ELSE pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float END ) AS pg_xlog_location_diff - , ( CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() - THEN 0 - ELSE EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()) - END - ) as pg_standby_wal_replay_lag , EXTRACT (EPOCH FROM write_lag) as write_lag_seconds , EXTRACT (EPOCH FROM flush_lag) as flush_lag_seconds , EXTRACT (EPOCH FROM replay_lag) as replay_lag_seconds @@ -39,11 +35,10 @@ SELECT * FROM pg_replication WHERE pg_xlog_location_diff IS NOT NULL /*postgres_ ) type statReplicationScraper struct { - lagBytes *prometheus.Desc - standbyWalReplayLag *prometheus.Desc - writeLag *prometheus.Desc - flushLag *prometheus.Desc - replayLag *prometheus.Desc + lagBytes *prometheus.Desc + writeLag *prometheus.Desc + flushLag *prometheus.Desc + replayLag *prometheus.Desc } // NewStatReplicationScraper returns a new Scraper exposing postgres pg_stat_replication @@ -55,12 +50,6 @@ func NewStatReplicationScraper() Scraper { []string{"application_name", "client_addr", "state", "sync_state"}, nil, ), - standbyWalReplayLag: prometheus.NewDesc( - "postgres_standby_wal_replay_lag_seconds", - "delay in standby wal replay seconds EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()", - []string{"application_name", "client_addr", "state", "sync_state"}, - nil, - ), writeLag: prometheus.NewDesc( "postgres_stat_replication_write_lag_seconds", "write_lag as reported by the pg_stat_replication view converted to seconds", @@ -99,7 +88,11 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver var applicationName, state, syncState string var clientAddr net.IP - var pgXlogLocationDiff, pgStandbyWalReplayLag, writeLagSeconds, flushLagSeconds, replayLagSeconds float64 + var pgXlogLocationDiff float64 + /* When querying pg_stat_replication it may be that we don't have + values for the various lags as they vanish are a period of inactivity + */ + var writeLagSeconds, flushLagSeconds, replayLagSeconds sql.NullFloat64 for rows.Next() { @@ -108,7 +101,6 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver &state, &syncState, &pgXlogLocationDiff, - &pgStandbyWalReplayLag, &writeLagSeconds, &flushLagSeconds, &replayLagSeconds); err != nil { @@ -124,41 +116,38 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver state, syncState) - // postgres_standby_wal_replay_lag_seconds - ch <- prometheus.MustNewConstMetric(c.standbyWalReplayLag, - prometheus.GaugeValue, - pgStandbyWalReplayLag, - applicationName, - clientAddr.String(), - state, - syncState) - - // postgres_stat_replication_write_lag_seconds - ch <- prometheus.MustNewConstMetric(c.writeLag, - prometheus.GaugeValue, - writeLagSeconds, - applicationName, - clientAddr.String(), - state, - syncState) + if writeLagSeconds.Valid { + // postgres_stat_replication_write_lag_seconds + ch <- prometheus.MustNewConstMetric(c.writeLag, + prometheus.GaugeValue, + writeLagSeconds.Float64, + applicationName, + clientAddr.String(), + state, + syncState) + } - // postgres_stat_replication_flush_lag_seconds - ch <- prometheus.MustNewConstMetric(c.flushLag, - prometheus.GaugeValue, - flushLagSeconds, - applicationName, - clientAddr.String(), - state, - syncState) + if flushLagSeconds.Valid { + // postgres_stat_replication_flush_lag_seconds + ch <- prometheus.MustNewConstMetric(c.flushLag, + prometheus.GaugeValue, + flushLagSeconds.Float64, + applicationName, + clientAddr.String(), + state, + syncState) + } - // postgres_stat_replication_replay_lag_seconds - ch <- prometheus.MustNewConstMetric(c.replayLag, - prometheus.GaugeValue, - replayLagSeconds, - applicationName, - clientAddr.String(), - state, - syncState) + if replayLagSeconds.Valid { + // postgres_stat_replication_replay_lag_seconds + ch <- prometheus.MustNewConstMetric(c.replayLag, + prometheus.GaugeValue, + replayLagSeconds.Float64, + applicationName, + clientAddr.String(), + state, + syncState) + } } err = rows.Err() if err != nil { diff --git a/collector/stat_wal_receiver.go b/collector/stat_wal_receiver.go new file mode 100644 index 0000000..01b2ede --- /dev/null +++ b/collector/stat_wal_receiver.go @@ -0,0 +1,84 @@ +package collector + +import ( + "context" + + pgx "github.com/jackc/pgx/v4" + "github.com/prometheus/client_golang/prometheus" +) + +/* When pg_basebackup is running in stream mode, it opens a second connection +to the server and starts streaming the transaction log in parallel while +running the backup. In both connections (state=backup and state=streaming) the +pg_log_location_diff is null and it requires to be excluded */ +const ( + // Scrape query + statWalReceiver = ` +WITH pg_wal_receiver AS ( + SELECT status + , ( + CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() + THEN 0 + ELSE EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()) + END + ) as postgres_wal_receiver_replay_lag + FROM pg_stat_wal_receiver +) +SELECT * FROM pg_wal_receiver WHERE postgres_wal_receiver_replay_lag IS NOT NULL /*postgres_exporter*/` +) + +type statWalReceiverScraper struct { + walReceiverReplayLag *prometheus.Desc +} + +// NewStatWalReceiverScraper returns a new Scraper exposing postgres pg_stat_replication +func NewWalReceiverScraper() Scraper { + return &statWalReceiverScraper{ + walReceiverReplayLag: prometheus.NewDesc( + "postgres_wal_receiver_replay_lag_seconds", + "delay in standby wal replay seconds EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()", + []string{"status"}, + nil, + ), + } +} + +func (c *statWalReceiverScraper) Name() string { + return "StatWalReceiverScraperr" +} + +func (c *statWalReceiverScraper) Scrape(ctx context.Context, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) error { + var rows pgx.Rows + var err error + + rows, err = conn.Query(ctx, statWalReceiver) + + if err != nil { + return err + } + defer rows.Close() + + var status string + var pgWalReceiverReplayLag float64 + + for rows.Next() { + + if err := rows.Scan(&status, + &pgWalReceiverReplayLag); err != nil { + + return err + } + // postgres_wal_receiver_replay_lag_seconds + ch <- prometheus.MustNewConstMetric(c.walReceiverReplayLag, + prometheus.GaugeValue, + pgWalReceiverReplayLag, + status) + } + + err = rows.Err() + if err != nil { + return err + } + + return nil +}