Skip to content

Commit

Permalink
use goroutine to broadcast file contract revision and refactor test h…
Browse files Browse the repository at this point in the history
…ost creation
  • Loading branch information
ChrisSchinnerl committed Dec 18, 2024
1 parent fba07f8 commit b56b9d2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 101 deletions.
5 changes: 4 additions & 1 deletion internal/accounts/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -341,7 +342,9 @@ func (a *Manager) refillAccounts() {

// fetch all usable hosts
hosts, err := a.hs.UsableHosts(a.shutdownCtx)
if err != nil {
if utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts for refill: %v", err))
return
}
Expand Down
64 changes: 34 additions & 30 deletions internal/bus/chainsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,38 +415,42 @@ func (s *chainSubscriber) broadcastExpiredFileContractResolutions(tx sql.ChainUp
s.logger.Errorf("failed to get expired file contract elements: %v", err)
return
}

for _, fce := range expiredFCEs {
txn := types.V2Transaction{
MinerFee: s.cm.RecommendedFee().Mul64(ContractResolutionTxnWeight),
FileContractResolutions: []types.V2FileContractResolution{
{
Parent: fce,
Resolution: &types.V2FileContractExpiration{},
go func(fce types.V2FileContractElement) {
txn := types.V2Transaction{
MinerFee: s.cm.RecommendedFee().Mul64(ContractResolutionTxnWeight),
FileContractResolutions: []types.V2FileContractResolution{
{
Parent: fce,
Resolution: &types.V2FileContractExpiration{},
},
},
},
}
// fund and sign txn
basis, toSign, err := s.wallet.FundV2Transaction(&txn, txn.MinerFee, true)
if err != nil {
s.logger.Errorf("failed to fund contract expiration txn: %v", err)
continue
}
s.wallet.SignV2Inputs(&txn, toSign)

// verify txn and broadcast it
_, err = s.cm.AddV2PoolTransactions(basis, []types.V2Transaction{txn})
if err != nil &&
(strings.Contains(err.Error(), "has already been resolved") ||
strings.Contains(err.Error(), "not present in the accumulator")) {
s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn})
s.logger.With(zap.Error(err)).Debug("failed to broadcast contract expiration txn")
continue
} else if err != nil {
s.logger.With(zap.Error(err)).Error("failed to broadcast contract expiration txn")
s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn})
continue
}
s.s.BroadcastV2TransactionSet(basis, []types.V2Transaction{txn})
}

// fund and sign txn
basis, toSign, err := s.wallet.FundV2Transaction(&txn, txn.MinerFee, true)
if err != nil {
s.logger.Errorf("failed to fund contract expiration txn: %v", err)
return
}
s.wallet.SignV2Inputs(&txn, toSign)

// verify txn and broadcast it
_, err = s.cm.AddV2PoolTransactions(basis, []types.V2Transaction{txn})
if err != nil &&
(strings.Contains(err.Error(), "has already been resolved") ||
strings.Contains(err.Error(), "not present in the accumulator")) {
s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn})
s.logger.With(zap.Error(err)).Debug("failed to broadcast contract expiration txn")
return
} else if err != nil {
s.logger.With(zap.Error(err)).Error("failed to broadcast contract expiration txn")
s.wallet.ReleaseInputs(nil, []types.V2Transaction{txn})
return
}
s.s.BroadcastV2TransactionSet(basis, []types.V2Transaction{txn})
}(fce)
}
}

Expand Down
81 changes: 11 additions & 70 deletions internal/test/e2e/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net"
"os"
"path/filepath"
"sync"
"time"

"go.sia.tech/core/consensus"
Expand Down Expand Up @@ -37,73 +36,6 @@ const (
blocksPerMonth = blocksPerDay * 30
)

type ephemeralPeerStore struct {
peers map[string]syncer.PeerInfo
bans map[string]time.Time
mu sync.Mutex
}

func (eps *ephemeralPeerStore) AddPeer(addr string) error {
eps.mu.Lock()
defer eps.mu.Unlock()
eps.peers[addr] = syncer.PeerInfo{Address: addr}
return nil
}

func (eps *ephemeralPeerStore) Peers() ([]syncer.PeerInfo, error) {
eps.mu.Lock()
defer eps.mu.Unlock()
var peers []syncer.PeerInfo
for _, peer := range eps.peers {
peers = append(peers, peer)
}
return peers, nil
}

func (eps *ephemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
eps.mu.Lock()
defer eps.mu.Unlock()
peer, ok := eps.peers[addr]
if !ok {
return syncer.PeerInfo{}, syncer.ErrPeerNotFound
}
return peer, nil
}

func (eps *ephemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
eps.mu.Lock()
defer eps.mu.Unlock()
peer, ok := eps.peers[addr]
if !ok {
return syncer.ErrPeerNotFound
}
fn(&peer)
eps.peers[addr] = peer
return nil
}

func (eps *ephemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error {
eps.mu.Lock()
defer eps.mu.Unlock()
eps.bans[addr] = time.Now().Add(duration)
return nil
}

// Banned returns true, nil if the peer is banned.
func (eps *ephemeralPeerStore) Banned(addr string) (bool, error) {
eps.mu.Lock()
defer eps.mu.Unlock()
t, ok := eps.bans[addr]
return ok && time.Now().Before(t), nil
}

func newEphemeralPeerStore() syncer.PeerStore {
return &ephemeralPeerStore{
peers: make(map[string]syncer.PeerInfo),
bans: make(map[string]time.Time),
}
}

// A Host is an ephemeral host that can be used for testing.
type Host struct {
dir string
Expand Down Expand Up @@ -235,11 +167,20 @@ func NewHost(privKey types.PrivateKey, cm *chain.Manager, dir string, network *c
if err != nil {
return nil, fmt.Errorf("failed to create syncer listener: %w", err)
}
s := syncer.New(l, cm, newEphemeralPeerStore(), gateway.Header{
s := syncer.New(l, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesisBlock.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l.Addr().String(),
}, syncer.WithPeerDiscoveryInterval(100*time.Millisecond), syncer.WithSyncInterval(100*time.Millisecond))
}, syncer.WithPeerDiscoveryInterval(100*time.Millisecond),
syncer.WithSyncInterval(100*time.Millisecond),
syncer.WithConnectTimeout(100*time.Millisecond),
syncer.WithPeerDiscoveryInterval(100*time.Millisecond),
syncer.WithMaxSendBlocks(1000),
syncer.WithRelayBlockOutlineTimeout(time.Second),
syncer.WithRelayHeaderTimeout(time.Second),
syncer.WithRelayTransactionSetTimeout(time.Second),
syncer.WithSendTransactionsTimeout(time.Second),
syncer.WithBanDuration(time.Second))
syncErrChan := make(chan error, 1)
syncerCtx, syncerCancel := context.WithCancel(context.Background())
defer func() {
Expand Down

0 comments on commit b56b9d2

Please sign in to comment.