Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use SingleAddressWallet from coreutils #928

Merged
merged 69 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
b5b539b
all: use single address wallet from coreutils
peterjan Feb 5, 2024
32641ab
wallet: make sure to return the old types for the time being
peterjan Feb 5, 2024
2e79b52
Merge branch 'pj/peer-store' into pj/single-address-wallet
peterjan Feb 5, 2024
0de27fc
Merge branch 'pj/peer-store' into pj/single-address-wallet
peterjan Feb 6, 2024
f33f6b5
internal: upgrade coreutils
peterjan Feb 6, 2024
5b88ac9
internal: subscribe wallet store
peterjan Feb 6, 2024
ddc1131
all: upgrade coreutils
peterjan Feb 14, 2024
3ec50bc
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Feb 15, 2024
0b10798
Merge branch 'pj/upgrade-coreutils' into pj/single-address-wallet
peterjan Feb 15, 2024
5fec9dc
stores: migrate wallet tables to wallet_infos and wallet_events
peterjan Feb 16, 2024
00a1a3c
Merge branch 'pj/upgrade-coreutils' into pj/single-address-wallet
peterjan Feb 16, 2024
00771dc
store: fix subscriber deadlock
peterjan Feb 16, 2024
2aec884
stores: remove old subscriber code
peterjan Feb 16, 2024
9186202
bus: fix japecheck
peterjan Feb 22, 2024
dee01c3
stores: fix subscriber close
peterjan Feb 22, 2024
7ead5e6
stores: remove CCID
peterjan Feb 22, 2024
b2ac9ea
stores: fix unit tests
peterjan Feb 22, 2024
7643137
Merge branch 'dev' of github.com:SiaFoundation/renterd into pj/single…
peterjan Feb 27, 2024
cfea189
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Feb 27, 2024
47bec50
testing: fix unit tests
peterjan Feb 27, 2024
76b42f6
stores: fix key length
peterjan Feb 27, 2024
20f2ab6
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Feb 27, 2024
0bf1f5a
stores: fix key length
peterjan Feb 27, 2024
53b1ebf
testing: fix TestUploadDownloadBasic
peterjan Feb 28, 2024
93b9597
stores: fix TestTypeMerkleProof
peterjan Feb 28, 2024
14043a0
stores: handle TODOs
peterjan Feb 28, 2024
e5b55d4
testing: fix TestEphemeralAccounts
peterjan Feb 28, 2024
014ffc9
stores: record wallet metrics
peterjan Feb 28, 2024
5cd6577
testing: fix deadlock
peterjan Feb 28, 2024
47bcefb
stores: fix TestWalletTransactions
peterjan Feb 28, 2024
7b37f99
testing: skip TestWalletFormUnconfirmed
peterjan Feb 28, 2024
d0bc5ae
bus: add TODO for wallet metrics
peterjan Feb 29, 2024
c5dcbcd
testing: enable TestWallet
peterjan Feb 29, 2024
21064dd
Merge branch 'dev' of github.com:SiaFoundation/renterd into pj/single…
peterjan Feb 29, 2024
6edee81
autopilot: revert ap changes
peterjan Feb 29, 2024
eb35fc9
all: cleanup PR
peterjan Feb 29, 2024
de6345c
stores: use syncer.PeerNotFound
peterjan Feb 29, 2024
aabb47c
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Feb 29, 2024
751b6dd
node: close chain store
peterjan Feb 29, 2024
89f4335
stores: update newChainSubscriber
peterjan Feb 29, 2024
036839a
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Mar 5, 2024
23e1ad9
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Mar 5, 2024
d7922cb
testing: remove TODO
peterjan Mar 6, 2024
f9ff9da
testing: fix TestWalletFormUnconfirmed
peterjan Mar 7, 2024
ec29f86
testing: mine an extra block when funding the cluster
peterjan Mar 7, 2024
8eee325
coreutils: upgrade deps to have wallet logging
peterjan Mar 7, 2024
0ca4490
testing: trigger autopilot until host gets pruned
peterjan Mar 7, 2024
bc2fdae
test: mine extra blocks
peterjan Mar 7, 2024
f5637a5
testing: add logging
peterjan Mar 7, 2024
a35be42
testing: add logging
peterjan Mar 7, 2024
b9e9d20
testing: add logging
peterjan Mar 7, 2024
921c219
testing: fix TestHostPruning NDF
peterjan Mar 7, 2024
afeea3f
Merge branch 'its-happening' into pj/single-address-wallet
peterjan Mar 11, 2024
3b8cc6b
all: implement CR remarks
peterjan Mar 11, 2024
25f4f0e
all: update toolchain and key string
peterjan Mar 12, 2024
0e46f01
stores: implement PR remarks
peterjan Mar 12, 2024
bc1eb41
bus: add missing interfaces
peterjan Mar 12, 2024
24dc421
bus: add missing interfaces
peterjan Mar 12, 2024
92e8214
modules: revert toolchain upgrade
peterjan Mar 12, 2024
0a1f609
alerts: fix TestAlerts
peterjan Mar 12, 2024
6e0a5c1
autopilot: try fix TestHostPruning NDF
peterjan Mar 12, 2024
7f2a8d7
testing: add logging
peterjan Mar 12, 2024
e3ce6e4
all: upgrade coreutils
peterjan Mar 14, 2024
a156e4e
bus: revert alert change
peterjan Mar 14, 2024
80d98b9
internal: add comment
peterjan Mar 14, 2024
b9ae383
stores: pass rev number & filesize from the fce
peterjan Mar 14, 2024
9618fbf
autopilog: fix race
peterjan Mar 14, 2024
8a9ed02
testing: add TestContractApplyChainUpdates
peterjan Mar 15, 2024
1346f09
stores: update processContract
peterjan Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func (c *contractor) renewContract(ctx context.Context, w Worker, ci contractInf
"renterFunds", renterFunds,
"expectedNewStorage", expectedNewStorage,
)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if isErr(err, wallet.ErrNotEnoughFunds) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func (c *contractor) refreshContract(ctx context.Context, w Worker, ci contractI
return api.ContractMetadata{}, true, err
}
c.logger.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if isErr(err, wallet.ErrNotEnoughFunds) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (c *contractor) formContract(ctx context.Context, w Worker, host hostdb.Hos
if err != nil {
// TODO: keep track of consecutive failures and break at some point
c.logger.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if isErr(err, wallet.ErrNotEnoughFunds) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down
176 changes: 104 additions & 72 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/gofakes3"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
Expand All @@ -25,7 +28,7 @@ import (
"go.sia.tech/renterd/bus/client"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
rwallet "go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
"go.sia.tech/siad/modules"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,8 +88,8 @@ type (
ReleaseInputs(txn ...types.Transaction)
SignTransaction(cs consensus.State, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error
Tip() (types.ChainIndex, error)
Transactions(offset, limit int) ([]wallet.Transaction, error)
UnspentOutputs() ([]wallet.SiacoinElement, error)
Transactions(offset, limit int) ([]rwallet.Transaction, error)
UnspentOutputs() ([]rwallet.SiacoinElement, error)
}

// A HostDB stores information about hosts.
Expand Down Expand Up @@ -211,17 +214,16 @@ type (
type bus struct {
startTime time.Time

cm ChainManager
s Syncer
tp TransactionPool
cm *chain.Manager
s *syncer.Syncer
w *wallet.SingleAddressWallet

as AutopilotStore
eas EphemeralAccountStore
hdb HostDB
ms MetadataStore
ss SettingStore
mtrcs MetricsStore
w Wallet

accounts *accounts
contractLocks *contractLocks
Expand Down Expand Up @@ -400,17 +402,18 @@ func (b *bus) consensusAcceptBlock(jc jape.Context) {
if jc.Decode(&block) != nil {
return
}
if jc.Check("failed to accept block", b.cm.AcceptBlock(block)) != nil {

// TODO: should we extend the API with a way to accept multiple blocks at once?
// TODO: should we deprecate this route in favor of /addblocks

if jc.Check("failed to accept block", b.cm.AddBlocks([]types.Block{block})) != nil {
return
}
}

func (b *bus) syncerAddrHandler(jc jape.Context) {
addr, err := b.s.SyncerAddress(jc.Request.Context())
if jc.Check("failed to fetch syncer's address", err) != nil {
return
}
jc.Encode(addr)
// TODO: have syncer accept contexts
jc.Encode(b.s.Addr())
}

func (b *bus) syncerPeersHandler(jc jape.Context) {
Expand All @@ -420,7 +423,8 @@ func (b *bus) syncerPeersHandler(jc jape.Context) {
func (b *bus) syncerConnectHandler(jc jape.Context) {
var addr string
if jc.Decode(&addr) == nil {
jc.Check("couldn't connect to peer", b.s.Connect(addr))
_, err := b.s.Connect(addr)
jc.Check("couldn't connect to peer", err)
}
}

Expand All @@ -435,18 +439,20 @@ func (b *bus) consensusNetworkHandler(jc jape.Context) {
}

func (b *bus) txpoolFeeHandler(jc jape.Context) {
fee := b.tp.RecommendedFee()
jc.Encode(fee)
// TODO: have chain manager accept contexts
jc.Encode(b.cm.RecommendedFee())
}

func (b *bus) txpoolTransactionsHandler(jc jape.Context) {
jc.Encode(b.tp.Transactions())
jc.Encode(b.cm.PoolTransactions())
}

func (b *bus) txpoolBroadcastHandler(jc jape.Context) {
var txnSet []types.Transaction
if jc.Decode(&txnSet) == nil {
jc.Check("couldn't broadcast transaction set", b.tp.AcceptTransactionSet(txnSet))
// TODO: should we handle 'known' return value
_, err := b.cm.AddPoolTransactions(txnSet)
jc.Check("couldn't broadcast transaction set", err)
}
}

Expand Down Expand Up @@ -551,7 +557,7 @@ func (b *bus) walletTransactionsHandler(jc jape.Context) {
if before.IsZero() && since.IsZero() {
txns, err := b.w.Transactions(offset, limit)
if jc.Check("couldn't load transactions", err) == nil {
jc.Encode(txns)
jc.Encode(rwallet.ConvertToTransactions(txns))
}
return
}
Expand All @@ -571,15 +577,15 @@ func (b *bus) walletTransactionsHandler(jc jape.Context) {
}
txns = filtered
if limit == 0 || limit == -1 {
jc.Encode(txns[offset:])
jc.Encode(rwallet.ConvertToTransactions(txns[offset:]))
} else {
jc.Encode(txns[offset : offset+limit])
jc.Encode(rwallet.ConvertToTransactions(txns[offset : offset+limit]))
}
return
}

func (b *bus) walletOutputsHandler(jc jape.Context) {
utxos, err := b.w.UnspentOutputs()
utxos, err := b.w.SpendableOutputs()
if jc.Check("couldn't load outputs", err) == nil {
jc.Encode(utxos)
}
Expand All @@ -593,22 +599,21 @@ func (b *bus) walletFundHandler(jc jape.Context) {
txn := wfr.Transaction
if len(txn.MinerFees) == 0 {
// if no fees are specified, we add some
fee := b.tp.RecommendedFee().Mul64(b.cm.TipState().TransactionWeight(txn))
fee := b.cm.RecommendedFee().Mul64(b.cm.TipState().TransactionWeight(txn))
txn.MinerFees = []types.Currency{fee}
}
toSign, err := b.w.FundTransaction(b.cm.TipState(), &txn, wfr.Amount.Add(txn.MinerFees[0]), wfr.UseUnconfirmedTxns)

toSign, err := b.w.FundTransaction(&txn, wfr.Amount.Add(txn.MinerFees[0]), wfr.UseUnconfirmedTxns)
if jc.Check("couldn't fund transaction", err) != nil {
return
}
parents, err := b.tp.UnconfirmedParents(txn)
if jc.Check("couldn't load transaction dependencies", err) != nil {
b.w.ReleaseInputs(txn)
return
}

// TODO: UnconfirmedParents needs a ctx (be sure to release inputs on err)

jc.Encode(api.WalletFundResponse{
Transaction: txn,
ToSign: toSign,
DependsOn: parents,
DependsOn: b.cm.UnconfirmedParents(txn),
})
}

Expand All @@ -617,10 +622,8 @@ func (b *bus) walletSignHandler(jc jape.Context) {
if jc.Decode(&wsr) != nil {
return
}
err := b.w.SignTransaction(b.cm.TipState(), &wsr.Transaction, wsr.ToSign, wsr.CoveredFields)
if jc.Check("couldn't sign transaction", err) == nil {
jc.Encode(wsr.Transaction)
}
b.w.SignTransaction(&wsr.Transaction, wsr.ToSign, wsr.CoveredFields)
jc.Encode(wsr.Transaction)
}

func (b *bus) walletRedistributeHandler(jc jape.Context) {
Expand All @@ -633,23 +636,20 @@ func (b *bus) walletRedistributeHandler(jc jape.Context) {
return
}

cs := b.cm.TipState()
txns, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions())
txns, toSign, err := b.w.Redistribute(wfr.Outputs, wfr.Amount, b.cm.RecommendedFee())
if jc.Check("couldn't redistribute money in the wallet into the desired outputs", err) != nil {
return
}

var ids []types.TransactionID
for i := 0; i < len(txns); i++ {
err = b.w.SignTransaction(cs, &txns[i], toSign, types.CoveredFields{WholeTransaction: true})
if jc.Check("couldn't sign the transaction", err) != nil {
b.w.ReleaseInputs(txns...)
return
}
b.w.SignTransaction(&txns[i], toSign, types.CoveredFields{WholeTransaction: true})
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
ids = append(ids, txns[i].ID())
}

if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet(txns)) != nil {
// TODO: should we handle 'known' return parameter here
_, err = b.cm.AddPoolTransactions(txns)
if jc.Check("couldn't broadcast the transaction", err) != nil {
b.w.ReleaseInputs(txns...)
return
}
Expand Down Expand Up @@ -684,23 +684,15 @@ func (b *bus) walletPrepareFormHandler(jc jape.Context) {
txn := types.Transaction{
FileContracts: []types.FileContract{fc},
}
txn.MinerFees = []types.Currency{b.tp.RecommendedFee().Mul64(cs.TransactionWeight(txn))}
toSign, err := b.w.FundTransaction(cs, &txn, cost.Add(txn.MinerFees[0]), true)
txn.MinerFees = []types.Currency{b.cm.RecommendedFee().Mul64(cs.TransactionWeight(txn))}
toSign, err := b.w.FundTransaction(&txn, cost.Add(txn.MinerFees[0]), true)
if jc.Check("couldn't fund transaction", err) != nil {
return
}
cf := wallet.ExplicitCoveredFields(txn)
err = b.w.SignTransaction(cs, &txn, toSign, cf)
if jc.Check("couldn't sign transaction", err) != nil {
b.w.ReleaseInputs(txn)
return
}
parents, err := b.tp.UnconfirmedParents(txn)
if jc.Check("couldn't load transaction dependencies", err) != nil {
b.w.ReleaseInputs(txn)
return
}
jc.Encode(append(parents, txn))
b.w.SignTransaction(&txn, toSign, ExplicitCoveredFields(txn))
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved

// TODO: UnconfirmedParents needs a ctx (be sure to release inputs on err)
jc.Encode(append(b.cm.UnconfirmedParents(txn), txn))
}

func (b *bus) walletPrepareRenewHandler(jc jape.Context) {
Expand Down Expand Up @@ -741,20 +733,16 @@ func (b *bus) walletPrepareRenewHandler(jc jape.Context) {
// Fund the txn. We are not signing it yet since it's not complete. The host
// still needs to complete it and the revision + contract are signed with
// the renter key by the worker.
toSign, err := b.w.FundTransaction(cs, &txn, cost, true)
toSign, err := b.w.FundTransaction(&txn, cost, true)
if jc.Check("couldn't fund transaction", err) != nil {
return
}

// Add any required parents.
parents, err := b.tp.UnconfirmedParents(txn)
if jc.Check("couldn't load transaction dependencies", err) != nil {
b.w.ReleaseInputs(txn)
return
}
// TODO: UnconfirmedParents needs a ctx (be sure to release inputs on err)

jc.Encode(api.WalletPrepareRenewResponse{
ToSign: toSign,
TransactionSet: append(parents, txn),
TransactionSet: append(b.cm.UnconfirmedParents(txn), txn),
})
}

Expand All @@ -774,7 +762,7 @@ func (b *bus) walletPendingHandler(jc jape.Context) {
return false
}

txns := b.tp.Transactions()
txns := b.cm.PoolTransactions()
relevant := txns[:0]
for _, txn := range txns {
if isRelevant(txn) {
Expand Down Expand Up @@ -1706,10 +1694,19 @@ func (b *bus) paramsHandlerUploadGET(jc jape.Context) {
}

func (b *bus) consensusState() api.ConsensusState {
// TODO: this should be a method on the syncer
var n int
for _, peer := range b.s.Peers() {
if peer.Synced() {
n++
}
}
synced := float64(n)/float64(len(b.s.Peers())) >= .3

return api.ConsensusState{
BlockHeight: b.cm.TipState().Index.Height,
LastBlockTime: api.TimeRFC3339(b.cm.LastBlockTime()),
Synced: b.cm.Synced(),
LastBlockTime: api.TimeRFC3339(b.cm.TipState().PrevTimestamps[0]),
Synced: synced,
}
}

Expand Down Expand Up @@ -1742,7 +1739,7 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) {
ConsensusState: cs,
GougingSettings: gs,
RedundancySettings: rs,
TransactionFee: b.tp.RecommendedFee(),
TransactionFee: b.cm.RecommendedFee(),
}, nil
}

Expand Down Expand Up @@ -2287,15 +2284,50 @@ func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) {
jc.Encode(resp)
}

// ExplicitCoveredFields returns a CoveredFields that covers all elements
// present in txn.
//
// TODO: where should this live
func ExplicitCoveredFields(txn types.Transaction) (cf types.CoveredFields) {
peterjan marked this conversation as resolved.
Show resolved Hide resolved
for i := range txn.SiacoinInputs {
cf.SiacoinInputs = append(cf.SiacoinInputs, uint64(i))
}
for i := range txn.SiacoinOutputs {
cf.SiacoinOutputs = append(cf.SiacoinOutputs, uint64(i))
}
for i := range txn.FileContracts {
cf.FileContracts = append(cf.FileContracts, uint64(i))
}
for i := range txn.FileContractRevisions {
cf.FileContractRevisions = append(cf.FileContractRevisions, uint64(i))
}
for i := range txn.StorageProofs {
cf.StorageProofs = append(cf.StorageProofs, uint64(i))
}
for i := range txn.SiafundInputs {
cf.SiafundInputs = append(cf.SiafundInputs, uint64(i))
}
for i := range txn.SiafundOutputs {
cf.SiafundOutputs = append(cf.SiafundOutputs, uint64(i))
}
for i := range txn.MinerFees {
cf.MinerFees = append(cf.MinerFees, uint64(i))
}
for i := range txn.ArbitraryData {
cf.ArbitraryData = append(cf.ArbitraryData, uint64(i))
}
for i := range txn.Signatures {
cf.Signatures = append(cf.Signatures, uint64(i))
}
return
}

// New returns a new Bus.
func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, mtrcs MetricsStore, l *zap.Logger) (*bus, error) {
func New(am *alerts.Manager, hm *webhooks.Manager, cm *chain.Manager, s *syncer.Syncer, w *wallet.SingleAddressWallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, mtrcs MetricsStore, l *zap.Logger) (*bus, error) {
peterjan marked this conversation as resolved.
Show resolved Hide resolved
b := &bus{
alerts: alerts.WithOrigin(am, "bus"),
alertMgr: am,
hooks: hm,
s: s,
cm: cm,
tp: tp,
w: w,
hdb: hdb,
as: as,
Expand Down
Loading
Loading