diff --git a/api/bus.go b/api/bus.go index 44725db19..f42e50030 100644 --- a/api/bus.go +++ b/api/bus.go @@ -238,7 +238,6 @@ type WalletPrepareRenewRequest struct { Revision types.FileContractRevision `json:"revision"` EndHeight uint64 `json:"endHeight"` HostAddress types.Address `json:"hostAddress"` - HostKey types.PublicKey `json:"hostKey"` PriceTable rhpv3.HostPriceTable `json:"priceTable"` NewCollateral types.Currency `json:"newCollateral"` RenterAddress types.Address `json:"renterAddress"` diff --git a/bus/bus.go b/bus/bus.go index d91044f4f..16293fb7f 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -28,6 +28,7 @@ import ( "go.sia.tech/renterd/tracing" "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" + "go.sia.tech/siad/modules" "go.uber.org/zap" ) @@ -49,10 +50,13 @@ func NewClient(addr, password string) *Client { type ( // A ChainManager manages blockchain state. ChainManager interface { - AcceptBlock(context.Context, types.Block) error + AcceptBlock(types.Block) error + BlockAtHeight(height uint64) (types.Block, bool) + IndexAtHeight(height uint64) (types.ChainIndex, error) LastBlockTime() time.Time - Synced(ctx context.Context) bool - TipState(ctx context.Context) consensus.State + Subscribe(s modules.ConsensusSetSubscriber, ccID modules.ConsensusChangeID, cancel <-chan struct{}) error + Synced() bool + TipState() consensus.State } // A Syncer can connect to other peers and synchronize the blockchain. @@ -65,9 +69,11 @@ type ( // A TransactionPool can validate and relay unconfirmed transactions. TransactionPool interface { + AcceptTransactionSet(txns []types.Transaction) error + Close() error RecommendedFee() types.Currency + Subscribe(subscriber modules.TransactionPoolSubscriber) Transactions() []types.Transaction - AddTransactionSet(txns []types.Transaction) error UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) } @@ -216,7 +222,7 @@ func (b *bus) consensusAcceptBlock(jc jape.Context) { if jc.Decode(&block) != nil { return } - if jc.Check("failed to accept block", b.cm.AcceptBlock(jc.Request.Context(), block)) != nil { + if jc.Check("failed to accept block", b.cm.AcceptBlock(block)) != nil { return } } @@ -241,12 +247,12 @@ func (b *bus) syncerConnectHandler(jc jape.Context) { } func (b *bus) consensusStateHandler(jc jape.Context) { - jc.Encode(b.consensusState(jc.Request.Context())) + jc.Encode(b.consensusState()) } func (b *bus) consensusNetworkHandler(jc jape.Context) { jc.Encode(api.ConsensusNetwork{ - Name: b.cm.TipState(jc.Request.Context()).Network.Name, + Name: b.cm.TipState().Network.Name, }) } @@ -262,7 +268,7 @@ func (b *bus) txpoolTransactionsHandler(jc jape.Context) { func (b *bus) txpoolBroadcastHandler(jc jape.Context) { var txnSet []types.Transaction if jc.Decode(&txnSet) == nil { - jc.Check("couldn't broadcast transaction set", b.tp.AddTransactionSet(txnSet)) + jc.Check("couldn't broadcast transaction set", b.tp.AcceptTransactionSet(txnSet)) } } @@ -374,10 +380,10 @@ func (b *bus) walletFundHandler(jc jape.Context) { txn := wfr.Transaction if len(txn.MinerFees) == 0 { // if no fees are specified, we add some - fee := b.tp.RecommendedFee().Mul64(uint64(types.EncodedLen(txn))) + fee := b.tp.RecommendedFee().Mul64(b.cm.TipState().TransactionWeight(txn)) txn.MinerFees = []types.Currency{fee} } - toSign, err := b.w.FundTransaction(b.cm.TipState(jc.Request.Context()), &txn, wfr.Amount.Add(txn.MinerFees[0]), b.tp.Transactions()) + toSign, err := b.w.FundTransaction(b.cm.TipState(), &txn, wfr.Amount.Add(txn.MinerFees[0]), b.tp.Transactions()) if jc.Check("couldn't fund transaction", err) != nil { return } @@ -398,7 +404,7 @@ func (b *bus) walletSignHandler(jc jape.Context) { if jc.Decode(&wsr) != nil { return } - err := b.w.SignTransaction(b.cm.TipState(jc.Request.Context()), &wsr.Transaction, wsr.ToSign, wsr.CoveredFields) + err := b.w.SignTransaction(b.cm.TipState(), &wsr.Transaction, wsr.ToSign, wsr.CoveredFields) if jc.Check("couldn't sign transaction", err) == nil { jc.Encode(wsr.Transaction) } @@ -414,7 +420,7 @@ func (b *bus) walletRedistributeHandler(jc jape.Context) { return } - cs := b.cm.TipState(jc.Request.Context()) + cs := b.cm.TipState() txn, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions()) if jc.Check("couldn't redistribute money in the wallet into the desired outputs", err) != nil { return @@ -425,7 +431,7 @@ func (b *bus) walletRedistributeHandler(jc jape.Context) { return } - if jc.Check("couldn't broadcast the transaction", b.tp.AddTransactionSet([]types.Transaction{txn})) != nil { + if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet([]types.Transaction{txn})) != nil { b.w.ReleaseInputs(txn) return } @@ -441,7 +447,6 @@ func (b *bus) walletDiscardHandler(jc jape.Context) { } func (b *bus) walletPrepareFormHandler(jc jape.Context) { - ctx := jc.Request.Context() var wpfr api.WalletPrepareFormRequest if jc.Decode(&wpfr) != nil { return @@ -454,14 +459,14 @@ func (b *bus) walletPrepareFormHandler(jc jape.Context) { jc.Error(errors.New("no renter key provided"), http.StatusBadRequest) return } - cs := b.cm.TipState(ctx) + cs := b.cm.TipState() fc := rhpv2.PrepareContractFormation(wpfr.RenterKey, wpfr.HostKey, wpfr.RenterFunds, wpfr.HostCollateral, wpfr.EndHeight, wpfr.HostSettings, wpfr.RenterAddress) cost := rhpv2.ContractFormationCost(cs, fc, wpfr.HostSettings.ContractPrice) txn := types.Transaction{ FileContracts: []types.FileContract{fc}, } - txn.MinerFees = []types.Currency{b.tp.RecommendedFee().Mul64(uint64(types.EncodedLen(txn)))} + txn.MinerFees = []types.Currency{b.tp.RecommendedFee().Mul64(cs.TransactionWeight(txn))} toSign, err := b.w.FundTransaction(cs, &txn, cost.Add(txn.MinerFees[0]), b.tp.Transactions()) if jc.Check("couldn't fund transaction", err) != nil { return @@ -485,15 +490,11 @@ func (b *bus) walletPrepareRenewHandler(jc jape.Context) { if jc.Decode(&wprr) != nil { return } - if wprr.HostKey == (types.PublicKey{}) { - jc.Error(errors.New("no host key provided"), http.StatusBadRequest) - return - } if wprr.RenterKey == nil { jc.Error(errors.New("no renter key provided"), http.StatusBadRequest) return } - cs := b.cm.TipState(jc.Request.Context()) + cs := b.cm.TipState() // Create the final revision from the provided revision. finalRevision := wprr.Revision @@ -503,7 +504,7 @@ func (b *bus) walletPrepareRenewHandler(jc jape.Context) { finalRevision.RevisionNumber = math.MaxUint64 // Prepare the new contract. - fc, basePrice := rhpv3.PrepareContractRenewal(wprr.Revision, wprr.HostAddress, wprr.RenterAddress, wprr.RenterFunds, wprr.NewCollateral, wprr.HostKey, wprr.PriceTable, wprr.EndHeight) + fc, basePrice := rhpv3.PrepareContractRenewal(wprr.Revision, wprr.HostAddress, wprr.RenterAddress, wprr.RenterFunds, wprr.NewCollateral, wprr.PriceTable, wprr.EndHeight) // Create the transaction containing both the final revision and new // contract. @@ -1442,17 +1443,17 @@ func (b *bus) paramsHandlerUploadGET(jc jape.Context) { jc.Encode(api.UploadParams{ ContractSet: contractSet, - CurrentHeight: b.cm.TipState(jc.Request.Context()).Index.Height, + CurrentHeight: b.cm.TipState().Index.Height, GougingParams: gp, UploadPacking: uploadPacking, }) } -func (b *bus) consensusState(ctx context.Context) api.ConsensusState { +func (b *bus) consensusState() api.ConsensusState { return api.ConsensusState{ - BlockHeight: b.cm.TipState(ctx).Index.Height, + BlockHeight: b.cm.TipState().Index.Height, LastBlockTime: b.cm.LastBlockTime(), - Synced: b.cm.Synced(ctx), + Synced: b.cm.Synced(), } } @@ -1479,7 +1480,7 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) { b.logger.Panicf("failed to unmarshal redundancy settings '%s': %v", rss, err) } - cs := b.consensusState(ctx) + cs := b.consensusState() return api.GougingParams{ ConsensusState: cs, @@ -1691,7 +1692,7 @@ func (b *bus) contractTaxHandlerGET(jc jape.Context) { if jc.DecodeParam("payout", (*api.ParamCurrency)(&payout)) != nil { return } - cs := b.cm.TipState(jc.Request.Context()) + cs := b.cm.TipState() jc.Encode(cs.FileContractTax(types.FileContract{Payout: payout})) } diff --git a/bus/client/wallet.go b/bus/client/wallet.go index 2fa88e2ce..a430d7dbd 100644 --- a/bus/client/wallet.go +++ b/bus/client/wallet.go @@ -94,12 +94,11 @@ func (c *Client) WalletPrepareForm(ctx context.Context, renterAddress types.Addr } // WalletPrepareRenew funds and signs a contract renewal transaction. -func (c *Client) WalletPrepareRenew(ctx context.Context, revision types.FileContractRevision, hostAddress, renterAddress types.Address, renterKey types.PrivateKey, renterFunds, newCollateral types.Currency, hostKey types.PublicKey, pt rhpv3.HostPriceTable, endHeight, windowSize uint64) (api.WalletPrepareRenewResponse, error) { +func (c *Client) WalletPrepareRenew(ctx context.Context, revision types.FileContractRevision, hostAddress, renterAddress types.Address, renterKey types.PrivateKey, renterFunds, newCollateral types.Currency, pt rhpv3.HostPriceTable, endHeight, windowSize uint64) (api.WalletPrepareRenewResponse, error) { req := api.WalletPrepareRenewRequest{ Revision: revision, EndHeight: endHeight, HostAddress: hostAddress, - HostKey: hostKey, PriceTable: pt, NewCollateral: newCollateral, RenterAddress: renterAddress, diff --git a/go.mod b/go.mod index 4365f1a9d..2bd89c03c 100644 --- a/go.mod +++ b/go.mod @@ -17,9 +17,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 - go.sia.tech/core v0.1.12-0.20230529164041-6347a98003be + go.sia.tech/core v0.1.12-0.20231011172826-6ca0ac7b3b6b go.sia.tech/gofakes3 v0.0.0-20231003090232-776c144c0a19 - go.sia.tech/hostd v0.1.4 + go.sia.tech/hostd v0.2.1-0.20231013174940-920057ff41c8 go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb go.sia.tech/mux v1.2.0 go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca diff --git a/go.sum b/go.sum index 652291e21..abbd619f1 100644 --- a/go.sum +++ b/go.sum @@ -305,12 +305,12 @@ go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZE go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= -go.sia.tech/core v0.1.12-0.20230529164041-6347a98003be h1:fKfwYsCF5ua3Z/NNdLU+4sS9MM6M4EetMA0V4Y8zPKg= -go.sia.tech/core v0.1.12-0.20230529164041-6347a98003be/go.mod h1:D17UWSn99SEfQnEaR9G9n6Kz9+BwqMoUgZ6Cl424LsQ= +go.sia.tech/core v0.1.12-0.20231011172826-6ca0ac7b3b6b h1:gHnhRiY1SMWCEFu+1Xo0a967RVzHg+g+0grMbHNuLfE= +go.sia.tech/core v0.1.12-0.20231011172826-6ca0ac7b3b6b/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/gofakes3 v0.0.0-20231003090232-776c144c0a19 h1:qCJHxn1RgRdmltGl3jehgbJcpTSTaWhzdDuOiWkjvm0= go.sia.tech/gofakes3 v0.0.0-20231003090232-776c144c0a19/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg= -go.sia.tech/hostd v0.1.4 h1:r6PSo8ed0hmxjf4pvFQfRCfGk/DVRd/R/T9K9B0JhJ4= -go.sia.tech/hostd v0.1.4/go.mod h1:o+Z9ZGJZRY31MvRNXyyAlmBHjjiTE/3TgAB4pCHgr2c= +go.sia.tech/hostd v0.2.1-0.20231013174940-920057ff41c8 h1:0kVIAauXG2+C5EZWlnJDVtf6TrLLgB+EcObuCJyDNfY= +go.sia.tech/hostd v0.2.1-0.20231013174940-920057ff41c8/go.mod h1:B+jY+eJ2jlcowcXwYOb28N/A6/cy2dblX/WUec9pFM8= go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb h1:yLDEqkqC19E/HgBoq2Uhw9oH3SMNRyeRjZ7Ep4dPKR8= go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= diff --git a/internal/node/chainmanager.go b/internal/node/chainmanager.go new file mode 100644 index 000000000..d0f27bed3 --- /dev/null +++ b/internal/node/chainmanager.go @@ -0,0 +1,158 @@ +package node + +import ( + "errors" + "fmt" + "strings" + "sync" + "time" + + "go.sia.tech/core/consensus" + "go.sia.tech/core/types" + "go.sia.tech/siad/modules" + stypes "go.sia.tech/siad/types" +) + +const ( + maxSyncTime = time.Hour +) + +var ( + ErrBlockNotFound = errors.New("block not found") + ErrInvalidChangeID = errors.New("invalid change id") +) + +type chainManager struct { + cs modules.ConsensusSet + network *consensus.Network + + close chan struct{} + mu sync.Mutex + tip consensus.State + synced bool +} + +// ProcessConsensusChange implements the modules.ConsensusSetSubscriber interface. +func (m *chainManager) ProcessConsensusChange(cc modules.ConsensusChange) { + m.mu.Lock() + defer m.mu.Unlock() + m.tip = consensus.State{ + Network: m.network, + Index: types.ChainIndex{ + ID: types.BlockID(cc.AppliedBlocks[len(cc.AppliedBlocks)-1].ID()), + Height: uint64(cc.BlockHeight), + }, + } + m.synced = synced(cc.AppliedBlocks[len(cc.AppliedBlocks)-1].Timestamp) +} + +// Network returns the network name. +func (m *chainManager) Network() string { + switch m.network.Name { + case "zen": + return "Zen Testnet" + case "mainnet": + return "Mainnet" + default: + return m.network.Name + } +} + +// Close closes the chain manager. +func (m *chainManager) Close() error { + select { + case <-m.close: + return nil + default: + } + close(m.close) + return m.cs.Close() +} + +// Synced returns true if the chain manager is synced with the consensus set. +func (m *chainManager) Synced() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.synced +} + +// BlockAtHeight returns the block at the given height. +func (m *chainManager) BlockAtHeight(height uint64) (types.Block, bool) { + sb, ok := m.cs.BlockAtHeight(stypes.BlockHeight(height)) + var c types.Block + convertToCore(sb, (*types.V1Block)(&c)) + return types.Block(c), ok +} + +func (m *chainManager) LastBlockTime() time.Time { + return time.Unix(int64(m.cs.CurrentBlock().Timestamp), 0) +} + +// IndexAtHeight return the chain index at the given height. +func (m *chainManager) IndexAtHeight(height uint64) (types.ChainIndex, error) { + block, ok := m.cs.BlockAtHeight(stypes.BlockHeight(height)) + if !ok { + return types.ChainIndex{}, ErrBlockNotFound + } + return types.ChainIndex{ + ID: types.BlockID(block.ID()), + Height: height, + }, nil +} + +// TipState returns the current chain state. +func (m *chainManager) TipState() consensus.State { + m.mu.Lock() + defer m.mu.Unlock() + return m.tip +} + +// AcceptBlock adds b to the consensus set. +func (m *chainManager) AcceptBlock(b types.Block) error { + var sb stypes.Block + convertToSiad(types.V1Block(b), &sb) + return m.cs.AcceptBlock(sb) +} + +// Subscribe subscribes to the consensus set. +func (m *chainManager) Subscribe(s modules.ConsensusSetSubscriber, ccID modules.ConsensusChangeID, cancel <-chan struct{}) error { + if err := m.cs.ConsensusSetSubscribe(s, ccID, cancel); err != nil { + if strings.Contains(err.Error(), "consensus subscription has invalid id") { + return ErrInvalidChangeID + } + return err + } + return nil +} + +func synced(timestamp stypes.Timestamp) bool { + return time.Since(time.Unix(int64(timestamp), 0)) <= maxSyncTime +} + +// NewManager creates a new chain manager. +func NewChainManager(cs modules.ConsensusSet, network *consensus.Network) (*chainManager, error) { + height := cs.Height() + block, ok := cs.BlockAtHeight(height) + if !ok { + return nil, fmt.Errorf("failed to get block at height %d", height) + } + + m := &chainManager{ + cs: cs, + network: network, + tip: consensus.State{ + Network: network, + Index: types.ChainIndex{ + ID: types.BlockID(block.ID()), + Height: uint64(height), + }, + }, + synced: synced(block.Timestamp), + close: make(chan struct{}), + } + + if err := cs.ConsensusSetSubscribe(m, modules.ConsensusChangeRecent, m.close); err != nil { + return nil, fmt.Errorf("failed to subscribe to consensus set: %w", err) + } + return m, nil +} diff --git a/internal/node/convert.go b/internal/node/convert.go new file mode 100644 index 000000000..8fcc01eed --- /dev/null +++ b/internal/node/convert.go @@ -0,0 +1,28 @@ +package node + +import ( + "bytes" + + "gitlab.com/NebulousLabs/encoding" + "go.sia.tech/core/types" +) + +func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) { + var buf bytes.Buffer + e := types.NewEncoder(&buf) + core.EncodeTo(e) + e.Flush() + if err := siad.UnmarshalSia(&buf); err != nil { + panic(err) + } +} + +func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) { + var buf bytes.Buffer + siad.MarshalSia(&buf) + d := types.NewBufDecoder(buf.Bytes()) + core.DecodeFrom(d) + if d.Err() != nil { + panic(d.Err()) + } +} diff --git a/internal/node/miner.go b/internal/node/miner.go index 003405b17..9043196b4 100644 --- a/internal/node/miner.go +++ b/internal/node/miner.go @@ -120,8 +120,8 @@ func (m *Miner) mineBlock(addr stypes.UnlockHash) error { } var b types.Block - convertToCore(&block, &b) - if err := m.consensus.AcceptBlock(context.Background(), b); err != nil { + convertToCore(&block, (*types.V1Block)(&b)) + if err := m.consensus.AcceptBlock(context.Background(), types.Block(b)); err != nil { return fmt.Errorf("failed to get block accepted: %w", err) } return nil diff --git a/internal/node/node.go b/internal/node/node.go index 0cdf8d116..b81d4739a 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -1,7 +1,6 @@ package node import ( - "bytes" "context" "errors" "fmt" @@ -9,9 +8,7 @@ import ( "net/http" "os" "path/filepath" - "time" - "gitlab.com/NebulousLabs/encoding" "go.sia.tech/core/consensus" "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" @@ -27,7 +24,6 @@ import ( "go.sia.tech/siad/modules/gateway" "go.sia.tech/siad/modules/transactionpool" "go.sia.tech/siad/sync" - stypes "go.sia.tech/siad/types" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/crypto/blake2b" @@ -52,137 +48,6 @@ type ( ShutdownFn = func(context.Context) error ) -func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) { - var buf bytes.Buffer - e := types.NewEncoder(&buf) - core.EncodeTo(e) - e.Flush() - if err := siad.UnmarshalSia(&buf); err != nil { - panic(err) - } -} - -func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) { - var buf bytes.Buffer - siad.MarshalSia(&buf) - d := types.NewBufDecoder(buf.Bytes()) - core.DecodeFrom(d) - if d.Err() != nil { - panic(d.Err()) - } -} - -type chainManager struct { - cs modules.ConsensusSet - network *consensus.Network -} - -func (cm chainManager) AcceptBlock(ctx context.Context, b types.Block) error { - var sb stypes.Block - convertToSiad(b, &sb) - return cm.cs.AcceptBlock(sb) -} - -func (cm chainManager) LastBlockTime() time.Time { - return time.Unix(int64(cm.cs.CurrentBlock().Timestamp), 0) -} - -func (cm chainManager) Synced(ctx context.Context) bool { - return cm.cs.Synced() -} - -func (cm chainManager) TipState(ctx context.Context) consensus.State { - return consensus.State{ - Network: cm.network, - Index: types.ChainIndex{ - Height: uint64(cm.cs.Height()), - ID: types.BlockID(cm.cs.CurrentBlock().ID()), - }, - } -} - -type syncer struct { - g modules.Gateway - tp modules.TransactionPool -} - -func (s syncer) Addr() string { - return string(s.g.Address()) -} - -func (s syncer) Peers() []string { - var peers []string - for _, p := range s.g.Peers() { - peers = append(peers, string(p.NetAddress)) - } - return peers -} - -func (s syncer) Connect(addr string) error { - return s.g.Connect(modules.NetAddress(addr)) -} - -func (s syncer) BroadcastTransaction(txn types.Transaction, dependsOn []types.Transaction) { - txnSet := make([]stypes.Transaction, len(dependsOn)+1) - for i, txn := range dependsOn { - convertToSiad(txn, &txnSet[i]) - } - convertToSiad(txn, &txnSet[len(txnSet)-1]) - s.tp.Broadcast(txnSet) -} - -func (s syncer) SyncerAddress(ctx context.Context) (string, error) { - return string(s.g.Address()), nil -} - -type txpool struct { - tp modules.TransactionPool -} - -func (tp txpool) RecommendedFee() (fee types.Currency) { - _, max := tp.tp.FeeEstimation() - convertToCore(&max, &fee) - return -} - -func (tp txpool) Transactions() []types.Transaction { - stxns := tp.tp.Transactions() - txns := make([]types.Transaction, len(stxns)) - for i := range txns { - convertToCore(&stxns[i], &txns[i]) - } - return txns -} - -func (tp txpool) AddTransactionSet(txns []types.Transaction) error { - stxns := make([]stypes.Transaction, len(txns)) - for i := range stxns { - convertToSiad(&txns[i], &stxns[i]) - } - return tp.tp.AcceptTransactionSet(stxns) -} - -func (tp txpool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) { - pool := tp.Transactions() - outputToParent := make(map[types.SiacoinOutputID]*types.Transaction) - for i, txn := range pool { - for j := range txn.SiacoinOutputs { - outputToParent[txn.SiacoinOutputID(j)] = &pool[i] - } - } - var parents []types.Transaction - seen := make(map[types.TransactionID]bool) - for _, sci := range txn.SiacoinInputs { - if parent, ok := outputToParent[sci.ParentID]; ok { - if txid := parent.ID(); !seen[txid] { - seen[txid] = true - parents = append(parents, *parent) - } - } - } - return parents, nil -} - func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { gatewayDir := filepath.Join(dir, "gateway") if err := os.MkdirAll(gatewayDir, 0700); err != nil { @@ -272,7 +137,12 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht tp.TransactionPoolSubscribe(m) } - b, err := bus.New(syncer{g, tp}, alertsMgr, hooksMgr, chainManager{cs: cs, network: cfg.Network}, txpool{tp}, w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, l) + cm, err := NewChainManager(cs, cfg.Network) + if err != nil { + return nil, nil, err + } + + b, err := bus.New(syncer{g, tp}, alertsMgr, hooksMgr, cm, NewTransactionPool(tp), w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, l) if err != nil { return nil, nil, err } diff --git a/internal/node/syncer.go b/internal/node/syncer.go new file mode 100644 index 000000000..6a4e80c98 --- /dev/null +++ b/internal/node/syncer.go @@ -0,0 +1,43 @@ +package node + +import ( + "context" + + "go.sia.tech/core/types" + "go.sia.tech/siad/modules" + stypes "go.sia.tech/siad/types" +) + +type syncer struct { + g modules.Gateway + tp modules.TransactionPool +} + +func (s syncer) Addr() string { + return string(s.g.Address()) +} + +func (s syncer) Peers() []string { + var peers []string + for _, p := range s.g.Peers() { + peers = append(peers, string(p.NetAddress)) + } + return peers +} + +func (s syncer) Connect(addr string) error { + return s.g.Connect(modules.NetAddress(addr)) +} + +func (s syncer) BroadcastTransaction(txn types.Transaction, dependsOn []types.Transaction) { + txnSet := make([]stypes.Transaction, len(dependsOn)+1) + for i, txn := range dependsOn { + convertToSiad(txn, &txnSet[i]) + } + convertToSiad(txn, &txnSet[len(txnSet)-1]) + s.tp.Broadcast(txnSet) +} + +func (s syncer) SyncerAddress(ctx context.Context) (string, error) { + return string(s.g.Address()), nil +} diff --git a/internal/node/transactionpool.go b/internal/node/transactionpool.go new file mode 100644 index 000000000..b2226bfb5 --- /dev/null +++ b/internal/node/transactionpool.go @@ -0,0 +1,74 @@ +package node + +import ( + "errors" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/bus" + "go.sia.tech/siad/modules" + stypes "go.sia.tech/siad/types" +) + +type txpool struct { + tp modules.TransactionPool +} + +func (tp txpool) RecommendedFee() (fee types.Currency) { + _, max := tp.tp.FeeEstimation() + convertToCore(&max, &fee) + return +} + +func (tp txpool) Transactions() []types.Transaction { + stxns := tp.tp.Transactions() + txns := make([]types.Transaction, len(stxns)) + for i := range txns { + convertToCore(&stxns[i], &txns[i]) + } + return txns +} + +func (tp txpool) AcceptTransactionSet(txns []types.Transaction) error { + stxns := make([]stypes.Transaction, len(txns)) + for i := range stxns { + convertToSiad(&txns[i], &stxns[i]) + } + err := tp.tp.AcceptTransactionSet(stxns) + if errors.Is(err, modules.ErrDuplicateTransactionSet) { + err = nil + } + return err +} + +func (tp txpool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) { + pool := tp.Transactions() + outputToParent := make(map[types.SiacoinOutputID]*types.Transaction) + for i, txn := range pool { + for j := range txn.SiacoinOutputs { + outputToParent[txn.SiacoinOutputID(j)] = &pool[i] + } + } + var parents []types.Transaction + seen := make(map[types.TransactionID]bool) + for _, sci := range txn.SiacoinInputs { + if parent, ok := outputToParent[sci.ParentID]; ok { + if txid := parent.ID(); !seen[txid] { + seen[txid] = true + parents = append(parents, *parent) + } + } + } + return parents, nil +} + +func (tp txpool) Subscribe(subscriber modules.TransactionPoolSubscriber) { + tp.tp.TransactionPoolSubscribe(subscriber) +} + +func (tp txpool) Close() error { + return tp.tp.Close() +} + +func NewTransactionPool(tp modules.TransactionPool) bus.TransactionPool { + return &txpool{tp: tp} +} diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index a856cf3cc..1d80672d3 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -152,14 +152,15 @@ type TestCluster struct { autopilotShutdownFns []func(context.Context) error s3ShutdownFns []func(context.Context) error - miner *node.Miner - apID string - dbName string - dir string - logger *zap.Logger - tt *TT - wk types.PrivateKey - wg sync.WaitGroup + network *consensus.Network + miner *node.Miner + apID string + dbName string + dir string + logger *zap.Logger + tt *TT + wk types.PrivateKey + wg sync.WaitGroup } func (tc *TestCluster) ShutdownAutopilot(ctx context.Context) { @@ -450,13 +451,14 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { autopilotShutdownFns = append(autopilotShutdownFns, aStopFn) cluster := &TestCluster{ - apID: apCfg.ID, - dir: dir, - dbName: dbName, - logger: logger, - miner: busCfg.Miner, - tt: tt, - wk: wk, + apID: apCfg.ID, + dir: dir, + dbName: dbName, + logger: logger, + network: busCfg.Network, + miner: busCfg.Miner, + tt: tt, + wk: wk, Autopilot: autopilotClient, Bus: busClient, @@ -551,14 +553,14 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { } // addStorageFolderToHosts adds a single storage folder to each host. -func addStorageFolderToHost(hosts []*Host) error { +func addStorageFolderToHost(ctx context.Context, hosts []*Host) error { for _, host := range hosts { sectors := uint64(10) volumeDir := filepath.Join(host.dir, "volumes") if err := os.MkdirAll(volumeDir, 0777); err != nil { return err } - if err := host.AddVolume(filepath.Join(volumeDir, "volume.dat"), sectors); err != nil { + if err := host.AddVolume(ctx, filepath.Join(volumeDir, "volume.dat"), sectors); err != nil { return err } } @@ -712,7 +714,7 @@ func (c *TestCluster) NewHost() *Host { c.tt.Helper() // Create host. hostDir := filepath.Join(c.dir, "hosts", fmt.Sprint(len(c.hosts)+1)) - h, err := NewHost(types.GeneratePrivateKey(), hostDir, false) + h, err := NewHost(types.GeneratePrivateKey(), hostDir, c.network, false) c.tt.OK(err) // Connect gateways. @@ -747,7 +749,9 @@ func (c *TestCluster) AddHost(h *Host) { c.sync(hosts) // Announce hosts. - c.tt.OK(addStorageFolderToHost(hosts)) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + c.tt.OK(addStorageFolderToHost(ctx, hosts)) c.tt.OK(announceHosts(hosts)) // Mine a few blocks. The host should show up eventually. @@ -909,8 +913,12 @@ func testNetwork() *consensus.Network { n.HardforkASIC.OakTarget = types.BlockID{255, 255} n.HardforkFoundation.Height = 50 - n.HardforkFoundation.PrimaryAddress = types.GeneratePrivateKey().PublicKey().StandardAddress() - n.HardforkFoundation.FailsafeAddress = types.GeneratePrivateKey().PublicKey().StandardAddress() + n.HardforkFoundation.PrimaryAddress = types.StandardUnlockHash(types.GeneratePrivateKey().PublicKey()) + n.HardforkFoundation.FailsafeAddress = types.StandardUnlockHash(types.GeneratePrivateKey().PublicKey()) + + // make it difficult to reach v2 in most tests + n.HardforkV2.AllowHeight = 1000 + n.HardforkV2.RequireHeight = 1020 return n } diff --git a/internal/testing/host.go b/internal/testing/host.go index edfc4162b..f94aa6696 100644 --- a/internal/testing/host.go +++ b/internal/testing/host.go @@ -1,18 +1,16 @@ package testing import ( - "bytes" - "errors" + "context" "fmt" "net" "path/filepath" "time" - "gitlab.com/NebulousLabs/encoding" + "go.sia.tech/core/consensus" crhpv2 "go.sia.tech/core/rhp/v2" crhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" - "go.sia.tech/hostd/chain" "go.sia.tech/hostd/host/accounts" "go.sia.tech/hostd/host/alerts" "go.sia.tech/hostd/host/contracts" @@ -20,14 +18,16 @@ import ( "go.sia.tech/hostd/host/settings" "go.sia.tech/hostd/host/storage" "go.sia.tech/hostd/persist/sqlite" + "go.sia.tech/hostd/rhp" rhpv2 "go.sia.tech/hostd/rhp/v2" rhpv3 "go.sia.tech/hostd/rhp/v3" "go.sia.tech/hostd/wallet" + "go.sia.tech/renterd/bus" + "go.sia.tech/renterd/internal/node" "go.sia.tech/siad/modules" mconsensus "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" "go.sia.tech/siad/modules/transactionpool" - stypes "go.sia.tech/siad/types" "go.uber.org/zap" ) @@ -35,7 +35,12 @@ const blocksPerMonth = 144 * 30 type stubMetricReporter struct{} -func (stubMetricReporter) Report(any) (_ error) { return } +func (stubMetricReporter) StartSession(conn *rhp.Conn, proto string, version int) (rhp.UID, func()) { + return rhp.UID{}, func() {} +} +func (stubMetricReporter) StartRPC(rhp.UID, types.Specifier) (rhp.UID, func(contracts.Usage, error)) { + return rhp.UID{}, func(contracts.Usage, error) {} +} type stubDataMonitor struct{} @@ -49,7 +54,7 @@ type Host struct { g modules.Gateway cs modules.ConsensusSet - tp *TXPool + tp bus.TransactionPool store *sqlite.Store wallet *wallet.SingleAddressWallet @@ -87,88 +92,6 @@ var defaultHostSettings = settings.Settings{ MaxRegistryEntries: 1e3, } -func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) { - var buf bytes.Buffer - e := types.NewEncoder(&buf) - core.EncodeTo(e) - e.Flush() - if err := siad.UnmarshalSia(&buf); err != nil { - panic(err) - } -} - -func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) { - var buf bytes.Buffer - siad.MarshalSia(&buf) - d := types.NewBufDecoder(buf.Bytes()) - core.DecodeFrom(d) - if d.Err() != nil { - panic(d.Err()) - } -} - -// TXPool wraps a siad transaction pool with core types. -type TXPool struct { - tp modules.TransactionPool -} - -// RecommendedFee returns the recommended fee for a transaction. -func (tp TXPool) RecommendedFee() (fee types.Currency) { - _, max := tp.tp.FeeEstimation() - convertToCore(&max, &fee) - return -} - -// Transactions returns all transactions in the pool. -func (tp TXPool) Transactions() []types.Transaction { - stxns := tp.tp.Transactions() - txns := make([]types.Transaction, len(stxns)) - for i := range txns { - convertToCore(&stxns[i], &txns[i]) - } - return txns -} - -// AcceptTransactionSet adds a transaction set to the pool. -func (tp TXPool) AcceptTransactionSet(txns []types.Transaction) error { - stxns := make([]stypes.Transaction, len(txns)) - for i := range stxns { - convertToSiad(&txns[i], &stxns[i]) - } - err := tp.tp.AcceptTransactionSet(stxns) - if errors.Is(err, modules.ErrDuplicateTransactionSet) { - err = nil - } - return err -} - -// UnconfirmedParents returns the parents of a transaction in the pool. -func (tp TXPool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error) { - pool := tp.Transactions() - outputToParent := make(map[types.SiacoinOutputID]*types.Transaction) - for i, txn := range pool { - for j := range txn.SiacoinOutputs { - outputToParent[txn.SiacoinOutputID(j)] = &pool[i] - } - } - var parents []types.Transaction - seen := make(map[types.TransactionID]bool) - for _, sci := range txn.SiacoinInputs { - if parent, ok := outputToParent[sci.ParentID]; ok { - if txid := parent.ID(); !seen[txid] { - seen[txid] = true - parents = append(parents, *parent) - } - } - } - return parents, nil -} - -// Subscribe subscribes to the transaction pool. -func (tp TXPool) Subscribe(subscriber modules.TransactionPoolSubscriber) { - tp.tp.TransactionPoolSubscribe(subscriber) -} - // Close shutsdown the host func (h *Host) Close() error { h.rhpv2.Close() @@ -178,7 +101,7 @@ func (h *Host) Close() error { h.contracts.Close() h.storage.Close() h.store.Close() - h.tp.tp.Close() + h.tp.Close() h.cs.Close() h.g.Close() return nil @@ -195,9 +118,9 @@ func (h *Host) RHPv3Addr() string { } // AddVolume adds a new volume to the host -func (h *Host) AddVolume(path string, size uint64) error { +func (h *Host) AddVolume(ctx context.Context, path string, size uint64) error { result := make(chan error) - _, err := h.storage.AddVolume(path, size, result) + _, err := h.storage.AddVolume(ctx, path, size, result) if err != nil { return err } @@ -240,7 +163,7 @@ func (h *Host) GatewayAddr() string { } // NewHost initializes a new test host -func NewHost(privKey types.PrivateKey, dir string, debugLogging bool) (*Host, error) { +func NewHost(privKey types.PrivateKey, dir string, network *consensus.Network, debugLogging bool) (*Host, error) { g, err := gateway.New("localhost:0", false, filepath.Join(dir, "gateway")) if err != nil { return nil, fmt.Errorf("failed to create gateway: %w", err) @@ -249,15 +172,16 @@ func NewHost(privKey types.PrivateKey, dir string, debugLogging bool) (*Host, er if err := <-errCh; err != nil { return nil, fmt.Errorf("failed to create consensus set: %w", err) } - cm, err := chain.NewManager(cs) + cm, err := node.NewChainManager(cs, network) if err != nil { return nil, err } + tpool, err := transactionpool.New(cs, g, filepath.Join(dir, "transactionpool")) if err != nil { return nil, fmt.Errorf("failed to create transaction pool: %w", err) } - tp := &TXPool{tpool} + tp := node.NewTransactionPool(tpool) log := zap.NewNop() db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite")) diff --git a/stores/hostdb.go b/stores/hostdb.go index c0227e108..3ad2d5a26 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -921,8 +921,8 @@ func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) { for _, sb := range cc.AppliedBlocks { // Fetch announcements and add them to the queue. var b types.Block - convertToCore(sb, &b) - hostdb.ForEachAnnouncement(b, height, func(hostKey types.PublicKey, ha hostdb.Announcement) { + convertToCore(sb, (*types.V1Block)(&b)) + hostdb.ForEachAnnouncement(types.Block(b), height, func(hostKey types.PublicKey, ha hostdb.Announcement) { newAnnouncements = append(newAnnouncements, announcement{ hostKey: publicKey(hostKey), announcement: ha, diff --git a/wallet/wallet.go b/wallet/wallet.go index 754377215..42eaeb241 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -13,7 +13,6 @@ import ( "go.sia.tech/core/consensus" "go.sia.tech/core/types" "go.sia.tech/siad/modules" - stypes "go.sia.tech/siad/types" "go.uber.org/zap" "lukechampine.com/frand" ) @@ -273,16 +272,6 @@ func (w *SingleAddressWallet) ReleaseInputs(txn types.Transaction) { // SignTransaction adds a signature to each of the specified inputs. func (w *SingleAddressWallet) SignTransaction(cs consensus.State, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error { - // NOTE: siad uses different hardfork heights when -tags=testing is set, - // so we have to alter cs accordingly. - // TODO: remove this - switch { - case cs.Index.Height >= uint64(stypes.FoundationHardforkHeight): - cs.Index.Height = 298000 - case cs.Index.Height >= uint64(stypes.ASICHardforkHeight): - cs.Index.Height = 179000 - } - for _, id := range toSign { ts := types.TransactionSignature{ ParentID: id, diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 6b849fbb1..65356ca58 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -506,8 +506,8 @@ func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevis } // check funds - cost := rhpv2.RPCSectorRootsCost(settings, n) - if rev.RenterFunds().Cmp(cost) < 0 { + price, _ := settings.RPCSectorRootsCost(offset, n).Total() + if rev.RenterFunds().Cmp(price) < 0 { return nil, ErrInsufficientFunds } @@ -518,7 +518,7 @@ func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevis rev.Revision.RevisionNumber++ // update the revision outputs - newValid, newMissed, err := updateRevisionOutputs(&rev.Revision, cost, types.ZeroCurrency) + newValid, newMissed, err := updateRevisionOutputs(&rev.Revision, price, types.ZeroCurrency) if err != nil { return nil, err } @@ -563,7 +563,7 @@ func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevis offset += n // record spending - w.contractSpendingRecorder.Record(rev.ID(), rev.Revision.RevisionNumber, rev.Revision.Filesize, api.ContractSpending{SectorRoots: cost}) + w.contractSpendingRecorder.Record(rev.ID(), rev.Revision.RevisionNumber, rev.Revision.Filesize, api.ContractSpending{SectorRoots: price}) } return } diff --git a/worker/rhpv3.go b/worker/rhpv3.go index efc8fa700..f20c40d19 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -1375,7 +1375,7 @@ func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transpor // Prepare the signed transaction that contains the final revision as well // as the new contract - wprr, err := bus.WalletPrepareRenew(ctx, rev, rrr.HostAddress, rrr.RenterAddress, renterKey, rrr.RenterFunds, rrr.NewCollateral, rrr.HostKey, *pt, rrr.EndHeight, rrr.WindowSize) + wprr, err := bus.WalletPrepareRenew(ctx, rev, rrr.HostAddress, rrr.RenterAddress, renterKey, rrr.RenterFunds, rrr.NewCollateral, *pt, rrr.EndHeight, rrr.WindowSize) if err != nil { return rhpv2.ContractRevision{}, nil, fmt.Errorf("failed to prepare renew: %w", err) } diff --git a/worker/worker.go b/worker/worker.go index 7ca73736e..d73c6f1d3 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -168,7 +168,7 @@ type Bus interface { WalletDiscard(ctx context.Context, txn types.Transaction) error WalletFund(ctx context.Context, txn *types.Transaction, amount types.Currency) ([]types.Hash256, []types.Transaction, error) WalletPrepareForm(ctx context.Context, renterAddress types.Address, renterKey types.PublicKey, renterFunds, hostCollateral types.Currency, hostKey types.PublicKey, hostSettings rhpv2.HostSettings, endHeight uint64) (txns []types.Transaction, err error) - WalletPrepareRenew(ctx context.Context, revision types.FileContractRevision, hostAddress, renterAddress types.Address, renterKey types.PrivateKey, renterFunds, newCollateral types.Currency, hostKey types.PublicKey, pt rhpv3.HostPriceTable, endHeight, windowSize uint64) (api.WalletPrepareRenewResponse, error) + WalletPrepareRenew(ctx context.Context, revision types.FileContractRevision, hostAddress, renterAddress types.Address, renterKey types.PrivateKey, renterFunds, newCollateral types.Currency, pt rhpv3.HostPriceTable, endHeight, windowSize uint64) (api.WalletPrepareRenewResponse, error) WalletSign(ctx context.Context, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error Bucket(_ context.Context, bucket string) (api.Bucket, error)