Skip to content

Commit

Permalink
Re-work standby side metrics to be based around the `pg_stat_wal_rece…
Browse files Browse the repository at this point in the history
…iver` view
  • Loading branch information
Andrew Morton committed Apr 28, 2022
1 parent 458791c commit 4aaf535
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 52 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ Prometheus exporter for PostgreSQL server metrics.
| postgres_stat_database_xact_commit_total | Number of transactions in this database that have been committed | datname |
| postgres_stat_database_xact_rollback_total | Number of transactions in this database that have been rolled back | datname |
| postgres_stat_replication_lag_bytes | Replication Lag in bytes | application_name, client_addr, state, sync_state |
| postgres_standby_wal_replay_lag_seconds | Replication lag measured in seconds on the standby. Measured as `EXTRACT (EPOCH FROM now()) - pg_last_xact_replay_timestamp()` | application_name, client_addr, state, sync_state |
| postgres_stat_replication_flush_lag_seconds | Elapsed time during committed WALs from primary to the standby (WAL's has already been flushed but not yet applied). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
| postgres_stat_replication_replay_lag_seconds | Elapsed time during committed WALs from primary to the standby (fully committed in standby node). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
| postgres_stat_replication_write_lag_seconds | Elapsed time during committed WALs from primary to the standby (but not yet committed in the standby). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
Expand All @@ -79,8 +78,10 @@ Prometheus exporter for PostgreSQL server metrics.
| postgres_stat_user_indexes_scan_total | Number of times this index has been scanned | datname, schemaname, tablename, indexname |
| postgres_stat_user_indexes_tuple_read_total | Number of times tuples have been returned from scanning this index | datname, schemaname, tablename, indexname |
| postgres_stat_user_indexes_tuple_fetch_total | Number of live tuples fetched by scans on this index | datname, schemaname, tablename, indexname |
| postgres_wal_receiver_replay_lag_seconds | Replication lag measured in seconds on the standby. Measured as `EXTRACT (EPOCH FROM now()) - pg_last_xact_replay_timestamp()` | status |
| postgres_up | Whether the Postgres server is up | |


### Run

#### Passing in a libpq connection string
Expand Down
1 change: 1 addition & 0 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewExporter(ctx context.Context, logger kitlog.Logger, connConfig *pgx.Conn
NewStatBgwriterScraper(),
NewStatDatabaseScraper(),
NewStatReplicationScraper(),
NewWalReceiverScraper(),
},
datnameScrapers: []Scraper{
NewStatVacuumProgressScraper(),
Expand Down
91 changes: 40 additions & 51 deletions collector/stat_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"context"
"database/sql"
"net"

pgx "github.com/jackc/pgx/v4"
Expand All @@ -25,11 +26,6 @@ WITH pg_replication AS (
ELSE pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float
END
) AS pg_xlog_location_diff
, ( CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()
THEN 0
ELSE EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp())
END
) as pg_standby_wal_replay_lag
, EXTRACT (EPOCH FROM write_lag) as write_lag_seconds
, EXTRACT (EPOCH FROM flush_lag) as flush_lag_seconds
, EXTRACT (EPOCH FROM replay_lag) as replay_lag_seconds
Expand All @@ -39,11 +35,10 @@ SELECT * FROM pg_replication WHERE pg_xlog_location_diff IS NOT NULL /*postgres_
)

type statReplicationScraper struct {
lagBytes *prometheus.Desc
standbyWalReplayLag *prometheus.Desc
writeLag *prometheus.Desc
flushLag *prometheus.Desc
replayLag *prometheus.Desc
lagBytes *prometheus.Desc
writeLag *prometheus.Desc
flushLag *prometheus.Desc
replayLag *prometheus.Desc
}

// NewStatReplicationScraper returns a new Scraper exposing postgres pg_stat_replication
Expand All @@ -55,12 +50,6 @@ func NewStatReplicationScraper() Scraper {
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
standbyWalReplayLag: prometheus.NewDesc(
"postgres_standby_wal_replay_lag_seconds",
"delay in standby wal replay seconds EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()",
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
writeLag: prometheus.NewDesc(
"postgres_stat_replication_write_lag_seconds",
"write_lag as reported by the pg_stat_replication view converted to seconds",
Expand Down Expand Up @@ -99,7 +88,11 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver

var applicationName, state, syncState string
var clientAddr net.IP
var pgXlogLocationDiff, pgStandbyWalReplayLag, writeLagSeconds, flushLagSeconds, replayLagSeconds float64
var pgXlogLocationDiff float64
/* When querying pg_stat_replication it may be that we don't have
values for the various lags as they vanish are a period of inactivity
*/
var writeLagSeconds, flushLagSeconds, replayLagSeconds sql.NullFloat64

for rows.Next() {

Expand All @@ -108,7 +101,6 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver
&state,
&syncState,
&pgXlogLocationDiff,
&pgStandbyWalReplayLag,
&writeLagSeconds,
&flushLagSeconds,
&replayLagSeconds); err != nil {
Expand All @@ -124,41 +116,38 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver
state,
syncState)

// postgres_standby_wal_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.standbyWalReplayLag,
prometheus.GaugeValue,
pgStandbyWalReplayLag,
applicationName,
clientAddr.String(),
state,
syncState)

