Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate churn information into single alert instead of registering multiple #989

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions alerts/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (

type (
Alerter interface {
Alerts(_ context.Context, opts AlertsOpts) (resp AlertsResponse, err error)
RegisterAlert(_ context.Context, a Alert) error
DismissAlerts(_ context.Context, ids ...types.Hash256) error
}
Expand Down Expand Up @@ -169,17 +170,18 @@ func (m *Manager) DismissAlerts(ctx context.Context, ids ...types.Hash256) error
})
}

// Active returns the host's active alerts.
func (m *Manager) Active(offset, limit int) AlertsResponse {
// Alerts returns the host's active alerts.
func (m *Manager) Alerts(_ context.Context, opts AlertsOpts) (AlertsResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()

offset, limit := opts.Offset, opts.Limit
resp := AlertsResponse{
Total: len(m.alerts),
}

if offset >= len(m.alerts) {
return resp
return resp, nil
} else if limit == -1 {
limit = len(m.alerts)
}
Expand All @@ -197,7 +199,7 @@ func (m *Manager) Active(offset, limit int) AlertsResponse {
resp.HasMore = true
}
resp.Alerts = alerts
return resp
return resp, nil
}

func (m *Manager) RegisterWebhookBroadcaster(b webhooks.Broadcaster) {
Expand Down Expand Up @@ -231,6 +233,11 @@ func WithOrigin(alerter Alerter, origin string) Alerter {
}
}

// Alerts implements the Alerter interface.
func (a *originAlerter) Alerts(ctx context.Context, opts AlertsOpts) (resp AlertsResponse, err error) {
return a.alerter.Alerts(ctx, opts)
}

// RegisterAlert implements the Alerter interface.
func (a *originAlerter) RegisterAlert(ctx context.Context, alert Alert) error {
if alert.Data == nil {
Expand Down
58 changes: 21 additions & 37 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

var (
alertAccountRefillID = frand.Entropy256() // constant until restarted
alertLostSectorsID = frand.Entropy256() // constant until restarted
alertLowBalanceID = frand.Entropy256() // constant until restarted
alertMigrationID = frand.Entropy256() // constant until restarted
alertPruningID = frand.Entropy256() // constant until restarted
alertRenewalFailedID = frand.Entropy256() // constant until restarted
alertAccountRefillID = randomAlertID() // constant until restarted
alertChurnID = randomAlertID() // constant until restarted
alertLostSectorsID = randomAlertID() // constant until restarted
alertLowBalanceID = randomAlertID() // constant until restarted
alertMigrationID = randomAlertID() // constant until restarted
alertPruningID = randomAlertID() // constant until restarted
alertRenewalFailedID = randomAlertID() // constant until restarted
)

func alertIDForAccount(alertID [32]byte, id rhpv3.Account) types.Hash256 {
Expand Down Expand Up @@ -54,6 +55,20 @@ func (ap *Autopilot) DismissAlert(ctx context.Context, ids ...types.Hash256) {
}
}

func (ap *Autopilot) HasAlert(ctx context.Context, id types.Hash256) bool {
ar, err := ap.alerts.Alerts(ctx, alerts.AlertsOpts{Offset: 0, Limit: -1})
if err != nil {
ap.logger.Errorf("failed to fetch alerts: %v", err)
return false
}
for _, alert := range ar.Alerts {
if alert.ID == id {
return true
}
}
return false
}

func newAccountLowBalanceAlert(address types.Address, balance, allowance types.Currency, bh, renewWindow, endHeight uint64) alerts.Alert {
severity := alerts.SeverityInfo
if bh+renewWindow/2 >= endHeight {
Expand Down Expand Up @@ -137,37 +152,6 @@ func newContractPruningFailedAlert(hk types.PublicKey, version string, fcid type
}
}

func newContractSetChangeAlert(name string, additions map[types.FileContractID]contractSetAddition, removals map[types.FileContractID]contractSetRemoval) alerts.Alert {
var hint string
if len(removals) > 0 {
hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set."
}

removedReasons := make(map[string]string, len(removals))
for k, v := range removals {
removedReasons[k.String()] = v.Reason
}

return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityInfo,
Message: "Contract set changed",
Data: map[string]any{
"name": name,
"set_additions": additions,
"set_removals": removals,
"hint": hint,

// TODO: these fields can be removed on the next major release, they
// contain redundant information
"added": len(additions),
"removed": len(removals),
"removals": removedReasons,
},
Timestamp: time.Now(),
}
}

func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert {
return alerts.Alert{
ID: alertIDForHost(alertLostSectorsID, hk),
Expand Down
68 changes: 68 additions & 0 deletions autopilot/churn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package autopilot

import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
)

type (
accumulatedChurn struct {
additions map[types.FileContractID]contractSetAdditions
removals map[types.FileContractID]contractSetRemovals
}
)

func newAccumulatedChurn() *accumulatedChurn {
return &accumulatedChurn{
additions: make(map[types.FileContractID]contractSetAdditions),
removals: make(map[types.FileContractID]contractSetRemovals),
}
}

func (c *accumulatedChurn) Alert(name string) alerts.Alert {
var hint string
if len(c.removals) > 0 {
hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set."
}

return alerts.Alert{
ID: alertChurnID,
Severity: alerts.SeverityInfo,
Message: "Contract set changed",
Data: map[string]any{
"name": name,
"set_additions": c.additions,
"set_removals": c.removals,
"hint": hint,
},
Timestamp: time.Now(),
}
}

func (c *accumulatedChurn) Apply(additions map[types.FileContractID]contractSetAdditions, removals map[types.FileContractID]contractSetRemovals) {
for fcid, a := range additions {
if _, exists := c.additions[fcid]; !exists {
c.additions[fcid] = a
} else {
additions := c.additions[fcid]
additions.Additions = append(additions.Additions, a.Additions...)
c.additions[fcid] = additions
}
}
for fcid, r := range removals {
if _, exists := c.removals[fcid]; !exists {
c.removals[fcid] = r
} else {
removals := c.removals[fcid]
removals.Removals = append(removals.Removals, r.Removals...)
c.removals[fcid] = removals
}
}
}

func (c *accumulatedChurn) Reset() {
c.additions = make(map[types.FileContractID]contractSetAdditions)
c.removals = make(map[types.FileContractID]contractSetRemovals)
}
64 changes: 47 additions & 17 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
type (
contractor struct {
ap *Autopilot
churn *accumulatedChurn
resolver *ipResolver
logger *zap.SugaredLogger

Expand Down Expand Up @@ -122,15 +123,25 @@ type (
recoverable bool
}

contractSetAdditions struct {
HostKey types.PublicKey `json:"hostKey"`
Additions []contractSetAddition `json:"additions"`
}

contractSetAddition struct {
Size uint64 `json:"size"`
HostKey types.PublicKey `json:"hostKey"`
Size uint64 `json:"size"`
Time api.TimeRFC3339 `json:"time"`
}

contractSetRemovals struct {
HostKey types.PublicKey `json:"hostKey"`
Removals []contractSetRemoval `json:"removals"`
}

contractSetRemoval struct {
Size uint64 `json:"size"`
HostKey types.PublicKey `json:"hostKey"`
Reason string `json:"reason"`
Size uint64 `json:"size"`
Reason string `json:"reasons"`
Time api.TimeRFC3339 `json:"time"`
}

renewal struct {
Expand All @@ -143,6 +154,7 @@ type (
func newContractor(ap *Autopilot, revisionSubmissionBuffer uint64, revisionBroadcastInterval time.Duration) *contractor {
return &contractor{
ap: ap,
churn: newAccumulatedChurn(),
logger: ap.logger.Named("contractor"),

revisionBroadcastInterval: revisionBroadcastInterval,
Expand Down Expand Up @@ -453,8 +465,9 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
}

// log added and removed contracts
setAdditions := make(map[types.FileContractID]contractSetAddition)
setRemovals := make(map[types.FileContractID]contractSetRemoval)
setAdditions := make(map[types.FileContractID]contractSetAdditions)
setRemovals := make(map[types.FileContractID]contractSetRemovals)
now := api.TimeNow()
for _, contract := range oldSet {
_, exists := inNewSet[contract.ID]
_, renewed := inNewSet[renewalsFromTo[contract.ID]]
Expand All @@ -464,22 +477,36 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
reason = "unknown"
}

setRemovals[contract.ID] = contractSetRemoval{
Size: contractData[contract.ID],
HostKey: contract.HostKey,
Reason: reason,
if _, exists := setRemovals[contract.ID]; !exists {
setRemovals[contract.ID] = contractSetRemovals{
HostKey: contract.HostKey,
}
}
removals := setRemovals[contract.ID]
removals.Removals = append(removals.Removals, contractSetRemoval{
Size: contractData[contract.ID],
Reason: reason,
Time: now,
})
setRemovals[contract.ID] = removals
c.logger.Debugf("contract %v was removed from the contract set, size: %v, reason: %v", contract.ID, contractData[contract.ID], reason)
}
}
for _, contract := range newSet {
_, existed := inOldSet[contract.ID]
_, renewed := renewalsToFrom[contract.ID]
if !existed && !renewed {
setAdditions[contract.ID] = contractSetAddition{
Size: contractData[contract.ID],
HostKey: contract.HostKey,
if _, exists := setAdditions[contract.ID]; !exists {
setAdditions[contract.ID] = contractSetAdditions{
HostKey: contract.HostKey,
}
}
additions := setAdditions[contract.ID]
additions.Additions = append(additions.Additions, contractSetAddition{
Size: contractData[contract.ID],
Time: now,
})
setAdditions[contract.ID] = additions
c.logger.Debugf("contract %v was added to the contract set, size: %v", contract.ID, contractData[contract.ID])
}
}
Expand All @@ -499,7 +526,6 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
}

// record churn metrics
now := api.TimeNow()
var metrics []api.ContractSetChurnMetric
for fcid := range setAdditions {
metrics = append(metrics, api.ContractSetChurnMetric{
Expand All @@ -514,7 +540,7 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
Name: c.ap.state.cfg.Contracts.Set,
ContractID: fcid,
Direction: api.ChurnDirRemoved,
Reason: removal.Reason,
Reason: removal.Removals[0].Reason,
Timestamp: now,
})
}
Expand All @@ -536,7 +562,11 @@ func (c *contractor) computeContractSetChanged(ctx context.Context, name string,
)
hasChanged := len(setAdditions)+len(setRemovals) > 0
if hasChanged {
c.ap.RegisterAlert(ctx, newContractSetChangeAlert(name, setAdditions, setRemovals))
if !c.ap.HasAlert(ctx, alertChurnID) {
c.churn.Reset()
}
c.churn.Apply(setAdditions, setRemovals)
c.ap.RegisterAlert(ctx, c.churn.Alert(name))
}
return hasChanged
}
Expand Down
11 changes: 9 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,10 @@ func (b *bus) gougingParams(ctx context.Context) (api.GougingParams, error) {
}

func (b *bus) handleGETAlertsDeprecated(jc jape.Context) {
ar := b.alertMgr.Active(0, -1)
ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{Offset: 0, Limit: -1})
if jc.Check("failed to fetch alerts", err) != nil {
return
}
jc.Encode(ar.Alerts)
}

Expand All @@ -1744,7 +1747,11 @@ func (b *bus) handleGETAlerts(jc jape.Context) {
jc.Error(errors.New("offset must be non-negative"), http.StatusBadRequest)
return
}
jc.Encode(b.alertMgr.Active(offset, limit))
ar, err := b.alertMgr.Alerts(jc.Request.Context(), alerts.AlertsOpts{Offset: offset, Limit: limit})
if jc.Check("failed to fetch alerts", err) != nil {
return
}
jc.Encode(ar)
}

func (b *bus) handlePOSTAlertsDismiss(jc jape.Context) {
Expand Down
4 changes: 2 additions & 2 deletions bus/client/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
)

// Alerts fetches the active alerts from the bus.
func (c *Client) Alerts(opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) {
func (c *Client) Alerts(ctx context.Context, opts alerts.AlertsOpts) (resp alerts.AlertsResponse, err error) {
values := url.Values{}
values.Set("offset", fmt.Sprint(opts.Offset))
if opts.Limit != 0 {
values.Set("limit", fmt.Sprint(opts.Limit))
}
err = c.c.GET("/alerts?"+values.Encode(), &resp)
err = c.c.WithContext(ctx).GET("/alerts?"+values.Encode(), &resp)
return
}

Expand Down
6 changes: 3 additions & 3 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,7 @@ func TestAlerts(t *testing.T) {
tt.OK(b.RegisterAlert(context.Background(), alert))
findAlert := func(id types.Hash256) *alerts.Alert {
t.Helper()
ar, err := b.Alerts(alerts.AlertsOpts{})
ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{})
tt.OK(err)
for _, alert := range ar.Alerts {
if alert.ID == id {
Expand Down Expand Up @@ -1960,15 +1960,15 @@ func TestAlerts(t *testing.T) {
}

// try to find with offset = 1
ar, err := b.Alerts(alerts.AlertsOpts{Offset: 1})
ar, err := b.Alerts(context.Background(), alerts.AlertsOpts{Offset: 1})
foundAlerts := ar.Alerts
tt.OK(err)
if len(foundAlerts) != 1 || foundAlerts[0].ID != alert.ID {
t.Fatal("wrong alert")
}

// try to find with limit = 1
ar, err = b.Alerts(alerts.AlertsOpts{Limit: 1})
ar, err = b.Alerts(context.Background(), alerts.AlertsOpts{Limit: 1})
foundAlerts = ar.Alerts
tt.OK(err)
if len(foundAlerts) != 1 || foundAlerts[0].ID != alert2.ID {
Expand Down
Loading