diff --git a/go.mod b/go.mod index 9b2450d8f..aa3fcc31a 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/pressly/goose/v3 v3.20.0 github.com/prometheus/client_golang v1.19.0 github.com/rs/zerolog v1.31.0 + github.com/schollz/sqlite3dump v1.3.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.18.2 github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a diff --git a/go.sum b/go.sum index 1e20c5cc5..9b26395ec 100644 --- a/go.sum +++ b/go.sum @@ -970,6 +970,7 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= @@ -1326,6 +1327,8 @@ github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXn github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= +github.com/schollz/sqlite3dump v1.3.1 h1:QXizJ7XEJ7hggjqjZ3YRtF3+javm8zKtzNByYtEkPRA= +github.com/schollz/sqlite3dump v1.3.1/go.mod h1:mzSTjZpJH4zAb1FN3iNlhWPbbdyeBpOaTW0hukyMHyI= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= diff --git a/service/indexer.go b/service/indexer.go index 5ce214e71..e647ca481 100644 --- a/service/indexer.go +++ b/service/indexer.go @@ -1,12 +1,7 @@ package service import ( - "context" - "fmt" - "io" - "os" "path/filepath" - "time" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/snapshot" @@ -29,43 +24,8 @@ func (vs *VocdoniService) VochainIndexer() error { // launch the indexer after sync routine (executed when the blockchain is ready) go vs.Indexer.AfterSyncBootstrap(false) - snapshot.SetFnImportIndexer(func(r io.Reader) error { - log.Debugf("restoring indexer backup") - - file, err := os.CreateTemp("", "indexer.sqlite3") - if err != nil { - return fmt.Errorf("creating tmpfile: %w", err) - } - defer func() { - if err := file.Close(); err != nil { - log.Warnw("error closing tmpfile", "path", file.Name(), "err", err) - } - if err := os.Remove(file.Name()); err != nil { - log.Warnw("error removing tmpfile", "path", file.Name(), "err", err) - } - }() - - if _, err := io.Copy(file, r); err != nil { - return fmt.Errorf("writing tmpfile: %w", err) - } - - return vs.Indexer.RestoreBackup(file.Name()) - }) - - snapshot.SetFnExportIndexer(func(w io.Writer) error { - log.Debugf("saving indexer backup") - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - data, err := vs.Indexer.ExportBackupAsBytes(ctx) - if err != nil { - return fmt.Errorf("creating indexer backup: %w", err) - } - if _, err := w.Write(data); err != nil { - return fmt.Errorf("writing data: %w", err) - } - return nil - }) + snapshot.SetFnImportIndexer(vs.Indexer.ImportBackup) + snapshot.SetFnExportIndexer(vs.Indexer.ExportBackup) if vs.Config.Indexer.ArchiveURL != "" && vs.Config.Indexer.ArchiveURL != "none" { log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL) diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index 562609cec..95f987aef 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -1,6 +1,9 @@ package indexer import ( + "bufio" + "bytes" + "compress/gzip" "context" "database/sql" "embed" @@ -30,6 +33,7 @@ import ( "github.com/pressly/goose/v3" "golang.org/x/exp/maps" + "github.com/schollz/sqlite3dump" // modernc is a pure-Go version, but its errors have less useful info. // We use mattn while developing and testing, and we can swap them later. // _ "modernc.org/sqlite" @@ -233,7 +237,8 @@ func (idx *Indexer) Close() error { return nil } -// BackupPath restores the database from a backup created via SaveBackup. +// RestoreBackup restores the indexer by copying the file (raw binary format) +// from the passed path. // Note that this must be called with ExpectBackupRestore set to true, // and before any indexing or queries happen. func (idx *Indexer) RestoreBackup(path string) error { @@ -249,37 +254,112 @@ func (idx *Indexer) RestoreBackup(path string) error { return nil } -// SaveBackup backs up the database to a file on disk. +// ImportBackup restores the database from a backup created via ExportBackup. +// Note that this must be called with ExpectBackupRestore set to true, +// and before any indexing or queries happen. +func (idx *Indexer) ImportBackup(r io.Reader) error { + if idx.readWriteDB != nil { + panic("Indexer.RestoreBackup called after the database was initialized") + } + log.Debugf("restoring indexer backup") + gzipReader, err := gzip.NewReader(r) + if err != nil { + return fmt.Errorf("could not create gzip reader: %w", err) + } + defer gzipReader.Close() + if err := restoreDBFromSQLDump(idx.dbPath, gzipReader); err != nil { + return fmt.Errorf("could not restore indexer backup: %w", err) + } + if err := idx.startDB(); err != nil { + return err + } + return nil +} + +func restoreDBFromSQLDump(dbPath string, r io.Reader) error { + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal&_foreign_keys=true", dbPath)) + if err != nil { + return fmt.Errorf("could not open indexer db: %w", err) + } + defer db.Close() + + scanner := bufio.NewScanner(r) + var statement strings.Builder + for scanner.Scan() { + line := scanner.Text() + statement.WriteString(line) + statement.WriteString("\n") + + if strings.HasSuffix(line, ";") { + _, err := db.Exec(statement.String()) + if err != nil { + return fmt.Errorf("failed to execute statement: %s (error: %w)", statement.String(), err) + } + statement.Reset() + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error during restore: %w", err) + } + + return nil +} + +// ExportBackup backs up the database to a file on disk. // Note that writes to the database may be blocked until the backup finishes, // and an error may occur if a file at path already exists. // // For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. -func (idx *Indexer) SaveBackup(ctx context.Context, path string) error { - _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path) - return err +func (idx *Indexer) ExportBackup(w io.Writer) error { + log.Debugf("exporting indexer backup") + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + tmpDB, err := os.CreateTemp("", "indexer*.sqlite3") + if err != nil { + return fmt.Errorf("could not create tmpdb file: %w", err) + } + defer func() { + if err := os.Remove(tmpDB.Name()); err != nil { + log.Warnw("error removing tmpdb file", "path", tmpDB.Name(), "err", err) + } + }() + + if _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, tmpDB.Name()); err != nil { + return fmt.Errorf("could not vacuum into tmpdb: %w", err) + } + + db, err := sql.Open("sqlite3", tmpDB.Name()) + if err != nil { + return fmt.Errorf("could not open tmpDB: %w", err) + } + defer db.Close() + + // first drop stats table + if _, err := db.ExecContext(ctx, `DROP TABLE IF EXISTS sqlite_stat1;`); err != nil { + return fmt.Errorf("could not drop table sqlite_stat1: %w", err) + } + + // make goose_db_version table deterministic + if _, err := db.ExecContext(ctx, `UPDATE goose_db_version SET tstamp = '1970-01-01 00:00:00';`); err != nil { + return fmt.Errorf("could not update goose_db_version: %w", err) + } + + gzw := gzip.NewWriter(w) + defer gzw.Close() + return sqlite3dump.DumpDB(db, gzw) } // ExportBackupAsBytes backs up the database, and returns the contents as []byte. // // Note that writes to the database may be blocked until the backup finishes. -// -// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. func (idx *Indexer) ExportBackupAsBytes(ctx context.Context) ([]byte, error) { - tmpDir, err := os.MkdirTemp("", "indexer") - if err != nil { - return nil, fmt.Errorf("error creating tmpDir: %w", err) - - } - tmpFilePath := filepath.Join(tmpDir, "indexer.sqlite3") - if err := idx.SaveBackup(ctx, tmpFilePath); err != nil { + var buf bytes.Buffer + if err := idx.ExportBackup(&buf); err != nil { return nil, fmt.Errorf("error saving indexer backup: %w", err) } - defer func() { - if err := os.Remove(tmpFilePath); err != nil { - log.Warnw("error removing indexer backup file", "path", tmpFilePath, "err", err) - } - }() - return os.ReadFile(tmpFilePath) + return buf.Bytes(), nil } // blockTxQueries assumes that lockPool is locked. diff --git a/vochain/indexer/indexer_test.go b/vochain/indexer/indexer_test.go index 901769c0e..3e2e939ee 100644 --- a/vochain/indexer/indexer_test.go +++ b/vochain/indexer/indexer_test.go @@ -2,13 +2,11 @@ package indexer import ( "bytes" - "context" "encoding/hex" "fmt" "io" stdlog "log" "math/big" - "path/filepath" "testing" qt "github.com/frankban/quicktest" @@ -88,8 +86,8 @@ func TestBackup(t *testing.T) { wantTotalVotes(10) // Back up the database. - backupPath := filepath.Join(t.TempDir(), "backup") - err = idx.SaveBackup(context.TODO(), backupPath) + var bkp bytes.Buffer + err = idx.ExportBackup(&bkp) qt.Assert(t, err, qt.IsNil) // Add another 5 votes which aren't in the backup. @@ -110,7 +108,7 @@ func TestBackup(t *testing.T) { idx.Close() idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true}) qt.Assert(t, err, qt.IsNil) - err = idx.RestoreBackup(backupPath) + err = idx.ImportBackup(&bkp) qt.Assert(t, err, qt.IsNil) wantTotalVotes(10)