Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move migrations to the autopilot #1735

Merged
merged 22 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dd121fb
downloader: move downloadManager to its own package
ChrisSchinnerl Dec 6, 2024
d9fec9f
internal: move upload manager
peterjan Dec 9, 2024
66dddf1
docs: add changelog file
peterjan Dec 9, 2024
119609d
testing: fix upload tests
peterjan Dec 9, 2024
88a4455
testing: fix parameter order
peterjan Dec 9, 2024
77d86b3
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Dec 9, 2024
00e6bc3
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Dec 10, 2024
066a6c4
cluster: move migrations from worker to autopilot
peterjan Dec 10, 2024
df9976d
testing: fix TestDownloadAllHosts
peterjan Dec 10, 2024
99c6b09
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Dec 10, 2024
a933e31
docs: add changelog
peterjan Dec 10, 2024
8aa2f27
worker: remove migrate slab from worker API
peterjan Dec 10, 2024
b9cffad
internal: deduplicate spending and host manager
peterjan Dec 10, 2024
738cd3f
worker: set host manager
peterjan Dec 10, 2024
33f7996
migrator: cleanup
peterjan Dec 10, 2024
cde0fb1
host: remove FundAccount
peterjan Dec 10, 2024
8da3f8d
all: implement review remarks
peterjan Dec 11, 2024
75dc1f6
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Dec 11, 2024
a3c4316
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Dec 12, 2024
9c08718
Merge branch 'dev' into pj/move-migrations
ChrisSchinnerl Dec 12, 2024
4e08d66
Merge branch 'dev' of github.com:SiaFoundation/renterd into pj/move-m…
peterjan Dec 16, 2024
030c351
Merge branch 'pj/move-migrations' of github.com:SiaFoundation/renterd…
peterjan Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/move_migrations_to_the_autopilot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
default: major
---

# Move migrations to the autopilot
18 changes: 7 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,19 @@ overview of all settings configurable through the CLI.
| `Worker.UploadOverdriveTimeout` | Timeout for overdriving slab uploads | `3s` | `--worker.uploadOverdriveTimeout` | - | `worker.uploadOverdriveTimeout` |
| `Worker.Enabled` | Enables/disables worker | `true` | `--worker.enabled` | `RENTERD_WORKER_ENABLED` | `worker.enabled` |
| `Worker.AllowUnauthenticatedDownloads` | Allows unauthenticated downloads | - | `--worker.unauthenticatedDownloads` | `RENTERD_WORKER_UNAUTHENTICATED_DOWNLOADS` | `worker.allowUnauthenticatedDownloads` |
| `Worker.RemoteAddrs` | List of remote worker addresses (semicolon delimited) | - | - | `RENTERD_WORKER_REMOTE_ADDRS` | `worker.remotes` |
| `Worker.RemotePassword` | API password for the remote workers | - | - | `RENTERD_WORKER_API_PASSWORD` | `worker.remotes` |
| `Autopilot.Enabled` | Enables/disables autopilot | `true` | `--autopilot.enabled` | `RENTERD_AUTOPILOT_ENABLED` | `autopilot.enabled` |
| `Autopilot.AccountsRefillInterval` | Interval for refilling workers' account balances | `24h` | `--autopilot.accountRefillInterval` | - | `autopilot.accountsRefillInterval` |
| `Autopilot.Heartbeat` | Interval for autopilot loop execution | `30m` | `--autopilot.heartbeat` | - | `autopilot.heartbeat` |
| `Autopilot.MigrationHealthCutoff` | Threshold for migrating slabs based on health | `0.75` | `--autopilot.migrationHealthCutoff` | - | `autopilot.migrationHealthCutoff` |
| `Autopilot.RevisionBroadcastInterval`| Interval for broadcasting contract revisions | `168h` (7 days) | `--autopilot.revisionBroadcastInterval` | `RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL` | `autopilot.revisionBroadcastInterval` |
| `Autopilot.ScannerBatchSize` | Batch size for host scanning | `1000` | `--autopilot.scannerBatchSize` | - | `autopilot.scannerBatchSize` |
| `Autopilot.ScannerInterval` | Interval for scanning hosts | `24h` | `--autopilot.scannerInterval` | - | `autopilot.scannerInterval` |
| `Autopilot.ScannerNumThreads` | Number of threads for scanning hosts | `100` | - | - | `autopilot.scannerNumThreads` |
| `Autopilot.MigratorParallelSlabsPerWorker` | Parallel slab migrations per worker | `1` | `--autopilot.migratorParallelSlabsPerWorker` | `RENTERD_MIGRATOR_PARALLEL_SLABS_PER_WORKER` | `autopilot.migratorParallelSlabsPerWorker` |
| `Migrator.HealthCutoff` | Threshold for migrating slabs based on health | `0.75` | `--migrator.healthCutoff` | - | `migrator.HealthCutoff` |
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
| `Migrator.ParallelSlabsPerWorker` | Parallel slab migrations | `1` | `--migrator.parallelSlabsPerWorker` | `RENTERD_MIGRATOR_PARALLEL_SLABS_PER_WORKER` | `migrator.parallelSlabsPerWorker` |
peterjan marked this conversation as resolved.
Show resolved Hide resolved
| `Migrator.DownloadMaxOverdrive` | Max overdrive workers for migration downloads | `5` | `--migrator.downloadMaxOverdrive` | - | `migrator.downloadMaxOverdrive` |
| `Migrator.DownloadOverdriveTimeout` | Timeout for overdriving migration downloads | `3s` | `--migrator.downloadOverdriveTimeout` | - | `migrator.downloadOverdriveTimeout` |
| `Migrator.UploadMaxOverdrive` | Max overdrive workers for migration uploads | `5` | `--migrator.uploadMaxOverdrive` | - | `migrator.uploadMaxOverdrive` |
| `Migrator.UploadOverdriveTimeout` | Timeout for overdriving migration uploads | `3s` | `--migrator.uploadOverdriveTimeout` | - | `migrator.uploadOverdriveTimeout` |
| `S3.Address` | Address for serving S3 API | `:9982` | `--s3.address` | `RENTERD_S3_ADDRESS` | `s3.address` |
| `S3.DisableAuth` | Disables authentication for S3 API | `false` | `--s3.disableAuth` | `RENTERD_S3_DISABLE_AUTH` | `s3.disableAuth` |
| `S3.Enabled` | Enables/disables S3 API | `true` | `--s3.enabled` | `RENTERD_S3_ENABLED` | `s3.enabled` |
Expand Down Expand Up @@ -135,11 +137,7 @@ occur. Therefor it is important to start the worker after the bus is reachable.

