From 570b9fbf905a3807aa0eca78517749b73c294f6f Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 12 Sep 2024 22:50:08 -0700 Subject: [PATCH] rhp4: combine client and server packages --- rhp/v4/host/host.go | 262 --------------------------- rhp/v4/{host => }/options.go | 12 +- rhp/v4/rpc.go | 24 +-- rhp/v4/rpc_test.go | 5 +- rhp/v4/{host/rpc.go => server.go} | 286 +++++++++++++++++++++++++++--- testutil/host.go | 10 +- 6 files changed, 288 insertions(+), 311 deletions(-) delete mode 100644 rhp/v4/host/host.go rename rhp/v4/{host => }/options.go (62%) rename rhp/v4/{host/rpc.go => server.go} (77%) diff --git a/rhp/v4/host/host.go b/rhp/v4/host/host.go deleted file mode 100644 index da0165c..0000000 --- a/rhp/v4/host/host.go +++ /dev/null @@ -1,262 +0,0 @@ -package host - -import ( - "encoding/hex" - "errors" - "fmt" - "net" - "time" - - "go.sia.tech/core/consensus" - rhp4 "go.sia.tech/core/rhp/v4" - "go.sia.tech/core/types" - "go.sia.tech/coreutils/chain" - "go.uber.org/zap" - "lukechampine.com/frand" -) - -var protocolVersion = [3]byte{0, 0, 1} - -type ( - // Usage contains the revenue and risked collateral for a contract. - Usage struct { - RPCRevenue types.Currency `json:"rpc"` - StorageRevenue types.Currency `json:"storage"` - EgressRevenue types.Currency `json:"egress"` - IngressRevenue types.Currency `json:"ingress"` - AccountFunding types.Currency `json:"accountFunding"` - RiskedCollateral types.Currency `json:"riskedCollateral"` - } - - // A TransactionSet contains the transaction set and basis for a v2 contract. - TransactionSet struct { - TransactionSet []types.V2Transaction - Basis types.ChainIndex - } -) - -type ( - // A Transport is a generic multiplexer for incoming streams. - Transport interface { - AcceptStream() (net.Conn, error) - Close() error - } - - // ChainManager defines the interface required by the contract manager to - // interact with the consensus set. - ChainManager interface { - Tip() types.ChainIndex - TipState() consensus.State - - // V2TransactionSet returns the full transaction set and basis necessary for - // broadcasting a transaction. If the provided basis does not match the current - // tip, the transaction will be updated. The transaction set includes the parents - // and the transaction itself in an order valid for broadcasting. - V2TransactionSet(basis types.ChainIndex, txn types.V2Transaction) (types.ChainIndex, []types.V2Transaction, error) - // AddV2PoolTransactions validates a transaction set and adds it to the - // transaction pool. - AddV2PoolTransactions(types.ChainIndex, []types.V2Transaction) (known bool, err error) - // RecommendedFee returns the recommended fee per weight - RecommendedFee() types.Currency - // UpdatesSince returns at most max updates on the path between index and the - // Manager's current tip. - UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []chain.RevertUpdate, aus []chain.ApplyUpdate, err error) - } - - // A Syncer broadcasts transactions to its peers - Syncer interface { - // BroadcastV2TransactionSet broadcasts a transaction set to the network. - BroadcastV2TransactionSet(types.ChainIndex, []types.V2Transaction) - } - - // A Wallet manages Siacoins and funds transactions - Wallet interface { - // Address returns the host's address - Address() types.Address - - // FundTransaction funds a transaction with the specified amount of - // Siacoins. If useUnconfirmed is true, the transaction may spend - // unconfirmed outputs. The outputs spent by the transaction are locked - // until they are released by ReleaseInputs. - FundV2Transaction(txn *types.V2Transaction, amount types.Currency, useUnconfirmed bool) (types.ChainIndex, []int, error) - // SignV2Inputs signs the inputs of a transaction. - SignV2Inputs(txn *types.V2Transaction, toSign []int) - // ReleaseInputs releases the inputs of a transaction. It should only - // be used if the transaction is not going to be broadcast - ReleaseInputs(txns []types.Transaction, v2txns []types.V2Transaction) - } - - // A SectorStore is an interface for reading and writing sectors - SectorStore interface { - ReadSector(types.Hash256) ([rhp4.SectorSize]byte, error) - // StoreSector stores a sector and returns its root hash. - StoreSector(root types.Hash256, data *[rhp4.SectorSize]byte, expiration uint64) error - } - - // A RevisionState pairs a contract revision with its sector roots - RevisionState struct { - Revision types.V2FileContract - Roots []types.Hash256 - } - - // Contractor is an interface for managing a host's contracts - Contractor interface { - // LockV2Contract locks a contract and returns its current state. - // The returned function must be called to release the lock. - LockV2Contract(types.FileContractID) (RevisionState, func(), error) - // AddV2Contract adds a new contract to the host - AddV2Contract(TransactionSet, Usage) error - // RenewV2Contract finalizes an existing contract and adds its renewal - RenewV2Contract(TransactionSet, Usage) error - // ReviseV2Contract atomically revises a contract and updates its sector - // roots and usage - ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error - // ContractElement returns the contract state element for the given - // contract ID - ContractElement(types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error) - - AccountBalance(rhp4.Account) (types.Currency, error) - CreditAccountsWithContract([]rhp4.AccountDeposit, types.FileContractID, types.V2FileContract) ([]types.Currency, error) - DebitAccount(rhp4.Account, types.Currency) error - } - - // SettingsReporter reports the host's current settings - SettingsReporter interface { - RHP4Settings() rhp4.HostSettings - } - - // A Server handles incoming RHP4 RPC - Server struct { - hostKey types.PrivateKey - priceTableValidity time.Duration - contractProofWindowBuffer uint64 - - log *zap.Logger - - chain ChainManager - syncer Syncer - wallet Wallet - sectors SectorStore - contractor Contractor - settings SettingsReporter - } -) - -func (s *Server) lockContractForRevision(contractID types.FileContractID) (RevisionState, func(), error) { - rev, unlock, err := s.contractor.LockV2Contract(contractID) - switch { - case err != nil: - return RevisionState{}, nil, err - case rev.Revision.ProofHeight-s.contractProofWindowBuffer <= s.chain.Tip().Height: - unlock() - return RevisionState{}, nil, errorBadRequest("contract too close to proof window") - case rev.Revision.RevisionNumber >= types.MaxRevisionNumber: - unlock() - return RevisionState{}, nil, errorBadRequest("contract is locked for revision") - } - return rev, unlock, nil -} - -func (s *Server) handleHostStream(stream net.Conn, log *zap.Logger) { - defer stream.Close() - - stream.SetDeadline(time.Now().Add(30 * time.Second)) // set an initial timeout - rpcStart := time.Now() - id, err := rhp4.ReadID(stream) - if err != nil { - log.Debug("failed to read RPC ID", zap.Error(err)) - return - } - - switch id { - case rhp4.RPCSettingsID: - err = s.handleRPCSettings(stream) - case rhp4.RPCAccountBalanceID: - err = s.handleRPCAccountBalance(stream) - case rhp4.RPCFormContractID: - err = s.handleRPCFormContract(stream) - case rhp4.RPCFundAccountsID: - err = s.handleRPCFundAccounts(stream) - case rhp4.RPCLatestRevisionID: - err = s.handleRPCLatestRevision(stream) - case rhp4.RPCModifySectorsID: - err = s.handleRPCModifySectors(stream) - case rhp4.RPCReadSectorID: - err = s.handleRPCReadSector(stream) - case rhp4.RPCRenewContractID: - err = s.handleRPCRenewContract(stream) - case rhp4.RPCSectorRootsID: - err = s.handleRPCSectorRoots(stream) - case rhp4.RPCWriteSectorID: - err = s.handleRPCWriteSector(stream) - default: - log.Debug("unrecognized RPC", zap.Stringer("rpc", id)) - rhp4.WriteResponse(stream, &rhp4.RPCError{Code: rhp4.ErrorCodeBadRequest, Description: "unrecognized RPC"}) - return - } - if err != nil { - var re *rhp4.RPCError - if ok := errors.As(err, &re); ok { - rhp4.WriteResponse(stream, re) - log.Debug("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) - } else { - rhp4.WriteResponse(stream, rhp4.ErrHostInternalError.(*rhp4.RPCError)) - log.Error("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) - } - return - } - log.Info("RPC success", zap.Duration("elapsed", time.Since(rpcStart))) -} - -// HostKey returns the host's private key -func (s *Server) HostKey() types.PrivateKey { - return s.hostKey -} - -// Serve accepts incoming streams on the provided multiplexer and handles them -func (s *Server) Serve(t Transport, log *zap.Logger) error { - defer t.Close() - - for { - stream, err := t.AcceptStream() - if errors.Is(err, net.ErrClosed) { - return nil - } else if err != nil { - return fmt.Errorf("failed to accept connection: %w", err) - } - log := log.With(zap.String("streamID", hex.EncodeToString(frand.Bytes(4)))) - log.Debug("accepted stream") - go func() { - defer func() { - if err := stream.Close(); err != nil { - log.Debug("failed to close stream", zap.Error(err)) - } else { - log.Debug("closed stream") - } - }() - s.handleHostStream(stream, log) - }() - } -} - -// NewServer creates a new RHP4 server -func NewServer(pk types.PrivateKey, cm ChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings SettingsReporter, sectors SectorStore, opts ...ServerOption) *Server { - s := &Server{ - hostKey: pk, - priceTableValidity: 30 * time.Minute, - contractProofWindowBuffer: 10, - - log: zap.NewNop(), - - chain: cm, - syncer: syncer, - wallet: wallet, - sectors: sectors, - contractor: contracts, - settings: settings, - } - for _, opt := range opts { - opt(s) - } - return s -} diff --git a/rhp/v4/host/options.go b/rhp/v4/options.go similarity index 62% rename from rhp/v4/host/options.go rename to rhp/v4/options.go index afc011b..7b90e0d 100644 --- a/rhp/v4/host/options.go +++ b/rhp/v4/options.go @@ -1,4 +1,4 @@ -package host +package rhp import ( "time" @@ -6,18 +6,18 @@ import ( "go.uber.org/zap" ) -// A ServerOption sets an option on a Server. -type ServerOption func(*Server) +// An Option sets an option on a Server. +type Option func(*Server) // WithLog sets the logger for the server. -func WithLog(log *zap.Logger) ServerOption { +func WithLog(log *zap.Logger) Option { return func(s *Server) { s.log = log } } // WithPriceTableValidity sets the duration for which a price table is valid. -func WithPriceTableValidity(validity time.Duration) ServerOption { +func WithPriceTableValidity(validity time.Duration) Option { return func(s *Server) { s.priceTableValidity = validity } @@ -25,7 +25,7 @@ func WithPriceTableValidity(validity time.Duration) ServerOption { // WithContractProofWindowBuffer sets the buffer for revising a contract before // its proof window starts. -func WithContractProofWindowBuffer(buffer uint64) ServerOption { +func WithContractProofWindowBuffer(buffer uint64) Option { return func(s *Server) { s.contractProofWindowBuffer = buffer } diff --git a/rhp/v4/rpc.go b/rhp/v4/rpc.go index a52271c..e77580d 100644 --- a/rhp/v4/rpc.go +++ b/rhp/v4/rpc.go @@ -11,8 +11,8 @@ import ( ) type ( - // A Transport is a generic multiplexer for incoming streams. - Transport interface { + // A TransportClient is a generic multiplexer for outgoing streams. + TransportClient interface { DialStream() net.Conn Close() error } @@ -50,7 +50,7 @@ type ContractRevision struct { } // RPCSettings returns the current settings of the host -func RPCSettings(t Transport) (rhp4.HostSettings, error) { +func RPCSettings(t TransportClient) (rhp4.HostSettings, error) { s := t.DialStream() defer s.Close() @@ -66,7 +66,7 @@ func RPCSettings(t Transport) (rhp4.HostSettings, error) { } // RPCReadSector reads a sector from the host -func RPCReadSector(t Transport, prices rhp4.HostPrices, token rhp4.AccountToken, root types.Hash256, offset, length uint64) ([]byte, error) { +func RPCReadSector(t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, root types.Hash256, offset, length uint64) ([]byte, error) { if offset+length > rhp4.SectorSize { return nil, fmt.Errorf("read exceeds sector bounds") } @@ -93,7 +93,7 @@ func RPCReadSector(t Transport, prices rhp4.HostPrices, token rhp4.AccountToken, } // RPCWriteSector writes a sector to the host -func RPCWriteSector(t Transport, prices rhp4.HostPrices, token rhp4.AccountToken, data []byte, duration uint64) (types.Hash256, error) { +func RPCWriteSector(t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, data []byte, duration uint64) (types.Hash256, error) { if len(data) > rhp4.SectorSize { return types.Hash256{}, fmt.Errorf("sector must be less than %d bytes", rhp4.SectorSize) } else if len(data)%rhp4.LeafSize != 0 { @@ -128,7 +128,7 @@ func RPCWriteSector(t Transport, prices rhp4.HostPrices, token rhp4.AccountToken } // RPCModifySectors modifies sectors on the host -func RPCModifySectors(t Transport, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract ContractRevision, actions []rhp4.WriteAction) (types.V2FileContract, error) { +func RPCModifySectors(t TransportClient, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract ContractRevision, actions []rhp4.WriteAction) (types.V2FileContract, error) { s := t.DialStream() defer s.Close() @@ -176,7 +176,7 @@ func RPCModifySectors(t Transport, cs consensus.State, prices rhp4.HostPrices, s } // RPCFundAccounts funds accounts on the host -func RPCFundAccounts(t Transport, cs consensus.State, sk types.PrivateKey, contract ContractRevision, deposits []rhp4.AccountDeposit) (types.V2FileContract, []types.Currency, error) { +func RPCFundAccounts(t TransportClient, cs consensus.State, sk types.PrivateKey, contract ContractRevision, deposits []rhp4.AccountDeposit) (types.V2FileContract, []types.Currency, error) { var total types.Currency for _, deposit := range deposits { total = total.Add(deposit.Amount) @@ -215,7 +215,7 @@ func RPCFundAccounts(t Transport, cs consensus.State, sk types.PrivateKey, contr } // RPCLatestRevision returns the latest revision of a contract -func RPCLatestRevision(t Transport, contractID types.FileContractID) (types.V2FileContract, error) { +func RPCLatestRevision(t TransportClient, contractID types.FileContractID) (types.V2FileContract, error) { s := t.DialStream() defer s.Close() @@ -231,7 +231,7 @@ func RPCLatestRevision(t Transport, contractID types.FileContractID) (types.V2Fi } // RPCSectorRoots returns the sector roots for a contract -func RPCSectorRoots(t Transport, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract ContractRevision, offset, length uint64) (types.V2FileContract, []types.Hash256, error) { +func RPCSectorRoots(t TransportClient, cs consensus.State, prices rhp4.HostPrices, sk types.PrivateKey, contract ContractRevision, offset, length uint64) (types.V2FileContract, []types.Hash256, error) { revision, err := rhp4.ReviseForSectorRoots(contract.Revision, prices, length) if err != nil { return types.V2FileContract{}, nil, fmt.Errorf("failed to revise contract: %w", err) @@ -269,7 +269,7 @@ func RPCSectorRoots(t Transport, cs consensus.State, prices rhp4.HostPrices, sk } // RPCAccountBalance returns the balance of an account -func RPCAccountBalance(t Transport, account rhp4.Account) (types.Currency, error) { +func RPCAccountBalance(t TransportClient, account rhp4.Account) (types.Currency, error) { s := t.DialStream() defer s.Close() @@ -285,7 +285,7 @@ func RPCAccountBalance(t Transport, account rhp4.Account) (types.Currency, error } // RPCFormContract forms a contract with a host -func RPCFormContract(t Transport, cm ChainManager, signer FundAndSign, prices rhp4.HostPrices, fc types.V2FileContract) (ContractRevision, TransactionSet, error) { +func RPCFormContract(t TransportClient, cm ChainManager, signer FundAndSign, prices rhp4.HostPrices, fc types.V2FileContract) (ContractRevision, TransactionSet, error) { formationTxn := types.V2Transaction{ MinerFee: types.Siacoins(1), FileContracts: []types.V2FileContract{fc}, @@ -393,7 +393,7 @@ func RPCFormContract(t Transport, cm ChainManager, signer FundAndSign, prices rh } // RPCRenewContract renews a contract with a host -func RPCRenewContract(t Transport, cm ChainManager, signer FundAndSign, prices rhp4.HostPrices, sk types.PrivateKey, contractID types.FileContractID, existing types.V2FileContract, renewal types.V2FileContractRenewal) (ContractRevision, TransactionSet, error) { +func RPCRenewContract(t TransportClient, cm ChainManager, signer FundAndSign, prices rhp4.HostPrices, sk types.PrivateKey, contractID types.FileContractID, existing types.V2FileContract, renewal types.V2FileContractRenewal) (ContractRevision, TransactionSet, error) { renewalTxn := types.V2Transaction{ MinerFee: types.Siacoins(1), FileContractResolutions: []types.V2FileContractResolution{ diff --git a/rhp/v4/rpc_test.go b/rhp/v4/rpc_test.go index 1005d98..aca1c25 100644 --- a/rhp/v4/rpc_test.go +++ b/rhp/v4/rpc_test.go @@ -15,7 +15,6 @@ import ( "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" rhp4 "go.sia.tech/coreutils/rhp/v4" - "go.sia.tech/coreutils/rhp/v4/host" "go.sia.tech/coreutils/syncer" "go.sia.tech/coreutils/testutil" "go.sia.tech/coreutils/wallet" @@ -47,8 +46,8 @@ func (fs *fundAndSign) SignV2Inputs(txn *types.V2Transaction, toSign []int) { fs.w.SignV2Inputs(txn, toSign) } -func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm host.ChainManager, s host.Syncer, w host.Wallet, c host.Contractor, sr host.SettingsReporter, ss host.SectorStore, log *zap.Logger) rhp4.Transport { - rs := host.NewServer(hostKey, cm, s, c, w, sr, ss, host.WithContractProofWindowBuffer(10), host.WithPriceTableValidity(2*time.Minute), host.WithLog(log.Named("rhp4"))) +func testRenterHostPair(tb testing.TB, hostKey types.PrivateKey, cm rhp4.ServerChainManager, s rhp4.Syncer, w rhp4.Wallet, c rhp4.Contractor, sr rhp4.SettingsReporter, ss rhp4.SectorStore, log *zap.Logger) rhp4.TransportClient { + rs := rhp4.NewServer(hostKey, cm, s, c, w, sr, ss, rhp4.WithContractProofWindowBuffer(10), rhp4.WithPriceTableValidity(2*time.Minute), rhp4.WithLog(log.Named("rhp4"))) hostAddr := testutil.ServeSiaMux(tb, rs, log.Named("siamux")) conn, err := net.Dial("tcp", hostAddr) diff --git a/rhp/v4/host/rpc.go b/rhp/v4/server.go similarity index 77% rename from rhp/v4/host/rpc.go rename to rhp/v4/server.go index 6fc1e99..42dc980 100644 --- a/rhp/v4/host/rpc.go +++ b/rhp/v4/server.go @@ -1,26 +1,158 @@ -package host +package rhp import ( + "encoding/hex" "errors" "fmt" "io" "net" "time" + "go.sia.tech/core/consensus" rhp4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" "go.sia.tech/coreutils/wallet" "go.uber.org/zap" + "lukechampine.com/frand" ) -const maxBasisDiff = 18 +const maxBasisDiff = 20 -func errorBadRequest(f string, p ...any) error { - return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, fmt.Sprintf(f, p...)) -} +var protocolVersion = [3]byte{0, 0, 1} -func errorDecodingError(f string, p ...any) error { - return rhp4.NewRPCError(rhp4.ErrorCodeDecoding, fmt.Sprintf(f, p...)) +type ( + // Usage contains the revenue and risked collateral for a contract. + Usage struct { + RPCRevenue types.Currency `json:"rpc"` + StorageRevenue types.Currency `json:"storage"` + EgressRevenue types.Currency `json:"egress"` + IngressRevenue types.Currency `json:"ingress"` + AccountFunding types.Currency `json:"accountFunding"` + RiskedCollateral types.Currency `json:"riskedCollateral"` + } +) + +type ( + // A TransportListener is a generic multiplexer for incoming streams. + TransportListener interface { + AcceptStream() (net.Conn, error) + Close() error + } + + // ServerChainManager defines the interface required by the contract manager to + // interact with the consensus set. + ServerChainManager interface { + Tip() types.ChainIndex + TipState() consensus.State + + // V2TransactionSet returns the full transaction set and basis necessary for + // broadcasting a transaction. If the provided basis does not match the current + // tip, the transaction will be updated. The transaction set includes the parents + // and the transaction itself in an order valid for broadcasting. + V2TransactionSet(basis types.ChainIndex, txn types.V2Transaction) (types.ChainIndex, []types.V2Transaction, error) + // AddV2PoolTransactions validates a transaction set and adds it to the + // transaction pool. + AddV2PoolTransactions(types.ChainIndex, []types.V2Transaction) (known bool, err error) + // RecommendedFee returns the recommended fee per weight + RecommendedFee() types.Currency + // UpdatesSince returns at most max updates on the path between index and the + // Manager's current tip. + UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []chain.RevertUpdate, aus []chain.ApplyUpdate, err error) + } + + // A Syncer broadcasts transactions to its peers + Syncer interface { + // BroadcastV2TransactionSet broadcasts a transaction set to the network. + BroadcastV2TransactionSet(types.ChainIndex, []types.V2Transaction) + } + + // A Wallet manages Siacoins and funds transactions + Wallet interface { + // Address returns the host's address + Address() types.Address + + // FundTransaction funds a transaction with the specified amount of + // Siacoins. If useUnconfirmed is true, the transaction may spend + // unconfirmed outputs. The outputs spent by the transaction are locked + // until they are released by ReleaseInputs. + FundV2Transaction(txn *types.V2Transaction, amount types.Currency, useUnconfirmed bool) (types.ChainIndex, []int, error) + // SignV2Inputs signs the inputs of a transaction. + SignV2Inputs(txn *types.V2Transaction, toSign []int) + // ReleaseInputs releases the inputs of a transaction. It should only + // be used if the transaction is not going to be broadcast + ReleaseInputs(txns []types.Transaction, v2txns []types.V2Transaction) + } + + // A SectorStore is an interface for reading and writing sectors + SectorStore interface { + ReadSector(types.Hash256) ([rhp4.SectorSize]byte, error) + // StoreSector stores a sector and returns its root hash. + StoreSector(root types.Hash256, data *[rhp4.SectorSize]byte, expiration uint64) error + } + + // A RevisionState pairs a contract revision with its sector roots + RevisionState struct { + Revision types.V2FileContract + Roots []types.Hash256 + } + + // Contractor is an interface for managing a host's contracts + Contractor interface { + // LockV2Contract locks a contract and returns its current state. + // The returned function must be called to release the lock. + LockV2Contract(types.FileContractID) (RevisionState, func(), error) + // AddV2Contract adds a new contract to the host + AddV2Contract(TransactionSet, Usage) error + // RenewV2Contract finalizes an existing contract and adds its renewal + RenewV2Contract(TransactionSet, Usage) error + // ReviseV2Contract atomically revises a contract and updates its sector + // roots and usage + ReviseV2Contract(contractID types.FileContractID, revision types.V2FileContract, roots []types.Hash256, usage Usage) error + // ContractElement returns the contract state element for the given + // contract ID + ContractElement(types.FileContractID) (types.ChainIndex, types.V2FileContractElement, error) + + AccountBalance(rhp4.Account) (types.Currency, error) + CreditAccountsWithContract([]rhp4.AccountDeposit, types.FileContractID, types.V2FileContract) ([]types.Currency, error) + DebitAccount(rhp4.Account, types.Currency) error + } + + // SettingsReporter reports the host's current settings + SettingsReporter interface { + RHP4Settings() rhp4.HostSettings + } + + // A Server handles incoming RHP4 RPC + Server struct { + hostKey types.PrivateKey + priceTableValidity time.Duration + contractProofWindowBuffer uint64 + + log *zap.Logger + + chain ServerChainManager + syncer Syncer + wallet Wallet + sectors SectorStore + contractor Contractor + settings SettingsReporter + } +) + +func (s *Server) lockContractForRevision(contractID types.FileContractID) (RevisionState, func(), error) { + rev, unlock, err := s.contractor.LockV2Contract(contractID) + switch { + case err != nil: + return RevisionState{}, nil, err + case rev.Revision.ProofHeight-s.contractProofWindowBuffer <= s.chain.Tip().Height: + unlock() + return RevisionState{}, nil, errorBadRequest("contract too close to proof window") + case rev.Revision.RevisionNumber >= types.MaxRevisionNumber: + unlock() + return RevisionState{}, nil, errorBadRequest("contract is locked for revision") + } + return rev, unlock, nil } func (s *Server) handleRPCSettings(stream net.Conn) error { @@ -496,8 +628,8 @@ func (s *Server) handleRPCFormContract(stream net.Conn) error { // add the contract to the contractor err = s.contractor.AddV2Contract(TransactionSet{ - TransactionSet: formationSet, - Basis: basis, + Transactions: formationSet, + Basis: basis, }, Usage{ RPCRevenue: formationRevenue, }) @@ -734,34 +866,28 @@ func (s *Server) handleRPCRenewContract(stream net.Conn) error { return errorBadRequest("failed to add formation parents to transaction pool: %v", err) } } - // add our setup parents to the transaction pool + + // add the setup parents to the transaction pool if len(renewalParents) > 0 { if _, err := s.chain.AddV2PoolTransactions(basis, renewalParents); err != nil { return errorBadRequest("failed to add setup parents to transaction pool: %v", err) } } - // get the full updated transaction set for the setup transaction - basis, setupSet, err := s.chain.V2TransactionSet(basis, renewalTxn) - if err != nil { - return fmt.Errorf("failed to get transaction set: %w", err) - } else if _, err = s.chain.AddV2PoolTransactions(basis, setupSet); err != nil { - return errorBadRequest("failed to broadcast setup transaction: %v", err) - } - // get the full updated transaction set for the renewal transaction basis, renewalSet, err := s.chain.V2TransactionSet(basis, renewalTxn) if err != nil { return fmt.Errorf("failed to get transaction set: %w", err) } else if _, err = s.chain.AddV2PoolTransactions(basis, renewalSet); err != nil { - return errorBadRequest("failed to broadcast formation transaction: %v", err) + return errorBadRequest("failed to broadcast setup transaction: %v", err) } + // broadcast the transaction set s.syncer.BroadcastV2TransactionSet(basis, renewalSet) // add the contract to the contractor err = s.contractor.RenewV2Contract(TransactionSet{ - TransactionSet: renewalSet, - Basis: basis, + Transactions: renewalSet, + Basis: basis, }, usage) if err != nil { return fmt.Errorf("failed to add contract: %w", err) @@ -774,7 +900,7 @@ func (s *Server) handleRPCRenewContract(stream net.Conn) error { }) } -func updateStateElementBasis(cm ChainManager, base, target types.ChainIndex, element *types.StateElement) error { +func updateStateElementBasis(cm ServerChainManager, base, target types.ChainIndex, element *types.StateElement) error { reverted, applied, err := cm.UpdatesSince(base, 144) if err != nil { return err @@ -821,7 +947,7 @@ func updateStateElementBasis(cm ChainManager, base, target types.ChainIndex, ele // updateSiacoinElementBasis is a helper to update a transaction's siacoin elements // to the target basis. If an error is returned, inputs must be considered invalid. -func updateSiacoinElementBasis(cm ChainManager, base, target types.ChainIndex, inputs []types.V2SiacoinInput) error { +func updateSiacoinElementBasis(cm ServerChainManager, base, target types.ChainIndex, inputs []types.V2SiacoinInput) error { reverted, applied, err := cm.UpdatesSince(base, 144) if err != nil { return err @@ -897,3 +1023,117 @@ func updateSiacoinElementBasis(cm ChainManager, base, target types.ChainIndex, i } return nil } + +func (s *Server) handleHostStream(stream net.Conn, log *zap.Logger) { + defer stream.Close() + + stream.SetDeadline(time.Now().Add(30 * time.Second)) // set an initial timeout + rpcStart := time.Now() + id, err := rhp4.ReadID(stream) + if err != nil { + log.Debug("failed to read RPC ID", zap.Error(err)) + return + } + + switch id { + case rhp4.RPCSettingsID: + err = s.handleRPCSettings(stream) + case rhp4.RPCAccountBalanceID: + err = s.handleRPCAccountBalance(stream) + case rhp4.RPCFormContractID: + err = s.handleRPCFormContract(stream) + case rhp4.RPCFundAccountsID: + err = s.handleRPCFundAccounts(stream) + case rhp4.RPCLatestRevisionID: + err = s.handleRPCLatestRevision(stream) + case rhp4.RPCModifySectorsID: + err = s.handleRPCModifySectors(stream) + case rhp4.RPCReadSectorID: + err = s.handleRPCReadSector(stream) + case rhp4.RPCRenewContractID: + err = s.handleRPCRenewContract(stream) + case rhp4.RPCSectorRootsID: + err = s.handleRPCSectorRoots(stream) + case rhp4.RPCWriteSectorID: + err = s.handleRPCWriteSector(stream) + default: + log.Debug("unrecognized RPC", zap.Stringer("rpc", id)) + rhp4.WriteResponse(stream, &rhp4.RPCError{Code: rhp4.ErrorCodeBadRequest, Description: "unrecognized RPC"}) + return + } + if err != nil { + var re *rhp4.RPCError + if ok := errors.As(err, &re); ok { + rhp4.WriteResponse(stream, re) + log.Debug("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) + } else { + rhp4.WriteResponse(stream, rhp4.ErrHostInternalError.(*rhp4.RPCError)) + log.Error("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) + } + return + } + log.Info("RPC success", zap.Duration("elapsed", time.Since(rpcStart))) +} + +// HostKey returns the host's private key +func (s *Server) HostKey() types.PrivateKey { + return s.hostKey +} + +// Serve accepts incoming streams on the provided multiplexer and handles them +func (s *Server) Serve(t TransportListener, log *zap.Logger) error { + defer t.Close() + + for { + stream, err := t.AcceptStream() + if errors.Is(err, net.ErrClosed) { + return nil + } else if err != nil { + return fmt.Errorf("failed to accept connection: %w", err) + } + log := log.With(zap.String("streamID", hex.EncodeToString(frand.Bytes(4)))) + log.Debug("accepted stream") + go func() { + defer func() { + if err := stream.Close(); err != nil { + log.Debug("failed to close stream", zap.Error(err)) + } else { + log.Debug("closed stream") + } + }() + s.handleHostStream(stream, log) + }() + } +} + +// errorBadRequest is a helper to create an rpc BadRequest error +func errorBadRequest(f string, p ...any) error { + return rhp4.NewRPCError(rhp4.ErrorCodeBadRequest, fmt.Sprintf(f, p...)) +} + +// errorDecodingError is a helper to create an rpc Decoding error +func errorDecodingError(f string, p ...any) error { + return rhp4.NewRPCError(rhp4.ErrorCodeDecoding, fmt.Sprintf(f, p...)) +} + +// NewServer creates a new RHP4 server +func NewServer(pk types.PrivateKey, cm ServerChainManager, syncer Syncer, contracts Contractor, wallet Wallet, settings SettingsReporter, sectors SectorStore, opts ...Option) *Server { + s := &Server{ + hostKey: pk, + priceTableValidity: 30 * time.Minute, + contractProofWindowBuffer: 10, + + log: zap.NewNop(), + + chain: cm, + syncer: syncer, + wallet: wallet, + sectors: sectors, + contractor: contracts, + settings: settings, + } + for _, opt := range opts { + opt(s) + } + return s +} diff --git a/testutil/host.go b/testutil/host.go index db7cddc..4d47fd5 100644 --- a/testutil/host.go +++ b/testutil/host.go @@ -10,7 +10,7 @@ import ( proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" - rhp4 "go.sia.tech/coreutils/rhp/v4/host" + rhp4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/mux" "go.uber.org/zap" ) @@ -102,10 +102,10 @@ func (ec *EphemeralContractor) AddV2Contract(formationSet rhp4.TransactionSet, _ ec.mu.Lock() defer ec.mu.Unlock() - if len(formationSet.TransactionSet) == 0 { + if len(formationSet.Transactions) == 0 { return errors.New("expected at least one transaction") } - formationTxn := formationSet.TransactionSet[len(formationSet.TransactionSet)-1] + formationTxn := formationSet.Transactions[len(formationSet.Transactions)-1] if len(formationTxn.FileContracts) != 1 { return errors.New("expected exactly one contract") } @@ -126,10 +126,10 @@ func (ec *EphemeralContractor) RenewV2Contract(renewalSet rhp4.TransactionSet, _ ec.mu.Lock() defer ec.mu.Unlock() - if len(renewalSet.TransactionSet) == 0 { + if len(renewalSet.Transactions) == 0 { return errors.New("expected at least one transaction") } - renewalTxn := renewalSet.TransactionSet[len(renewalSet.TransactionSet)-1] + renewalTxn := renewalSet.Transactions[len(renewalSet.Transactions)-1] if len(renewalTxn.FileContractResolutions) != 1 { return errors.New("expected exactly one resolution") }