diff --git a/stores/sql.go b/stores/sql.go index 4ec04f393..8cc86be62 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -204,7 +204,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) { } // Get latest consensus change ID or init db. - ci, ccid, err := initConsensusInfo(db) + ci, ccid, err := initConsensusInfo(ctx, dbMain) if err != nil { return nil, modules.ConsensusChangeID{}, err } @@ -249,7 +249,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) { walletAddress: cfg.WalletAddress, chainIndex: types.ChainIndex{ Height: ci.Height, - ID: types.BlockID(ci.BlockID), + ID: types.BlockID(ci.ID), }, lastPrunedAt: time.Now(), @@ -510,47 +510,28 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er return fmt.Errorf("retryTransaction failed: %w", err) } -func initConsensusInfo(db *gorm.DB) (dbConsensusInfo, modules.ConsensusChangeID, error) { - var ci dbConsensusInfo - if err := db. - Where(&dbConsensusInfo{Model: Model{ID: consensusInfoID}}). - Attrs(dbConsensusInfo{ - Model: Model{ID: consensusInfoID}, - CCID: modules.ConsensusChangeBeginning[:], - }). - FirstOrCreate(&ci). - Error; err != nil { - return dbConsensusInfo{}, modules.ConsensusChangeID{}, err - } - var ccid modules.ConsensusChangeID - copy(ccid[:], ci.CCID) - return ci, ccid, nil +func initConsensusInfo(ctx context.Context, db sql.Database) (ci types.ChainIndex, ccid modules.ConsensusChangeID, err error) { + err = db.Transaction(ctx, func(tx sql.DatabaseTx) error { + ci, ccid, err = tx.InitConsensusInfo(ctx) + return err + }) + return } func (s *SQLStore) ResetConsensusSubscription(ctx context.Context) error { - // empty tables and reinit consensus_infos - var ci dbConsensusInfo - err := s.retryTransaction(ctx, func(tx *gorm.DB) error { - if err := s.db.Exec("DELETE FROM consensus_infos").Error; err != nil { - return err - } else if err := s.db.Exec("DELETE FROM siacoin_elements").Error; err != nil { - return err - } else if err := s.db.Exec("DELETE FROM transactions").Error; err != nil { - return err - } else if ci, _, err = initConsensusInfo(tx); err != nil { - return err - } - return nil + // reset db + var ci types.ChainIndex + var err error + err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + ci, err = tx.ResetConsensusSubscription(ctx) + return err }) if err != nil { return err } // reset in-memory state. s.persistMu.Lock() - s.chainIndex = types.ChainIndex{ - Height: ci.Height, - ID: types.BlockID(ci.BlockID), - } + s.chainIndex = ci s.persistMu.Unlock() return nil } diff --git a/stores/sql/database.go b/stores/sql/database.go index 44533409b..b8679ce1b 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -9,6 +9,7 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" + "go.sia.tech/siad/modules" ) // The database interfaces define all methods that a SQL database must implement @@ -141,6 +142,10 @@ type ( // HostBlocklist returns the list of host addresses on the blocklist. HostBlocklist(ctx context.Context) ([]string, error) + // InitConsensusInfo initializes the consensus info in the database or + // returns the latest one. + InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) + // InsertObject inserts a new object into the database. InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error @@ -211,6 +216,10 @@ type ( // returned. RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error + // ResetConsenusSubscription resets the consensus subscription in the + // database. + ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) + // ResetLostSectors resets the lost sector count for the given host. ResetLostSectors(ctx context.Context, hk types.PublicKey) error diff --git a/stores/sql/main.go b/stores/sql/main.go index 7d84b0298..b79c6a3b9 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -20,9 +20,12 @@ import ( "go.sia.tech/renterd/internal/sql" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" + "go.sia.tech/siad/modules" "lukechampine.com/frand" ) +const consensuInfoID = 1 + var ErrNegativeOffset = errors.New("offset can not be negative") func AbortMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, uploadID string) error { @@ -1294,6 +1297,39 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64 return res.RowsAffected() } +func InitConsensusInfo(ctx context.Context, tx sql.Tx) (types.ChainIndex, modules.ConsensusChangeID, error) { + // try fetch existing + var ccid modules.ConsensusChangeID + var ci types.ChainIndex + err := tx.QueryRow(ctx, "SELECT cc_id, height, block_id FROM consensus_infos WHERE id = ?", consensuInfoID). + Scan((*CCID)(&ccid), &ci.Height, (*Hash256)(&ci.ID)) + if err != nil && !errors.Is(err, dsql.ErrNoRows) { + return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to fetch consensus info: %w", err) + } else if err == nil { + return ci, ccid, nil + } + // otherwise init + ci = types.ChainIndex{} + if _, err := tx.Exec(ctx, "INSERT INTO consensus_infos (id, created_at, cc_id, height, block_id) VALUES (?, ?, ?, ?, ?)", + consensuInfoID, time.Now(), (CCID)(modules.ConsensusChangeBeginning), ci.Height, (Hash256)(ci.ID)); err != nil { + return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to init consensus infos: %w", err) + } + return types.ChainIndex{}, modules.ConsensusChangeBeginning, nil +} + +func ResetConsensusSubscription(ctx context.Context, tx sql.Tx) (ci types.ChainIndex, err error) { + if _, err := tx.Exec(ctx, "DELETE FROM consensus_infos"); err != nil { + return types.ChainIndex{}, fmt.Errorf("failed to delete consensus infos: %w", err) + } else if _, err := tx.Exec(ctx, "DELETE FROM siacoin_elements"); err != nil { + return types.ChainIndex{}, fmt.Errorf("failed to delete siacoin elements: %w", err) + } else if _, err := tx.Exec(ctx, "DELETE FROM transactions"); err != nil { + return types.ChainIndex{}, fmt.Errorf("failed to delete transactions: %w", err) + } else if ci, _, err = InitConsensusInfo(ctx, tx); err != nil { + return types.ChainIndex{}, fmt.Errorf("failed to initialize consensus info: %w", err) + } + return ci, nil +} + func ResetLostSectors(ctx context.Context, tx sql.Tx, hk types.PublicKey) error { _, err := tx.Exec(ctx, "UPDATE hosts SET lost_sectors = 0 WHERE public_key = ?", PublicKey(hk)) if err != nil { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 93a979d5a..fcb19302c 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/renterd/object" ssql "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/webhooks" + "go.sia.tech/siad/modules" "lukechampine.com/frand" "go.sia.tech/renterd/internal/sql" @@ -355,6 +356,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit) } +func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) { + return ssql.InitConsensusInfo(ctx, tx) +} + func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { // get bucket id var bucketID int64 @@ -604,6 +609,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, return nil } +func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) { + return ssql.ResetConsensusSubscription(ctx, tx) +} + func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error { return ssql.ResetLostSectors(ctx, tx, hk) } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 80b2edefd..92ac7f084 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -17,6 +17,7 @@ import ( "go.sia.tech/renterd/object" ssql "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/webhooks" + "go.sia.tech/siad/modules" "lukechampine.com/frand" "go.uber.org/zap" @@ -344,6 +345,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit) } +func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) { + return ssql.InitConsensusInfo(ctx, tx) +} + func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { // get bucket id var bucketID int64 @@ -602,6 +607,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, return nil } +func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) { + return ssql.ResetConsensusSubscription(ctx, tx) +} + func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error { return ssql.ResetLostSectors(ctx, tx, hk) } diff --git a/stores/sql/types.go b/stores/sql/types.go index 2ab9c6012..00242be2f 100644 --- a/stores/sql/types.go +++ b/stores/sql/types.go @@ -15,6 +15,7 @@ import ( rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/siad/modules" ) const ( @@ -24,6 +25,7 @@ const ( type ( AutopilotConfig api.AutopilotConfig BigInt big.Int + CCID modules.ConsensusChangeID Currency types.Currency FileContractID types.FileContractID Hash256 types.Hash256 @@ -46,6 +48,7 @@ var ( _ scannerValuer = (*AutopilotConfig)(nil) _ scannerValuer = (*BigInt)(nil) _ scannerValuer = (*BusSetting)(nil) + _ scannerValuer = (*CCID)(nil) _ scannerValuer = (*Currency)(nil) _ scannerValuer = (*FileContractID)(nil) _ scannerValuer = (*Hash256)(nil) @@ -99,6 +102,22 @@ func (b BigInt) Value() (driver.Value, error) { return (*big.Int)(&b).String(), nil } +// Scan scan value into CCID, implements sql.Scanner interface. +func (c *CCID) Scan(value interface{}) error { + switch value := value.(type) { + case []byte: + copy(c[:], value) + default: + return fmt.Errorf("failed to unmarshal CCID value: %v %t", value, value) + } + return nil +} + +// Value returns a publicKey value, implements driver.Valuer interface. +func (c CCID) Value() (driver.Value, error) { + return c[:], nil +} + // Scan scan value into Currency, implements sql.Scanner interface. func (c *Currency) Scan(value interface{}) error { var s string