To run the autopilot separately, the worker has to be disabled using the
`--worker.enabled` flag. Similar to the worker, the autopilot has to be
configured with a remote bus for the node not to start a bus itself. Alongside
with knowing where the bus is located, the autopilot also has to be aware of the
workers. These remote workers can be configured through yaml under the option
`worker.remotes`, or through environment variables
(`RENTERD_WORKER_REMOTE_ADDRS` and `RENTERD_WORKER_API_PASSWORD`).
configured with a remote bus for the node not to start a bus itself.

#### Example docker-compose with minimal configuration

Expand Down Expand Up @@ -193,8 +191,6 @@ services:
- RENTERD_API_PASSWORD=autopilot-pass
- RENTERD_BUS_API_PASSWORD=bus-pass
- RENTERD_BUS_REMOTE_ADDR=http://bus:9980/api/bus
- RENTERD_WORKER_API_PASSWORD=<worker-password>
- RENTERD_WORKER_REMOTE_ADDRS=http://worker-1:9980/api/worker;http://worker-2:9980/api/worker
ports:
- "9984:9980"
depends_on:
Expand Down
33 changes: 2 additions & 31 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
)

var (
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
Expand Down Expand Up @@ -58,30 +56,3 @@ func newContractPruningFailedAlert(hk types.PublicKey, version, release string,
Timestamp: time.Now(),
}
}

func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
data := make(map[string]interface{})
if rounded := estimate.Round(time.Minute); rounded > 0 {
data["estimate"] = fmt.Sprintf("~%v remaining", rounded)
}

return alerts.Alert{
ID: alertOngoingMigrationsID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertHealthRefreshID,
Severity: alerts.SeverityCritical,
Message: "Health refresh failed",
Data: map[string]interface{}{
"error": err.Error(),
},
Timestamp: time.Now(),
}
}
215 changes: 122 additions & 93 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"sync"
"time"

rhpv3 "go.sia.tech/core/rhp/v3"

