Skip to content

Commit

Permalink
storage,sqlite: dedupe, fix metrics test
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Oct 4, 2023
1 parent b4870fb commit 96ba098
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
7 changes: 2 additions & 5 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,8 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhp2.SectorSize]byte)
return nil
}
start := time.Now()
vol, err := vm.getVolume(loc.Volume)
if err != nil {
return fmt.Errorf("failed to get volume %v: %w", loc.Volume, err)
} else if err := vol.WriteSector(data, loc.Index); err != nil {
return fmt.Errorf("failed to write sector %v: %w", root, err)
if err := vm.writeSector(data, loc, false); err != nil {
return err
}
vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start)))
// Add newly written sector to cache
Expand Down
32 changes: 16 additions & 16 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,21 @@ func contractSectorRoots(tx txn, contractID int64) (uint64, error) {
// ReviseContract atomically updates a contract's revision and sectors
func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contracts.Usage, sectorChanges []contracts.SectorChange) error {
return s.transaction(func(tx txn) error {
// revise the contract
contractID, err := reviseContract(tx, revision)
if err != nil {
return fmt.Errorf("failed to revise contract: %w", err)
} else if err := incrementContractUsage(tx, contractID, usage); err != nil {
}
// update the contract usage and metrics
if err := incrementContractUsage(tx, contractID, usage); err != nil {
return fmt.Errorf("failed to update contract usage: %w", err)
} else if err := incrementCurrencyStat(tx, metricRiskedCollateral, usage.RiskedCollateral, false, time.Now()); err != nil {
return fmt.Errorf("failed to track risked collateral: %w", err)
} else if err := incrementPotentialRevenueMetrics(tx, usage, false); err != nil {
return fmt.Errorf("failed to track potential revenue: %w", err)
}

var delta int
// update the sector roots
sectors, err := contractSectorRoots(tx, contractID)
if err != nil {
return fmt.Errorf("failed to get sector index: %w", err)
Expand All @@ -290,7 +297,6 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contract
return fmt.Errorf("failed to append sector: %w", err)
}
sectors++
delta++
case contracts.SectorActionTrim:
if sectors < change.A {
return fmt.Errorf("cannot trim %v sectors from contract with %v sectors", change.A, sectors)
Expand All @@ -300,7 +306,6 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contract
return fmt.Errorf("failed to trim sectors: %w", err)
}
sectors -= change.A
delta -= int(change.A)
case contracts.SectorActionUpdate:
if err := updateSector(tx, contractID, change.Root, change.A); err != nil {
return fmt.Errorf("failed to update sector: %w", err)
Expand All @@ -311,21 +316,11 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contract
}
}
}

// update global stats
if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil {
return fmt.Errorf("failed to track contract sectors: %w", err)
} else if err := incrementCurrencyStat(tx, metricRiskedCollateral, usage.RiskedCollateral, false, time.Now()); err != nil {
return fmt.Errorf("failed to track risked collateral: %w", err)
} else if err := incrementPotentialRevenueMetrics(tx, usage, false); err != nil {
return fmt.Errorf("failed to track potential revenue: %w", err)
}
return nil
})
}

// SectorRoots returns the sector roots for a contract. If limit is 0, all roots
// are returned.
// SectorRoots returns the sector roots for a contract.
func (s *Store) SectorRoots(contractID types.FileContractID) (roots []types.Hash256, err error) {
err = s.transaction(func(tx txn) error {
var dbID int64
Expand Down Expand Up @@ -536,7 +531,12 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) {
func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error {
var sectorID int64
err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(&sectorID)
return err
if err != nil {
return err
} else if err := incrementNumericStat(tx, metricContractSectors, 1, time.Now()); err != nil {
return fmt.Errorf("failed to track contract sectors: %w", err)
}
return nil
}

func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) error {
Expand Down

0 comments on commit 96ba098

Please sign in to comment.