diff --git a/go.mod b/go.mod index b714b70..0fbb1ab 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( go.etcd.io/bbolt v1.3.8 - go.sia.tech/core v0.2.1-0.20240130145801-8067f34b2ecc + go.sia.tech/core v0.2.2-0.20240201221153-1e1b69d8f4a7 go.uber.org/zap v1.26.0 golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 lukechampine.com/frand v1.4.2 diff --git a/go.sum b/go.sum index 3d17b69..8c37ced 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= -go.sia.tech/core v0.2.1-0.20240130145801-8067f34b2ecc h1:oUCCTOatQIwYkJ2FUWRvJtgU+i/BwlzmzCxoSvmmJVQ= -go.sia.tech/core v0.2.1-0.20240130145801-8067f34b2ecc/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= +go.sia.tech/core v0.2.2-0.20240130162142-3abb350aac79 h1:R/fl6Xw8eZuMI//2cICMOlNcH+wD2RngYdpAQu9mG5Y= +go.sia.tech/core v0.2.2-0.20240130162142-3abb350aac79/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= +go.sia.tech/core v0.2.2-0.20240201221153-1e1b69d8f4a7 h1:hYWrmXmzddqL/zc3I1jHtd0/XHw+B7ufXH4nolQY5ms= +go.sia.tech/core v0.2.2-0.20240201221153-1e1b69d8f4a7/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= diff --git a/rhp/v4/client.go b/rhp/v4/client.go new file mode 100644 index 0000000..5ded8d9 --- /dev/null +++ b/rhp/v4/client.go @@ -0,0 +1,466 @@ +package rhp + +import ( + "context" + "fmt" + "net" + "slices" + "sync" + "time" + + rhpv2 "go.sia.tech/core/rhp/v2" + rhpv4 "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" +) + +var ( + defaultOptions = options{ + DialTimeout: time.Minute, + IdleTimeout: 30 * time.Second, + RPCTimeout: 5 * time.Minute, + } +) + +type ( + // An Option is used to configure the client during creation. + Option func(o *options) + + // options are used to configure the client during creation. + options struct { + DialTimeout time.Duration // timeout for dialing a new connection + IdleTimeout time.Duration // timeout for idle connections before recreating them + RPCTimeout time.Duration // timeout for RPCs + } + + // Client is a client for the v4 renter-host protocol. After successful + // creation, a client will try to maintain a connection to the host and + // recreate it if necessary. In addition to various timeouts (see Options), + // every RPC can be interrupted using a context. + Client struct { + addr string + hostKey types.PublicKey + + dialTimeout time.Duration + idleTimeout time.Duration + rpcTimeout time.Duration + + mu sync.Mutex + openStreams int + lastSuccess time.Time + t *rhpv4.Transport + } +) + +// WithDialTimeout overwrites the default dial timeout of 1 minute. +func WithDialTimeout(d time.Duration) Option { + return func(opts *options) { + opts.DialTimeout = d + } +} + +// WithIdleTimeout overwrites the default idle timeout of 30 seconds. +func WithIdleTimeout(d time.Duration) Option { + return func(opts *options) { + opts.IdleTimeout = d + } +} + +// WithRPCTimeout overwrites the default RPC timeout of 5 minutes. +func WithRPCTimeout(d time.Duration) Option { + return func(opts *options) { + opts.RPCTimeout = d + } +} + +// NewClient creates a new client for the v4 renter-host protocol. +func NewClient(ctx context.Context, addr string, hostKey types.PublicKey, opts ...Option) (*Client, error) { + o := defaultOptions + for _, opt := range opts { + opt(&o) + } + c := &Client{ + addr: addr, + hostKey: hostKey, + openStreams: 0, + lastSuccess: time.Now(), + dialTimeout: o.DialTimeout, + idleTimeout: o.IdleTimeout, + rpcTimeout: o.RPCTimeout, + } + return c, c.resetTransport(ctx) +} + +// do performs an RPC with a host in a way that allows the caller to interrupt +// it +func (c *Client) do(ctx context.Context, rpc rhpv4.RPC) error { + done := make(chan struct{}) + var doErr error + var s *rhpv4.Stream + go func() { + defer close(done) + + // defer recover + defer func() { + if r := recover(); r != nil { + doErr = fmt.Errorf("a panic occurred while executing the rpc: %v", r) + } + }() + + // reset the transport if it hasn't been used in a while + c.mu.Lock() + if c.t == nil || (c.openStreams == 0 && time.Since(c.lastSuccess) > c.idleTimeout) { + if err := c.resetTransport(ctx); err != nil { + c.mu.Unlock() + doErr = err + return + } + } + c.openStreams++ + c.mu.Unlock() + + defer func() { + c.mu.Lock() + c.openStreams-- + c.mu.Unlock() + }() + + // dial a stream with a sane deadline + var err error + s, err = c.t.DialStream() + if err != nil { + doErr = fmt.Errorf("failed to dial stream: %w", err) + return + } else if err = s.SetDeadline(time.Now().Add(c.rpcTimeout)); err != nil { + doErr = fmt.Errorf("failed to set deadline: %w", err) + return + } + defer s.Close() + + // write rpc id + if err := s.WriteID(rpc); err != nil { + doErr = fmt.Errorf("failed to write rpc id: %w", err) + return + } + + // the write succeeded, the connection is still intact + defer func() { + c.mu.Lock() + c.lastSuccess = time.Now() + c.mu.Unlock() + }() + + // perform remaining rpc + if err := s.WriteRequest(rpc); err != nil { + doErr = fmt.Errorf("failed to write rpc request: %w", err) + return + } else if err := s.ReadResponse(rpc); err != nil { + doErr = fmt.Errorf("failed to read rpc response: %w", err) + return + } + }() + select { + case <-ctx.Done(): + // Caller interrupted the RPC - optimistically set deadline to abort + // goroutine as soon as possible + s.SetDeadline(time.Now()) + return ctx.Err() + case <-done: + return doErr + } +} + +func (c *Client) resetTransport(ctx context.Context) error { + conn, err := net.DialTimeout("tcp", c.addr, c.dialTimeout) + if err != nil { + return fmt.Errorf("failed to dial tcp connection: %w", err) + } else if conn.(*net.TCPConn).SetKeepAlive(true); err != nil { + return fmt.Errorf("failed to set keepalive: %w", err) + } else if conn.SetDeadline(time.Now().Add(c.dialTimeout)); err != nil { + return fmt.Errorf("failed to set dial deadline on tcp connection: %w", err) + } else if t, err := rhpv4.Dial(conn, c.hostKey); err != nil { + return fmt.Errorf("failed to dial mux: %w", err) + } else if err := conn.SetDeadline(time.Time{}); err != nil { + return fmt.Errorf("failed to revoke deadline on tcp connection") + } else { + c.t = t + } + return nil +} + +// AuditContract probabilistically audits a contract, checking whether the host +// has missing sectors. The input specifies the number of sectors we randomly +// pick so the higher the nummer, the less likely it is that the host is missing +// sectors. Any missing sectors found are returned. +func (c *Client) AuditContract(ctx context.Context, n int) ([]interface{}, error) { + panic("implement me") +} + +// Settings returns the host's current settings, including its prices. +func (c *Client) Settings(ctx context.Context) (rhpv4.HostSettings, error) { + rpc := rhpv4.RPCSettings{} + if err := c.do(ctx, &rpc); err != nil { + return rhpv4.HostSettings{}, fmt.Errorf("RPCSettings failed: %w", err) + } + return rpc.Settings, nil +} + +// FormContract forms a new contract with the host. +func (c *Client) FormContract(ctx context.Context, hp rhpv4.HostPrices, contract types.V2FileContract, inputs []types.V2SiacoinInput) (types.V2FileContract, error) { + rpc := rhpv4.RPCFormContract{ + Prices: hp, + Contract: contract, + RenterInputs: inputs, + } + if err := c.do(ctx, &rpc); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCFormContract failed: %w", err) + } + panic("incomplete rpc - missing outputs") + // TODO: verify host signatures + // return rpc.Contract, nil +} + +// RenewContract renews a contract with the host, immediately unlocking +func (c *Client) RenewContract(ctx context.Context, hp rhpv4.HostPrices, finalRevision, initialRevision types.V2FileContract) (types.V2FileContractRenewal, error) { + panic("incomplete rpc -- missing inputs and outputs") +} + +// PinSectors pins sectors to a contract. Commonly used to pin sectors uploaded +// with 'UploadSector'. PinSectors will first overwrite the provided gaps and +// then start appending roots to the end of the contract. So if more roots than +// gaps were provided and the method returns an error, it is safe to assume all +// gaps were filled. PinSectors fails if more roots than gaps are provided since +// it sorts the gaps to find duplicates which makes it hard for the caller to +// know which gaps got filled. +func (c *Client) PinSectors(ctx context.Context, contract types.V2FileContract, hp rhpv4.HostPrices, roots []types.Hash256, gaps []uint64) (types.V2FileContract, error) { + // sanity check input - no duplicate gaps, at most one gap per root + if len(gaps) > len(roots) { + return types.V2FileContract{}, fmt.Errorf("more gaps than roots provided") + } + slices.Sort(gaps) + for i := 1; i < len(gaps); i++ { + if gaps[i] == gaps[i-1] { + return types.V2FileContract{}, fmt.Errorf("gap %v is duplicated", gaps[i]) + } + } + + actions := make([]rhpv4.WriteAction, len(roots)) + for i := range roots { + if len(gaps) > 0 { + actions[i] = rhpv4.WriteAction{} + panic("incomplete type") + // gaps = gaps[1:] + } else { + actions[i] = rhpv4.WriteAction{ + Type: rhpv4.ActionAppend, + Root: roots[i], + } + } + } + rpcModify := rhpv4.RPCModifySectors{ + Actions: actions, + } + if err := c.do(ctx, &rpcModify); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCModifySectors failed: %w", err) + } + + // TODO: verify proof & build new revision + var rev types.V2FileContract + + rpcRevise := rhpv4.RPCReviseContract{ + Prices: hp, + Revision: rev, + } + if err := c.do(ctx, &rpcRevise); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCReviseSectors failed: %w", err) + } + + // TODO: verify host signatures + return rpcRevise.Revision, nil +} + +// PruneContract prunes the sectors with the given indices from a contract. +func (c *Client) PruneContract(ctx context.Context, contract types.V2FileContract, hp rhpv4.HostPrices, nSectors uint64, sectorIndices []uint64) (types.V2FileContract, error) { + if len(sectorIndices) == 0 { + return types.V2FileContract{}, nil // nothing to do + } else if nSectors == 0 { + return types.V2FileContract{}, fmt.Errorf("trying to prune empty contract") + } + + // sanity check input - no out-of-bounds indices, no duplicates + lastIndex := nSectors - 1 + slices.Sort(sectorIndices) + if sectorIndices[len(sectorIndices)-1] > lastIndex { + return types.V2FileContract{}, fmt.Errorf("sector index %v is out of bounds for contract with %v sectors", sectorIndices[len(sectorIndices)-1], nSectors) + } + for i := 1; i < len(sectorIndices); i++ { + if sectorIndices[i] == sectorIndices[i-1] { + return types.V2FileContract{}, fmt.Errorf("sector index %v is duplicated", sectorIndices[i]) + } + } + + // swap out sectors to delete + actions := make([]rhpv4.WriteAction, len(sectorIndices)) + for i := range sectorIndices { + actions[i] = rhpv4.WriteAction{ + Type: rhpv4.ActionSwap, + A: uint64(sectorIndices[i]), + B: lastIndex, + } + lastIndex-- + } + + // trim the swapped sectors + actions = append(actions, rhpv4.WriteAction{ + N: uint64(len(actions)), + }) + + // modify sector + rpcModify := rhpv4.RPCModifySectors{ + Actions: actions, + } + if err := c.do(ctx, &rpcModify); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCModifySectors failed: %w", err) + } + + // TODO: check proof & build new revision + var rev types.V2FileContract + + // revise contract + rpcRevise := rhpv4.RPCReviseContract{ + Prices: hp, + Revision: rev, + } + if err := c.do(ctx, &rpcRevise); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCReviseSectors failed: %w", err) + } + + // TODO: verify host signatures + return rpcRevise.Revision, nil +} + +// LatestRevision returns the latest revision for a given contract. +func (c *Client) LatestRevision(ctx context.Context, contractID types.FileContractID) (types.V2FileContract, error) { + rpc := rhpv4.RPCLatestRevision{ + ContractID: contractID, + } + if err := c.do(ctx, &rpc); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCLatestRevision failed: %w", err) + } + return rpc.Contract, nil +} + +// ReadSector reads a sector from the host. +func (c *Client) ReadSector(ctx context.Context, hp rhpv4.HostPrices, root types.Hash256, offset, length uint64) ([]byte, error) { + // sanity check input - offset must be segment-aligned + if offset%64 != 0 { + return nil, fmt.Errorf("offset %v is not segment-aligned", offset) + } + rpc := rhpv4.RPCReadSector{ + Prices: hp, + Root: root, + Offset: offset, + Length: length, + } + if err := c.do(ctx, &rpc); err != nil { + return nil, fmt.Errorf("RPCReadSector failed: %w", err) + } + // TODO: validate proof + return rpc.Sector, nil +} + +// WriteSector stores a sector in the host's temporary storage. To make it +// permanent, use 'PinSectors'. The provided data will be extended to +// rhpv4.SectorSize by the host so the client still pays for the full sector. +func (c *Client) WriteSector(ctx context.Context, hp rhpv4.HostPrices, data []byte) (types.Hash256, error) { + if len(data) == 0 { + return types.Hash256{}, fmt.Errorf("empty sector") + } else if len(data) > rhpv4.SectorSize { + return types.Hash256{}, fmt.Errorf("sector too large") + } + rpc := rhpv4.RPCWriteSector{ + Prices: hp, + Sector: data, + } + if err := c.do(ctx, &rpc); err != nil { + return types.Hash256{}, fmt.Errorf("RPCWriteSector failed: %w", err) + } + var sectorData [rhpv4.SectorSize]byte + copy(sectorData[:], data) + if rhpv2.SectorRoot((§orData)) != rpc.Root { + return types.Hash256{}, fmt.Errorf("root mismatch") + } + panic("unfinished rpc - missing payment") + // return rpc.Root, nil +} + +// SectorRoots returns 'length' roots of a contract starting at the given +// 'offset'. +func (c *Client) SectorRoots(ctx context.Context, hp rhpv4.HostPrices, offset, length uint64) ([]types.Hash256, error) { + rpc := rhpv4.RPCSectorRoots{ + Prices: hp, + Offset: offset, + Length: length, + } + if err := c.do(ctx, &rpc); err != nil { + return nil, fmt.Errorf("RPCSectorRoots failed: %w", err) + } + // TODO: verify proof + panic("unfinished rpc - missing payment") + // return rpc.Roots, nil +} + +// AccountBalance returns the balance of a given account. +func (c *Client) AccountBalance(ctx context.Context, account types.PublicKey) (types.Currency, error) { + rpc := rhpv4.RPCAccountBalance{ + Account: types.PublicKey{}, + } + if err := c.do(ctx, &rpc); err != nil { + return types.Currency{}, fmt.Errorf("RPCAccountBalance failed: %w", err) + } + return rpc.Balance, nil +} + +// FundAccount adds to the balance to an account and returns the new balance. +func (c *Client) FundAccount(ctx context.Context, contract types.V2FileContract, account types.PrivateKey) (types.V2FileContract, types.Currency, error) { + // TODO: build new revision and signature + var rev types.V2FileContract + var sig types.Signature + + rpc := rhpv4.RPCFundAccount{ + Account: account.PublicKey(), + Revision: rev, + RenterSignature: sig, + } + if err := c.do(ctx, &rpc); err != nil { + return types.V2FileContract{}, types.Currency{}, fmt.Errorf("RPCFundAccount failed: %w", err) + } + + // TODO: verify host signature + return rpc.Revision, rpc.NewBalance, nil +} + +// ReviseContract is a more generic version of 'PinSectors' and 'PruneContract'. +// It's allows for arbitrary actions to be performed. Most users should use +// 'PinSectors' and 'PruneContract' instead. +func (c *Client) ReviseContract(ctx context.Context, hp rhpv4.HostPrices, actions []rhpv4.WriteAction) (types.V2FileContract, error) { + modifyRPC := rhpv4.RPCModifySectors{ + Actions: actions, + } + if err := c.do(ctx, &modifyRPC); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCModifySectors failed: %w", err) + } + + // TODO: verify proof & build new revision + var rev types.V2FileContract + + reviseRPC := rhpv4.RPCReviseContract{ + Prices: hp, + Revision: rev, + } + if err := c.do(ctx, &reviseRPC); err != nil { + return types.V2FileContract{}, fmt.Errorf("RPCReviseContract failed: %w", err) + } + + // TODO: verify host signatures + return reviseRPC.Revision, nil +} diff --git a/rhp/v4/client_test.go b/rhp/v4/client_test.go new file mode 100644 index 0000000..97ca635 --- /dev/null +++ b/rhp/v4/client_test.go @@ -0,0 +1,81 @@ +//go:build ignore + +package rhp_test + +import ( + "context" + "net" + "testing" + "time" + + crhp4 "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" + rhp4 "go.sia.tech/coreutils/rhp/v4" + "go.sia.tech/coreutils/rhp/v4/host" + "go.sia.tech/coreutils/testutil" + "go.uber.org/zap/zaptest" +) + +type muxTransport struct { + t *crhp4.Transport +} + +func (mt *muxTransport) AcceptStream() (host.Stream, error) { + return mt.t.AcceptStream() +} + +func (mt *muxTransport) Close() error { return mt.t.Close() } + +func TestRPCSettings(t *testing.T) { + log := zaptest.NewLogger(t) + hostKey := types.GeneratePrivateKey() + //renterKey := types.GeneratePrivateKey() + + network, genesisBlock := testutil.Network() + chainStore, state, err := chain.NewDBStore(chain.NewMemDB(), network, genesisBlock) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(chainStore, state) + + h := host.NewServer(hostKey, cm) + + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + go func() { + for { + conn, err := l.Accept() + if err != nil { + return + } + t, err := crhp4.Accept(conn, hostKey) + if err != nil { + panic(err) + } + mt := &muxTransport{t} + go func() { + defer t.Close() + + h.Serve(mt, log.Named("host")) + }() + } + }() + + client, err := rhp4.NewClient(context.Background(), l.Addr().String(), hostKey.PublicKey()) + if err != nil { + t.Fatal(err) + } + //defer client.Close() + + settings, err := client.Settings(context.Background()) + if err != nil { + t.Fatal(err) + } else if time.Until(settings.Prices.ValidUntil) <= 0 { + t.Fatal("price table is expired") + } +} diff --git a/rhp/v4/host/contracts.go b/rhp/v4/host/contracts.go new file mode 100644 index 0000000..88b3c32 --- /dev/null +++ b/rhp/v4/host/contracts.go @@ -0,0 +1,90 @@ +package host + +import ( + "sync" + + "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" + "go.uber.org/zap" +) + +type ( + contract struct { + mu sync.Mutex + + // the element proof must be updated every block + revision types.V2FileContractRevision + roots []types.Hash256 + + revisionIndex *types.ChainIndex + proofIndex *types.ChainIndex + + confirmedRevisionNumber uint64 + } + + // A MemContractStore manages the state of file contracts in memory. + MemContractStore struct { + log *zap.Logger + + updates []*chain.ApplyUpdate + + mu sync.Mutex + contracts map[types.Hash256]*contract + } +) + +// ProcessChainApplyUpdate implements the chain.Subscriber interface +func (ms *MemContractStore) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool) error { + ms.updates = append(ms.updates, cau) + + if mayCommit { + log := ms.log.Named("consensus.apply") + // commit the updates + ms.mu.Lock() + defer ms.mu.Unlock() + for _, update := range ms.updates { + update.ForEachFileContractElement(func(fce types.FileContractElement, rev *types.FileContractElement, resolved, valid bool) { + if resolved { + log.Debug("resolved contract", zap.Stringer("id", fce.ID), zap.Bool("valid", valid)) + delete(ms.contracts, fce.ID) + return + } + }) + } + } + + return nil +} + +// ProcessChainRevertUpdate implements the chain.Subscriber interface +func (ms *MemContractStore) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { + if len(ms.updates) != 0 && ms.updates[len(ms.updates)-1].State.Index == cru.State.Index { + ms.updates = ms.updates[:len(ms.updates)-1] + return nil + } + + panic("implement me") +} + +// Revision returns the current revision of the contract. +func (ms *MemContractStore) Revision() types.V2FileContractRevision { + panic("implement me") +} + +// AddContract adds a contract to the store. +func (ms *MemContractStore) AddContract(fe types.V2FileContractElement) error { + ms.mu.Lock() + defer ms.mu.Unlock() + + if _, ok := ms.contracts[fe.ID]; ok { + return ErrContractExists + } + + ms.contracts[fe.ID] = &contract{ + revision: types.V2FileContractRevision{ + Parent: fe, + Revision: fe.V2FileContract, + }, + } + return nil +} diff --git a/rhp/v4/host/errors.go b/rhp/v4/host/errors.go new file mode 100644 index 0000000..eea403a --- /dev/null +++ b/rhp/v4/host/errors.go @@ -0,0 +1,30 @@ +package host + +import ( + "errors" + + "go.sia.tech/coreutils/wallet" +) + +var ( + // ErrNotEnoughFunds is returned when a transaction cannot be funded. + ErrNotEnoughFunds = wallet.ErrNotEnoughFunds + // ErrNotEnoughStorage is returned when a sector cannot be stored because + // the host has run out of space + ErrNotEnoughStorage = errors.New("not enough storage") + // ErrSectorNotFound is returned when a sector cannot be deleted because it + // does not exist. + ErrSectorNotFound = errors.New("sector not found") + // ErrPriceTableExpired is returned when the renter sent an expired price + // table. + ErrPriceTableExpired = errors.New("price table expired") + // ErrOffsetOutOfBounds is returned when a renter requests a sector with an + // offset that is out of bounds. + ErrOffsetOutOfBounds = errors.New("offset out of bounds") + // ErrContractExists is returned when a renter tries to form a contract with + // a host that already has a contract with the renter. + ErrContractExists = errors.New("contract already exists") + // ErrSectorTooLarge is returned when a renter tries to store a sector that + // is larger than the maximum sector size. + ErrSectorTooLarge = errors.New("sector too large") +) diff --git a/rhp/v4/host/options.go b/rhp/v4/host/options.go new file mode 100644 index 0000000..60fbaeb --- /dev/null +++ b/rhp/v4/host/options.go @@ -0,0 +1,13 @@ +package host + +type ( + config struct { + Settings Settings + } + + // An Option sets options such as the host's default settings for the server. + Option func(*config) +) + +// WithSettings sets the logger for the server. +func WithSettings(s Settings) Option { return func(c *config) { c.Settings = s } } diff --git a/rhp/v4/host/rhp.go b/rhp/v4/host/rhp.go new file mode 100644 index 0000000..0d225ca --- /dev/null +++ b/rhp/v4/host/rhp.go @@ -0,0 +1,182 @@ +package host + +import ( + "encoding/hex" + "errors" + "fmt" + "net" + "time" + + "go.sia.tech/core/consensus" + "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" + "go.uber.org/zap" + "lukechampine.com/frand" +) + +var ( + protocolVersion = [3]byte{0, 0, 1} +) + +type ( + // A Transport is a generic multiplexer interface used to handle incoming + // streams. + Transport interface { + AcceptStream() (net.Conn, error) + Close() error + } + + // A Server provides a minimal reference implementation an RHP4 host. It + // should be used only for testing or as a reference for implementing a + // production host. + Server struct { + privKey types.PrivateKey + config config + chain ChainManager + sectors *MemSectorStore + } + + // A ChainManager provides access to the current state of the blockchain. + ChainManager interface { + TipState() consensus.State + } + + // A Wallet funds and signs transactions. + Wallet interface { + FundTransaction(txn *types.Transaction, amount types.Currency, useUnconfirmed bool) ([]types.Hash256, error) + SignTransaction(txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) + ReleaseInputs(txns ...types.Transaction) + } + + // Settings contains the host's internal settings. + Settings struct { + NetAddresses []rhp.NetAddress `json:"protocols"` + AcceptingContracts bool `json:"acceptingContracts"` + MaxDuration uint64 `json:"maxDuration"` + CollateralMultiplier float64 `json:"collateralMultiplier"` + ContractPrice types.Currency `json:"contractPrice"` + StoragePrice types.Currency `json:"storagePrice"` + IngressPrice types.Currency `json:"ingressPrice"` + EgressPrice types.Currency `json:"egressPrice"` + MaxCollateral types.Currency `json:"maxCollateral"` + PriceTableValidity time.Duration `json:"priceTableValidity"` + } +) + +func (s *Server) handleHostStream(stream net.Conn, log *zap.Logger) { + defer stream.Close() + + stream.SetDeadline(time.Now().Add(30 * time.Second)) // set an initial timeout + rpcStart := time.Now() + id, err := rhp.ReadID(stream) + if err != nil { + log.Debug("failed to read RPC ID", zap.Error(err)) + return + } + + req := rhp.RequestforID(id) + if err := rhp.ReadRequest(stream, req); err != nil { + log.Debug("failed to read RPC request", zap.Error(err)) + return + } + + switch req := req.(type) { + case *rhp.RPCSettingsRequest: + err = s.handleRPCSettings(stream, req, log.Named(id.String())) + case *rhp.RPCReadSectorRequest: + panic("not implemented") + case *rhp.RPCWriteSectorRequest: + panic("not implemented") + case *rhp.RPCModifySectorsRequest: + panic("not implemented") + case *rhp.RPCFundAccountRequest: + panic("not implemented") + case *rhp.RPCFormContractRequest: + panic("not implemented") + case *rhp.RPCRenewContractRequest: + panic("not implemented") + case *rhp.RPCLatestRevisionRequest: + panic("not implemented") + case *rhp.RPCSectorRootsRequest: + panic("not implemented") + case *rhp.RPCAccountBalanceRequest: + panic("not implemented") + default: + log.Debug("unrecognized RPC", zap.Stringer("rpc", id)) + rhp.WriteResponse(stream, &rhp.RPCError{Code: 0, Description: "unrecognized RPC"}) + return + } + if err != nil { + log.Warn("RPC failed", zap.Error(err), zap.Duration("elapsed", time.Since(rpcStart))) + return + } + log.Info("RPC success", zap.Duration("elapsed", time.Since(rpcStart))) +} + +// Serve accepts incoming streams on the provided multiplexer and handles them +func (s *Server) Serve(t Transport, log *zap.Logger) error { + defer t.Close() + + for { + stream, err := t.AcceptStream() + if errors.Is(err, net.ErrClosed) { + return nil + } else if err != nil { + return fmt.Errorf("failed to accept connection: %w", err) + } + log := log.With(zap.String("streamID", hex.EncodeToString(frand.Bytes(4)))) + log.Debug("accepted stream") + go func() { + defer func() { + if err := stream.Close(); err != nil { + log.Debug("failed to close stream", zap.Error(err)) + } else { + log.Debug("closed stream") + } + }() + s.handleHostStream(stream, log) + }() + } +} + +// UpdateSettings updates the host's internal settings. +func (s *Server) UpdateSettings(settings Settings) error { + s.config.Settings = settings + return nil +} + +// SetMaxSectors sets the maximum number of sectors the host can store. +func (s *Server) SetMaxSectors(n uint64) { + s.sectors.maxSectors = n +} + +// NewServer creates a new reference host server with the given private key and chain +// manager. The server will use the provided options to configure its internal +// settings. +// +// A transport must be set up and then called with the Serve method to start +// an RHP session. +func NewServer(privKey types.PrivateKey, cm ChainManager, opts ...Option) *Server { + cfg := config{ + Settings: Settings{ + AcceptingContracts: true, + MaxDuration: 1000, + CollateralMultiplier: 2.0, + ContractPrice: types.Siacoins(1).Div64(4), + StoragePrice: types.Siacoins(1).Div64(4320).Div64(1e12), + IngressPrice: types.Siacoins(1).Div64(1e12), + EgressPrice: types.Siacoins(1).Div64(1e12), + MaxCollateral: types.Siacoins(1000), + PriceTableValidity: 10 * time.Minute, + }, + } + for _, opt := range opts { + opt(&cfg) + } + return &Server{ + privKey: privKey, + config: cfg, + chain: cm, + sectors: NewMemSectorStore(100), + } +} diff --git a/rhp/v4/host/rpc.go b/rhp/v4/host/rpc.go new file mode 100644 index 0000000..4aeebc6 --- /dev/null +++ b/rhp/v4/host/rpc.go @@ -0,0 +1,111 @@ +package host + +import ( + "net" + "time" + + "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" + "go.uber.org/zap" +) + +func validatePriceTable(pk types.PrivateKey, pt rhp.HostPrices) error { + if pt.ValidUntil.Before(time.Now()) { + return ErrPriceTableExpired + } + // sigHash := pt.SigHash() + // if !pk.VerifyHash(sigHash, pt.Signature) { + // return ErrInvalidSignature + // } + return nil +} + +func (s *Server) handleRPCSettings(stream net.Conn, req *rhp.RPCSettingsRequest, _ *zap.Logger) error { + pt := rhp.HostPrices{ + ContractPrice: s.config.Settings.ContractPrice, + Collateral: s.config.Settings.StoragePrice.Mul64(uint64(s.config.Settings.CollateralMultiplier * 1000)).Div64(1000), + StoragePrice: s.config.Settings.StoragePrice, + IngressPrice: s.config.Settings.IngressPrice, + EgressPrice: s.config.Settings.EgressPrice, + TipHeight: s.chain.TipState().Index.Height, + ValidUntil: time.Now().Add(s.config.Settings.PriceTableValidity), + } + + // sigHash := pt.SigHash() + // pt.Signature = s.privKey.SignHash(sigHash) + + err := rhp.WriteResponse(stream, &rhp.RPCSettingsResponse{ + Settings: rhp.HostSettings{ + Version: protocolVersion, + NetAddresses: s.config.Settings.NetAddresses, + AcceptingContracts: s.config.Settings.AcceptingContracts, + MaxDuration: s.config.Settings.MaxDuration, + Prices: pt, + }, + }) + return err +} + +/* +func (s *Server) handleRPCReadSector(stream net.Conn, rpc *rhp.RPCReadSector, _ *zap.Logger) error { + if err := validatePriceTable(s.privKey, rpc.Prices); err != nil { + stream.WriteResponseErr(err) + return err + } + if rpc.Length+rpc.Offset > rhp.SectorSize { + stream.WriteResponseErr(ErrOffsetOutOfBounds) + return ErrOffsetOutOfBounds + } + + sector, ok := s.sectors.Read(rpc.Root) + if !ok { + stream.WriteResponseErr(ErrSectorNotFound) + return fmt.Errorf("failed to read sector: %w", ErrSectorNotFound) + } + + // TODO: response is missing proof + rpc.Sector = sector[rpc.Offset : rpc.Offset+rpc.Length] + if err := stream.WriteResponse(rpc); err != nil { + return fmt.Errorf("failed to write RPCReadSector: %w", err) + } + return nil +} + +func (s *Server) handleRPCWriteSector(stream Stream, rpc *rhp.RPCWriteSector, _ *zap.Logger) error { + if err := validatePriceTable(s.privKey, rpc.Prices); err != nil { + stream.WriteResponseErr(err) + return err + } else if len(rpc.Sector) > rhp.SectorSize { + stream.WriteResponseErr(ErrSectorTooLarge) + return ErrSectorTooLarge + } + + sector := ([rhp.SectorSize]byte)(rpc.Sector) + // TODO: stream sector root calculation + rpc.Root = rhp2.SectorRoot(§or) + + if err := s.sectors.Write(rpc.Root, sector); err != nil { + stream.WriteResponseErr(err) + return fmt.Errorf("failed to write sector: %w", err) + } else if err := stream.WriteResponse(rpc); err != nil { + return fmt.Errorf("failed to write RPCWriteSector: %w", err) + } + return nil +} + +func (s *Server) handleRPCModifySectors(stream Stream, rpc *rhp.RPCModifySectors, _ *zap.Logger) error { + panic("implement me") +} + +func (s *Server) handleRPCAccountBalance(stream Stream, rpc *rhp.RPCAccountBalance, _ *zap.Logger) error { + panic("implement me") +} + +func (s *Server) handleRPCFundAccount(stream Stream, rpc *rhp.RPCFundAccount, _ *zap.Logger) error { + panic("implement me") +} + +func (s *Server) handleRPCSectorRoots(stream Stream, rpc *rhp.RPCSectorRoots, _ *zap.Logger) error { + panic("missing contract payment") +} +*/ diff --git a/rhp/v4/host/storage.go b/rhp/v4/host/storage.go new file mode 100644 index 0000000..f761f15 --- /dev/null +++ b/rhp/v4/host/storage.go @@ -0,0 +1,56 @@ +package host + +import ( + "errors" + "sync" + + "go.sia.tech/core/rhp/v4" + "go.sia.tech/core/types" +) + +// A MemSectorStore manages the state of sectors in memory. +type MemSectorStore struct { + maxSectors uint64 + + mu sync.Mutex + sectors map[types.Hash256][rhp.SectorSize]byte +} + +// NewMemSectorStore creates a new MemSectorStore with a maximum number of +// sectors. +func NewMemSectorStore(maxSectors uint64) *MemSectorStore { + return &MemSectorStore{ + maxSectors: maxSectors, + sectors: make(map[types.Hash256][rhp.SectorSize]byte), + } +} + +// Read retrieves a sector from the store. +func (m *MemSectorStore) Read(root types.Hash256) ([rhp.SectorSize]byte, bool) { + m.mu.Lock() + defer m.mu.Unlock() + sector, ok := m.sectors[root] + return sector, ok +} + +// Write stores a sector in the store. +func (m *MemSectorStore) Write(root types.Hash256, sector [rhp.SectorSize]byte) error { + m.mu.Lock() + defer m.mu.Unlock() + if uint64(len(m.sectors)) > m.maxSectors { + return errors.New("not enough storage") + } + m.sectors[root] = sector + return nil +} + +// Delete removes a sector from the store. +func (m *MemSectorStore) Delete(root types.Hash256) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.sectors[root]; !ok { + return errors.New("sector not found") + } + delete(m.sectors, root) + return nil +}