From b56b9d2813d304e9d8eb8a81cd3831aa8d7d2a6d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 18 Dec 2024 13:19:48 +0100 Subject: [PATCH] use goroutine to broadcast file contract revision and refactor test host creation --- internal/accounts/accounts.go | 5 +- internal/bus/chainsubscriber.go | 64 ++++++++++++++------------ internal/test/e2e/host.go | 81 +++++---------------------------- 3 files changed, 49 insertions(+), 101 deletions(-) diff --git a/internal/accounts/accounts.go b/internal/accounts/accounts.go index 72d35ee9d..fe19669c7 100644 --- a/internal/accounts/accounts.go +++ b/internal/accounts/accounts.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "strings" "sync" "time" @@ -341,7 +342,9 @@ func (a *Manager) refillAccounts() { // fetch all usable hosts hosts, err := a.hs.UsableHosts(a.shutdownCtx) - if err != nil { + if utils.IsErr(err, context.Canceled) { + return + } else if err != nil { a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts for refill: %v", err)) return } diff --git a/internal/bus/chainsubscriber.go b/internal/bus/chainsubscriber.go index 0224ab0f3..325a486c4 100644 --- a/internal/bus/chainsubscriber.go +++ b/internal/bus/chainsubscriber.go @@ -415,38 +415,42 @@ func (s *chainSubscriber) broadcastExpiredFileContractResolutions(tx sql.ChainUp s.logger.Errorf("failed to get expired file contract elements: %v", err) return } + for _, fce := range expiredFCEs { - txn := types.V2Transaction{ - MinerFee: s.cm.RecommendedFee().Mul64(ContractResolutionTxnWeight), - FileContractResolutions: []types.V2FileContractResolution{ - { - Parent: fce, - Resolution: &types.V2FileContractExpiration{}, + go func(fce types.V2FileContractElement) { + txn := types.V2Transaction{ + MinerFee: s.cm.RecommendedFee().Mul64(ContractResolutionTxnWeight), + FileContractResolutions: []types.V2FileContractResolution{ + { + Parent: fce, + Resolution: &types.V2FileContractExpiration{}, + }, }, - }, - } - // fund and sign txn - basis, toSign, err := s.wallet.FundV2Transaction(&txn, txn.MinerFee, true) - if err != nil { - s.logger.Errorf("failed to fund contract expiration txn: %v", err) - continue - } - s.wallet.SignV2Inputs(&txn, toSign) - - // verify txn and broadcast it - _, err = s.cm.AddV2PoolTransactions(basis, []types.V2Transaction{txn}) - if err != nil && - (strings.Contains(err.Error(), "has already been resolved") || - strings.Contains(err.Error(), "not present in the accumulator")) { - s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) - s.logger.With(zap.Error(err)).Debug("failed to broadcast contract expiration txn") - continue - } else if err != nil { - s.logger.With(zap.Error(err)).Error("failed to broadcast contract expiration txn") - s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) - continue - } - s.s.BroadcastV2TransactionSet(basis, []types.V2Transaction{txn}) + } + + // fund and sign txn + basis, toSign, err := s.wallet.FundV2Transaction(&txn, txn.MinerFee, true) + if err != nil { + s.logger.Errorf("failed to fund contract expiration txn: %v", err) + return + } + s.wallet.SignV2Inputs(&txn, toSign) + + // verify txn and broadcast it + _, err = s.cm.AddV2PoolTransactions(basis, []types.V2Transaction{txn}) + if err != nil && + (strings.Contains(err.Error(), "has already been resolved") || + strings.Contains(err.Error(), "not present in the accumulator")) { + s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) + s.logger.With(zap.Error(err)).Debug("failed to broadcast contract expiration txn") + return + } else if err != nil { + s.logger.With(zap.Error(err)).Error("failed to broadcast contract expiration txn") + s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn}) + return + } + s.s.BroadcastV2TransactionSet(basis, []types.V2Transaction{txn}) + }(fce) } } diff --git a/internal/test/e2e/host.go b/internal/test/e2e/host.go index bc1adb8f1..45c1efd30 100644 --- a/internal/test/e2e/host.go +++ b/internal/test/e2e/host.go @@ -6,7 +6,6 @@ import ( "net" "os" "path/filepath" - "sync" "time" "go.sia.tech/core/consensus" @@ -37,73 +36,6 @@ const ( blocksPerMonth = blocksPerDay * 30 ) -type ephemeralPeerStore struct { - peers map[string]syncer.PeerInfo - bans map[string]time.Time - mu sync.Mutex -} - -func (eps *ephemeralPeerStore) AddPeer(addr string) error { - eps.mu.Lock() - defer eps.mu.Unlock() - eps.peers[addr] = syncer.PeerInfo{Address: addr} - return nil -} - -func (eps *ephemeralPeerStore) Peers() ([]syncer.PeerInfo, error) { - eps.mu.Lock() - defer eps.mu.Unlock() - var peers []syncer.PeerInfo - for _, peer := range eps.peers { - peers = append(peers, peer) - } - return peers, nil -} - -func (eps *ephemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { - eps.mu.Lock() - defer eps.mu.Unlock() - peer, ok := eps.peers[addr] - if !ok { - return syncer.PeerInfo{}, syncer.ErrPeerNotFound - } - return peer, nil -} - -func (eps *ephemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { - eps.mu.Lock() - defer eps.mu.Unlock() - peer, ok := eps.peers[addr] - if !ok { - return syncer.ErrPeerNotFound - } - fn(&peer) - eps.peers[addr] = peer - return nil -} - -func (eps *ephemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error { - eps.mu.Lock() - defer eps.mu.Unlock() - eps.bans[addr] = time.Now().Add(duration) - return nil -} - -// Banned returns true, nil if the peer is banned. -func (eps *ephemeralPeerStore) Banned(addr string) (bool, error) { - eps.mu.Lock() - defer eps.mu.Unlock() - t, ok := eps.bans[addr] - return ok && time.Now().Before(t), nil -} - -func newEphemeralPeerStore() syncer.PeerStore { - return &ephemeralPeerStore{ - peers: make(map[string]syncer.PeerInfo), - bans: make(map[string]time.Time), - } -} - // A Host is an ephemeral host that can be used for testing. type Host struct { dir string @@ -235,11 +167,20 @@ func NewHost(privKey types.PrivateKey, cm *chain.Manager, dir string, network *c if err != nil { return nil, fmt.Errorf("failed to create syncer listener: %w", err) } - s := syncer.New(l, cm, newEphemeralPeerStore(), gateway.Header{ + s := syncer.New(l, cm, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesisBlock.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: l.Addr().String(), - }, syncer.WithPeerDiscoveryInterval(100*time.Millisecond), syncer.WithSyncInterval(100*time.Millisecond)) + }, syncer.WithPeerDiscoveryInterval(100*time.Millisecond), + syncer.WithSyncInterval(100*time.Millisecond), + syncer.WithConnectTimeout(100*time.Millisecond), + syncer.WithPeerDiscoveryInterval(100*time.Millisecond), + syncer.WithMaxSendBlocks(1000), + syncer.WithRelayBlockOutlineTimeout(time.Second), + syncer.WithRelayHeaderTimeout(time.Second), + syncer.WithRelayTransactionSetTimeout(time.Second), + syncer.WithSendTransactionsTimeout(time.Second), + syncer.WithBanDuration(time.Second)) syncErrChan := make(chan error, 1) syncerCtx, syncerCancel := context.WithCancel(context.Background()) defer func() {