Skip to content

Commit

Permalink
Isolate contractor into its own package (#1083)
Browse files Browse the repository at this point in the history
This PR prepares the contractor for a slight refactor that makes it more
unit-testable by detangling it from the `Autopilot` first.
  • Loading branch information
ChrisSchinnerl authored Apr 4, 2024
2 parents a3192d8 + c3a2fe6 commit c1492e8
Show file tree
Hide file tree
Showing 28 changed files with 1,142 additions and 943 deletions.
23 changes: 23 additions & 0 deletions alerts/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"sync"
"time"

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"lukechampine.com/frand"
)

const (
Expand Down Expand Up @@ -83,6 +86,26 @@ type (
}
)

func IDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 {
return types.HashBytes(append(alertID[:], id[:]...))
}

func IDForContract(alertID [32]byte, fcid types.FileContractID) types.Hash256 {
return types.HashBytes(append(alertID[:], fcid[:]...))
}

func IDForHost(alertID [32]byte, hk types.PublicKey) types.Hash256 {
return types.HashBytes(append(alertID[:], hk[:]...))
}

func IDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slabKey.String())...))
}

func RandomAlertID() types.Hash256 {
return frand.Entropy256()
}

func (ar AlertsResponse) Total() int {
return ar.Totals.Info + ar.Totals.Warning + ar.Totals.Error + ar.Totals.Critical
}
Expand Down
7 changes: 6 additions & 1 deletion api/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type (
}
)

// EndHeight of a contract formed using the AutopilotConfig given the current
// period.
func (ap *Autopilot) EndHeight() uint64 {
return ap.CurrentPeriod + ap.Config.Contracts.Period + ap.Config.Contracts.RenewWindow
}

type (
// AutopilotTriggerRequest is the request object used by the /trigger
// endpoint
Expand Down Expand Up @@ -114,7 +120,6 @@ type (
} `json:"gouging"`
NotAcceptingContracts uint64 `json:"notAcceptingContracts"`
NotScanned uint64 `json:"notScanned"`
Unknown uint64 `json:"unknown"`
}
Recommendation *ConfigRecommendation `json:"recommendation,omitempty"`
}
Expand Down
11 changes: 8 additions & 3 deletions autopilot/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,7 +104,11 @@ func (a *accounts) refillWorkersAccountsLoop(ctx context.Context) {
// until the previously launched goroutine returns.
func (a *accounts) refillWorkerAccounts(ctx context.Context, w Worker) {
// fetch config
state := a.ap.State()
cfg, err := a.ap.Config(ctx)
if err != nil {
a.l.Errorw(fmt.Sprintf("failed to fetch config for refill: %v", err))
return
}

// fetch worker id
workerID, err := w.ID(ctx)
Expand All @@ -122,7 +127,7 @@ func (a *accounts) refillWorkerAccounts(ctx context.Context, w Worker) {
}

// fetch all contract set contracts
contractSetContracts, err := a.c.Contracts(ctx, api.ContractsOpts{ContractSet: state.cfg.Contracts.Set})
contractSetContracts, err := a.c.Contracts(ctx, api.ContractsOpts{ContractSet: cfg.Config.Contracts.Set})
if err != nil {
a.l.Errorw(fmt.Sprintf("failed to fetch contract set contracts: %v", err))
return
Expand Down Expand Up @@ -154,7 +159,7 @@ func (a *accounts) refillWorkerAccounts(ctx context.Context, w Worker) {
}
} else {
// dismiss alerts on success
a.ap.DismissAlert(ctx, alertIDForAccount(alertAccountRefillID, accountID))
a.ap.DismissAlert(ctx, alerts.IDForAccount(alertAccountRefillID, accountID))

// log success
if refilled {
Expand Down
92 changes: 10 additions & 82 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,15 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)

var (
alertAccountRefillID = randomAlertID() // constant until restarted
alertChurnID = randomAlertID() // constant until restarted
alertLostSectorsID = randomAlertID() // constant until restarted
alertLowBalanceID = randomAlertID() // constant until restarted
alertMigrationID = randomAlertID() // constant until restarted
alertPruningID = randomAlertID() // constant until restarted
alertRenewalFailedID = randomAlertID() // constant until restarted
alertAccountRefillID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertMigrationID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func alertIDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 {
return types.HashBytes(append(alertID[:], id[:]...))
}

func alertIDForContract(alertID [32]byte, fcid types.FileContractID) types.Hash256 {
return types.HashBytes(append(alertID[:], fcid[:]...))
}

func alertIDForHost(alertID [32]byte, hk types.PublicKey) types.Hash256 {
return types.HashBytes(append(alertID[:], hk[:]...))
}

func alertIDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slabKey.String())...))
}

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
if err := ap.alerts.RegisterAlert(ctx, a); err != nil {
ap.logger.Errorf("failed to register alert: %v", err)
Expand All @@ -55,20 +31,6 @@ func (ap *Autopilot) DismissAlert(ctx context.Context, ids ...types.Hash256) {
}
}