"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/autopilot/contractor"
"go.sia.tech/renterd/autopilot/migrator"
"go.sia.tech/renterd/autopilot/scanner"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/config"
Expand All @@ -31,6 +34,8 @@ type Bus interface {

// accounts
Accounts(ctx context.Context, owner string) (accounts []api.Account, err error)
FundAccount(ctx context.Context, account rhpv3.Account, fcid types.FileContractID, amount types.Currency) (types.Currency, error)
UpdateAccounts(context.Context, []api.Account) error

// autopilot
AutopilotConfig(ctx context.Context) (api.AutopilotConfig, error)
Expand All @@ -49,6 +54,8 @@ type Bus interface {
FormContract(ctx context.Context, renterAddress types.Address, renterFunds types.Currency, hostKey types.PublicKey, hostCollateral types.Currency, endHeight uint64) (api.ContractMetadata, error)
ContractRevision(ctx context.Context, fcid types.FileContractID) (api.Revision, error)
RenewContract(ctx context.Context, fcid types.FileContractID, endHeight uint64, renterFunds, minNewCollateral types.Currency, expectedNewStorage uint64) (api.ContractMetadata, error)
RecordContractSpending(ctx context.Context, records []api.ContractSpendingRecord) error
RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error)
UpdateContractUsability(ctx context.Context, contractID types.FileContractID, usability string) (err error)
PrunableData(ctx context.Context) (prunableData api.ContractsPrunableDataResponse, err error)
PruneContract(ctx context.Context, id types.FileContractID, timeout time.Duration) (api.ContractPruneResponse, error)
Expand All @@ -65,16 +72,36 @@ type Bus interface {
// buckets
ListBuckets(ctx context.Context) ([]api.Bucket, error)

// migrations
UploadParams(ctx context.Context) (api.UploadParams, error)
UsableHosts(ctx context.Context) (hosts []api.HostInfo, err error)
AddMultipartPart(ctx context.Context, bucket, key, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error)
AddObject(ctx context.Context, bucket, key string, o object.Object, opts api.AddObjectOptions) error
AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error)
AddUploadingSectors(ctx context.Context, uID api.UploadID, root []types.Hash256) error
FinishUpload(ctx context.Context, uID api.UploadID) error
MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error
TrackUpload(ctx context.Context, uID api.UploadID) error
UpdateSlab(ctx context.Context, key object.EncryptionKey, sectors []api.UploadedSector) error

// locker
AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error)
KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error)
ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error)

// objects
Objects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsResponse, err error)
RefreshHealth(ctx context.Context) error
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
SlabsForMigration(ctx context.Context, healthCutoff float64, limit int) ([]api.UnhealthySlab, error)
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error
FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error)

// scanner
ScanHost(ctx context.Context, hostKey types.PublicKey, timeout time.Duration) (resp api.HostScanResponse, err error)

// settings
GougingParams(ctx context.Context) (api.GougingParams, error)
GougingSettings(ctx context.Context) (gs api.GougingSettings, err error)
UploadSettings(ctx context.Context) (us api.UploadSettings, err error)

Expand All @@ -92,13 +119,12 @@ type Bus interface {
}

