Skip to content

Commit

Permalink
Merge pull request #308 from SiaFoundation/nate/refactor-sector-prune
Browse files Browse the repository at this point in the history
Refactor sector prune
  • Loading branch information
n8maninger authored Feb 17, 2024
2 parents 6f0a7b6 + 5cf05b7 commit 503ffa7
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 191 deletions.
185 changes: 88 additions & 97 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,43 @@ func (u *updateContractsTxn) ContractRelevant(id types.FileContractID) (bool, er
return err == nil, err
}

func (s *Store) batchExpireContractSectors(height uint64) (removed []contractSectorRef, pruned int, err error) {
func deleteExpiredContractSectors(tx txn, height uint64) (sectorIDs []int64, err error) {
const query = `DELETE FROM contract_sector_roots
WHERE id IN (SELECT csr.id FROM contract_sector_roots csr
INNER JOIN contracts c ON (csr.contract_id=c.id)
-- past proof window or not confirmed and past the rebroadcast height
WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3)
RETURNING sector_id;`
rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return nil, err
}
sectorIDs = append(sectorIDs, id)
}
return sectorIDs, nil
}

func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed []types.Hash256, err error) {
err = s.transaction(func(tx txn) (err error) {
removed, err = expiredContractSectors(tx, height, sqlSectorBatchSize)
sectorIDs, err := deleteExpiredContractSectors(tx, height)
if err != nil {
return fmt.Errorf("failed to select sectors: %w", err)
return fmt.Errorf("failed to delete contract sectors: %w", err)
}
expired = len(sectorIDs)

refs := make([]contractSectorRootRef, 0, len(removed))
for _, sector := range removed {
refs = append(refs, contractSectorRootRef{
dbID: sector.ID,
sectorID: sector.SectorID,
})
// decrement the contract metrics
if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil {
return fmt.Errorf("failed to decrement contract sectors: %w", err)
}

pruned, err = deleteContractSectors(tx, refs)
if err != nil {
return fmt.Errorf("failed to prune sectors: %w", err)
}
return nil
removed, err = pruneSectors(tx, sectorIDs)
return err
})
return
}
Expand Down Expand Up @@ -298,9 +315,8 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, roots []types.
return fmt.Errorf("failed to trim sectors: %w", err)
}
sectors -= change.A
removed := roots[len(roots)-int(change.A):]
for _, root := range removed {
if !trimmed[root] {
for i, root := range roots[len(roots)-int(change.A):] {
if trimmed[i] != root {
return fmt.Errorf("inconsistent sector trim: expected %s to be trimmed", root)
}
}
Expand Down Expand Up @@ -519,28 +535,16 @@ func (s *Store) UpdateContractState(ccID modules.ConsensusChangeID, height uint6
// ExpireContractSectors expires all sectors that are no longer covered by an
// active contract.
func (s *Store) ExpireContractSectors(height uint64) error {
var totalRemoved int
contractExpired := make(map[types.FileContractID]int)
defer func() {
for contractID, removed := range contractExpired {
s.log.Debug("expired contract sectors", zap.Stringer("contractID", contractID), zap.Uint64("height", height), zap.Int("expired", removed))
}
if totalRemoved > 0 {
s.log.Debug("removed contract sectors", zap.Uint64("height", height), zap.Int("removed", totalRemoved))
}
}()
log := s.log.Named("ExpireContractSectors").With(zap.Uint64("height", height))
// delete in batches to avoid holding a lock on the database for too long
for i := 0; ; i++ {
expired, removed, err := s.batchExpireContractSectors(height)
if err != nil {
return fmt.Errorf("failed to prune sectors: %w", err)
} else if len(expired) == 0 {
} else if expired == 0 {
return nil
}
for _, ref := range expired {
contractExpired[ref.ContractID]++
}
totalRemoved += removed
log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i))
jitterSleep(time.Millisecond) // allow other transactions to run
}
}
Expand All @@ -561,6 +565,7 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) {
return contract, err
}

// appendSector appends a new sector root to a contract.
func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error {
var sectorID int64
err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(&sectorID)
Expand All @@ -572,6 +577,7 @@ func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) er
return nil
}

