diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 43b54023d..b4ab8cc5e 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -87,7 +87,7 @@ type Bus interface { WalletDiscard(ctx context.Context, txn types.Transaction) error WalletOutputs(ctx context.Context) (resp []wallet.SiacoinElement, err error) WalletPending(ctx context.Context) (resp []types.Transaction, err error) - WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error) + WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (ids []types.TransactionID, err error) } type Autopilot struct { diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 16666d16c..04b68c640 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -91,7 +91,7 @@ type ( resolver *ipResolver logger *zap.SugaredLogger - maintenanceTxnID types.TransactionID + maintenanceTxnIDs []types.TransactionID revisionBroadcastInterval time.Duration revisionLastBroadcast map[types.FileContractID]time.Time @@ -579,9 +579,11 @@ func (c *contractor) performWalletMaintenance(ctx context.Context) error { return nil } for _, txn := range pending { - if c.maintenanceTxnID == txn.ID() { - l.Debugf("wallet maintenance skipped, pending transaction found with id %v", c.maintenanceTxnID) - return nil + for _, mTxnID := range c.maintenanceTxnIDs { + if mTxnID == txn.ID() { + l.Debugf("wallet maintenance skipped, pending transaction found with id %v", mTxnID) + return nil + } } } @@ -607,13 +609,13 @@ func (c *contractor) performWalletMaintenance(ctx context.Context) error { } // redistribute outputs - id, err := b.WalletRedistribute(ctx, int(outputs), amount) + ids, err := b.WalletRedistribute(ctx, int(outputs), amount) if err != nil { return fmt.Errorf("failed to redistribute wallet into %d outputs of amount %v, balance %v, err %v", outputs, amount, balance, err) } - l.Debugf("wallet maintenance succeeded, tx %v", id) - c.maintenanceTxnID = id + l.Debugf("wallet maintenance succeeded, txns %v", ids) + c.maintenanceTxnIDs = ids return nil } diff --git a/bus/bus.go b/bus/bus.go index 9689f307c..7fc7e0c8b 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -83,8 +83,8 @@ type ( Balance() (spendable, confirmed, unconfirmed types.Currency, _ error) FundTransaction(cs consensus.State, txn *types.Transaction, amount types.Currency, useUnconfirmedTxns bool) ([]types.Hash256, error) Height() uint64 - Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) (types.Transaction, []types.Hash256, error) - ReleaseInputs(txn types.Transaction) + Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) ([]types.Transaction, []types.Hash256, error) + ReleaseInputs(txn ...types.Transaction) SignTransaction(cs consensus.State, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error Transactions(before, since time.Time, offset, limit int) ([]wallet.Transaction, error) UnspentOutputs() ([]wallet.SiacoinElement, error) @@ -602,22 +602,27 @@ func (b *bus) walletRedistributeHandler(jc jape.Context) { } cs := b.cm.TipState() - txn, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions()) + txns, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions()) if jc.Check("couldn't redistribute money in the wallet into the desired outputs", err) != nil { return } - err = b.w.SignTransaction(cs, &txn, toSign, types.CoveredFields{WholeTransaction: true}) - if jc.Check("couldn't sign the transaction", err) != nil { - return + var ids []types.TransactionID + for i := 0; i < len(txns); i++ { + err = b.w.SignTransaction(cs, &txns[i], toSign, types.CoveredFields{WholeTransaction: true}) + if jc.Check("couldn't sign the transaction", err) != nil { + b.w.ReleaseInputs(txns...) + return + } + ids = append(ids, txns[i].ID()) } - if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet([]types.Transaction{txn})) != nil { - b.w.ReleaseInputs(txn) + if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet(txns)) != nil { + b.w.ReleaseInputs(txns...) return } - jc.Encode(txn.ID()) + jc.Encode(ids) } func (b *bus) walletDiscardHandler(jc jape.Context) { diff --git a/bus/client/wallet.go b/bus/client/wallet.go index 7630654f6..0d4761e51 100644 --- a/bus/client/wallet.go +++ b/bus/client/wallet.go @@ -116,13 +116,13 @@ func (c *Client) WalletPrepareRenew(ctx context.Context, revision types.FileCont // WalletRedistribute broadcasts a transaction that redistributes the money in // the wallet in the desired number of outputs of given amount. If the // transaction was successfully broadcasted it will return the transaction ID. -func (c *Client) WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error) { +func (c *Client) WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (ids []types.TransactionID, err error) { req := api.WalletRedistributeRequest{ Amount: amount, Outputs: outputs, } - err = c.c.WithContext(ctx).POST("/wallet/redistribute", req, &id) + err = c.c.WithContext(ctx).POST("/wallet/redistribute", req, &ids) return } diff --git a/wallet/wallet.go b/wallet/wallet.go index 6882d9163..ea2859aa0 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "reflect" "sort" "sync" "time" @@ -16,12 +15,17 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/siad/modules" "go.uber.org/zap" - "lukechampine.com/frand" ) -// BytesPerInput is the encoded size of a SiacoinInput and corresponding -// TransactionSignature, assuming standard UnlockConditions. -const BytesPerInput = 241 +const ( + // BytesPerInput is the encoded size of a SiacoinInput and corresponding + // TransactionSignature, assuming standard UnlockConditions. + BytesPerInput = 241 + + // redistributeBatchSize is the number of outputs to redistribute per txn to + // avoid creating a txn that is too large. + redistributeBatchSize = 10 +) // ErrInsufficientBalance is returned when there aren't enough unused outputs to // cover the requested amount. @@ -223,14 +227,22 @@ func (w *SingleAddressWallet) FundTransaction(cs consensus.State, txn *types.Tra return nil, err } - // choose outputs randomly - frand.Shuffle(len(utxos), reflect.Swapper(utxos)) + // desc sort + sort.Slice(utxos, func(i, j int) bool { + return utxos[i].Value.Cmp(utxos[j].Value) > 0 + }) // add all unconfirmed outputs to the end of the slice as a last resort if useUnconfirmedTxns { + var tpoolUtxos []SiacoinElement for _, sco := range w.tpoolUtxos { - utxos = append(utxos, sco) + tpoolUtxos = append(tpoolUtxos, sco) } + // desc sort + sort.Slice(tpoolUtxos, func(i, j int) bool { + return tpoolUtxos[i].Value.Cmp(tpoolUtxos[j].Value) > 0 + }) + utxos = append(utxos, tpoolUtxos...) } var outputSum types.Currency @@ -270,9 +282,17 @@ func (w *SingleAddressWallet) FundTransaction(cs consensus.State, txn *types.Tra // ReleaseInputs is a helper function that releases the inputs of txn for use in // other transactions. It should only be called on transactions that are invalid // or will never be broadcast. -func (w *SingleAddressWallet) ReleaseInputs(txn types.Transaction) { - for _, in := range txn.SiacoinInputs { - delete(w.lastUsed, types.Hash256(in.ParentID)) +func (w *SingleAddressWallet) ReleaseInputs(txns ...types.Transaction) { + w.mu.Lock() + defer w.mu.Unlock() + w.releaseInputs(txns...) +} + +func (w *SingleAddressWallet) releaseInputs(txns ...types.Transaction) { + for _, txn := range txns { + for _, in := range txn.SiacoinInputs { + delete(w.lastUsed, types.Hash256(in.ParentID)) + } } } @@ -300,10 +320,7 @@ func (w *SingleAddressWallet) SignTransaction(cs consensus.State, txn *types.Tra // Redistribute returns a transaction that redistributes money in the wallet by // selecting a minimal set of inputs to cover the creation of the requested // outputs. It also returns a list of output IDs that need to be signed. -// -// NOTE: we can not reuse 'FundTransaction' because it randomizes the unspent -// transaction outputs it uses and we need a minimal set of inputs -func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) (types.Transaction, []types.Hash256, error) { +func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) ([]types.Transaction, []types.Hash256, error) { w.mu.Lock() defer w.mu.Unlock() @@ -318,7 +335,7 @@ func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amou // fetch unspent transaction outputs utxos, err := w.store.UnspentSiacoinElements(false) if err != nil { - return types.Transaction{}, nil, err + return nil, nil, err } // check whether a redistribution is necessary, adjust number of desired @@ -332,16 +349,7 @@ func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amou } } if outputs <= 0 { - return types.Transaction{}, nil, nil - } - - // prepare all outputs - var txn types.Transaction - for i := 0; i < int(outputs); i++ { - txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{ - Value: amount, - Address: w.Address(), - }) + return nil, nil, nil } // desc sort @@ -349,67 +357,85 @@ func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amou return utxos[i].Value.Cmp(utxos[j].Value) > 0 }) - // estimate the fees - outputFees := feePerByte.Mul64(uint64(len(encoding.Marshal(txn.SiacoinOutputs)))) - feePerInput := feePerByte.Mul64(BytesPerInput) + // prepare all outputs + var txns []types.Transaction + var toSign []types.Hash256 + + for outputs > 0 { + var txn types.Transaction + for i := 0; i < outputs && i < redistributeBatchSize; i++ { + txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{ + Value: amount, + Address: w.Address(), + }) + } + outputs -= len(txn.SiacoinOutputs) + + // estimate the fees + outputFees := feePerByte.Mul64(uint64(len(encoding.Marshal(txn.SiacoinOutputs)))) + feePerInput := feePerByte.Mul64(BytesPerInput) + + // collect outputs that cover the total amount + var inputs []SiacoinElement + want := amount.Mul64(uint64(len(txn.SiacoinOutputs))) + var amtInUse, amtSameValue, amtNotMatured types.Currency + for _, sce := range utxos { + inUse := w.isOutputUsed(sce.ID) || inPool[sce.ID] + matured := cs.Index.Height >= sce.MaturityHeight + sameValue := sce.Value.Equals(amount) + if inUse { + amtInUse = amtInUse.Add(sce.Value) + continue + } else if sameValue { + amtSameValue = amtSameValue.Add(sce.Value) + continue + } else if !matured { + amtNotMatured = amtNotMatured.Add(sce.Value) + continue + } - // collect outputs that cover the total amount - var inputs []SiacoinElement - want := amount.Mul64(uint64(outputs)) - var amtInUse, amtSameValue, amtNotMatured types.Currency - for _, sce := range utxos { - inUse := w.isOutputUsed(sce.ID) || inPool[sce.ID] - matured := cs.Index.Height >= sce.MaturityHeight - sameValue := sce.Value.Equals(amount) - if inUse { - amtInUse = amtInUse.Add(sce.Value) - continue - } else if sameValue { - amtSameValue = amtSameValue.Add(sce.Value) - continue - } else if !matured { - amtNotMatured = amtNotMatured.Add(sce.Value) - continue + inputs = append(inputs, sce) + fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees) + if SumOutputs(inputs).Cmp(want.Add(fee)) > 0 { + break + } } - inputs = append(inputs, sce) + // not enough outputs found fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees) - if SumOutputs(inputs).Cmp(want.Add(fee)) > 0 { - break + if sumOut := SumOutputs(inputs); sumOut.Cmp(want.Add(fee)) < 0 { + // in case of an error we need to free all inputs + w.releaseInputs(txns...) + return nil, nil, fmt.Errorf("%w: inputs %v < needed %v + txnFee %v (usable: %v, inUse: %v, sameValue: %v, notMatured: %v)", + ErrInsufficientBalance, sumOut.String(), want.String(), fee.String(), sumOut.String(), amtInUse.String(), amtSameValue.String(), amtNotMatured.String()) } - } - // not enough outputs found - fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees) - if sumOut := SumOutputs(inputs); sumOut.Cmp(want.Add(fee)) < 0 { - return types.Transaction{}, nil, fmt.Errorf("%w: inputs %v < needed %v + txnFee %v (usable: %v, inUse: %v, sameValue: %v, notMatured: %v)", - ErrInsufficientBalance, sumOut.String(), want.String(), fee.String(), sumOut.String(), amtInUse.String(), amtSameValue.String(), amtNotMatured.String()) - } + // set the miner fee + txn.MinerFees = []types.Currency{fee} - // set the miner fee - txn.MinerFees = []types.Currency{fee} + // add the change output + change := SumOutputs(inputs).Sub(want.Add(fee)) + if !change.IsZero() { + txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{ + Value: change, + Address: w.addr, + }) + } - // add the change output - change := SumOutputs(inputs).Sub(want.Add(fee)) - if !change.IsZero() { - txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{ - Value: change, - Address: w.addr, - }) - } + // add the inputs + for _, sce := range inputs { + txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{ + ParentID: types.SiacoinOutputID(sce.ID), + UnlockConditions: StandardUnlockConditions(w.priv.PublicKey()), + }) + toSign = append(toSign, sce.ID) + w.lastUsed[sce.ID] = time.Now() + } - // add the inputs - toSign := make([]types.Hash256, len(inputs)) - for i, sce := range inputs { - txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{ - ParentID: types.SiacoinOutputID(sce.ID), - UnlockConditions: StandardUnlockConditions(w.priv.PublicKey()), - }) - toSign[i] = sce.ID - w.lastUsed[sce.ID] = time.Now() + txns = append(txns, txn) } - return txn, toSign, nil + return txns, toSign, nil } func (w *SingleAddressWallet) isOutputUsed(id types.Hash256) bool { diff --git a/wallet/wallet_test.go b/wallet/wallet_test.go index a6b02fcc0..0538d50af 100644 --- a/wallet/wallet_test.go +++ b/wallet/wallet_test.go @@ -1,4 +1,4 @@ -package wallet_test +package wallet import ( "context" @@ -9,7 +9,6 @@ import ( "go.sia.tech/core/consensus" "go.sia.tech/core/types" "go.sia.tech/renterd/api" - "go.sia.tech/renterd/wallet" "go.uber.org/zap" "lukechampine.com/frand" ) @@ -17,15 +16,15 @@ import ( // mockStore implements wallet.SingleAddressStore and allows to manipulate the // wallet's utxos type mockStore struct { - utxos []wallet.SiacoinElement + utxos []SiacoinElement } func (s *mockStore) Balance() (types.Currency, error) { return types.ZeroCurrency, nil } func (s *mockStore) Height() uint64 { return 0 } -func (s *mockStore) UnspentSiacoinElements(bool) ([]wallet.SiacoinElement, error) { +func (s *mockStore) UnspentSiacoinElements(bool) ([]SiacoinElement, error) { return s.utxos, nil } -func (s *mockStore) Transactions(before, since time.Time, offset, limit int) ([]wallet.Transaction, error) { +func (s *mockStore) Transactions(before, since time.Time, offset, limit int) ([]Transaction, error) { return nil, nil } func (s *mockStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { @@ -47,16 +46,16 @@ func TestWalletRedistribute(t *testing.T) { // create a wallet with one output priv := types.GeneratePrivateKey() pub := priv.PublicKey() - utxo := wallet.SiacoinElement{ + utxo := SiacoinElement{ types.SiacoinOutput{ Value: oneSC.Mul64(20), - Address: wallet.StandardAddress(pub), + Address: StandardAddress(pub), }, randomOutputID(), 0, } - s := &mockStore{utxos: []wallet.SiacoinElement{utxo}} - w := wallet.NewSingleAddressWallet(priv, s, 0, zap.NewNop().Sugar()) + s := &mockStore{utxos: []SiacoinElement{utxo}} + w := NewSingleAddressWallet(priv, s, 0, zap.NewNop().Sugar()) numOutputsWithValue := func(v types.Currency) (c uint64) { utxos, _ := w.UnspentOutputs() @@ -78,7 +77,7 @@ func TestWalletRedistribute(t *testing.T) { } } for _, output := range txn.SiacoinOutputs { - s.utxos = append(s.utxos, wallet.SiacoinElement{output, randomOutputID(), 0}) + s.utxos = append(s.utxos, SiacoinElement{output, randomOutputID(), 0}) } } @@ -91,10 +90,12 @@ func TestWalletRedistribute(t *testing.T) { // split into 3 outputs of 6SC each amount := oneSC.Mul64(6) - if txn, _, err := w.Redistribute(cs, 3, amount, types.NewCurrency64(1), nil); err != nil { + if txns, _, err := w.Redistribute(cs, 3, amount, types.NewCurrency64(1), nil); err != nil { t.Fatal(err) + } else if len(txns) != 1 { + t.Fatalf("unexpected number of txns, %v != 1", len(txns)) } else { - applyTxn(txn) + applyTxn(txns[0]) } // assert number of outputs @@ -117,10 +118,12 @@ func TestWalletRedistribute(t *testing.T) { // split into 2 outputs of 9SC amount = oneSC.Mul64(9) - if txn, _, err := w.Redistribute(cs, 2, amount, types.NewCurrency64(1), nil); err != nil { + if txns, _, err := w.Redistribute(cs, 2, amount, types.NewCurrency64(1), nil); err != nil { t.Fatal(err) + } else if len(txns) != 1 { + t.Fatalf("unexpected number of txns, %v != 1", len(txns)) } else { - applyTxn(txn) + applyTxn(txns[0]) } // assert number of outputs @@ -137,10 +140,12 @@ func TestWalletRedistribute(t *testing.T) { // split into 5 outputs of 3SC amount = oneSC.Mul64(3) - if txn, _, err := w.Redistribute(cs, 5, amount, types.NewCurrency64(1), nil); err != nil { + if txns, _, err := w.Redistribute(cs, 5, amount, types.NewCurrency64(1), nil); err != nil { t.Fatal(err) + } else if len(txns) != 1 { + t.Fatalf("unexpected number of txns, %v != 1", len(txns)) } else { - applyTxn(txn) + applyTxn(txns[0]) } // assert number of outputs that hold 3SC @@ -154,16 +159,34 @@ func TestWalletRedistribute(t *testing.T) { } // split into 6 outputs of 3SC - if txn, _, err := w.Redistribute(cs, 6, amount, types.NewCurrency64(1), nil); err != nil { + if txns, _, err := w.Redistribute(cs, 6, amount, types.NewCurrency64(1), nil); err != nil { t.Fatal(err) + } else if len(txns) != 1 { + t.Fatalf("unexpected number of txns, %v != 1", len(txns)) } else { - applyTxn(txn) + applyTxn(txns[0]) } // assert number of outputs that hold 3SC if cnt := numOutputsWithValue(amount); cnt != 6 { t.Fatalf("unexpected number of 3SC outputs, %v != 6", cnt) } + + // split into 2 times the redistributeBatchSize + amount = oneSC.Div64(10) + if txns, _, err := w.Redistribute(cs, 2*redistributeBatchSize, amount, types.NewCurrency64(1), nil); err != nil { + t.Fatal(err) + } else if len(txns) != 2 { + t.Fatalf("unexpected number of txns, %v != 2", len(txns)) + } else { + applyTxn(txns[0]) + applyTxn(txns[1]) + } + + // assert number of outputs that hold 0.1SC + if cnt := numOutputsWithValue(amount); cnt != 2*redistributeBatchSize { + t.Fatalf("unexpected number of 0.1SC outputs, %v != 20", cnt) + } } func randomOutputID() (t types.Hash256) {