Skip to content

Commit

Permalink
Migrate ResetConsensusSubscription to raw SQL (#1348)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 27, 2024
1 parent 74ed4bf commit 65ea0cf
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 34 deletions.
49 changes: 15 additions & 34 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
}

// Get latest consensus change ID or init db.
ci, ccid, err := initConsensusInfo(db)
ci, ccid, err := initConsensusInfo(ctx, dbMain)
if err != nil {
return nil, modules.ConsensusChangeID{}, err
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
walletAddress: cfg.WalletAddress,
chainIndex: types.ChainIndex{
Height: ci.Height,
ID: types.BlockID(ci.BlockID),
ID: types.BlockID(ci.ID),
},

lastPrunedAt: time.Now(),
Expand Down Expand Up @@ -510,47 +510,28 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er
return fmt.Errorf("retryTransaction failed: %w", err)
}

func initConsensusInfo(db *gorm.DB) (dbConsensusInfo, modules.ConsensusChangeID, error) {
var ci dbConsensusInfo
if err := db.
Where(&dbConsensusInfo{Model: Model{ID: consensusInfoID}}).
Attrs(dbConsensusInfo{
Model: Model{ID: consensusInfoID},
CCID: modules.ConsensusChangeBeginning[:],
}).
FirstOrCreate(&ci).
Error; err != nil {
return dbConsensusInfo{}, modules.ConsensusChangeID{}, err
}
var ccid modules.ConsensusChangeID
copy(ccid[:], ci.CCID)
return ci, ccid, nil
func initConsensusInfo(ctx context.Context, db sql.Database) (ci types.ChainIndex, ccid modules.ConsensusChangeID, err error) {
err = db.Transaction(ctx, func(tx sql.DatabaseTx) error {
ci, ccid, err = tx.InitConsensusInfo(ctx)
return err
})
return
}

func (s *SQLStore) ResetConsensusSubscription(ctx context.Context) error {
// empty tables and reinit consensus_infos
var ci dbConsensusInfo
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
if err := s.db.Exec("DELETE FROM consensus_infos").Error; err != nil {
return err
} else if err := s.db.Exec("DELETE FROM siacoin_elements").Error; err != nil {
return err
} else if err := s.db.Exec("DELETE FROM transactions").Error; err != nil {
return err
} else if ci, _, err = initConsensusInfo(tx); err != nil {
return err
}
return nil
// reset db
var ci types.ChainIndex
var err error
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
ci, err = tx.ResetConsensusSubscription(ctx)
return err
})
if err != nil {
return err
}
// reset in-memory state.
s.persistMu.Lock()
s.chainIndex = types.ChainIndex{
Height: ci.Height,
ID: types.BlockID(ci.BlockID),
}
s.chainIndex = ci
s.persistMu.Unlock()
return nil
}
9 changes: 9 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
)

// The database interfaces define all methods that a SQL database must implement
Expand Down Expand Up @@ -141,6 +142,10 @@ type (
// HostBlocklist returns the list of host addresses on the blocklist.
HostBlocklist(ctx context.Context) ([]string, error)

// InitConsensusInfo initializes the consensus info in the database or
// returns the latest one.
InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error)

// InsertObject inserts a new object into the database.
InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error

Expand Down Expand Up @@ -211,6 +216,10 @@ type (
// returned.
RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error

// ResetConsenusSubscription resets the consensus subscription in the
// database.
ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error)

// ResetLostSectors resets the lost sector count for the given host.
ResetLostSectors(ctx context.Context, hk types.PublicKey) error

Expand Down
36 changes: 36 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"
)

const consensuInfoID = 1

var ErrNegativeOffset = errors.New("offset can not be negative")

func AbortMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, uploadID string) error {
Expand Down Expand Up @@ -1294,6 +1297,39 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64
return res.RowsAffected()
}