// updateSector updates a contract sector root in place and returns the old sector root
func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) (types.Hash256, error) {
row := tx.QueryRow(`SELECT csr.id, csr.sector_id, ss.sector_root
FROM contract_sector_roots csr
Expand All @@ -582,26 +588,28 @@ WHERE contract_id=$1 AND root_index=$2`, contractID, index)
return types.Hash256{}, fmt.Errorf("failed to get old sector id: %w", err)
}

// update the sector ID
var newSectorID int64
err = tx.QueryRow(`WITH sector AS (
SELECT id FROM stored_sectors WHERE sector_root=$1
)
UPDATE contract_sector_roots
SET sector_id=sector.id
FROM sector
WHERE contract_sector_roots.id=$2
RETURNING sector_id;`, sqlHash256(root), ref.dbID).Scan(&newSectorID)
err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&newSectorID)
if err != nil {
return types.Hash256{}, fmt.Errorf("failed to get new sector id: %w", err)
}

// update the sector ID
err = tx.QueryRow(`UPDATE contract_sector_roots
SET sector_id=$1
WHERE id=$2
RETURNING sector_id;`, newSectorID, ref.dbID).Scan(&newSectorID)
if err != nil {
return types.Hash256{}, fmt.Errorf("failed to update sector ID: %w", err)
}
// prune the old sector ID
if _, err := pruneSectorRef(tx, ref.sectorID); err != nil {
if _, err := pruneSectors(tx, []int64{ref.sectorID}); err != nil {
return types.Hash256{}, fmt.Errorf("failed to prune old sector: %w", err)
}
return ref.root, nil
}

// swapSectors swaps two sector roots in a contract and returns the sector roots
func swapSectors(tx txn, contractID int64, i, j uint64) (map[types.Hash256]bool, error) {
if i == j {
return nil, nil
Expand Down Expand Up @@ -656,11 +664,14 @@ ORDER BY root_index ASC;`, contractID, i, j)
}, nil
}

// lastContractSectors returns the last n sector IDs for a contract.
func lastContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
// lastNContractSectors returns the last n sector IDs for a contract.
func lastNContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id)
WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
WHERE csr.contract_id=$1
ORDER BY root_index DESC
LIMIT $2;`

rows, err := tx.Query(query, contractID, n)
if err != nil {
return nil, err
Expand All @@ -677,68 +688,48 @@ WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
return
}

// deleteContractSectors deletes sector roots from a contract. Sectors that are
// still referenced will not be removed. Returns the number of sectors deleted.
func deleteContractSectors(tx txn, refs []contractSectorRootRef) (int, error) {
var rootIDs []int64
for _, ref := range refs {
rootIDs = append(rootIDs, ref.dbID)
// deleteContractSectorRoots deletes the contract sector roots with the given IDs.
func deleteContractSectorRoots(tx txn, ids []int64) error {
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(ids)) + `);`
res, err := tx.Exec(query, queryArgs(ids)...)
if err != nil {
return fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n != int64(len(ids)) {
return fmt.Errorf("expected %v rows affected, got %v", len(ids), n)
}
return nil
}

// delete the sector roots
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(rootIDs)) + `) RETURNING id;`
rows, err := tx.Query(query, queryArgs(rootIDs)...)
// trimSectors deletes the last n sector roots for a contract and returns the
// deleted sector roots in order.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) {
refs, err := lastNContractSectors(tx, contractID, n)
if err != nil {
return 0, fmt.Errorf("failed to delete sectors: %w", err)
}
deleted := make(map[int64]bool)
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return 0, fmt.Errorf("failed to scan deleted sector: %w", err)
}
deleted[id] = true
}
if len(deleted) != len(rootIDs) {
return 0, errors.New("failed to delete all sectors")
}
for _, rootID := range rootIDs {
if !deleted[rootID] {
return 0, errors.New("failed to delete all sectors")
}
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}

// decrement the contract metrics
if err := incrementNumericStat(tx, metricContractSectors, -len(refs), time.Now()); err != nil {
return 0, fmt.Errorf("failed to decrement contract sectors: %w", err)
var contractSectorRootIDs []int64
roots := make([]types.Hash256, len(refs))
var sectorIDs []int64
for i, ref := range refs {
contractSectorRootIDs = append(contractSectorRootIDs, ref.dbID)
roots[len(roots)-i-1] = ref.root // reverse the order to match the contract sector roots
sectorIDs = append(sectorIDs, ref.sectorID)
}

// attempt to prune the deleted sectors
var pruned int
for _, ref := range refs {
deleted, err := pruneSectorRef(tx, ref.sectorID)
if err != nil {
return 0, fmt.Errorf("failed to prune sector ref: %w", err)
} else if deleted {
pruned++
}
if err := deleteContractSectorRoots(tx, contractSectorRootIDs); err != nil {
return nil, fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorRootIDs), time.Now()); err != nil {
return nil, fmt.Errorf("failed to decrement contract sectors: %w", err)
}
return pruned, nil
}

// trimSectors deletes the last n sector roots for a contract.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) (map[types.Hash256]bool, error) {
refs, err := lastContractSectors(tx, contractID, n)
removed, err := pruneSectors(tx, sectorIDs)
if err != nil {
return nil, fmt.Errorf("failed to get sector IDs: %w", err)
} else if _, err = deleteContractSectors(tx, refs); err != nil {
return nil, fmt.Errorf("failed to delete sectors: %w", err)
}

roots := make(map[types.Hash256]bool)
for _, ref := range refs {
roots[ref.root] = true
return nil, fmt.Errorf("failed to prune sectors: %w", err)
}
log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed))
return roots, nil
}

