Skip to content

Commit

Permalink
move known contracts into txn
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Dec 12, 2024
1 parent 6d7ca5e commit 1a51275
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 85 deletions.
112 changes: 40 additions & 72 deletions internal/bus/chainsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
Expand Down Expand Up @@ -91,9 +90,8 @@ type (
syncSig chan struct{}
wg sync.WaitGroup

mu sync.Mutex
knownContracts map[types.FileContractID]bool
unsubscribeFn func()
mu sync.Mutex

Check failure on line 93 in internal/bus/chainsubscriber.go

View workflow job for this annotation

GitHub Actions / analyze

field `mu` is unused (unused)
unsubscribeFn func()
}
)

Expand Down Expand Up @@ -123,8 +121,6 @@ func NewChainSubscriber(whm WebhookManager, cm ChainManager, cs ChainStore, s Sy
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
syncSig: make(chan struct{}, 1),

knownContracts: make(map[types.FileContractID]bool),
}

// start the subscriber
Expand Down Expand Up @@ -212,6 +208,11 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply
cau.ForEachFileContractElement(func(fce types.FileContractElement, _ bool, rev *types.FileContractElement, resolved, valid bool) {
if err != nil {
return
} else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil {
err = lookupErr
return
} else if !known {
return // only consider known contracts
}
err = s.applyV1ContractUpdate(tx, cau.State.Index, fce, rev, resolved, valid)
})
Expand All @@ -224,25 +225,29 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply
cau.ForEachV2FileContractElement(func(fce types.V2FileContractElement, _ bool, rev *types.V2FileContractElement, res types.V2FileContractResolutionType) {
if err != nil {
return
} else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil {
err = lookupErr
return
} else if !known {
return // only consider known contracts
}
if rev == nil {
revisedContracts = append(revisedContracts, fce)
} else {
revisedContracts = append(revisedContracts, *rev)
}
err = s.applyV2ContractUpdate(tx, cau.State.Index, fce, rev, res)
})
if err != nil {
return fmt.Errorf("failed to apply v2 contract update: %w", err)
}