type Autopilot struct {
alerts alerts.Alerter
bus Bus
logger *zap.SugaredLogger
workers *workerPool
alerts alerts.Alerter
bus Bus
logger *zap.SugaredLogger

c *contractor.Contractor
m *migrator
m migrator.Migrator
s scanner.Scanner

tickerDuration time.Duration
Expand All @@ -120,14 +146,13 @@ type Autopilot struct {
}

// New initializes an Autopilot.
func New(cfg config.Autopilot, bus Bus, workers []Worker, logger *zap.Logger) (_ *Autopilot, err error) {
func New(cfg config.Autopilot, mCfg config.Migrator, masterKey utils.MasterKey, bus Bus, logger *zap.Logger) (_ *Autopilot, err error) {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
logger = logger.Named("autopilot")
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, "autopilot"),
bus: bus,
logger: logger.Sugar(),
workers: newWorkerPool(workers),
alerts: alerts.WithOrigin(bus, "autopilot"),
bus: bus,
logger: logger.Sugar(),

shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
Expand All @@ -143,7 +168,10 @@ func New(cfg config.Autopilot, bus Bus, workers []Worker, logger *zap.Logger) (_
}

ap.c = contractor.New(bus, bus, cfg.RevisionSubmissionBuffer, cfg.RevisionBroadcastInterval, cfg.AllowRedundantHostIPs, ap.logger)
ap.m = newMigrator(ap, cfg.MigrationHealthCutoff, cfg.MigratorParallelSlabsPerWorker)
ap.m, err = migrator.New(ap.shutdownCtx, mCfg, masterKey, ap.alerts, ap.bus, ap.bus, logger)
if err != nil {
return
}

return ap, nil
}
Expand Down Expand Up @@ -225,88 +253,7 @@ func (ap *Autopilot) Run() {
for !ap.isStopped() {
ap.logger.Info("autopilot iteration starting")
tickerFired := make(chan struct{})
ap.workers.withWorker(func(w Worker) {
defer ap.logger.Info("autopilot iteration ended")

// initiate a host scan - no need to be synced or configured for scanning
ap.s.Scan(ap.shutdownCtx, ap.bus, forceScan)

// reset forceScans
forceScan = false

// block until consensus is synced
if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
if interrupted {
close(tickerFired)
return
}
ap.logger.Info("autopilot stopped before consensus was synced")
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
ap.s.Scan(ap.shutdownCtx, ap.bus, true)
}
}

// block until the autopilot is enabled
if enabled, interrupted := ap.blockUntilEnabled(ap.ticker.C); !enabled {
if interrupted {
close(tickerFired)
return
}
ap.logger.Info("autopilot stopped before it was able to confirm it was enabled in the bus")
return
}

// fetch autopilot config
apCfg, err := ap.bus.AutopilotConfig(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("aborting maintenance, failed to fetch autopilot", zap.Error(err))
return
}

// update the scanner with the hosts config
ap.s.UpdateHostsConfig(apCfg.Hosts)

// perform wallet maintenance
err = ap.performWalletMaintenance(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("wallet maintenance failed, err: %v", err)
}

// build maintenance state
buildState, err := ap.buildState(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("aborting maintenance, failed to build state, err: %v", err)
return
}

// perform maintenance
setChanged, err := ap.c.PerformContractMaintenance(ap.shutdownCtx, buildState)
if err != nil && utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
}
maintenanceSuccess := err == nil

// upon success, notify the migrator. The health of slabs might have
// changed.
if maintenanceSuccess && setChanged {
ap.m.SignalMaintenanceFinished()
}

// migration
ap.m.tryPerformMigrations(ap.workers)

// pruning
if apCfg.Contracts.Prune {
ap.tryPerformPruning()
} else {
ap.logger.Info("pruning disabled")
}
})

ap.tick(forceScan, tickerFired)
select {
case <-ap.shutdownCtx.Done():
return
Expand Down Expand Up @@ -459,6 +406,88 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block
}
}

func (ap *Autopilot) tick(forceScan bool, tickerFired chan struct{}) {
peterjan marked this conversation as resolved.
Show resolved Hide resolved
defer ap.logger.Info("autopilot iteration ended")

// initiate a host scan - no need to be synced or configured for scanning
ap.s.Scan(ap.shutdownCtx, ap.bus, forceScan)

// reset forceScans
forceScan = false

// block until consensus is synced
if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
if interrupted {
close(tickerFired)
return
}
ap.logger.Info("autopilot stopped before consensus was synced")
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
ap.s.Scan(ap.shutdownCtx, ap.bus, true)
}
}

// block until the autopilot is enabled
if enabled, interrupted := ap.blockUntilEnabled(ap.ticker.C); !enabled {
if interrupted {
close(tickerFired)
return
}
ap.logger.Info("autopilot stopped before it was able to confirm it was enabled in the bus")
return
}

// fetch autopilot config
apCfg, err := ap.bus.AutopilotConfig(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("aborting maintenance, failed to fetch autopilot", zap.Error(err))
return
}

// update the scanner with the hosts config
ap.s.UpdateHostsConfig(apCfg.Hosts)

// perform wallet maintenance
err = ap.performWalletMaintenance(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("wallet maintenance failed, err: %v", err)
}

// build maintenance state
buildState, err := ap.buildState(ap.shutdownCtx)
if err != nil {
ap.logger.Errorf("aborting maintenance, failed to build state, err: %v", err)
return
}

// perform maintenance
setChanged, err := ap.c.PerformContractMaintenance(ap.shutdownCtx, buildState)
if err != nil && utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
}
maintenanceSuccess := err == nil

// upon success, notify the migrator. The health of slabs might have
// changed.
if maintenanceSuccess && setChanged {
ap.m.SignalMaintenanceFinished()
}

// migration
ap.m.Migrate(ap.shutdownCtx)

// pruning
if apCfg.Contracts.Prune {
ap.tryPerformPruning()
} else {
ap.logger.Info("pruning disabled")
}
}

func (ap *Autopilot) tryScheduleTriggerWhenFunded() error {
// apply sane timeout
ctx, cancel := context.WithTimeout(ap.shutdownCtx, time.Minute)
Expand Down
Loading
Loading