Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend migration failure alerts with affected objects #1081

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,37 @@ func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Ale
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
},
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
Expand All @@ -218,15 +233,10 @@ func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err e
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
},
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}
Expand Down
3 changes: 3 additions & 0 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type Bus interface {
RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error

// buckets
ListBuckets(ctx context.Context) ([]api.Bucket, error)

// objects
ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error)
RefreshHealth(ctx context.Context) error
Expand Down
45 changes: 42 additions & 3 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,23 @@ func (m *migrator) performMigrations(p *workerPool) {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))

if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
skipAlert := utils.IsErr(err, api.ErrSlabNotFound)
if !skipAlert {
// fetch all object IDs for the slab we failed to migrate
var objectIds map[string][]string
if res, err := m.objectIDsForSlabKey(ctx, j.Key); err != nil {
m.logger.Errorf("failed to fetch object ids for slab key; %w", err)
} else {
objectIds = res
}

// register the alert
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err))
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, objectIds, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err))
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, objectIds, err))
}
}
} else {
Expand Down Expand Up @@ -274,3 +282,34 @@ OUTER:
return
}
}

func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) {
// fetch all buckets
//
// NOTE:at the time of writing the bus does not support fetching objects by
// slab key across all buckets at once, therefor we have to list all buckets
// and loop over them, revisit on the next major release
buckets, err := m.ap.bus.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("%w; failed to list buckets", err)
}

// fetch all objects per bucket
idsPerBucket := make(map[string][]string)
for _, bucket := range buckets {
objects, err := m.ap.bus.ObjectsBySlabKey(ctx, bucket.Name, key)
if err != nil {
m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err)
continue
} else if len(objects) == 0 {
continue
}

idsPerBucket[bucket.Name] = make([]string, len(objects))
for i, object := range objects {
idsPerBucket[bucket.Name][i] = object.Name
}
}

return idsPerBucket, nil
}
94 changes: 73 additions & 21 deletions internal/test/e2e/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/test"
"lukechampine.com/frand"
Expand All @@ -19,27 +23,29 @@ func TestMigrations(t *testing.T) {
t.SkipNow()
}

// create a new test cluster
// configure the cluster to use one extra host
rs := test.RedundancySettings
cfg := test.AutopilotConfig
cfg.Contracts.Amount = uint64(test.RedundancySettings.TotalShards) + 1
cfg.Contracts.Amount = uint64(rs.TotalShards) + 1

// create a new test cluster
cluster := newTestCluster(t, testClusterOptions{
// configure the cluster to use 1 more host than the total shards in the
// redundancy settings.
autopilotSettings: &cfg,
hosts: int(test.RedundancySettings.TotalShards) + 1,
hosts: int(cfg.Contracts.Amount),
})
defer cluster.Shutdown()

// convenience variables
b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// create a helper to fetch used hosts
usedHosts := func(path string) map[types.PublicKey]struct{} {
// fetch used hosts
res, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, path, api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
} else if res.Object == nil {
res, _ := b.Object(context.Background(), api.DefaultBucketName, path, api.GetObjectOptions{})
if res.Object == nil {
t.Fatal("object not found")
}

used := make(map[types.PublicKey]struct{})
for _, slab := range res.Object.Slabs {
for _, sector := range slab.Shards {
Expand All @@ -49,18 +55,13 @@ func TestMigrations(t *testing.T) {
return used
}

// convenience variables
w := cluster.Worker
tt := cluster.tt

// add an object
data := make([]byte, rhpv2.SectorSize)
frand.Read(data)
path := "foo"
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{}))
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, t.Name(), api.UploadObjectOptions{}))

// assert amount of hosts used
used := usedHosts(path)
used := usedHosts(t.Name())
if len(used) != test.RedundancySettings.TotalShards {
t.Fatal("unexpected amount of hosts used", len(used), test.RedundancySettings.TotalShards)
}
Expand All @@ -77,13 +78,12 @@ func TestMigrations(t *testing.T) {

// assert we migrated away from the bad host
tt.Retry(300, 100*time.Millisecond, func() error {
if _, used := usedHosts(path)[removed]; used {
if _, used := usedHosts(t.Name())[removed]; used {
return errors.New("host is still used")
}
return nil
})

res, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, path, api.GetObjectOptions{})
res, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, t.Name(), api.GetObjectOptions{})
tt.OK(err)

// check slabs
Expand All @@ -109,8 +109,60 @@ func TestMigrations(t *testing.T) {
shardHosts += len(shard.Contracts)
}
}

// all shards should have 1 host except for 1. So we end up with 4 in total.
if shardHosts != 4 {
t.Fatalf("expected 4 shard hosts, got %v", shardHosts)
}

// create another bucket and add an object
tt.OK(b.CreateBucket(context.Background(), "newbucket", api.CreateBucketOptions{}))
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), "newbucket", t.Name(), api.UploadObjectOptions{}))

// assert we currently don't have any alerts
ress, _ := b.Alerts(context.Background(), alerts.AlertsOpts{})
if ress.Totals.Error+ress.Totals.Critical != 0 {
t.Fatal("unexpected", ress)
}

// remove all hosts to ensure migrations fail
for _, h := range cluster.hosts {
cluster.RemoveHost(h)
}

// fetch alerts and collect object ids until we found two
var got map[string][]string
tt.Retry(100, 100*time.Millisecond, func() error {
got = make(map[string][]string)
ress, err := b.Alerts(context.Background(), alerts.AlertsOpts{})
tt.OK(err)
for _, alert := range ress.Alerts {
// skip if not a migration alert
data, ok := alert.Data["objectIDs"].(map[string]interface{})
if !ok {
continue
}

// collect all object ids per bucket
for bucket, ids := range data {
if objectIDs, ok := ids.([]interface{}); ok {
for _, id := range objectIDs {
got[bucket] = append(got[bucket], id.(string))
}
}
}
}
if len(got) != 2 {
return errors.New("unexpected number of buckets")
}
return nil
})

// assert we found our two objects across two buckets
if want := map[string][]string{
api.DefaultBucketName: {fmt.Sprintf("/%s", t.Name())},
"newbucket": {fmt.Sprintf("/%s", t.Name())},
}; !reflect.DeepEqual(want, got) {
t.Fatal("unexpected", cmp.Diff(want, got))
}
}
Loading