From 7278bd06a1c76637f514f3f1dbe241992a681f78 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 12 Dec 2024 16:12:31 +0100 Subject: [PATCH] move known contracts into txn --- internal/bus/chainsubscriber.go | 117 ++++++++++++-------------------- stores/sql/chain.go | 16 ++++- stores/sql/database.go | 1 + stores/sql/mysql/chain.go | 28 ++++++-- stores/sql/sqlite/chain.go | 28 ++++++-- 5 files changed, 105 insertions(+), 85 deletions(-) diff --git a/internal/bus/chainsubscriber.go b/internal/bus/chainsubscriber.go index 48aa491c7..1e18685e8 100644 --- a/internal/bus/chainsubscriber.go +++ b/internal/bus/chainsubscriber.go @@ -13,7 +13,6 @@ import ( rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/api" - "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" @@ -91,9 +90,8 @@ type ( syncSig chan struct{} wg sync.WaitGroup - mu sync.Mutex - knownContracts map[types.FileContractID]bool - unsubscribeFn func() + mu sync.Mutex + unsubscribeFn func() } ) @@ -123,8 +121,6 @@ func NewChainSubscriber(whm WebhookManager, cm ChainManager, cs ChainStore, s Sy shutdownCtx: ctx, shutdownCtxCancel: cancel, syncSig: make(chan struct{}, 1), - - knownContracts: make(map[types.FileContractID]bool), } // start the subscriber @@ -212,6 +208,11 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply cau.ForEachFileContractElement(func(fce types.FileContractElement, _ bool, rev *types.FileContractElement, resolved, valid bool) { if err != nil { return + } else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil { + err = lookupErr + return + } else if !known { + return // only consider known contracts } err = s.applyV1ContractUpdate(tx, cau.State.Index, fce, rev, resolved, valid) }) @@ -224,6 +225,16 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply cau.ForEachV2FileContractElement(func(fce types.V2FileContractElement, _ bool, rev *types.V2FileContractElement, res types.V2FileContractResolutionType) { if err != nil { return + } else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil { + err = lookupErr + return + } else if !known { + return // only consider known contracts + } + if rev == nil { + revisedContracts = append(revisedContracts, fce) + } else { + revisedContracts = append(revisedContracts, *rev) } err = s.applyV2ContractUpdate(tx, cau.State.Index, fce, rev, res) }) @@ -231,18 +242,12 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply return fmt.Errorf("failed to apply v2 contract update: %w", err) } - // new contracts - only consider the ones we are interested in - filtered := revisedContracts[:0] - for _, fce := range revisedContracts { - if s.isKnownContract(fce.ID) { - filtered = append(filtered, fce) - } - } - if err := tx.UpdateFileContractElements(filtered); err != nil { + // update revised contracts + if err := tx.UpdateFileContractElements(revisedContracts); err != nil { return fmt.Errorf("failed to insert v2 file contract elements: %w", err) } - // contract proofs + // update contract proofs if err := tx.UpdateFileContractElementProofs(cau); err != nil { return fmt.Errorf("failed to update file contract element proofs: %w", err) } @@ -271,6 +276,11 @@ func (s *chainSubscriber) revertChainUpdate(tx sql.ChainUpdateTx, cru chain.Reve cru.ForEachFileContractElement(func(fce types.FileContractElement, _ bool, rev *types.FileContractElement, resolved, valid bool) { if err != nil { return + } else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil { + err = lookupErr + return + } else if !known { + return // only consider known contracts } err = s.revertV1ContractUpdate(tx, fce, rev, resolved, valid) }) @@ -283,6 +293,16 @@ func (s *chainSubscriber) revertChainUpdate(tx sql.ChainUpdateTx, cru chain.Reve cru.ForEachV2FileContractElement(func(fce types.V2FileContractElement, created bool, rev *types.V2FileContractElement, res types.V2FileContractResolutionType) { if err != nil { return + } else if known, lookupErr := tx.IsKnownContract(fce.ID); err != nil { + err = lookupErr + return + } else if !known { + return // only consider known contracts + } + if rev == nil { + revertedContracts = append(revertedContracts, fce) + } else { + revertedContracts = append(revertedContracts, *rev) } err = s.revertV2ContractUpdate(tx, fce, rev, res) }) @@ -290,18 +310,12 @@ func (s *chainSubscriber) revertChainUpdate(tx sql.ChainUpdateTx, cru chain.Reve return fmt.Errorf("failed to revert v2 contract update: %w", err) } - // reverted contracts - only consider the ones that we are interested in - filtered := revertedContracts[:0] - for _, fce := range revertedContracts { - if s.isKnownContract(fce.ID) { - filtered = append(filtered, fce) - } - } - if err := tx.UpdateFileContractElements(filtered); err != nil { + // update reverted contracts + if err := tx.UpdateFileContractElements(revertedContracts); err != nil { return fmt.Errorf("failed to remove v2 file contract elements: %w", err) } - // contract proofs + // update contract proofs if err := tx.UpdateFileContractElementProofs(cru); err != nil { return fmt.Errorf("failed to update file contract element proofs: %w", err) } @@ -441,17 +455,9 @@ func (s *chainSubscriber) applyV1ContractUpdate(tx sql.ChainUpdateTx, index type fcid = rev.ID } - // ignore unknown contracts - if !s.isKnownContract(fcid) { - return nil - } - // fetch contract state state, err := tx.ContractState(fcid) - if err != nil && utils.IsErr(err, api.ErrContractNotFound) { - s.updateKnownContracts(fcid, false) // ignore unknown contracts - return nil - } else if err != nil { + if err != nil { return fmt.Errorf("failed to get contract state: %w", err) } @@ -516,17 +522,9 @@ func (s *chainSubscriber) revertV1ContractUpdate(tx sql.ChainUpdateTx, fce types fcid = rev.ID } - // ignore unknown contracts - if !s.isKnownContract(fcid) { - return nil - } - // fetch contract state to see if contract is known _, err := tx.ContractState(fcid) - if err != nil && utils.IsErr(err, api.ErrContractNotFound) { - s.updateKnownContracts(fcid, false) // ignore unknown contracts - return nil - } else if err != nil { + if err != nil { return fmt.Errorf("failed to get contract state: %w", err) } @@ -574,17 +572,9 @@ func (s *chainSubscriber) applyV2ContractUpdate(tx sql.ChainUpdateTx, index type fcid = rev.ID } - // ignore unknown contracts - if !s.isKnownContract(fcid) { - return nil - } - // fetch contract state state, err := tx.ContractState(fcid) - if err != nil && utils.IsErr(err, api.ErrContractNotFound) { - s.updateKnownContracts(fcid, false) // ignore unknown contracts - return nil - } else if err != nil { + if err != nil { return fmt.Errorf("failed to get contract state: %w", err) } @@ -661,16 +651,15 @@ func (s *chainSubscriber) revertV2ContractUpdate(tx sql.ChainUpdateTx, fce types } // ignore unknown contracts - if !s.isKnownContract(fcid) { + if known, err := tx.IsKnownContract(fcid); err != nil { + return err + } else if !known { return nil } // fetch contract state to see if contract is known state, err := tx.ContractState(fcid) - if err != nil && utils.IsErr(err, api.ErrContractNotFound) { - s.updateKnownContracts(fcid, false) // ignore unknown contracts - return nil - } else if err != nil { + if err != nil { return fmt.Errorf("failed to get contract state: %w", err) } @@ -713,19 +702,3 @@ func (s *chainSubscriber) isClosed() bool { } return false } - -func (s *chainSubscriber) isKnownContract(fcid types.FileContractID) bool { - s.mu.Lock() - defer s.mu.Unlock() - known, ok := s.knownContracts[fcid] - if !ok { - return true // assume known - } - return known -} - -func (s *chainSubscriber) updateKnownContracts(fcid types.FileContractID, known bool) { - s.mu.Lock() - defer s.mu.Unlock() - s.knownContracts[fcid] = known -} diff --git a/stores/sql/chain.go b/stores/sql/chain.go index 9b3808f6b..990c2c4cf 100644 --- a/stores/sql/chain.go +++ b/stores/sql/chain.go @@ -204,12 +204,22 @@ func FileContractElement(ctx context.Context, tx sql.Tx, fcid types.FileContract }, nil } -func RecordContractRenewal(ctx context.Context, tx sql.Tx, old, new types.FileContractID) error { - _, err := tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_to = ? WHERE contracts.fcid = ?", FileContractID(new), FileContractID(old)) +func IsKnownContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID) (known bool, _ error) { + err := tx.QueryRow(ctx, "SELECT 1 FROM contracts WHERE fcid = ?", FileContractID(fcid)).Scan(&known) + if errors.Is(err, dsql.ErrNoRows) { + return false, nil + } else if err != nil { + return false, err + } + return known, nil +} + +func RecordContractRenewal(ctx context.Context, tx sql.Tx, oldFCID, newFCID types.FileContractID) error { + _, err := tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_to = ? WHERE contracts.fcid = ?", FileContractID(newFCID), FileContractID(oldFCID)) if err != nil { return fmt.Errorf("failed to update renewed_to of old contract: %w", err) } - _, err = tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_from = ? WHERE contracts.fcid = ?", FileContractID(old), FileContractID(new)) + _, err = tx.Exec(ctx, "UPDATE contracts SET contracts.renewed_from = ? WHERE contracts.fcid = ?", FileContractID(oldFCID), FileContractID(newFCID)) if err != nil { return fmt.Errorf("failed to update renewed_to of new contract: %w", err) } diff --git a/stores/sql/database.go b/stores/sql/database.go index 1490e7957..1c5d5935a 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -21,6 +21,7 @@ type ( ContractState(fcid types.FileContractID) (api.ContractState, error) ExpiredFileContractElements(bh uint64) ([]types.V2FileContractElement, error) FileContractElement(fcid types.FileContractID) (types.V2FileContractElement, error) + IsKnownContract(fcid types.FileContractID) (bool, error) PruneFileContractElements(threshold uint64) error RecordContractRenewal(old, new types.FileContractID) error UpdateFileContractElements([]types.V2FileContractElement) error diff --git a/stores/sql/mysql/chain.go b/stores/sql/mysql/chain.go index 1c1702dd1..1d2683075 100644 --- a/stores/sql/mysql/chain.go +++ b/stores/sql/mysql/chain.go @@ -24,9 +24,10 @@ var ( ) type chainUpdateTx struct { - ctx context.Context - tx isql.Tx - l *zap.SugaredLogger + ctx context.Context + tx isql.Tx + l *zap.SugaredLogger + known map[types.FileContractID]bool // map to prevent rare duplicate selects } func (c chainUpdateTx) WalletApplyIndex(index types.ChainIndex, created, spent []types.SiacoinElement, events []wallet.Event, timestamp time.Time) error { @@ -210,12 +211,29 @@ func (c chainUpdateTx) FileContractElement(fcid types.FileContractID) (types.V2F return ssql.FileContractElement(c.ctx, c.tx, fcid) } +func (c chainUpdateTx) IsKnownContract(fcid types.FileContractID) (bool, error) { + if c.known == nil { + c.known = make(map[types.FileContractID]bool) + } + + if relevant, ok := c.known[fcid]; ok { + return relevant, nil + } + + known, err := ssql.IsKnownContract(c.ctx, c.tx, fcid) + if err != nil { + return false, err + } + c.known[fcid] = known + return known, nil +} + func (c chainUpdateTx) PruneFileContractElements(threshold uint64) error { return ssql.PruneFileContractElements(c.ctx, c.tx, threshold) } -func (c chainUpdateTx) RecordContractRenewal(old, new types.FileContractID) error { - return ssql.RecordContractRenewal(c.ctx, c.tx, old, new) +func (c chainUpdateTx) RecordContractRenewal(oldFCID, newFCID types.FileContractID) error { + return ssql.RecordContractRenewal(c.ctx, c.tx, oldFCID, newFCID) } func (c chainUpdateTx) UpdateFileContractElements(fces []types.V2FileContractElement) error { diff --git a/stores/sql/sqlite/chain.go b/stores/sql/sqlite/chain.go index ee50ad664..f3abf5353 100644 --- a/stores/sql/sqlite/chain.go +++ b/stores/sql/sqlite/chain.go @@ -25,9 +25,10 @@ var ( ) type chainUpdateTx struct { - ctx context.Context - tx isql.Tx - l *zap.SugaredLogger + ctx context.Context + tx isql.Tx + l *zap.SugaredLogger + known map[types.FileContractID]bool // map to prevent rare duplicate selects } func (c chainUpdateTx) WalletApplyIndex(index types.ChainIndex, created, spent []types.SiacoinElement, events []wallet.Event, timestamp time.Time) error { @@ -211,12 +212,29 @@ func (c chainUpdateTx) FileContractElement(fcid types.FileContractID) (types.V2F return ssql.FileContractElement(c.ctx, c.tx, fcid) } +func (c chainUpdateTx) IsKnownContract(fcid types.FileContractID) (bool, error) { + if c.known == nil { + c.known = make(map[types.FileContractID]bool) + } + + if relevant, ok := c.known[fcid]; ok { + return relevant, nil + } + + known, err := ssql.IsKnownContract(c.ctx, c.tx, fcid) + if err != nil { + return false, err + } + c.known[fcid] = known + return known, nil +} + func (c chainUpdateTx) PruneFileContractElements(threshold uint64) error { return ssql.PruneFileContractElements(c.ctx, c.tx, threshold) } -func (c chainUpdateTx) RecordContractRenewal(old, new types.FileContractID) error { - return ssql.RecordContractRenewal(c.ctx, c.tx, old, new) +func (c chainUpdateTx) RecordContractRenewal(oldFCID, newFCID types.FileContractID) error { + return ssql.RecordContractRenewal(c.ctx, c.tx, oldFCID, newFCID) } func (c chainUpdateTx) UpdateFileContractElements(fces []types.V2FileContractElement) error {