Skip to content

Commit

Permalink
Add db metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Dec 31, 2020
1 parent da5087c commit 9d0e79c
Show file tree
Hide file tree
Showing 5 changed files with 590 additions and 20 deletions.
13 changes: 12 additions & 1 deletion cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,25 @@ The commands are:
`[1:])
}

// Default configuration settings.
const (
DefaultAddr = ":9090"
)

// Config represents a configuration file for the litestream daemon.
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`

// List of databases to manage.
DBs []*DBConfig `yaml:"databases"`
}

// DefaultConfig returns a new instance of Config with defaults set.
func DefaultConfig() Config {
return Config{}
return Config{
Addr: DefaultAddr,
}
}

func (c *Config) DBConfig(path string) *DBConfig {
Expand Down
15 changes: 14 additions & 1 deletion cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"errors"
"flag"
"fmt"
"net"
"net/http"
"os"
"os/signal"

"github.com/benbjohnson/litestream"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type ReplicateCommand struct {
Expand Down Expand Up @@ -38,7 +41,7 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
}

// Setup signal handler.
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
go func() { <-ch; cancel() }()
Expand Down Expand Up @@ -66,6 +69,16 @@ func (c *ReplicateCommand) Run(ctx context.Context, args []string) (err error) {
// Notify user that initialization is done.
fmt.Printf("Initialized with %d databases.\n", len(c.DBs))

// Serve metrics over HTTP if enabled.
if config.Addr != "" {
_, port, _ := net.SplitHostPort(config.Addr)
fmt.Printf("Serving metrics on http://localhost:%s/metrics\n", port)
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(config.Addr, nil)
}()
}

// Wait for signal to stop program.
<-ctx.Done()
signal.Reset()
Expand Down
184 changes: 166 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Default DB settings.
Expand All @@ -31,19 +34,30 @@ const (

// DB represents a managed instance of a SQLite database in the file system.
type DB struct {
mu sync.RWMutex
path string // part to database
db *sql.DB // target database
rtx *sql.Tx // long running read transaction
pageSize int // page size, in bytes

notify chan struct{} // closes on WAL change
mu sync.RWMutex
path string // part to database
db *sql.DB // target database
rtx *sql.Tx // long running read transaction
pageSize int // page size, in bytes
notify chan struct{} // closes on WAL change
lastCheckpointAt time.Time // last checkpoint time

ctx context.Context
cancel func()
wg sync.WaitGroup

lastCheckpointAt time.Time // last checkpoint time
// Metrics
dbSizeGauge prometheus.Gauge
walSizeGauge prometheus.Gauge
totalWALBytesCounter prometheus.Counter
shadowWALIndexGauge prometheus.Gauge
shadowWALSizeGauge prometheus.Gauge
syncNCounter prometheus.Counter
syncErrorNCounter prometheus.Counter
syncSecondsCounter prometheus.Counter
checkpointNCounterVec *prometheus.CounterVec
checkpointErrorNCounterVec *prometheus.CounterVec
checkpointSecondsCounterVec *prometheus.CounterVec

// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
Expand Down Expand Up @@ -81,7 +95,21 @@ func NewDB(path string) *DB {
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,
}

db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path)
db.walSizeGauge = walSizeGaugeVec.WithLabelValues(db.path)
db.totalWALBytesCounter = totalWALBytesCounterVec.WithLabelValues(db.path)
db.shadowWALIndexGauge = shadowWALIndexGaugeVec.WithLabelValues(db.path)
db.shadowWALSizeGauge = shadowWALSizeGaugeVec.WithLabelValues(db.path)
db.syncNCounter = syncNCounterVec.WithLabelValues(db.path)
db.syncErrorNCounter = syncErrorNCounterVec.WithLabelValues(db.path)
db.syncSecondsCounter = syncSecondsCounterVec.WithLabelValues(db.path)
db.checkpointNCounterVec = checkpointNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointErrorNCounterVec = checkpointErrorNCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})
db.checkpointSecondsCounterVec = checkpointSecondsCounterVec.MustCurryWith(prometheus.Labels{"db": db.path})

db.ctx, db.cancel = context.WithCancel(context.Background())

return db
}

