Skip to content

Commit

Permalink
Merge pull request #387 from SiaFoundation/nate/fix-tpool-race
Browse files Browse the repository at this point in the history
Merge chain manager and tpool in SingleAddressWallet
  • Loading branch information
n8maninger authored May 7, 2024
2 parents f9c899f + bc2a34c commit 00ff075
Show file tree
Hide file tree
Showing 18 changed files with 297 additions and 208 deletions.
4 changes: 2 additions & 2 deletions cmd/hostd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func newNode(ctx context.Context, walletKey types.PrivateKey, ex *explorer.Explo
// load the host identity
hostKey := db.HostKey()

cm, err := chain.NewManager(cs)
cm, err := chain.NewManager(cs, tp)
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to create chain manager: %w", err)
}

w, err := wallet.NewSingleAddressWallet(walletKey, cm, tp, db, logger.Named("wallet"))
w, err := wallet.NewSingleAddressWallet(walletKey, cm, db, logger.Named("wallet"))
if err != nil {
return nil, types.PrivateKey{}, fmt.Errorf("failed to create wallet: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions host/accounts/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func TestCredit(t *testing.T) {
tp := chain.NewTPool(stp)
defer tp.Close()

cm, err := chain.NewManager(cs)
cm, err := chain.NewManager(cs, tp)
if err != nil {
t.Fatal(err)
}
defer cm.Close()

w, err := wallet.NewSingleAddressWallet(types.NewPrivateKeyFromSeed(frand.Bytes(32)), cm, tp, db, log.Named("wallet"))
w, err := wallet.NewSingleAddressWallet(types.NewPrivateKeyFromSeed(frand.Bytes(32)), cm, db, log.Named("wallet"))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions host/accounts/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ func TestBudget(t *testing.T) {
tp := chain.NewTPool(stp)
defer tp.Close()

cm, err := chain.NewManager(cs)
cm, err := chain.NewManager(cs, tp)
if err != nil {
t.Fatal(err)
}
defer cm.Close()

w, err := wallet.NewSingleAddressWallet(types.NewPrivateKeyFromSeed(frand.Bytes(32)), cm, tp, db, log.Named("wallet"))
w, err := wallet.NewSingleAddressWallet(types.NewPrivateKeyFromSeed(frand.Bytes(32)), cm, db, log.Named("wallet"))
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 7 additions & 5 deletions host/contracts/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,17 @@ func (cm *ContractManager) handleContractAction(id types.FileContractID, height

fee := cm.tpool.RecommendedFee().Mul64(1000)
revisionTxn.MinerFees = append(revisionTxn.MinerFees, fee)
toSign, discard, err := cm.wallet.FundTransaction(&revisionTxn, fee)
toSign, release, err := cm.wallet.FundTransaction(&revisionTxn, fee)
if err != nil {
log.Error("failed to fund revision transaction", zap.Error(err))
return
}
defer discard()
if err := cm.wallet.SignTransaction(cs, &revisionTxn, toSign, types.CoveredFields{WholeTransaction: true}); err != nil {
release()
log.Error("failed to sign revision transaction", zap.Error(err))
return
} else if err := cm.tpool.AcceptTransactionSet([]types.Transaction{revisionTxn}); err != nil {
release()
log.Error("failed to broadcast revision transaction", zap.Error(err))
return
}
Expand Down Expand Up @@ -215,14 +216,12 @@ func (cm *ContractManager) handleContractAction(id types.FileContractID, height
StorageProofs: []types.StorageProof{sp},
},
}
intermediateToSign, discard, err := cm.wallet.FundTransaction(&resolutionTxnSet[0], fee)
intermediateToSign, release, err := cm.wallet.FundTransaction(&resolutionTxnSet[0], fee)
if err != nil {
log.Error("failed to fund resolution transaction", zap.Error(err))
registerContractAlert(alerts.SeverityError, "Failed to fund resolution transaction", err)
return
}
defer discard()

// add the intermediate output to the proof transaction
resolutionTxnSet[1].SiacoinInputs = append(resolutionTxnSet[1].SiacoinInputs, types.SiacoinInput{
ParentID: resolutionTxnSet[0].SiacoinOutputID(0),
Expand All @@ -231,12 +230,15 @@ func (cm *ContractManager) handleContractAction(id types.FileContractID, height
proofToSign := []types.Hash256{types.Hash256(resolutionTxnSet[1].SiacoinInputs[0].ParentID)}
start = time.Now()
if err := cm.wallet.SignTransaction(cs, &resolutionTxnSet[0], intermediateToSign, types.CoveredFields{WholeTransaction: true}); err != nil { // sign the intermediate transaction
release()
log.Error("failed to sign resolution intermediate transaction", zap.Error(err))
return
} else if err := cm.wallet.SignTransaction(cs, &resolutionTxnSet[1], proofToSign, types.CoveredFields{WholeTransaction: true}); err != nil { // sign the proof transaction
release()
log.Error("failed to sign resolution transaction", zap.Error(err))
return
} else if err := cm.tpool.AcceptTransactionSet(resolutionTxnSet); err != nil { // broadcast the transaction set
release()
buf, _ := json.Marshal(resolutionTxnSet)
log.Error("failed to broadcast resolution transaction set", zap.Error(err), zap.ByteString("transactionSet", buf))
registerContractAlert(alerts.SeverityError, "Failed to broadcast resolution transaction set", err)
Expand Down
5 changes: 3 additions & 2 deletions host/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func formContract(renterKey, hostKey types.PrivateKey, start, end uint64, renter
txn := types.Transaction{
FileContracts: []types.FileContract{contract},
}
toSign, discard, err := w.FundTransaction(&txn, formationCost.Add(hostPayout)) // we're funding both sides of the payout
toSign, release, err := w.FundTransaction(&txn, formationCost.Add(hostPayout)) // we're funding both sides of the payout
if err != nil {
return contracts.SignedRevision{}, fmt.Errorf("failed to fund transaction: %w", err)
}
defer discard()
if err := w.SignTransaction(state, &txn, toSign, types.CoveredFields{WholeTransaction: true}); err != nil {
release()
return contracts.SignedRevision{}, fmt.Errorf("failed to sign transaction: %w", err)
} else if err := tp.AcceptTransactionSet([]types.Transaction{txn}); err != nil {
release()
return contracts.SignedRevision{}, fmt.Errorf("failed to accept transaction set: %w", err)
}
revision := types.FileContractRevision{
Expand Down
3 changes: 2 additions & 1 deletion host/settings/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ func (m *ConfigManager) Announce() error {
if err != nil {
return fmt.Errorf("failed to fund transaction: %w", err)
}
defer release()
// sign the transaction
err = m.wallet.SignTransaction(m.cm.TipState(), &txn, toSign, types.CoveredFields{WholeTransaction: true})
if err != nil {
release()
return fmt.Errorf("failed to sign transaction: %w", err)
}
// broadcast the transaction
err = m.tp.AcceptTransactionSet([]types.Transaction{txn})
if err != nil {
release()
return fmt.Errorf("failed to broadcast transaction: %w", err)
}
m.log.Debug("broadcast announcement", zap.String("transactionID", txn.ID().String()), zap.String("netaddress", settings.NetAddress), zap.String("cost", minerFee.ExactString()))
Expand Down
134 changes: 113 additions & 21 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.sia.tech/hostd/webhooks"
"go.sia.tech/siad/modules/consensus"
"go.sia.tech/siad/modules/gateway"
"go.sia.tech/siad/modules/transactionpool"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)
Expand Down Expand Up @@ -60,11 +61,17 @@ func TestVolumeLoad(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
Expand Down Expand Up @@ -170,11 +177,17 @@ func TestAddVolume(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
Expand Down Expand Up @@ -243,11 +256,17 @@ func TestRemoveVolume(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
Expand Down Expand Up @@ -411,11 +430,17 @@ func TestRemoveCorrupt(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
Expand Down Expand Up @@ -611,12 +636,18 @@ func TestRemoveMissing(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
webhookReporter, err := webhooks.NewManager(db, log.Named("webhooks"))
Expand Down Expand Up @@ -788,11 +819,17 @@ func TestVolumeConcurrency(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
Expand Down Expand Up @@ -948,11 +985,17 @@ func TestVolumeGrow(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
defer cm.Close()
defer cm.Close()

// initialize the storage manager
Expand Down Expand Up @@ -1067,7 +1110,14 @@ func TestVolumeShrink(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1229,7 +1279,14 @@ func TestVolumeManagerReadWrite(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1348,7 +1405,14 @@ func TestSectorCache(t *testing.T) {
}
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
t.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1477,7 +1541,14 @@ func BenchmarkVolumeManagerWrite(b *testing.B) {
b.Fatal(err)
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
b.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1552,7 +1623,14 @@ func BenchmarkNewVolume(b *testing.B) {
b.Fatal(err)
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
b.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1610,7 +1688,14 @@ func BenchmarkVolumeManagerRead(b *testing.B) {
b.Fatal(err)
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
b.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1687,7 +1772,14 @@ func BenchmarkVolumeRemove(b *testing.B) {
b.Fatal(err)
default:
}
cm, err := chain.NewManager(cs)

tp, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool"))
if err != nil {
b.Fatal(err)
}
defer tp.Close()

cm, err := chain.NewManager(cs, chain.NewTPool(tp))
if err != nil {
b.Fatal(err)
}
Expand Down
Loading

0 comments on commit 00ff075

Please sign in to comment.