From f345045e19dbe1015cc856054381dc4b47d8cab9 Mon Sep 17 00:00:00 2001 From: Gui Iribarren Date: Thu, 13 Jun 2024 11:58:08 +0200 Subject: [PATCH] indexer: backups are now slower but deterministic they are a (zstd compressed) set of SQL statements also, relevant methods now use io.Reader and io.Writer --- data/compressor/compression.go | 12 +++ go.mod | 1 - go.sum | 2 - service/indexer.go | 44 +---------- vochain/indexer/indexer.go | 125 ++++++++++++++++++++++++++------ vochain/indexer/indexer_test.go | 8 +- 6 files changed, 119 insertions(+), 73 deletions(-) diff --git a/data/compressor/compression.go b/data/compressor/compression.go index 105718104..82d225fad 100644 --- a/data/compressor/compression.go +++ b/data/compressor/compression.go @@ -1,12 +1,24 @@ package compressor import ( + "io" "time" "github.com/klauspost/compress/zstd" "go.vocdoni.io/dvote/log" ) +// NewWriter creates a new writer that uses zstd +func NewWriter(w io.Writer) (io.WriteCloser, error) { + return zstd.NewWriter(w) +} + +// NewReader creates a new reader that uses zstd +func NewReader(r io.Reader) (io.ReadCloser, error) { + zr, err := zstd.NewReader(r) + return zr.IOReadCloser(), err +} + // Compressor is a data compressor that uses zstd. type Compressor struct { encoder *zstd.Encoder diff --git a/go.mod b/go.mod index 54dc601ce..a4f5f17ca 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,6 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.11.0 github.com/libp2p/go-reuseport v0.4.0 github.com/manifoldco/promptui v0.9.0 - github.com/mattn/go-sqlite3 v1.14.23 github.com/multiformats/go-multiaddr v0.12.4 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 diff --git a/go.sum b/go.sum index 53ef2d806..27fd08d87 100644 --- a/go.sum +++ b/go.sum @@ -970,8 +970,6 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= -github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0= -github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= diff --git a/service/indexer.go b/service/indexer.go index f19f67836..da267baae 100644 --- a/service/indexer.go +++ b/service/indexer.go @@ -1,12 +1,7 @@ package service import ( - "context" - "fmt" - "io" - "os" "path/filepath" - "time" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/snapshot" @@ -29,43 +24,8 @@ func (vs *VocdoniService) VochainIndexer() error { // launch the indexer after sync routine (executed when the blockchain is ready) go vs.Indexer.AfterSyncBootstrap(false) - snapshot.SetFnImportIndexer(func(r io.Reader) error { - log.Debugf("restoring indexer backup") - - file, err := os.CreateTemp("", "indexer.sqlite3") - if err != nil { - return fmt.Errorf("creating tmpfile: %w", err) - } - defer func() { - if err := file.Close(); err != nil { - log.Warnw("error closing tmpfile", "path", file.Name(), "err", err) - } - if err := os.Remove(file.Name()); err != nil { - log.Warnw("error removing tmpfile", "path", file.Name(), "err", err) - } - }() - - if _, err := io.Copy(file, r); err != nil { - return fmt.Errorf("writing tmpfile: %w", err) - } - - return vs.Indexer.RestoreBackup(file.Name()) - }) - - snapshot.SetFnExportIndexer(func(w io.Writer) error { - log.Debugf("saving indexer backup") - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - data, err := vs.Indexer.ExportBackupAsBytes(ctx) - if err != nil { - return fmt.Errorf("creating indexer backup: %w", err) - } - if _, err := w.Write(data); err != nil { - return fmt.Errorf("writing data: %w", err) - } - return nil - }) + snapshot.SetFnImportIndexer(vs.Indexer.ImportBackup) + snapshot.SetFnExportIndexer(vs.Indexer.ExportBackup) return nil } diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index 37fea31a2..87f8db5d9 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -1,6 +1,7 @@ package indexer import ( + "bufio" "bytes" "context" "database/sql" @@ -18,6 +19,7 @@ import ( "sync" "time" + "go.vocdoni.io/dvote/data/compressor" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/statedb" "go.vocdoni.io/dvote/types" @@ -31,6 +33,7 @@ import ( "github.com/pressly/goose/v3" + "github.com/schollz/sqlite3dump" // modernc is a pure-Go version, but its errors have less useful info. // We use mattn while developing and testing, and we can swap them later. // _ "modernc.org/sqlite" @@ -240,7 +243,8 @@ func (idx *Indexer) Close() error { return nil } -// BackupPath restores the database from a backup created via SaveBackup. +// RestoreBackup restores the indexer by copying the file (raw binary format) +// from the passed path. // Note that this must be called with ExpectBackupRestore set to true, // and before any indexing or queries happen. func (idx *Indexer) RestoreBackup(path string) error { @@ -277,36 +281,111 @@ func gooseMigrationsPending(db *sql.DB, dir string) bool { return len(migrations) > 0 } -// SaveBackup backs up the database to a file on disk. -// Note that writes to the database may be blocked until the backup finishes, -// and an error may occur if a file at path already exists. -// -// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. -func (idx *Indexer) SaveBackup(ctx context.Context, path string) error { - _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path) - return err +// ImportBackup restores the database from a backup created via ExportBackup. +// Note that this must be called with ExpectBackupRestore set to true, +// and before any indexing or queries happen. +func (idx *Indexer) ImportBackup(r io.Reader) error { + if idx.readWriteDB != nil { + panic("Indexer.RestoreBackup called after the database was initialized") + } + log.Debugf("restoring indexer backup") + + zr, err := compressor.NewReader(r) + if err != nil { + return fmt.Errorf("could not init decompressor: %w", err) + } + defer zr.Close() + + if err := restoreDBFromSQLDump(idx.dbPath, zr); err != nil { + return fmt.Errorf("could not restore indexer backup: %w", err) + } + if err := idx.startDB(); err != nil { + return err + } + return nil +} + +func restoreDBFromSQLDump(dbPath string, r io.Reader) error { + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal&_foreign_keys=true", dbPath)) + if err != nil { + return fmt.Errorf("could not open indexer db: %w", err) + } + defer db.Close() + + scanner := bufio.NewScanner(r) + var statement strings.Builder + for scanner.Scan() { + line := scanner.Text() + statement.WriteString(line) + statement.WriteString("\n") + + if strings.HasSuffix(line, ";") { + _, err := db.Exec(statement.String()) + if err != nil { + return fmt.Errorf("failed to execute statement: %s (error: %w)", statement.String(), err) + } + statement.Reset() + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error during restore: %w", err) + } + + return nil +} + +// ExportBackup exports a (compressed) deterministic set of SQL statements. +// Note that writes to the database may be blocked until the method finishes. +func (idx *Indexer) ExportBackup(w io.Writer) error { + log.Debugf("exporting indexer backup") + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + tmpDB, err := os.CreateTemp("", "indexer*.sqlite3") + if err != nil { + return fmt.Errorf("could not create tmpdb file: %w", err) + } + defer os.Remove(tmpDB.Name()) + + if _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, tmpDB.Name()); err != nil { + return fmt.Errorf("could not vacuum into tmpdb: %w", err) + } + + db, err := sql.Open("sqlite3", tmpDB.Name()) + if err != nil { + return fmt.Errorf("could not open tmpDB: %w", err) + } + defer db.Close() + + // first drop stats table + if _, err := db.ExecContext(ctx, `DROP TABLE IF EXISTS sqlite_stat1;`); err != nil { + return fmt.Errorf("could not drop table sqlite_stat1: %w", err) + } + + // make goose_db_version table deterministic + if _, err := db.ExecContext(ctx, `UPDATE goose_db_version SET tstamp = '1970-01-01 00:00:00';`); err != nil { + return fmt.Errorf("could not update goose_db_version: %w", err) + } + + zw, err := compressor.NewWriter(w) + if err != nil { + return fmt.Errorf("could not init compressor: %w", err) + } + defer zw.Close() + + return sqlite3dump.DumpDB(db, zw) } // ExportBackupAsBytes backs up the database, and returns the contents as []byte. // // Note that writes to the database may be blocked until the backup finishes. -// -// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. func (idx *Indexer) ExportBackupAsBytes(ctx context.Context) ([]byte, error) { - tmpDir, err := os.MkdirTemp("", "indexer") - if err != nil { - return nil, fmt.Errorf("error creating tmpDir: %w", err) - } - tmpFilePath := filepath.Join(tmpDir, "indexer.sqlite3") - if err := idx.SaveBackup(ctx, tmpFilePath); err != nil { + var buf bytes.Buffer + if err := idx.ExportBackup(&buf); err != nil { return nil, fmt.Errorf("error saving indexer backup: %w", err) } - defer func() { - if err := os.Remove(tmpFilePath); err != nil { - log.Warnw("error removing indexer backup file", "path", tmpFilePath, "err", err) - } - }() - return os.ReadFile(tmpFilePath) + return buf.Bytes(), nil } // blockTxQueries assumes that lockPool is locked. diff --git a/vochain/indexer/indexer_test.go b/vochain/indexer/indexer_test.go index f3dc00ed3..25588ed98 100644 --- a/vochain/indexer/indexer_test.go +++ b/vochain/indexer/indexer_test.go @@ -2,13 +2,11 @@ package indexer import ( "bytes" - "context" "encoding/hex" "fmt" "io" stdlog "log" "math/big" - "path/filepath" "strings" "testing" @@ -89,8 +87,8 @@ func TestBackup(t *testing.T) { wantTotalVotes(10) // Back up the database. - backupPath := filepath.Join(t.TempDir(), "backup") - err = idx.SaveBackup(context.TODO(), backupPath) + var bkp bytes.Buffer + err = idx.ExportBackup(&bkp) qt.Assert(t, err, qt.IsNil) // Add another 5 votes which aren't in the backup. @@ -111,7 +109,7 @@ func TestBackup(t *testing.T) { idx.Close() idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true}) qt.Assert(t, err, qt.IsNil) - err = idx.RestoreBackup(backupPath) + err = idx.ImportBackup(&bkp) qt.Assert(t, err, qt.IsNil) wantTotalVotes(10)