Expand Down Expand Up @@ -124,24 +152,23 @@ func (db *DB) ShadowWALPath(generation string, index int) string {

// CurrentShadowWALPath returns the path to the last shadow WAL in a generation.
func (db *DB) CurrentShadowWALPath(generation string) (string, error) {
index, err := db.CurrentShadowWALIndex(generation)
index, _, err := db.CurrentShadowWALIndex(generation)
if err != nil {
return "", err
}
return db.ShadowWALPath(generation, index), nil
}

// CurrentShadowWALIndex returns the current WAL index for a given generation.
func (db *DB) CurrentShadowWALIndex(generation string) (int, error) {
// CurrentShadowWALIndex returns the current WAL index & total size.
func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, err error) {
fis, err := ioutil.ReadDir(filepath.Join(db.GenerationPath(generation), "wal"))
if os.IsNotExist(err) {
return 0, nil // no wal files written for generation
return 0, 0, nil // no wal files written for generation
} else if err != nil {
return 0, err
return 0, 0, err
}

// Find highest wal index.
var index int
for _, fi := range fis {
if !strings.HasSuffix(fi.Name(), WALExt) {
continue
Expand All @@ -151,8 +178,10 @@ func (db *DB) CurrentShadowWALIndex(generation string) (int, error) {
} else if v > index {
index = v
}

size += fi.Size()
}
return index, nil
return index, size, nil
}

// Replica returns a replica by name.
Expand All @@ -174,7 +203,7 @@ func (db *DB) Pos() (Pos, error) {
return Pos{}, nil
}

index, err := db.CurrentShadowWALIndex(generation)
index, _, err := db.CurrentShadowWALIndex(generation)
if err != nil {
return Pos{}, err
}
Expand Down Expand Up @@ -571,6 +600,16 @@ func (db *DB) Sync() (err error) {
return nil
}

// Track total sync metrics.
t := time.Now()
defer func() {
db.syncNCounter.Inc()
if err != nil {
db.syncErrorNCounter.Inc()
}
db.syncSecondsCounter.Add(float64(time.Since(t).Seconds()))
}()

// TODO: Force "-wal" file if it doesn't exist.

// Ensure WAL has at least one frame in it.
Expand Down Expand Up @@ -663,6 +702,12 @@ func (db *DB) Sync() (err error) {
return fmt.Errorf("cannot clean: %w", err)
}

// Compute current index and total shadow WAL size.
// This is only for metrics so we ignore any errors that occur.
index, size, _ := db.CurrentShadowWALIndex(info.generation)
db.shadowWALIndexGauge.Set(float64(index))
db.shadowWALSizeGauge.Set(float64(size))

// Notify replicas of WAL changes.
if changed {
close(db.notify)
Expand Down Expand Up @@ -698,13 +743,21 @@ func (db *DB) verify() (info syncInfo, err error) {
}
info.generation = generation

// Determine total bytes of real DB for metrics.
if fi, err := os.Stat(db.Path()); err != nil {
return info, err
} else {
db.dbSizeGauge.Set(float64(fi.Size()))
}

// Determine total bytes of real WAL.
fi, err := os.Stat(db.WALPath())
if err != nil {
return info, err
}
info.walSize = fi.Size()
info.walModTime = fi.ModTime()
db.walSizeGauge.Set(float64(fi.Size()))

// Open shadow WAL to copy append to.
info.shadowWALPath, err = db.CurrentShadowWALPath(info.generation)
Expand Down Expand Up @@ -881,7 +934,8 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
// TODO: Optimize to use bufio on reader & writer to minimize syscalls.

// Loop over each page, verify checksum, & copy to writer.
newSize = fi.Size()
origSize := fi.Size()
newSize = origSize
buf := make([]byte, db.pageSize+WALFrameHeaderSize)
for {
// Read next page from WAL file.
Expand Down Expand Up @@ -923,6 +977,9 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
return newSize, err
}

// Track total number of bytes written to WAL.
db.totalWALBytesCounter.Add(float64(newSize - origSize))

return newSize, nil
}

Expand Down Expand Up @@ -1058,7 +1115,18 @@ func readLastChecksumFrom(f *os.File, pageSize int) (uint32, uint32, error) {
}

// checkpoint performs a checkpoint on the WAL file.
func (db *DB) checkpoint(mode string) error {
func (db *DB) checkpoint(mode string) (err error) {
// Track checkpoint metrics.
t := time.Now()
defer func() {
labels := prometheus.Labels{"mode": mode}
db.checkpointNCounterVec.With(labels).Inc()
if err != nil {
db.checkpointErrorNCounterVec.With(labels).Inc()
}
db.checkpointSecondsCounterVec.With(labels).Add(float64(time.Since(t).Seconds()))
}()

// Ensure the read lock has been removed before issuing a checkpoint.
// We defer the re-acquire to ensure it occurs even on an early return.
if err := db.releaseReadLock(); err != nil {
Expand Down Expand Up @@ -1569,6 +1637,86 @@ func NewRestoreOptions() RestoreOptions {
}
}

// Database metrics.
var (
dbSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "db_size",
Help: "The current size of the real DB",
}, []string{"db"})

walSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "wal_size",
Help: "The current size of the real WAL",
}, []string{"db"})

totalWALBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "total_wal_bytes",
Help: "Total number of bytes written to shadow WAL",
}, []string{"db"})

shadowWALIndexGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "shadow_wal_index",
Help: "The current index of the shadow WAL",
}, []string{"db"})

shadowWALSizeGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "shadow_wal_size",
Help: "Current size of shadow WAL, in bytes",
}, []string{"db"})

syncNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "sync_count",
Help: "Number of sync operations performed",
}, []string{"db"})

syncErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "sync_error_count",
Help: "Number of sync errors that have occurred",
}, []string{"db"})

syncSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "sync_seconds",
Help: "Time spent syncing shadow WAL, in seconds",
}, []string{"db"})

checkpointNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "checkpoint_count",
Help: "Number of checkpoint operations performed",
}, []string{"db", "mode"})

checkpointErrorNCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "checkpoint_error_count",
Help: "Number of checkpoint errors that have occurred",
}, []string{"db", "mode"})

checkpointSecondsCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "litestream",
Subsystem: "db",
Name: "checkpoint_seconds",
Help: "Time spent checkpointing WAL, in seconds",
}, []string{"db", "mode"})
)

func headerByteOrder(hdr []byte) (binary.ByteOrder, error) {
magic := binary.BigEndian.Uint32(hdr[0:])
switch magic {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.15

require (
github.com/mattn/go-sqlite3 v1.14.5
github.com/prometheus/client_golang v1.9.0
gopkg.in/yaml.v2 v2.4.0
)
Loading

0 comments on commit 9d0e79c

Please sign in to comment.