// postgres_stat_replication_write_lag_seconds
ch <- prometheus.MustNewConstMetric(c.writeLag,
prometheus.GaugeValue,
writeLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)
if writeLagSeconds.Valid {
// postgres_stat_replication_write_lag_seconds
ch <- prometheus.MustNewConstMetric(c.writeLag,
prometheus.GaugeValue,
writeLagSeconds.Float64,
applicationName,
clientAddr.String(),
state,
syncState)
}

// postgres_stat_replication_flush_lag_seconds
ch <- prometheus.MustNewConstMetric(c.flushLag,
prometheus.GaugeValue,
flushLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)
if flushLagSeconds.Valid {
// postgres_stat_replication_flush_lag_seconds
ch <- prometheus.MustNewConstMetric(c.flushLag,
prometheus.GaugeValue,
flushLagSeconds.Float64,
applicationName,
clientAddr.String(),
state,
syncState)
}

// postgres_stat_replication_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.replayLag,
prometheus.GaugeValue,
replayLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)
if replayLagSeconds.Valid {
// postgres_stat_replication_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.replayLag,
prometheus.GaugeValue,
replayLagSeconds.Float64,
applicationName,
clientAddr.String(),
state,
syncState)
}
}
err = rows.Err()
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions collector/stat_wal_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package collector

import (
"context"

pgx "github.com/jackc/pgx/v4"
"github.com/prometheus/client_golang/prometheus"
)

/* When pg_basebackup is running in stream mode, it opens a second connection
to the server and starts streaming the transaction log in parallel while
running the backup. In both connections (state=backup and state=streaming) the
pg_log_location_diff is null and it requires to be excluded */
const (
// Scrape query
statWalReceiver = `
WITH pg_wal_receiver AS (
SELECT status
, (
CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()
THEN 0
ELSE EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp())
END
) as postgres_wal_receiver_replay_lag
FROM pg_stat_wal_receiver
)
SELECT * FROM pg_wal_receiver WHERE postgres_wal_receiver_replay_lag IS NOT NULL /*postgres_exporter*/`
)

type statWalReceiverScraper struct {
walReceiverReplayLag *prometheus.Desc
}

// NewStatWalReceiverScraper returns a new Scraper exposing postgres pg_stat_replication
func NewWalReceiverScraper() Scraper {
return &statWalReceiverScraper{
walReceiverReplayLag: prometheus.NewDesc(
"postgres_wal_receiver_replay_lag_seconds",
"delay in standby wal replay seconds EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()",
[]string{"status"},
nil,
),
}
}

func (c *statWalReceiverScraper) Name() string {
return "StatWalReceiverScraperr"
}

func (c *statWalReceiverScraper) Scrape(ctx context.Context, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) error {
var rows pgx.Rows
var err error

rows, err = conn.Query(ctx, statWalReceiver)

if err != nil {
return err
}
defer rows.Close()

var status string
var pgWalReceiverReplayLag float64

for rows.Next() {

if err := rows.Scan(&status,
&pgWalReceiverReplayLag); err != nil {

return err
}
// postgres_wal_receiver_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.walReceiverReplayLag,
prometheus.GaugeValue,
pgWalReceiverReplayLag,
status)
}

err = rows.Err()
if err != nil {
return err
}

return nil
}

0 comments on commit 4aaf535

Please sign in to comment.