// new contracts - only consider the ones we are interested in
filtered := revisedContracts[:0]
for _, fce := range revisedContracts {
if s.isKnownContract(fce.ID) {
filtered = append(filtered, fce)
}
}
if err := tx.UpdateFileContractElements(filtered); err != nil {
// update revised contracts
if err := tx.UpdateFileContractElements(revisedContracts); err != nil {
return fmt.Errorf("failed to insert v2 file contract elements: %w", err)
}

// contract proofs
// update contract proofs
if err := tx.UpdateFileContractElementProofs(cau); err != nil {
return fmt.Errorf("failed to update file contract element proofs: %w", err)
}
Expand Down Expand Up @@ -283,25 +288,29 @@ func (s *chainSubscriber) revertChainUpdate(tx sql.ChainUpdateTx, cru chain.Reve
cru.ForEachV2FileContractElement(func(fce types.V2FileContractElement, created bool, rev *types.V2FileContractElement, res types.V2FileContractResolutionType) {
if err != nil {
return
} else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil {
err = lookupErr
return
} else if !known {
return // only consider known contracts
}
if rev == nil {
revertedContracts = append(revertedContracts, fce)
} else {
revertedContracts = append(revertedContracts, *rev)
}
err = s.revertV2ContractUpdate(tx, fce, rev, res)
})
if err != nil {
return fmt.Errorf("failed to revert v2 contract update: %w", err)
}

// reverted contracts - only consider the ones that we are interested in
filtered := revertedContracts[:0]
for _, fce := range revertedContracts {
if s.isKnownContract(fce.ID) {
filtered = append(filtered, fce)
}
}
if err := tx.UpdateFileContractElements(filtered); err != nil {
// update reverted contracts
if err := tx.UpdateFileContractElements(revertedContracts); err != nil {
return fmt.Errorf("failed to remove v2 file contract elements: %w", err)
}

// contract proofs
// update contract proofs
if err := tx.UpdateFileContractElementProofs(cru); err != nil {
return fmt.Errorf("failed to update file contract element proofs: %w", err)
}
Expand Down Expand Up @@ -441,17 +450,9 @@ func (s *chainSubscriber) applyV1ContractUpdate(tx sql.ChainUpdateTx, index type
fcid = rev.ID
}

// ignore unknown contracts
if !s.isKnownContract(fcid) {
return nil
}

// fetch contract state
state, err := tx.ContractState(fcid)
if err != nil && utils.IsErr(err, api.ErrContractNotFound) {
s.updateKnownContracts(fcid, false) // ignore unknown contracts
return nil
} else if err != nil {
if err != nil {
return fmt.Errorf("failed to get contract state: %w", err)
}

Expand Down Expand Up @@ -516,17 +517,9 @@ func (s *chainSubscriber) revertV1ContractUpdate(tx sql.ChainUpdateTx, fce types
fcid = rev.ID
}

// ignore unknown contracts
if !s.isKnownContract(fcid) {
return nil
}

// fetch contract state to see if contract is known
_, err := tx.ContractState(fcid)
if err != nil && utils.IsErr(err, api.ErrContractNotFound) {
s.updateKnownContracts(fcid, false) // ignore unknown contracts
return nil
} else if err != nil {
if err != nil {
return fmt.Errorf("failed to get contract state: %w", err)
}

Expand Down Expand Up @@ -574,17 +567,9 @@ func (s *chainSubscriber) applyV2ContractUpdate(tx sql.ChainUpdateTx, index type
fcid = rev.ID
}

// ignore unknown contracts
if !s.isKnownContract(fcid) {
return nil
}

// fetch contract state
state, err := tx.ContractState(fcid)
if err != nil && utils.IsErr(err, api.ErrContractNotFound) {
s.updateKnownContracts(fcid, false) // ignore unknown contracts
return nil
} else if err != nil {
if err != nil {
return fmt.Errorf("failed to get contract state: %w", err)
}

Expand Down Expand Up @@ -661,16 +646,15 @@ func (s *chainSubscriber) revertV2ContractUpdate(tx sql.ChainUpdateTx, fce types
}

// ignore unknown contracts
if !s.isKnownContract(fcid) {
if known, err := tx.IsKnownContract(fcid); err != nil {
return err
} else if !known {
return nil
}

// fetch contract state to see if contract is known
state, err := tx.ContractState(fcid)
if err != nil && utils.IsErr(err, api.ErrContractNotFound) {
s.updateKnownContracts(fcid, false) // ignore unknown contracts
return nil
} else if err != nil {
if err != nil {
return fmt.Errorf("failed to get contract state: %w", err)
}

Expand Down Expand Up @@ -713,19 +697,3 @@ func (s *chainSubscriber) isClosed() bool {
}
return false
}

func (s *chainSubscriber) isKnownContract(fcid types.FileContractID) bool {
s.mu.Lock()
defer s.mu.Unlock()
known, ok := s.knownContracts[fcid]
if !ok {
return true // assume known
}
return known
}

func (s *chainSubscriber) updateKnownContracts(fcid types.FileContractID, known bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.knownContracts[fcid] = known
}
16 changes: 13 additions & 3 deletions stores/sql/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,22 @@ func FileContractElement(ctx context.Context, tx sql.Tx, fcid types.FileContract
}, nil
}

func RecordContractRenewal(ctx context.Context, tx sql.Tx, old, new types.FileContractID) error {
_, err := tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_to = ? WHERE contracts.fcid = ?", FileContractID(new), FileContractID(old))
func IsKnownContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID) (known bool, _ error) {
err := tx.QueryRow(ctx, "SELECT 1 FROM contracts WHERE fcid = ?", FileContractID(fcid)).Scan(&known)
if errors.Is(err, dsql.ErrNoRows) {
return false, nil
} else if err != nil {
return false, err
}
return known, nil
}

func RecordContractRenewal(ctx context.Context, tx sql.Tx, oldFCID, newFCID types.FileContractID) error {
_, err := tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_to = ? WHERE contracts.fcid = ?", FileContractID(newFCID), FileContractID(oldFCID))
if err != nil {
return fmt.Errorf("failed to update renewed_to of old contract: %w", err)
}
_, err = tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_from = ? WHERE contracts.fcid = ?", FileContractID(old), FileContractID(new))
_, err = tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_from = ? WHERE contracts.fcid = ?", FileContractID(oldFCID), FileContractID(newFCID))
if err != nil {
return fmt.Errorf("failed to update renewed_to of new contract: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type (
ContractState(fcid types.FileContractID) (api.ContractState, error)
ExpiredFileContractElements(bh uint64) ([]types.V2FileContractElement, error)
FileContractElement(fcid types.FileContractID) (types.V2FileContractElement, error)
IsKnownContract(fcid types.FileContractID) (bool, error)
PruneFileContractElements(threshold uint64) error
RecordContractRenewal(old, new types.FileContractID) error
UpdateFileContractElements([]types.V2FileContractElement) error
Expand Down
28 changes: 23 additions & 5 deletions stores/sql/mysql/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ var (
)

type chainUpdateTx struct {
ctx context.Context
tx isql.Tx
l *zap.SugaredLogger
ctx context.Context
tx isql.Tx
l *zap.SugaredLogger
known map[types.FileContractID]bool // map to prevent rare duplicate selects
}

func (c chainUpdateTx) WalletApplyIndex(index types.ChainIndex, created, spent []types.SiacoinElement, events []wallet.Event, timestamp time.Time) error {
Expand Down Expand Up @@ -210,12 +211,29 @@ func (c chainUpdateTx) FileContractElement(fcid types.FileContractID) (types.V2F
return ssql.FileContractElement(c.ctx, c.tx, fcid)
}

func (c chainUpdateTx) IsKnownContract(fcid types.FileContractID) (bool, error) {
if c.known == nil {
c.known = make(map[types.FileContractID]bool)
}

if relevant, ok := c.known[fcid]; ok {
return relevant, nil
}

known, err := ssql.IsKnownContract(c.ctx, c.tx, fcid)
if err != nil {
return false, err
}
c.known[fcid] = known
return known, nil
}

func (c chainUpdateTx) PruneFileContractElements(threshold uint64) error {
return ssql.PruneFileContractElements(c.ctx, c.tx, threshold)
}

func (c chainUpdateTx) RecordContractRenewal(old, new types.FileContractID) error {
return ssql.RecordContractRenewal(c.ctx, c.tx, old, new)
func (c chainUpdateTx) RecordContractRenewal(oldFCID, newFCID types.FileContractID) error {
return ssql.RecordContractRenewal(c.ctx, c.tx, oldFCID, newFCID)
}

func (c chainUpdateTx) UpdateFileContractElements(fces []types.V2FileContractElement) error {
Expand Down
28 changes: 23 additions & 5 deletions stores/sql/sqlite/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var (
)

type chainUpdateTx struct {
ctx context.Context
tx isql.Tx
l *zap.SugaredLogger
ctx context.Context
tx isql.Tx
l *zap.SugaredLogger
known map[types.FileContractID]bool // map to prevent rare duplicate selects
}

func (c chainUpdateTx) WalletApplyIndex(index types.ChainIndex, created, spent []types.SiacoinElement, events []wallet.Event, timestamp time.Time) error {
Expand Down Expand Up @@ -211,12 +212,29 @@ func (c chainUpdateTx) FileContractElement(fcid types.FileContractID) (types.V2F
return ssql.FileContractElement(c.ctx, c.tx, fcid)
}

func (c chainUpdateTx) IsKnownContract(fcid types.FileContractID) (bool, error) {
if c.known == nil {
c.known = make(map[types.FileContractID]bool)
}

if relevant, ok := c.known[fcid]; ok {
return relevant, nil
}

known, err := ssql.IsKnownContract(c.ctx, c.tx, fcid)
if err != nil {
return false, err
}
c.known[fcid] = known
return known, nil
}

func (c chainUpdateTx) PruneFileContractElements(threshold uint64) error {
return ssql.PruneFileContractElements(c.ctx, c.tx, threshold)
}

func (c chainUpdateTx) RecordContractRenewal(old, new types.FileContractID) error {
return ssql.RecordContractRenewal(c.ctx, c.tx, old, new)
func (c chainUpdateTx) RecordContractRenewal(oldFCID, newFCID types.FileContractID) error {
return ssql.RecordContractRenewal(c.ctx, c.tx, oldFCID, newFCID)
}

func (c chainUpdateTx) UpdateFileContractElements(fces []types.V2FileContractElement) error {
Expand Down

0 comments on commit 1a51275

Please sign in to comment.