From a1b6aa34729836113d95b8d8148c4605691d6013 Mon Sep 17 00:00:00 2001 From: Michael Todorovic Date: Wed, 27 Nov 2024 18:20:39 +0100 Subject: [PATCH 1/2] fix: handle pg_replication_slots on pg<13 Signed-off-by: Michael Todorovic --- collector/pg_replication_slot.go | 57 ++++++++++++++++++++------- collector/pg_replication_slot_test.go | 8 ++-- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 27ccddefd..802c6063c 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -16,8 +16,11 @@ package collector import ( "context" "database/sql" + "fmt" "log/slog" + "strings" + "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" ) @@ -81,26 +84,35 @@ var ( "availability of WAL files claimed by this slot", []string{"slot_name", "slot_type", "wal_status"}, nil, ) +) + +func replicationSlotQuery(columns []string) string { + return fmt.Sprintf("SELECT %s FROM pg_replication_slots;", strings.Join(columns, ",")) +} - pgReplicationSlotQuery = `SELECT - slot_name, - slot_type, - CASE WHEN pg_is_in_recovery() THEN +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + + columns := []string{ + "slot_name", + "slot_type", + `CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() - '0/0' ELSE pg_current_wal_lsn() - '0/0' - END AS current_wal_lsn, - COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, - active, - safe_wal_size, - wal_status - FROM pg_replication_slots;` -) + END AS current_wal_lsn`, + "COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn", + "active", + } + + abovePG13 := instance.version.GTE(semver.MustParse("13.0.0")) + if abovePG13 { + columns = append(columns, "safe_wal_size") + columns = append(columns, "wal_status") + } -func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { - db := instance.getDB() rows, err := db.QueryContext(ctx, - pgReplicationSlotQuery) + replicationSlotQuery(columns)) if err != nil { return err } @@ -114,7 +126,22 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var isActive sql.NullBool var safeWalSize sql.NullInt64 var walStatus sql.NullString - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil { + + r := []any{ + &slotName, + &slotType, + &walLSN, + &flushLSN, + &isActive, + } + + if abovePG13 { + r = append(r, &safeWalSize) + r = append(r, &walStatus) + } + + err := rows.Scan(r...) + if err != nil { return err } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 174743ac3..68bab1cc4 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -34,7 +34,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") - mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -77,7 +77,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") - mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -120,7 +120,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") - mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -161,7 +161,7 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, true, nil, nil) - mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { From 79eb34e983e8da73af5f0251eb1850ade4cb90ba Mon Sep 17 00:00:00 2001 From: Michael Todorovic Date: Wed, 27 Nov 2024 19:31:58 +0100 Subject: [PATCH 2/2] fix: tests Signed-off-by: Michael Todorovic --- collector/pg_replication_slot.go | 49 +++++++++++++++------------ collector/pg_replication_slot_test.go | 17 +++++----- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 802c6063c..e6c9773eb 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -16,9 +16,7 @@ package collector import ( "context" "database/sql" - "fmt" "log/slog" - "strings" "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" @@ -84,35 +82,42 @@ var ( "availability of WAL files claimed by this slot", []string{"slot_name", "slot_type", "wal_status"}, nil, ) -) - -func replicationSlotQuery(columns []string) string { - return fmt.Sprintf("SELECT %s FROM pg_replication_slots;", strings.Join(columns, ",")) -} - -func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { - db := instance.getDB() - - columns := []string{ - "slot_name", - "slot_type", - `CASE WHEN pg_is_in_recovery() THEN + pgReplicationSlotQuery = `SELECT + slot_name, + slot_type, + CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() - '0/0' ELSE pg_current_wal_lsn() - '0/0' - END AS current_wal_lsn`, - "COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn", - "active", - } + END AS current_wal_lsn, + COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, + active + FROM pg_replication_slots;` + pgReplicationSlotNewQuery = `SELECT + slot_name, + slot_type, + CASE WHEN pg_is_in_recovery() THEN + pg_last_wal_receive_lsn() - '0/0' + ELSE + pg_current_wal_lsn() - '0/0' + END AS current_wal_lsn, + COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, + active, + safe_wal_size, + wal_status + FROM pg_replication_slots;` +) +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + query := pgReplicationSlotQuery abovePG13 := instance.version.GTE(semver.MustParse("13.0.0")) if abovePG13 { - columns = append(columns, "safe_wal_size") - columns = append(columns, "wal_status") + query = pgReplicationSlotNewQuery } + db := instance.getDB() rows, err := db.QueryContext(ctx, - replicationSlotQuery(columns)) + query) if err != nil { return err } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 68bab1cc4..981b5db62 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -29,12 +30,12 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") - mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -72,12 +73,12 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") - mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -115,12 +116,12 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") - mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -156,12 +157,12 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, true, nil, nil) - mock.ExpectQuery(sanitizeQuery(replicationSlotQuery(columns))).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() {