diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 234589c81..1fc848ace 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -27,6 +27,7 @@ type ( logger *zap.SugaredLogger healthCutoff float64 parallelSlabsPerWorker uint64 + signalConsensusNotSynced chan struct{} signalMaintenanceFinished chan struct{} statsSlabMigrationSpeedMS *stats.DataPoints @@ -67,6 +68,7 @@ func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uin logger: ap.logger.Named("migrator"), healthCutoff: healthCutoff, parallelSlabsPerWorker: parallelSlabsPerWorker, + signalConsensusNotSynced: make(chan struct{}, 1), signalMaintenanceFinished: make(chan struct{}, 1), statsSlabMigrationSpeedMS: stats.New(time.Hour), } @@ -157,8 +159,14 @@ func (m *migrator) performMigrations(p *workerPool) { m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds())) if err != nil { m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err) - skipAlert := utils.IsErr(err, api.ErrSlabNotFound) - if !skipAlert { + if utils.IsErr(err, api.ErrConsensusNotSynced) { + // interrupt migrations if consensus is not synced + select { + case m.signalConsensusNotSynced <- struct{}{}: + default: + } + return + } else if !utils.IsErr(err, api.ErrSlabNotFound) { // fetch all object IDs for the slab we failed to migrate var objectIds map[string][]string if res, err := m.objectIDsForSlabKey(ctx, j.Key); err != nil { @@ -264,28 +272,27 @@ OUTER: // log the updated list of slabs to migrate m.logger.Infof("%d slabs to migrate", len(toMigrate)) - // register an alert to notify users about ongoing migrations. - if len(toMigrate) > 0 { - m.ap.RegisterAlert(m.ap.shutdownCtx, newOngoingMigrationsAlert(len(toMigrate), m.slabMigrationEstimate(len(toMigrate)))) - } - // return if there are no slabs to migrate if len(toMigrate) == 0 { return } + // register an alert to notify users about ongoing migrations + m.ap.RegisterAlert(m.ap.shutdownCtx, newOngoingMigrationsAlert(len(toMigrate), m.slabMigrationEstimate(len(toMigrate)))) + for i, slab := range toMigrate { select { case <-m.ap.shutdownCtx.Done(): return + case <-m.signalConsensusNotSynced: + m.logger.Info("migrations interrupted - consensus is not synced") + return case <-m.signalMaintenanceFinished: m.logger.Info("migrations interrupted - updating slabs for migration") continue OUTER case jobs <- job{slab, i, len(toMigrate), set, b}: } } - - return } }