func (ap *Autopilot) HasAlert(ctx context.Context, id types.Hash256) bool {
ar, err := ap.alerts.Alerts(ctx, alerts.AlertsOpts{Offset: 0, Limit: -1})
if err != nil {
ap.logger.Errorf("failed to fetch alerts: %v", err)
return false
}
for _, alert := range ar.Alerts {
if alert.ID == id {
return true
}
}
return false
}

func newAccountLowBalanceAlert(address types.Address, balance, allowance types.Currency, bh, renewWindow, endHeight uint64) alerts.Alert {
severity := alerts.SeverityInfo
if bh+renewWindow/2 >= endHeight {
Expand Down Expand Up @@ -103,34 +65,14 @@ func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err
}

return alerts.Alert{
ID: alertIDForAccount(alertAccountRefillID, id),
ID: alerts.IDForAccount(alertAccountRefillID, id),
Severity: alerts.SeverityError,
Message: "Ephemeral account refill failed",
Data: data,
Timestamp: time.Now(),
}
}

func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bool, err error) alerts.Alert {
severity := alerts.SeverityWarning
if interrupted {
severity = alerts.SeverityCritical
}

return alerts.Alert{
ID: alertIDForContract(alertRenewalFailedID, contract.ID),
Severity: severity,
Message: "Contract renewal failed",
Data: map[string]interface{}{
"error": err.Error(),
"renewalsInterrupted": interrupted,
"contractID": contract.ID.String(),
"hostKey": contract.HostKey.String(),
},
Timestamp: time.Now(),
}
}

func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid types.FileContractID, err error) *alerts.Alert {
data := map[string]interface{}{"error": err.Error()}
if hk != (types.PublicKey{}) {
Expand All @@ -144,28 +86,14 @@ func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid type
}

return &alerts.Alert{
ID: alertIDForContract(alertPruningID, fcid),
ID: alerts.IDForContract(alertPruningID, fcid),
Severity: alerts.SeverityWarning,
Message: "Contract pruning failed",
Data: data,
Timestamp: time.Now(),
}
}

func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert {
return alerts.Alert{
ID: alertIDForHost(alertLostSectorsID, hk),
Severity: alerts.SeverityWarning,
Message: "Host has lost sectors",
Data: map[string]interface{}{
"lostSectors": lostSectors,
"hostKey": hk.String(),
"hint": "The host has reported that it can't serve at least one sector. Consider blocking this host through the blocklist feature. If you think this was a mistake and you want to ignore this warning for now you can reset the lost sector count",
},
Timestamp: time.Now(),
}
}

func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
data := make(map[string]interface{})
if rounded := estimate.Round(time.Minute); rounded > 0 {
Expand All @@ -183,7 +111,7 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
Expand All @@ -206,7 +134,7 @@ func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float6
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Expand All @@ -233,7 +161,7 @@ func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objec
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Expand All @@ -243,7 +171,7 @@ func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objec

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: randomAlertID(),
ID: alerts.RandomAlertID(),
Severity: alerts.SeverityCritical,
Message: "Health refresh failed",
Data: map[string]interface{}{
Expand Down
Loading

0 comments on commit c1492e8

Please sign in to comment.