Skip to content

Commit

Permalink
Add account funding endpoint to bus (#1485)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Aug 30, 2024
2 parents 2ab36bb + 67594c2 commit ce2b3ca
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 103 deletions.
11 changes: 11 additions & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"errors"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
)

Expand Down Expand Up @@ -45,6 +46,16 @@ type (
)

type (
AccountsFundRequest struct {
AccountID rhpv3.Account `json:"accountID"`
Amount types.Currency `json:"amount"`
ContractID types.FileContractID `json:"contractID"`
}

AccountsFundResponse struct {
Deposit types.Currency `json:"deposit"`
}

AccountsSaveRequest struct {
Accounts []Account `json:"accounts"`
}
Expand Down
9 changes: 6 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.sia.tech/renterd/internal/rhp"
rhp2 "go.sia.tech/renterd/internal/rhp/v2"
rhp3 "go.sia.tech/renterd/internal/rhp/v3"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
Expand All @@ -42,6 +43,7 @@ const (
defaultWalletRecordMetricInterval = 5 * time.Minute
defaultPinUpdateInterval = 5 * time.Minute
defaultPinRateWindow = 6 * time.Hour
lockingPriorityFunding = 40
lockingPriorityRenew = 80
stdTxnSize = 1200 // bytes
)
Expand Down Expand Up @@ -297,7 +299,7 @@ type (

type Bus struct {
startTime time.Time
masterKey [32]byte
masterKey utils.MasterKey

alerts alerts.Alerter
alertMgr AlertManager
Expand Down Expand Up @@ -378,8 +380,9 @@ func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksMa
// Handler returns an HTTP handler that serves the bus API.
func (b *Bus) Handler() http.Handler {
return jape.Mux(map[string]jape.Handler{
"GET /accounts": b.accountsHandlerGET,
"POST /accounts": b.accountsHandlerPOST,
"GET /accounts": b.accountsHandlerGET,
"POST /accounts": b.accountsHandlerPOST,
"POST /accounts/fund": b.accountsFundHandler,

"GET /alerts": b.handleGETAlerts,
"POST /alerts/dismiss": b.handlePOSTAlertsDismiss,
Expand Down
12 changes: 12 additions & 0 deletions bus/client/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"net/url"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)

Expand All @@ -15,6 +17,16 @@ func (c *Client) Accounts(ctx context.Context, owner string) (accounts []api.Acc
return
}

func (c *Client) FundAccount(ctx context.Context, account rhpv3.Account, fcid types.FileContractID, amount types.Currency) (types.Currency, error) {
var resp api.AccountsFundResponse
err := c.c.WithContext(ctx).POST("/accounts/fund", api.AccountsFundRequest{
AccountID: account,
Amount: amount,
ContractID: fcid,
}, &resp)
return resp.Deposit, err
}

// UpdateAccounts saves all accounts.
func (c *Client) UpdateAccounts(ctx context.Context, accounts []api.Account) (err error) {
err = c.c.WithContext(ctx).POST("/accounts", api.AccountsSaveRequest{
Expand Down
83 changes: 83 additions & 0 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

rhpv2 "go.sia.tech/core/rhp/v2"

rhp3 "go.sia.tech/renterd/internal/rhp/v3"

ibus "go.sia.tech/renterd/internal/bus"
"go.sia.tech/renterd/internal/gouging"

Expand All @@ -41,6 +43,87 @@ func (b *Bus) fetchSetting(ctx context.Context, key string, value interface{}) e
return nil
}

func (b *Bus) accountsFundHandler(jc jape.Context) {
var req api.AccountsFundRequest
if jc.Decode(&req) != nil {
return
}

// contract metadata
cm, err := b.ms.Contract(jc.Request.Context(), req.ContractID)
if jc.Check("failed to fetch contract metadata", err) != nil {
return
}

rk := b.masterKey.DeriveContractKey(cm.HostKey)

// acquire contract
lockID, err := b.contractLocker.Acquire(jc.Request.Context(), lockingPriorityFunding, req.ContractID, math.MaxInt64)
if jc.Check("failed to acquire lock", err) != nil {
return
}
defer b.contractLocker.Release(req.ContractID, lockID)

// latest revision
rev, err := b.rhp3.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, cm.SiamuxAddr)
if jc.Check("failed to fetch contract revision", err) != nil {
return
}

// ensure we have at least 2H in the contract to cover the costs
if types.NewCurrency64(2).Cmp(rev.ValidRenterPayout()) >= 0 {
jc.Error(fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), types.NewCurrency64(2)), http.StatusBadRequest)
return
}

// price table
pt, err := b.rhp3.PriceTable(jc.Request.Context(), cm.HostKey, cm.SiamuxAddr, rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk))
if jc.Check("failed to fetch price table", err) != nil {
return
}

// check only the FundAccountCost
if types.NewCurrency64(1).Cmp(pt.FundAccountCost) < 0 {
jc.Error(fmt.Errorf("%w: host is gouging on FundAccountCost", gouging.ErrPriceTableGouging), http.StatusServiceUnavailable)
return
}

// cap the deposit by what's left in the contract
deposit := req.Amount
cost := pt.FundAccountCost
availableFunds := rev.ValidRenterPayout().Sub(cost)
if deposit.Cmp(availableFunds) > 0 {
deposit = availableFunds
}

// fund the account
err = b.rhp3.FundAccount(jc.Request.Context(), &rev, cm.HostKey, cm.SiamuxAddr, deposit, req.AccountID, pt.HostPriceTable, rk)
if jc.Check("failed to fund account", err) != nil {
return
}

// record spending
err = b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{
{
ContractSpending: api.ContractSpending{
FundAccount: deposit.Add(cost),
},
ContractID: rev.ParentID,
RevisionNumber: rev.RevisionNumber,
Size: rev.Filesize,

MissedHostPayout: rev.MissedHostPayout(),
ValidRenterPayout: rev.ValidRenterPayout(),
},
})
if err != nil {
b.logger.Error("failed to record contract spending", zap.Error(err))
}
jc.Encode(api.AccountsFundResponse{
Deposit: deposit,
})
}

func (b *Bus) consensusAcceptBlock(jc jape.Context) {
var block types.Block
if jc.Decode(&block) != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/rhp/v3/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ func (c *Client) Renew(ctx context.Context, gc gouging.Checker, rev types.FileCo
return
}

func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.SettingsID, rk types.PrivateKey) (balance types.Currency, _ error) {
func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.HostPriceTable, rk types.PrivateKey) (balance types.Currency, _ error) {
return balance, c.tpool.withTransport(ctx, hk, siamuxAddr, func(ctx context.Context, t *transportV3) error {
payment, err := payByContract(rev, types.NewCurrency64(1), accID, rk)
payment, err := payByContract(rev, pt.AccountBalanceCost, accID, rk)
if err != nil {
return err
}
balance, err = rpcAccountBalance(ctx, t, &payment, accID, pt)
balance, err = rpcAccountBalance(ctx, t, &payment, accID, pt.UID)
return err
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ func TestEphemeralAccountSync(t *testing.T) {
if len(accounts) != 1 || accounts[0].ID != acc.ID {
t.Fatal("account should exist")
} else if accounts[0].CleanShutdown || !accounts[0].RequiresSync {
t.Fatalf("account shouldn't be marked as clean shutdown or not require a sync, got %v", accounts[0])
t.Fatal("account shouldn't be marked as clean shutdown or not require a sync, got", accounts[0].CleanShutdown, accounts[0].RequiresSync)
}

// assert account was funded
Expand Down
66 changes: 66 additions & 0 deletions internal/utils/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package utils

import (
"fmt"

"go.sia.tech/core/types"
"golang.org/x/crypto/blake2b"
)

type (
MasterKey [32]byte
AccountsKey types.PrivateKey
)

// DeriveAccountsKey derives an accounts key from a masterkey which is used
// to derive individual account keys from.
func (key *MasterKey) DeriveAccountsKey(workerID string) AccountsKey {
keyPath := fmt.Sprintf("accounts/%s", workerID)
return AccountsKey(key.deriveSubKey(keyPath))
}

// DeriveContractKey derives a contract key from a masterkey which is used to
// form, renew and revise contracts.
func (key *MasterKey) DeriveContractKey(hostKey types.PublicKey) types.PrivateKey {
seed := blake2b.Sum256(append(key.deriveSubKey("renterkey"), hostKey[:]...))
pk := types.NewPrivateKeyFromSeed(seed[:])
for i := range seed {
seed[i] = 0
}
return pk
}

// deriveSubKey can be used to derive a sub-masterkey from the worker's
// masterkey to use for a specific purpose. Such as deriving more keys for
// ephemeral accounts.
func (key *MasterKey) deriveSubKey(purpose string) types.PrivateKey {
seed := blake2b.Sum256(append(key[:], []byte(purpose)...))
pk := types.NewPrivateKeyFromSeed(seed[:])
for i := range seed {
seed[i] = 0
}
return pk
}

// DeriveAccountKey derives an account plus key for a given host and worker.
// Each worker has its own account for a given host. That makes concurrency
// around keeping track of an accounts balance and refilling it a lot easier in
// a multi-worker setup.
func (key *AccountsKey) DeriveAccountKey(hk types.PublicKey) types.PrivateKey {
index := byte(0) // not used yet but can be used to derive more than 1 account per host

// Append the host for which to create it and the index to the
// corresponding sub-key.
subKey := *key
data := make([]byte, 0, len(subKey)+len(hk)+1)
data = append(data, subKey[:]...)
data = append(data, hk[:]...)
data = append(data, index)

seed := types.HashBytes(data)
pk := types.NewPrivateKeyFromSeed(seed[:])
for i := range seed {
seed[i] = 0
}
return pk
}
49 changes: 16 additions & 33 deletions internal/worker/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
rhp3 "go.sia.tech/renterd/internal/rhp/v3"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand All @@ -31,8 +32,11 @@ var (
)

type (
AccountMgrWorker interface {
FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, balance types.Currency) error
AccountFunder interface {
FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error
}

AccountSyncer interface {
SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error
}

Expand All @@ -53,11 +57,12 @@ type (
type (
AccountMgr struct {
alerts alerts.Alerter
w AccountMgrWorker
funder AccountFunder
syncer AccountSyncer
dc DownloadContracts
cs ConsensusState
s AccountStore
key types.PrivateKey
key utils.AccountsKey
logger *zap.SugaredLogger
owner string
refillInterval time.Duration
Expand Down Expand Up @@ -87,13 +92,14 @@ type (
// NewAccountManager creates a new account manager. It will load all accounts
// from the given store and mark the shutdown as unclean. When Shutdown is
// called it will save all accounts.
func NewAccountManager(key types.PrivateKey, owner string, alerter alerts.Alerter, w AccountMgrWorker, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
logger := l.Named("accounts").Sugar()

shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
a := &AccountMgr{
alerts: alerter,
w: w,
funder: funder,
syncer: syncer,
cs: cs,
dc: dc,
s: s,
Expand Down Expand Up @@ -177,7 +183,7 @@ func (a *AccountMgr) account(hk types.PublicKey) *Account {
defer a.mu.Unlock()

// Derive account key.
accKey := deriveAccountKey(a.key, hk)
accKey := a.key.DeriveAccountKey(hk)
accID := rhpv3.Account(accKey.PublicKey())

// Create account if it doesn't exist.
Expand Down Expand Up @@ -239,7 +245,7 @@ func (a *AccountMgr) run() {
a.mu.Lock()
accounts := make(map[rhpv3.Account]*Account, len(saved))
for _, acc := range saved {
accKey := deriveAccountKey(a.key, acc.HostKey)
accKey := a.key.DeriveAccountKey(acc.HostKey)
if rhpv3.Account(accKey.PublicKey()) != acc.ID {
a.logger.Errorf("account key derivation mismatch %v != %v", accKey.PublicKey(), acc.ID)
continue
Expand Down Expand Up @@ -384,7 +390,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet
// check if a resync is needed
if account.RequiresSync {
// sync the account
err := a.w.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr)
err := a.syncer.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr)
if err != nil {
return fmt.Errorf("failed to sync account's balance: %w", err)
}
Expand All @@ -399,7 +405,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet
}

// fund the account
err := a.w.FundAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr, maxBalance)
err := a.funder.FundAccount(ctx, contract.ID, contract.HostKey, maxBalance)
if err != nil {
return fmt.Errorf("failed to fund account: %w", err)
}
Expand Down Expand Up @@ -587,29 +593,6 @@ func (a *Account) setBalance(balance *big.Int) {
zap.Stringer("drift", drift))
}

// deriveAccountKey derives an account plus key for a given host and worker.
// Each worker has its own account for a given host. That makes concurrency
// around keeping track of an accounts balance and refilling it a lot easier in
// a multi-worker setup.
func deriveAccountKey(mgrKey types.PrivateKey, hostKey types.PublicKey) types.PrivateKey {
index := byte(0) // not used yet but can be used to derive more than 1 account per host

// Append the host for which to create it and the index to the
// corresponding sub-key.
subKey := mgrKey
data := make([]byte, 0, len(subKey)+len(hostKey)+1)
data = append(data, subKey[:]...)
data = append(data, hostKey[:]...)
data = append(data, index)

seed := types.HashBytes(data)
pk := types.NewPrivateKeyFromSeed(seed[:])
for i := range seed {
seed[i] = 0
}
return pk
}

func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err error, keysAndValues ...string) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
Expand Down
Loading

0 comments on commit ce2b3ca

Please sign in to comment.