Skip to content

Commit

Permalink
Merge pull request #989 from SiaFoundation/chris/churn-alert-accumulator
Browse files Browse the repository at this point in the history
Accumulate churn information into single alert instead of registering multiple
  • Loading branch information
ChrisSchinnerl authored Feb 23, 2024
2 parents 42338c7 + c17252a commit 3f09191
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 65 deletions.
15 changes: 11 additions & 4 deletions alerts/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (

type (
Alerter interface {
Alerts(_ context.Context, opts AlertsOpts) (resp AlertsResponse, err error)
RegisterAlert(_ context.Context, a Alert) error
DismissAlerts(_ context.Context, ids ...types.Hash256) error
}
Expand Down Expand Up @@ -169,17 +170,18 @@ func (m *Manager) DismissAlerts(ctx context.Context, ids ...types.Hash256) error
})
}

// Active returns the host's active alerts.
func (m *Manager) Active(offset, limit int) AlertsResponse {
// Alerts returns the host's active alerts.
func (m *Manager) Alerts(_ context.Context, opts AlertsOpts) (AlertsResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()

offset, limit := opts.Offset, opts.Limit
resp := AlertsResponse{
Total: len(m.alerts),
}

if offset >= len(m.alerts) {
return resp
return resp, nil
} else if limit == -1 {
limit = len(m.alerts)
}
Expand All @@ -197,7 +199,7 @@ func (m *Manager) Active(offset, limit int) AlertsResponse {
resp.HasMore = true
}
resp.Alerts = alerts
return resp
return resp, nil
}

func (m *Manager) RegisterWebhookBroadcaster(b webhooks.Broadcaster) {
Expand Down Expand Up @@ -231,6 +233,11 @@ func WithOrigin(alerter Alerter, origin string) Alerter {
}
}

// Alerts implements the Alerter interface.
func (a *originAlerter) Alerts(ctx context.Context, opts AlertsOpts) (resp AlertsResponse, err error) {
return a.alerter.Alerts(ctx, opts)
}

// RegisterAlert implements the Alerter interface.
func (a *originAlerter) RegisterAlert(ctx context.Context, alert Alert) error {
if alert.Data == nil {
Expand Down
58 changes: 21 additions & 37 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

var (
alertAccountRefillID = frand.Entropy256() // constant until restarted
alertLostSectorsID = frand.Entropy256() // constant until restarted
alertLowBalanceID = frand.Entropy256() // constant until restarted
alertMigrationID = frand.Entropy256() // constant until restarted
alertPruningID = frand.Entropy256() // constant until restarted
alertRenewalFailedID = frand.Entropy256() // constant until restarted
alertAccountRefillID = randomAlertID() // constant until restarted
alertChurnID = randomAlertID() // constant until restarted
alertLostSectorsID = randomAlertID() // constant until restarted
alertLowBalanceID = randomAlertID() // constant until restarted
alertMigrationID = randomAlertID() // constant until restarted
alertPruningID = randomAlertID() // constant until restarted
alertRenewalFailedID = randomAlertID() // constant until restarted
)

func alertIDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 {
Expand Down Expand Up @@ -54,6 +55,20 @@ func (ap *Autopilot) DismissAlert(ctx context.Context, ids ...types.Hash256) {
}
}

func (ap *Autopilot) HasAlert(ctx context.Context, id types.Hash256) bool {
ar, err := ap.alerts.Alerts(ctx, alerts.AlertsOpts{Offset: 0, Limit: -1})
if err != nil {
ap.logger.Errorf("failed to fetch alerts: %v", err)
return false
}
for _, alert := range ar.Alerts {
if alert.ID == id {
return true
}
}
return false
}

func newAccountLowBalanceAlert(address types.Address, balance, allowance types.Currency, bh, renewWindow, endHeight uint64) alerts.Alert {
severity := alerts.SeverityInfo
if bh+renewWindow/2 >= endHeight {
Expand Down Expand Up @@ -137,37 +152,6 @@ func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid type
}
}

func newContractSetChangeAlert(name string, additions map[types.FileContractID]contractSetAddition, removals map[types.FileContractID]contractSetRemoval) alerts.Alert {
var hint string
if len(removals) > 0 {
hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set."
}

removedReasons := make(map[string]string, len(removals))
for k, v := range removals {
removedReasons[k.String()] = v.Reason
}

return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityInfo,
Message: "Contract set changed",
Data: map[string]any{
"name": name,
"set_additions": additions,
"set_removals": removals,
"hint": hint,

// TODO: these fields can be removed on the next major release, they
// contain redundant information
"added": len(additions),
"removed": len(removals),
"removals": removedReasons,
},
Timestamp: time.Now(),
}
}

func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert {
return alerts.Alert{
ID: alertIDForHost(alertLostSectorsID, hk),
Expand Down
68 changes: 68 additions & 0 deletions autopilot/churn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package autopilot

import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
)

type (
accumulatedChurn struct {
additions map[types.FileContractID]contractSetAdditions
removals map[types.FileContractID]contractSetRemovals
}
)

func newAccumulatedChurn() *accumulatedChurn {
return &accumulatedChurn{
additions: make(map[types.FileContractID]contractSetAdditions),
removals: make(map[types.FileContractID]contractSetRemovals),
}
}

func (c *accumulatedChurn) Alert(name string) alerts.Alert {
var hint string
if len(c.removals) > 0 {
hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set."
}

return alerts.Alert{
ID: alertChurnID,
Severity: alerts.SeverityInfo,
Message: "Contract set changed",
Data: map[string]any{
"name": name,
"set_additions": c.additions,
"set_removals": c.removals,
"hint": hint,
},
Timestamp: time.Now(),
}
}

func (c *accumulatedChurn) Apply(additions map[types.FileContractID]contractSetAdditions, removals map[types.FileContractID]contractSetRemovals) {
for fcid, a := range additions {
if _, exists := c.additions[fcid]; !exists {
c.additions[fcid] = a
} else {
additions := c.additions[fcid]
additions.Additions = append(additions.Additions, a.Additions...)
c.additions[fcid] = additions
}
}
for fcid, r := range removals {
if _, exists := c.removals[fcid]; !exists {
c.removals[fcid] = r
} else {
removals := c.removals[fcid]
removals.Removals = append(removals.Removals, r.Removals...)
c.removals[fcid] = removals
}
}
}

func (c *accumulatedChurn) Reset() {
c.additions = make(map[types.FileContractID]contractSetAdditions)
c.removals = make(map[types.FileContractID]contractSetRemovals)
}
64 changes: 47 additions & 17 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
type (
contractor struct {
ap *Autopilot
churn *accumulatedChurn
resolver *ipResolver
logger *zap.SugaredLogger

Expand Down Expand Up @@ -122,15 +123,25 @@ type (
recoverable bool
}

contractSetAdditions struct {
HostKey types.PublicKey `json:"hostKey"`
Additions []contractSetAddition `json:"additions"`
}

contractSetAddition struct {
Size uint64 `json:"size"`
HostKey types.PublicKey `json:"hostKey"`
Size uint64 `json:"size"`
Time api.TimeRFC3339 `json:"time"`
}

contractSetRemovals struct {
HostKey types.PublicKey `json:"hostKey"`
Removals []contractSetRemoval `json:"removals"`
}

contractSetRemoval struct {
Size uint64 `json:"size"`
HostKey types.PublicKey `json:"hostKey"`
Reason string `json:"reason"`
Size uint64 `json:"size"`
Reason string `json:"reasons"`
Time api.TimeRFC3339 `json:"time"`
}

renewal struct {
Expand All @@ -143,6 +154,7 @@ type (
func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *contractor {
return &contractor{
ap: ap,
churn: newAccumulatedChurn(),
logger: ap.logger.Named("contractor"),

revisionBroadcastInterval: revisionBroadcastInterval,
Expand Down Expand Up @@ -453,8 +465,9 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
}

// log added and removed contracts
setAdditions := make(map[types.FileContractID]contractSetAddition)
setRemovals := make(map[types.FileContractID]contractSetRemoval)
setAdditions := make(map[types.FileContractID]contractSetAdditions)
setRemovals := make(map[types.FileContractID]contractSetRemovals)
now := api.TimeNow()
for _, contract := range oldSet {
_, exists := inNewSet[contract.ID]
_, renewed := inNewSet[renewalsFromTo[contract.ID]]
Expand All @@ -464,22 +477,36 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
reason = "unknown"
}

setRemovals[contract.ID] = contractSetRemoval{
Size: contractData[contract.ID],
HostKey: contract.HostKey,
Reason: reason,
if _, exists := setRemovals[contract.ID]; !exists {
setRemovals[contract.ID] = contractSetRemovals{
HostKey: contract.HostKey,
}
}
removals := setRemovals[contract.ID]
removals.Removals = append(removals.Removals, contractSetRemoval{
Size: contractData[contract.ID],
Reason: reason,
Time: now,
})
setRemovals[contract.ID] = removals
c.logger.Debugf("contract %v was removed from the contract set, size: %v, reason: %v", contract.ID, contractData[contract.ID], reason)
}
}
for _, contract := range newSet {
_, existed := inOldSet[contract.ID]
_, renewed := renewalsToFrom[contract.ID]
if !existed && !renewed {
setAdditions[contract.ID] = contractSetAddition{
Size: contractData[contract.ID],
HostKey: contract.HostKey,
if _, exists := setAdditions[contract.ID]; !exists {
setAdditions[contract.ID] = contractSetAdditions{
HostKey: contract.HostKey,
}
}
additions := setAdditions[contract.ID]
additions.Additions = append(additions.Additions, contractSetAddition{
Size: contractData[contract.ID],
Time: now,
})
setAdditions[contract.ID] = additions
c.logger.Debugf("contract %v was added to the contract set, size: %v", contract.ID, contractData[contract.ID])
}
}
Expand All @@ -499,7 +526,6 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
}

// record churn metrics
now := api.TimeNow()
var metrics []api.ContractSetChurnMetric
for fcid := range setAdditions {
metrics = append(metrics, api.ContractSetChurnMetric{
Expand All @@ -514,7 +540,7 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
Name: c.ap.state.cfg.Contracts.Set,
ContractID: fcid,
Direction: api.ChurnDirRemoved,
Reason: removal.Reason,
Reason: removal.Removals[0].Reason,
Timestamp: now,
})
}
Expand All @@ -536,7 +562,11 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
)
hasChanged := len(setAdditions)+len(setRemovals) > 0
if hasChanged {
c.ap.RegisterAlert(ctx, newContractSetChangeAlert(name, setAdditions, setRemovals))
if !c.ap.HasAlert(ctx, alertChurnID) {
c.churn.Reset()
}
c.churn.Apply(setAdditions, setRemovals)
c.ap.RegisterAlert(ctx, c.churn.Alert(name))
}
return hasChanged
}
Expand Down
11 changes: 9 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,10 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) {
}

func (b *bus) handleGETAlertsDeprecated(jc jape.Context) {
ar := b.alertMgr.Active(0, -1)
ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{Offset: 0, Limit: -1})
if jc.Check("failed to fetch alerts", err) != nil {
return
}
jc.Encode(ar.Alerts)
}

Expand All @@ -1744,7 +1747,11 @@ func (b *bus) handleGETAlerts(jc jape.Context) {
jc.Error(errors.New("offset must be non-negative"), http.StatusBadRequest)
return
}
jc.Encode(b.alertMgr.Active(offset, limit))
ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{Offset: offset, Limit: limit})
if jc.Check("failed to fetch alerts", err) != nil {
return
}
jc.Encode(ar)
}

func (b *bus) handlePOSTAlertsDismiss(jc jape.Context) {
Expand Down
4 changes: 2 additions & 2 deletions bus/client/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
)

// Alerts fetches the active alerts from the bus.
func (c *Client) Alerts(opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) {
func (c *Client) Alerts(ctx context.Context, opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) {
values := url.Values{}
values.Set("offset", fmt.Sprint(opts.Offset))
if opts.Limit != 0 {
values.Set("limit", fmt.Sprint(opts.Limit))
}
err = c.c.GET("/alerts?"+values.Encode(), &resp)
err = c.c.WithContext(ctx).GET("/alerts?"+values.Encode(), &resp)
return
}

Expand Down
6 changes: 3 additions & 3 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,7 @@ func TestAlerts(t *testing.T) {
tt.OK(b.RegisterAlert(context.Background(), alert))
findAlert := func(id types.Hash256) *alerts.Alert {
t.Helper()
ar, err := b.Alerts(alerts.AlertsOpts{})
ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{})
tt.OK(err)
for _, alert := range ar.Alerts {
if alert.ID == id {
Expand Down Expand Up @@ -1960,15 +1960,15 @@ func TestAlerts(t *testing.T) {
}

// try to find with offset = 1
ar, err := b.Alerts(alerts.AlertsOpts{Offset: 1})
ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{Offset: 1})
foundAlerts := ar.Alerts
tt.OK(err)
if len(foundAlerts) != 1 || foundAlerts[0].ID != alert.ID {
t.Fatal("wrong alert")
}

// try to find with limit = 1
ar, err = b.Alerts(alerts.AlertsOpts{Limit: 1})
ar, err = b.Alerts(context.Background(), alerts.AlertsOpts{Limit: 1})
foundAlerts = ar.Alerts
tt.OK(err)
if len(foundAlerts) != 1 || foundAlerts[0].ID != alert2.ID {
Expand Down

0 comments on commit 3f09191

Please sign in to comment.