Skip to content

Commit

Permalink
feat(rhp4): Add RHP4 listener
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Dec 10, 2024
1 parent 7c59837 commit 820302a
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 40 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type (
SetReadOnly(id int64, readOnly bool) error
RemoveSector(root types.Hash256) error
ResizeCache(size uint32)
Read(types.Hash256) (*[rhp2.SectorSize]byte, error)
ReadSector(types.Hash256) (*[rhp2.SectorSize]byte, error)

// SectorReferences returns the references to a sector
SectorReferences(root types.Hash256) (storage.SectorReference, error)
Expand Down
2 changes: 1 addition & 1 deletion api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (a *api) handleGETVerifySector(jc jape.Context) {
}

// try to read the sector data and verify the root
data, err := a.volumes.Read(root)
data, err := a.volumes.ReadSector(root)
if err != nil {
resp.Error = err.Error()
} else if calc := rhp2.SectorRoot(data); calc != root {
Expand Down
8 changes: 8 additions & 0 deletions cmd/hostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ var (
RHP3: config.RHP3{
TCPAddress: ":9983",
},
RHP4: config.RHP4{
ListenAddresses: []config.RHP4ListenAddress{
{
Protocol: "tcp",
Address: ":9984",
},
},
},
Log: config.Log{
Path: os.Getenv(logFileEnvVar), // deprecated. included for compatibility.
Level: "info",
Expand Down
25 changes: 25 additions & 0 deletions cmd/hostd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/hostd/alerts"
Expand Down Expand Up @@ -333,6 +334,30 @@ func runRootCmd(ctx context.Context, cfg config.Config, walletKey types.PrivateK
go rhp3.Serve()
defer rhp3.Close()

rhp4 := rhp4.NewServer(hostKey, cm, s, contractManager, wm, sm, vm, rhp4.WithPriceTableValidity(30*time.Minute), rhp4.WithContractProofWindowBuffer(72))

var stopListenerFuncs []func() error
defer func() {
for _, f := range stopListenerFuncs {
if err := f(); err != nil {
log.Error("failed to stop listener", zap.Error(err))
}
}
}()
for _, addr := range cfg.RHP4.ListenAddresses {
switch addr.Protocol {
case "tcp", "tcp4", "tcp6":
l, err := rhp.Listen(addr.Protocol, addr.Address, rhp.WithDataMonitor(dr), rhp.WithReadLimit(rl), rhp.WithWriteLimit(wl))
if err != nil {
return fmt.Errorf("failed to listen on rhp4 addr: %w", err)
}
stopListenerFuncs = append(stopListenerFuncs, l.Close)
go rhp.ServeRHP4SiaMux(l, rhp4, log.Named("rhp4"))
default:
return fmt.Errorf("unsupported protocol: %s", addr.Protocol)
}
}

apiOpts := []api.ServerOption{
api.WithAlerts(am),
api.WithLogger(log.Named("api")),
Expand Down
22 changes: 17 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ type (
IndexBatchSize int `yaml:"indexBatchSize,omitempty"`
}

// RHP2 contains the configuration for the RHP2 server.
RHP2 struct {
Address string `yaml:"address,omitempty"`
}

// ExplorerData contains the configuration for using an external explorer.
ExplorerData struct {
Disable bool `yaml:"disable,omitempty"`
URL string `yaml:"url,omitempty"`
}

// RHP2 contains the configuration for the RHP2 server.
RHP2 struct {
Address string `yaml:"address,omitempty"`
}

// RHP3 contains the configuration for the RHP3 server.
RHP3 struct {
TCPAddress string `yaml:"tcp,omitempty"`
}

// RHP4ListenAddress contains the configuration for an RHP4 listen address.
RHP4ListenAddress struct {
Protocol string `yaml:"protocol,omitempty"`
Address string `yaml:"address,omitempty"`
}

// RHP4 contains the configuration for the RHP4 server.
RHP4 struct {
ListenAddresses []RHP4ListenAddress `yaml:"listenAddresses,omitempty"`
}

// LogFile configures the file output of the logger.
LogFile struct {
Enabled bool `yaml:"enabled,omitempty"`
Expand Down Expand Up @@ -77,6 +88,7 @@ type (
Explorer ExplorerData `yaml:"explorer,omitempty"`
RHP2 RHP2 `yaml:"rhp2,omitempty"`
RHP3 RHP3 `yaml:"rhp3,omitempty"`
RHP4 RHP4 `yaml:"rhp4,omitempty"`
Log Log `yaml:"log,omitempty"`
}
)
4 changes: 2 additions & 2 deletions host/contracts/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func (cm *Manager) AccountBalance(account proto4.Account) (types.Currency, error

// CreditAccountsWithContract atomically revises a contract and credits the accounts
// returning the new balance of each account.
func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) ([]types.Currency, error) {
return cm.store.RHP4CreditAccounts(deposits, contractID, revision)
func (cm *Manager) CreditAccountsWithContract(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) ([]types.Currency, error) {
return cm.store.RHP4CreditAccounts(deposits, contractID, revision, usage)
}

// DebitAccount debits an account.
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cm *Manager) CheckIntegrity(ctx context.Context, contractID types.FileCont
default:
}
// read each sector from disk and verify its Merkle root
sector, err := cm.storage.Read(root)
sector, err := cm.storage.ReadSector(root)
if err != nil { // sector read failed
log.Error("missing sector", zap.String("root", root.String()), zap.Error(err))
missing++
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (
// A StorageManager stores and retrieves sectors.
StorageManager interface {
// Read reads a sector from the store
Read(root types.Hash256) (*[rhp2.SectorSize]byte, error)
ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error)
}

// Alerts registers and dismisses global alerts.
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type (
// RHP4AccountBalance returns the balance of an account.
RHP4AccountBalance(proto4.Account) (types.Currency, error)
// RHP4CreditAccounts atomically revises a contract and credits the accounts
RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract) (balances []types.Currency, err error)
RHP4CreditAccounts([]proto4.AccountDeposit, types.FileContractID, types.V2FileContract, proto4.Usage) (balances []types.Currency, err error)
// RHP4DebitAccount debits an account.
RHP4DebitAccount(proto4.Account, proto4.Usage) error
}
Expand Down
4 changes: 2 additions & 2 deletions host/contracts/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (cm *Manager) buildStorageProof(revision types.FileContractRevision, index
}

sectorRoot := roots[sectorIndex]
sector, err := cm.storage.Read(sectorRoot)
sector, err := cm.storage.ReadSector(sectorRoot)
if err != nil {
log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot))
return types.StorageProof{}, fmt.Errorf("failed to read sector data")
Expand Down Expand Up @@ -158,7 +158,7 @@ func (cm *Manager) buildV2StorageProof(cs consensus.State, fce types.V2FileContr
}

sectorRoot := roots[sectorIndex]
sector, err := cm.storage.Read(sectorRoot)
sector, err := cm.storage.ReadSector(sectorRoot)
if err != nil {
log.Error("failed to read sector data", zap.Error(err), zap.Stringer("sectorRoot", sectorRoot))
return types.V2StorageProof{}, fmt.Errorf("failed to read sector data")
Expand Down
4 changes: 2 additions & 2 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ func (vm *VolumeManager) readLocation(loc SectorLocation) (*[proto2.SectorSize]b
return sector, nil
}

// Read reads the sector with the given root
func (vm *VolumeManager) Read(root types.Hash256) (*[proto2.SectorSize]byte, error) {
// ReadSector reads the sector with the given root from disk
func (vm *VolumeManager) ReadSector(root types.Hash256) (*[proto2.SectorSize]byte, error) {
done, err := vm.tg.Add()
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestVolumeLoad(t *testing.T) {
}

// check that the sector is still there
sector2, err := vm.Read(root)
sector2, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if *sector2 != sector {
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestRemoveVolume(t *testing.T) {

checkRoots := func(roots []types.Hash256) error {
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
return fmt.Errorf("failed to read sector: %w", err)
} else if rhp2.SectorRoot(sector) != root {
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestVolumeConcurrency(t *testing.T) {

// read the sectors back
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if rhp2.SectorRoot(sector) != root {
Expand All @@ -646,7 +646,7 @@ func TestVolumeConcurrency(t *testing.T) {

// read the sectors back
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
} else if rhp2.SectorRoot(sector) != root {
Expand Down Expand Up @@ -1003,7 +1003,7 @@ func TestVolumeManagerReadWrite(t *testing.T) {
// read the sectors back
frand.Shuffle(len(roots), func(i, j int) { roots[i], roots[j] = roots[j], roots[i] })
for _, root := range roots {
sector, err := vm.Read(root)
sector, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func TestSectorCache(t *testing.T) {

// read the last 5 sectors all sectors should be cached
for i, root := range roots[5:] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1108,7 +1108,7 @@ func TestSectorCache(t *testing.T) {

// read the first 5 sectors all sectors should be missed
for i, root := range roots[:5] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1123,7 +1123,7 @@ func TestSectorCache(t *testing.T) {

// read the first 5 sectors again all sectors should be cached
for i, root := range roots[:5] {
_, err := vm.Read(root)
_, err := vm.ReadSector(root)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func BenchmarkVolumeManagerRead(b *testing.B) {
b.SetBytes(rhp2.SectorSize)
// read the sectors back
for _, root := range written {
if _, err := vm.Read(root); err != nil {
if _, err := vm.ReadSector(root); err != nil {
b.Fatal(err)
}
}
Expand Down
4 changes: 1 addition & 3 deletions persist/sqlite/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *Store) RHP4DebitAccount(account proto4.Account, usage proto4.Usage) err

// RHP4CreditAccounts credits the accounts with the given deposits and revises
// the contract.
func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract) (balances []types.Currency, err error) {
func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID types.FileContractID, revision types.V2FileContract, usage proto4.Usage) (balances []types.Currency, err error) {
err = s.transaction(func(tx *txn) error {
getBalanceStmt, err := tx.Prepare(`SELECT balance FROM accounts WHERE account_id=$1`)
if err != nil {
Expand Down Expand Up @@ -92,7 +92,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID
return fmt.Errorf("failed to get contract ID: %w", err)
}

var usage proto4.Usage
var createdAccounts int
for _, deposit := range deposits {
var balance types.Currency
Expand Down Expand Up @@ -120,7 +119,6 @@ func (s *Store) RHP4CreditAccounts(deposits []proto4.AccountDeposit, contractID
if _, err := updateFundingAmountStmt.Exec(contractDBID, accountDBID, encode(fundAmount)); err != nil {
return fmt.Errorf("failed to update funding amount: %w", err)
}
usage.AccountFunding = usage.AccountFunding.Add(deposit.Amount)
}

_, err = reviseV2Contract(tx, contractID, revision, usage)
Expand Down
6 changes: 3 additions & 3 deletions persist/sqlite/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestRHP4Accounts(t *testing.T) {
// deposit funds
balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(10)},
}, contract.ID, contract.V2FileContract)
}, contract.ID, contract.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(10)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestRHP4AccountsDistribution(t *testing.T) {

balances, err := db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(3)},
}, c1.ID, c1.V2FileContract)
}, c1.ID, c1.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand All @@ -318,7 +318,7 @@ func TestRHP4AccountsDistribution(t *testing.T) {

balances, err = db.RHP4CreditAccounts([]proto4.AccountDeposit{
{Account: account, Amount: types.Siacoins(3)},
}, c2.ID, c2.V2FileContract)
}, c2.ID, c2.V2FileContract, proto4.Usage{AccountFunding: types.Siacoins(3)})
if err != nil {
t.Fatal(err)
} else if len(balances) != 1 {
Expand Down
47 changes: 47 additions & 0 deletions rhp/siamux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rhp

import (
"crypto/ed25519"
"net"

rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/mux/v2"
"go.uber.org/zap"
)

// A muxTransport is a rhp4.Transport that wraps a mux.Mux.
type muxTransport struct {
m *mux.Mux
}

// Close implements the rhp4.Transport interface.
func (mt *muxTransport) Close() error {
return mt.m.Close()
}

// AcceptStream implements the rhp4.Transport interface.
func (mt *muxTransport) AcceptStream() (net.Conn, error) {
return mt.m.AcceptStream()
}

// ServeRHP4SiaMux serves RHP4 connections on l using the provided server and logger.
func ServeRHP4SiaMux(l net.Listener, s *rhp4.Server, log *zap.Logger) {
for {
conn, err := l.Accept()
if err != nil {
log.Error("failed to accept connection", zap.Error(err))
return
}
log := log.With(zap.String("peerAddress", conn.RemoteAddr().String()))
go func() {
defer conn.Close()

m, err := mux.Accept(conn, ed25519.PrivateKey(s.HostKey()))
if err != nil {
log.Debug("failed to accept mux connection", zap.Error(err))
} else if err := s.Serve(&muxTransport{m}, log); err != nil {
log.Debug("failed to serve connection", zap.Error(err))
}
}()
}
}
4 changes: 2 additions & 2 deletions rhp/v2/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type (
Sectors interface {
// Write writes a sector to persistent storage
Write(root types.Hash256, data *[rhp2.SectorSize]byte) error
// Read reads the sector with the given root from the manager.
Read(root types.Hash256) (*[rhp2.SectorSize]byte, error)
// ReadSector reads the sector with the given root from the manager.
ReadSector(root types.Hash256) (*[rhp2.SectorSize]byte, error)
// Sync syncs the data files of changed volumes.
Sync() error
}
Expand Down
4 changes: 2 additions & 2 deletions rhp/v2/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage
return contracts.Usage{}, err
}

sector, err := sh.sectors.Read(root)
sector, err := sh.sectors.ReadSector(root)
if err != nil {
s.t.WriteResponseErr(ErrHostInternalError)
return contracts.Usage{}, fmt.Errorf("failed to read sector %v: %w", root, err)
Expand Down Expand Up @@ -849,7 +849,7 @@ func (sh *SessionHandler) rpcRead(s *session, log *zap.Logger) (contracts.Usage,

// enter response loop
for i, sec := range req.Sections {
sector, err := sh.sectors.Read(sec.MerkleRoot)
sector, err := sh.sectors.ReadSector(sec.MerkleRoot)
if err != nil {
err := fmt.Errorf("failed to get sector: %w", err)
s.t.WriteResponseErr(err)
Expand Down
Loading

0 comments on commit 820302a

Please sign in to comment.