Expand Down
5 changes: 3 additions & 2 deletions persist/sqlite/contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ func TestReviseContract(t *testing.T) {
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
func() {
t.Log("revising contract:", test.name)
oldRoots := append([]types.Hash256(nil), roots...)
// update the expected roots
for i, change := range test.changes {
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestReviseContract(t *testing.T) {
} else if err := checkConsistency(roots, test.sectors); err != nil {
t.Fatal(err)
}
})
}()
}
}

Expand Down
45 changes: 44 additions & 1 deletion persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,51 @@ import (
"go.uber.org/zap"
)

// migrateVersion26 recalculates the contract and physical sectors metrics
func migrateVersion26(tx txn, log *zap.Logger) error {
// recalculate the contract sectors metric
var contractSectorCount int64
if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil {
return fmt.Errorf("failed to query contract sector count: %w", err)
} else if err := setNumericStat(tx, metricContractSectors, uint64(contractSectorCount), time.Now()); err != nil {
return fmt.Errorf("failed to set contract sectors metric: %w", err)
}

// recalculate the physical sectors metric
var physicalSectorsCount int64
volumePhysicalSectorCount := make(map[int64]int64)
rows, err := tx.Query(`SELECT volume_id, COUNT(*) FROM volume_sectors WHERE sector_id IS NOT NULL GROUP BY volume_id`)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to query volume sector count: %w", err)
}
defer rows.Close()

for rows.Next() {
var volumeID, count int64
if err := rows.Scan(&volumeID, &count); err != nil {
return fmt.Errorf("failed to scan volume sector count: %w", err)
}
volumePhysicalSectorCount[volumeID] = count
physicalSectorsCount += count
}

// update the physical sectors metric
if err := setNumericStat(tx, metricPhysicalSectors, uint64(physicalSectorsCount), time.Now()); err != nil {
return fmt.Errorf("failed to set contract sectors metric: %w", err)
}

// update the volume stats
for volumeID, count := range volumePhysicalSectorCount {
err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors = $1 WHERE id = $2 RETURNING id`, count, volumeID).Scan(&volumeID)
if err != nil {
return fmt.Errorf("failed to update volume stats: %w", err)
}
}
return nil
}

// migrateVersion25 is a no-op migration to trigger foreign key checks
func migrateVersion25(tx txn, log *zap.Logger) error {
// no-op migration to trigger foreign key checks
return nil
}

Expand Down
Loading

0 comments on commit 503ffa7

Please sign in to comment.