Skip to content

Commit

Permalink
migrator: abort if consensus is not synced
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored and ChrisSchinnerl committed Jul 29, 2024
1 parent 6345823 commit 0d7ca7f
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type (
logger *zap.SugaredLogger
healthCutoff float64
parallelSlabsPerWorker uint64
signalConsensusNotSynced chan struct{}
signalMaintenanceFinished chan struct{}
statsSlabMigrationSpeedMS *stats.DataPoints

Expand Down Expand Up @@ -67,6 +68,7 @@ func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uin
logger: ap.logger.Named("migrator"),
healthCutoff: healthCutoff,
parallelSlabsPerWorker: parallelSlabsPerWorker,
signalConsensusNotSynced: make(chan struct{}, 1),
signalMaintenanceFinished: make(chan struct{}, 1),
statsSlabMigrationSpeedMS: stats.New(time.Hour),
}
Expand Down Expand Up @@ -157,8 +159,14 @@ func (m *migrator) performMigrations(p *workerPool) {
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))
if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
skipAlert := utils.IsErr(err, api.ErrSlabNotFound)
if !skipAlert {
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if !utils.IsErr(err, api.ErrSlabNotFound) {
// fetch all object IDs for the slab we failed to migrate
var objectIds map[string][]string
if res, err := m.objectIDsForSlabKey(ctx, j.Key); err != nil {
Expand Down Expand Up @@ -264,28 +272,27 @@ OUTER:
// log the updated list of slabs to migrate
m.logger.Infof("%d slabs to migrate", len(toMigrate))

// register an alert to notify users about ongoing migrations.
if len(toMigrate) > 0 {
m.ap.RegisterAlert(m.ap.shutdownCtx, newOngoingMigrationsAlert(len(toMigrate), m.slabMigrationEstimate(len(toMigrate))))
}

// return if there are no slabs to migrate
if len(toMigrate) == 0 {
return
}

// register an alert to notify users about ongoing migrations
m.ap.RegisterAlert(m.ap.shutdownCtx, newOngoingMigrationsAlert(len(toMigrate), m.slabMigrationEstimate(len(toMigrate))))

for i, slab := range toMigrate {
select {
case <-m.ap.shutdownCtx.Done():
return
case <-m.signalConsensusNotSynced:
m.logger.Info("migrations interrupted - consensus is not synced")
return
case <-m.signalMaintenanceFinished:
m.logger.Info("migrations interrupted - updating slabs for migration")
continue OUTER
case jobs <- job{slab, i, len(toMigrate), set, b}:
}
}

return
}
}

Expand Down

0 comments on commit 0d7ca7f

Please sign in to comment.