func InitConsensusInfo(ctx context.Context, tx sql.Tx) (types.ChainIndex, modules.ConsensusChangeID, error) {
// try fetch existing
var ccid modules.ConsensusChangeID
var ci types.ChainIndex
err := tx.QueryRow(ctx, "SELECT cc_id, height, block_id FROM consensus_infos WHERE id = ?", consensuInfoID).
Scan((*CCID)(&ccid), &ci.Height, (*Hash256)(&ci.ID))
if err != nil && !errors.Is(err, dsql.ErrNoRows) {
return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to fetch consensus info: %w", err)
} else if err == nil {
return ci, ccid, nil
}
// otherwise init
ci = types.ChainIndex{}
if _, err := tx.Exec(ctx, "INSERT INTO consensus_infos (id, created_at, cc_id, height, block_id) VALUES (?, ?, ?, ?, ?)",
consensuInfoID, time.Now(), (CCID)(modules.ConsensusChangeBeginning), ci.Height, (Hash256)(ci.ID)); err != nil {
return types.ChainIndex{}, modules.ConsensusChangeID{}, fmt.Errorf("failed to init consensus infos: %w", err)
}
return types.ChainIndex{}, modules.ConsensusChangeBeginning, nil
}

func ResetConsensusSubscription(ctx context.Context, tx sql.Tx) (ci types.ChainIndex, err error) {
if _, err := tx.Exec(ctx, "DELETE FROM consensus_infos"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete consensus infos: %w", err)
} else if _, err := tx.Exec(ctx, "DELETE FROM siacoin_elements"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete siacoin elements: %w", err)
} else if _, err := tx.Exec(ctx, "DELETE FROM transactions"); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to delete transactions: %w", err)
} else if ci, _, err = InitConsensusInfo(ctx, tx); err != nil {
return types.ChainIndex{}, fmt.Errorf("failed to initialize consensus info: %w", err)
}
return ci, nil
}

func ResetLostSectors(ctx context.Context, tx sql.Tx, hk types.PublicKey) error {
_, err := tx.Exec(ctx, "UPDATE hosts SET lost_sectors = 0 WHERE public_key = ?", PublicKey(hk))
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"

"go.sia.tech/renterd/internal/sql"
Expand Down Expand Up @@ -355,6 +356,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) {
return ssql.InitConsensusInfo(ctx, tx)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -604,6 +609,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
return nil
}

func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) {
return ssql.ResetConsensusSubscription(ctx, tx)
}

func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error {
return ssql.ResetLostSectors(ctx, tx, hk)
}
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"lukechampine.com/frand"

"go.uber.org/zap"
Expand Down Expand Up @@ -344,6 +345,10 @@ func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InitConsensusInfo(ctx context.Context) (types.ChainIndex, modules.ConsensusChangeID, error) {
return ssql.InitConsensusInfo(ctx, tx)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -602,6 +607,10 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
return nil
}

func (tx *MainDatabaseTx) ResetConsensusSubscription(ctx context.Context) (types.ChainIndex, error) {
return ssql.ResetConsensusSubscription(ctx, tx)
}

func (tx *MainDatabaseTx) ResetLostSectors(ctx context.Context, hk types.PublicKey) error {
return ssql.ResetLostSectors(ctx, tx, hk)
}
Expand Down
19 changes: 19 additions & 0 deletions stores/sql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/siad/modules"
)

const (
Expand All @@ -24,6 +25,7 @@ const (
type (
AutopilotConfig api.AutopilotConfig
BigInt big.Int
CCID modules.ConsensusChangeID
Currency types.Currency
FileContractID types.FileContractID
Hash256 types.Hash256
Expand All @@ -46,6 +48,7 @@ var (
_ scannerValuer = (*AutopilotConfig)(nil)
_ scannerValuer = (*BigInt)(nil)
_ scannerValuer = (*BusSetting)(nil)
_ scannerValuer = (*CCID)(nil)
_ scannerValuer = (*Currency)(nil)
_ scannerValuer = (*FileContractID)(nil)
_ scannerValuer = (*Hash256)(nil)
Expand Down Expand Up @@ -99,6 +102,22 @@ func (b BigInt) Value() (driver.Value, error) {
return (*big.Int)(&b).String(), nil
}

// Scan scan value into CCID, implements sql.Scanner interface.
func (c *CCID) Scan(value interface{}) error {
switch value := value.(type) {
case []byte:
copy(c[:], value)
default:
return fmt.Errorf("failed to unmarshal CCID value: %v %t", value, value)
}
return nil
}

// Value returns a publicKey value, implements driver.Valuer interface.
func (c CCID) Value() (driver.Value, error) {
return c[:], nil
}

// Scan scan value into Currency, implements sql.Scanner interface.
func (c *Currency) Scan(value interface{}) error {
var s string
Expand Down

0 comments on commit 65ea0cf

Please sign in to comment.