Skip to content

Commit

Permalink
Migrate to new types fromcoreutils (#1233)
Browse files Browse the repository at this point in the history
This PR migrates `renterd` to use the new types in `coreutils`, e.g. the
wallet, syncer and chain manager.

Keeping it as a `DRAFT` for the time being, now that we're play testing
it on our nodes I figured it's a good time to look at the diff and keep
it up-to-date with `dev` more frequently.
  • Loading branch information
ChrisSchinnerl authored Jul 2, 2024
2 parents 57893a7 + 15495a6 commit b052e0c
Show file tree
Hide file tree
Showing 80 changed files with 4,405 additions and 3,262 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
host port: 3800
mysql version: '8'
mysql root password: test
- name: Test
- name: Test Stores
uses: n8maninger/action-golang-test@v1
with:
args: "-race;-short"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
with:
moduleName: 'renterd'
goVersion: '1.21'
token: ${{ secrets.GITHUB_TOKEN }}
token: ${{ secrets.GITHUB_TOKEN }}
2 changes: 1 addition & 1 deletion alerts/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ webhooks.WebhookStore = (*testWebhookStore)(nil)

func TestWebhooks(t *testing.T) {
store := &testWebhookStore{}
mgr, err := webhooks.NewManager(zap.NewNop().Sugar(), store)
mgr, err := webhooks.NewManager(store, zap.NewNop())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions api/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

"go.sia.tech/core/types"
"go.sia.tech/siad/build"
"go.sia.tech/renterd/internal/utils"
)

const (
Expand Down Expand Up @@ -131,7 +131,7 @@ type (
func (c AutopilotConfig) Validate() error {
if c.Hosts.MaxDowntimeHours > 99*365*24 {
return ErrMaxDowntimeHoursTooHigh
} else if c.Hosts.MinProtocolVersion != "" && !build.IsVersion(c.Hosts.MinProtocolVersion) {
} else if c.Hosts.MinProtocolVersion != "" && !utils.IsVersion(c.Hosts.MinProtocolVersion) {
return fmt.Errorf("invalid min protocol version '%s'", c.Hosts.MinProtocolVersion)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
ErrContractSetNotFound = errors.New("couldn't find contract set")
)

type ContractState string

type (
// A Contract wraps the contract metadata with the latest contract revision.
Contract struct {
Expand Down
21 changes: 21 additions & 0 deletions api/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ import (
"go.sia.tech/core/types"
)

type (
// A SiacoinElement is a SiacoinOutput along with its ID.
SiacoinElement struct {
types.SiacoinOutput
ID types.Hash256 `json:"id"`
MaturityHeight uint64 `json:"maturityHeight"`
}

// A Transaction is an on-chain transaction relevant to a particular wallet,
// paired with useful metadata.
Transaction struct {
Raw types.Transaction `json:"raw,omitempty"`
Index types.ChainIndex `json:"index"`
ID types.TransactionID `json:"id"`
Inflow types.Currency `json:"inflow"`
Outflow types.Currency `json:"outflow"`
Timestamp time.Time `json:"timestamp"`
}
)

type (
// WalletFundRequest is the request type for the /wallet/fund endpoint.
WalletFundRequest struct {
Expand Down Expand Up @@ -75,6 +95,7 @@ type (
Spendable types.Currency `json:"spendable"`
Confirmed types.Currency `json:"confirmed"`
Unconfirmed types.Currency `json:"unconfirmed"`
Immature types.Currency `json:"immature"`
}

// WalletSignRequest is the request type for the /wallet/sign endpoint.
Expand Down
4 changes: 0 additions & 4 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ var (
// be scanned since it is on a private network.
ErrHostOnPrivateNetwork = errors.New("host is on a private network")

// ErrHostTooManyAddresses is returned by the worker API when a host has
// more than two addresses of the same type.
ErrHostTooManyAddresses = errors.New("host has more than two addresses, or two of the same type")

// ErrMultiRangeNotSupported is returned by the worker API when a request
// tries to download multiple ranges at once.
ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported")
Expand Down
7 changes: 2 additions & 5 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -86,7 +85,7 @@ type Bus interface {
// wallet
Wallet(ctx context.Context) (api.WalletResponse, error)
WalletDiscard(ctx context.Context, txn types.Transaction) error
WalletOutputs(ctx context.Context) (resp []wallet.SiacoinElement, err error)
WalletOutputs(ctx context.Context) (resp []api.SiacoinElement, err error)
WalletPending(ctx context.Context) (resp []types.Transaction, err error)
WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (ids []types.TransactionID, err error)
}
Expand Down Expand Up @@ -256,9 +255,7 @@ func (ap *Autopilot) Run() error {
// initiate a host scan - no need to be synced or configured for scanning
ap.s.tryUpdateTimeout()
ap.s.tryPerformHostScan(ap.shutdownCtx, w, forceScan)

// reset forceScan
forceScan = false
forceScan = false // reset forceScan

// block until consensus is synced
if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
Expand Down
3 changes: 1 addition & 2 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/build"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -238,7 +237,7 @@ func humanReadableSize(b int) string {
}

func shouldSendPruneAlert(err error, version, release string) bool {
oldHost := (build.VersionCmp(version, "1.6.0") < 0 || version == "1.6.0" && release == "")
oldHost := (utils.VersionCmp(version, "1.6.0") < 0 || version == "1.6.0" && release == "")
sectorRootsIssue := utils.IsErr(err, errInvalidSectorRootsRange) && oldHost
merkleRootIssue := utils.IsErr(err, errInvalidMerkleProof) && oldHost
return err != nil && !(sectorRootsIssue || merkleRootIssue ||
Expand Down
15 changes: 8 additions & 7 deletions autopilot/contractor/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
cwallet "go.sia.tech/coreutils/wallet"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -89,6 +88,7 @@ type Bus interface {
AncestorContracts(ctx context.Context, id types.FileContractID, minStartHeight uint64) ([]api.ArchivedContract, error)
ArchiveContracts(ctx context.Context, toArchive map[types.FileContractID]string) error
ConsensusState(ctx context.Context) (api.ConsensusState, error)
Contract(ctx context.Context, id types.FileContractID) (api.ContractMetadata, error)
Contracts(ctx context.Context, opts api.ContractsOpts) (contracts []api.ContractMetadata, err error)
FileContractTax(ctx context.Context, payout types.Currency) (types.Currency, error)
Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error)
Expand Down Expand Up @@ -625,6 +625,7 @@ func (c *Contractor) runContractChecks(ctx *mCtx, hostChecks map[types.PublicKey
"toArchive", len(toArchive),
"toRefresh", len(toRefresh),
"toRenew", len(toRenew),
"bh", bh,
)
}()

Expand Down Expand Up @@ -660,7 +661,7 @@ LOOP:
toArchive[fcid] = errContractMaxRevisionNumber.Error()
} else if contract.RevisionNumber == math.MaxUint64 {
toArchive[fcid] = errContractMaxRevisionNumber.Error()
} else if contract.State == api.ContractStatePending && bh-contract.StartHeight > contractConfirmationDeadline {
} else if contract.State == api.ContractStatePending && bh-contract.StartHeight > ContractConfirmationDeadline {
toArchive[fcid] = errContractNotConfirmed.Error()
}
if _, archived := toArchive[fcid]; archived {
Expand Down Expand Up @@ -996,7 +997,7 @@ func (c *Contractor) runContractRenewals(ctx *mCtx, w Worker, toRenew []contract
if err != nil {
// don't register an alert for hosts that are out of funds since the
// user can't do anything about it
if !(worker.IsErrHost(err) && utils.IsErr(err, cwallet.ErrNotEnoughFunds)) {
if !(worker.IsErrHost(err) && utils.IsErr(err, wallet.ErrNotEnoughFunds)) {
c.alerter.RegisterAlert(ctx, newContractRenewalFailedAlert(contract, !proceed, err))
}
c.logger.With(zap.Error(err)).
Expand Down Expand Up @@ -1280,7 +1281,7 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, ci contractInfo, budget
"renterFunds", renterFunds,
"expectedNewStorage", expectedNewStorage,
)
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
if utils.IsErr(err, wallet.ErrNotEnoughFunds) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1364,7 +1365,7 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, ci contractInfo, budge
return api.ContractMetadata{}, true, err
}
log.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid)
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
if utils.IsErr(err, wallet.ErrNotEnoughFunds) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1429,7 +1430,7 @@ func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitial
if err != nil {
// TODO: keep track of consecutive failures and break at some point
log.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if utils.IsErr(err, wallet.ErrNotEnoughFunds) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down
8 changes: 4 additions & 4 deletions autopilot/contractor/hostfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
)

const (
// ContractConfirmationDeadline is the number of blocks since its start
// height we wait for a contract to appear on chain.
ContractConfirmationDeadline = 18

// minContractFundUploadThreshold is the percentage of contract funds
// remaining at which the contract gets marked as not good for upload
minContractFundUploadThreshold = float64(0.05) // 5%
Expand All @@ -23,10 +27,6 @@ const (
// acquirable storage below which the contract is considered to be
// out-of-collateral.
minContractCollateralDenominator = 20 // 5%

// contractConfirmationDeadline is the number of blocks since its start
// height we wait for a contract to appear on chain.
contractConfirmationDeadline = 18
)

var (
Expand Down
4 changes: 2 additions & 2 deletions autopilot/contractor/hostscore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/siad/build"
"go.sia.tech/renterd/internal/utils"
)

const (
Expand Down Expand Up @@ -273,7 +273,7 @@ func versionScore(settings rhpv2.HostSettings, minVersion string) float64 {
}
weight := 1.0
for _, v := range versions {
if build.VersionCmp(settings.Version, v.version) < 0 {
if utils.VersionCmp(settings.Version, v.version) < 0 {
weight *= v.penalty
}
}
Expand Down
2 changes: 1 addition & 1 deletion autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (m *migrator) performMigrations(p *workerPool) {
// fetch worker id once
id, err := w.ID(ctx)
if err != nil {
m.logger.Errorf("failed to fetch worker id: %v", err)
m.logger.Errorf("failed to reach worker, err: %v", err)
return
}

Expand Down
6 changes: 3 additions & 3 deletions autopilot/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo
s.logger.Infof("%s started", scanType)

s.wg.Add(1)
s.ap.wg.Add(1)
go func(st string) {
defer s.wg.Done()
defer s.ap.wg.Done()

for resp := range s.launchScanWorkers(ctx, w, s.launchHostScans()) {
if s.isInterrupted() || s.ap.isStopped() {
Expand Down Expand Up @@ -247,10 +249,7 @@ func (s *scanner) tryUpdateTimeout() {

func (s *scanner) launchHostScans() chan scanReq {
reqChan := make(chan scanReq, s.scanBatchSize)

s.ap.wg.Add(1)
go func() {
defer s.ap.wg.Done()
defer close(reqChan)

var offset int
Expand All @@ -268,6 +267,7 @@ func (s *scanner) launchHostScans() chan scanReq {
break
}
if len(hosts) == 0 {
s.logger.Debug("no hosts to scan")
break
}
if len(hosts) < int(s.scanBatchSize) {
Expand Down
Loading

0 comments on commit b052e0c

Please sign in to comment.