diff --git a/api/api.go b/api/api.go index ff2d9b6b..ae4a3834 100644 --- a/api/api.go +++ b/api/api.go @@ -55,11 +55,11 @@ type ( VolumeManager interface { Usage() (usedSectors uint64, totalSectors uint64, err error) Volumes() ([]storage.VolumeMeta, error) - Volume(id int) (storage.VolumeMeta, error) + Volume(id int64) (storage.VolumeMeta, error) AddVolume(ctx context.Context, localPath string, maxSectors uint64, result chan<- error) (storage.Volume, error) - RemoveVolume(ctx context.Context, id int, force bool, result chan<- error) error - ResizeVolume(ctx context.Context, id int, maxSectors uint64, result chan<- error) error - SetReadOnly(id int, readOnly bool) error + RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error + ResizeVolume(ctx context.Context, id int64, maxSectors uint64, result chan<- error) error + SetReadOnly(id int64, readOnly bool) error RemoveSector(root types.Hash256) error ResizeCache(size uint32) } @@ -164,7 +164,7 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C }, volumeJobs: volumeJobs{ volumes: vm, - jobs: make(map[int]context.CancelFunc), + jobs: make(map[int64]context.CancelFunc), }, } return jape.Mux(map[string]jape.Handler{ diff --git a/api/endpoints.go b/api/endpoints.go index 16579f29..01cd5ec8 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -285,7 +285,7 @@ func (a *api) handleGETVolume(c jape.Context) { return } - volume, err := a.volumes.Volume(id) + volume, err := a.volumes.Volume(int64(id)) if errors.Is(err, storage.ErrVolumeNotFound) { c.Error(err, http.StatusNotFound) return @@ -309,7 +309,7 @@ func (a *api) handlePUTVolume(c jape.Context) { return } - err := a.volumes.SetReadOnly(id, req.ReadOnly) + err := a.volumes.SetReadOnly(int64(id), req.ReadOnly) if errors.Is(err, storage.ErrVolumeNotFound) { c.Error(err, http.StatusNotFound) return diff --git a/api/volumes.go b/api/volumes.go index e0b7347c..f7957bb5 100644 --- a/api/volumes.go +++ b/api/volumes.go @@ -16,7 +16,7 @@ type ( volumes VolumeManager mu sync.Mutex // protects jobs - jobs map[int]context.CancelFunc + jobs map[int64]context.CancelFunc } ) @@ -48,7 +48,7 @@ func (vj *volumeJobs) AddVolume(path string, maxSectors uint64) (storage.Volume, return volume, nil } -func (vj *volumeJobs) RemoveVolume(id int, force bool) error { +func (vj *volumeJobs) RemoveVolume(id int64, force bool) error { vj.mu.Lock() defer vj.mu.Unlock() if _, exists := vj.jobs[id]; exists { @@ -79,7 +79,7 @@ func (vj *volumeJobs) RemoveVolume(id int, force bool) error { return nil } -func (vj *volumeJobs) ResizeVolume(id int, newSize uint64) error { +func (vj *volumeJobs) ResizeVolume(id int64, newSize uint64) error { vj.mu.Lock() defer vj.mu.Unlock() if _, exists := vj.jobs[id]; exists { @@ -110,7 +110,7 @@ func (vj *volumeJobs) ResizeVolume(id int, newSize uint64) error { return nil } -func (vj *volumeJobs) Cancel(id int) error { +func (vj *volumeJobs) Cancel(id int64) error { vj.mu.Lock() defer vj.mu.Unlock() cancel, exists := vj.jobs[id] @@ -163,7 +163,7 @@ func (a *api) handleDeleteVolume(c jape.Context) { } else if err := c.DecodeForm("force", &force); err != nil { return } - err := a.volumeJobs.RemoveVolume(id, force) + err := a.volumeJobs.RemoveVolume(int64(id), force) a.checkServerError(c, "failed to remove volume", err) } @@ -181,7 +181,7 @@ func (a *api) handlePUTVolumeResize(c jape.Context) { return } - err := a.volumeJobs.ResizeVolume(id, req.MaxSectors) + err := a.volumeJobs.ResizeVolume(int64(id), req.MaxSectors) a.checkServerError(c, "failed to resize volume", err) } @@ -194,6 +194,6 @@ func (a *api) handleDELETEVolumeCancelOp(c jape.Context) { return } - err := a.volumeJobs.Cancel(id) + err := a.volumeJobs.Cancel(int64(id)) a.checkServerError(c, "failed to cancel operation", err) } diff --git a/host/contracts/contracts.go b/host/contracts/contracts.go index 4b2fb61d..209baca8 100644 --- a/host/contracts/contracts.go +++ b/host/contracts/contracts.go @@ -331,7 +331,7 @@ func (cu *ContractUpdater) Commit(revision SignedRevision, usage Usage) error { start := time.Now() // revise the contract - err := cu.store.ReviseContract(revision, usage, cu.sectors, cu.sectorActions) + err := cu.store.ReviseContract(revision, usage, cu.sectorActions) if err == nil { // clear the committed sector actions cu.sectorActions = cu.sectorActions[:0] diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index e66820ad..890006b7 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -1013,7 +1013,7 @@ func TestSectorRoots(t *testing.T) { defer release() // use the database method directly to avoid the sector cache - err = db.ReviseContract(rev, contracts.Usage{}, uint64(i), []contracts.SectorChange{ + err = db.ReviseContract(rev, contracts.Usage{}, []contracts.SectorChange{ {Action: contracts.SectorActionAppend, Root: root}, }) if err != nil { diff --git a/host/contracts/persist.go b/host/contracts/persist.go index 9aba8cfc..13c2429a 100644 --- a/host/contracts/persist.go +++ b/host/contracts/persist.go @@ -6,24 +6,6 @@ import ( ) type ( - // UpdateContractTransaction atomically updates a single contract and its - // associated sector roots. - UpdateContractTransaction interface { - // AppendSector appends a sector root to the end of the contract - AppendSector(root types.Hash256) error - // SwapSectors swaps the sector roots at the given indices. - SwapSectors(i, j uint64) error - // TrimSectors removes the last n sector roots from the contract. - TrimSectors(n int) error - // UpdateSector updates the sector root at the given index. - UpdateSector(index uint64, newRoot types.Hash256) error - - // AddUsage adds the additional usage costs to the contract. - AddUsage(Usage) error - // ReviseContract updates the current revision associated with a contract. - ReviseContract(SignedRevision) error - } - // UpdateStateTransaction atomically updates the contract manager's state. UpdateStateTransaction interface { ContractRelevant(types.FileContractID) (bool, error) @@ -66,7 +48,7 @@ type ( ContractAction(height uint64, contractFn func(types.FileContractID, uint64, string)) error // ReviseContract atomically updates a contract and its associated // sector roots. - ReviseContract(revision SignedRevision, usage Usage, oldSectors uint64, sectorChanges []SectorChange) error + ReviseContract(revision SignedRevision, usage Usage, sectorChanges []SectorChange) error // UpdateContractState atomically updates the contract manager's state. UpdateContractState(modules.ConsensusChangeID, uint64, func(UpdateStateTransaction) error) error // ExpireContractSectors removes sector roots for any contracts that are diff --git a/host/storage/persist.go b/host/storage/persist.go index 1328a4ff..eaef0540 100644 --- a/host/storage/persist.go +++ b/host/storage/persist.go @@ -15,33 +15,33 @@ type ( // Volumes returns a list of all volumes in the volume store. Volumes() ([]Volume, error) // Volume returns a volume in the store by its id - Volume(id int) (Volume, error) + Volume(id int64) (Volume, error) // AddVolume initializes a new storage volume and adds it to the volume // store. GrowVolume must be called afterwards to initialize the volume // to its desired size. - AddVolume(localPath string, readOnly bool) (int, error) + AddVolume(localPath string, readOnly bool) (int64, error) // RemoveVolume removes a storage volume from the volume store. If there // are used sectors in the volume, ErrVolumeNotEmpty is returned. If // force is true, the volume is removed even if it is not empty. - RemoveVolume(volumeID int, force bool) error + RemoveVolume(volumeID int64) error // GrowVolume grows a storage volume's metadata to maxSectors. If the // number of sectors in the volume is already greater than maxSectors, // nil is returned. - GrowVolume(volumeID int, maxSectors uint64) error + GrowVolume(volumeID int64, maxSectors uint64) error // ShrinkVolume shrinks a storage volume's metadata to maxSectors. If // there are used sectors in the shrink range, an error is returned. - ShrinkVolume(volumeID int, maxSectors uint64) error + ShrinkVolume(volumeID int64, maxSectors uint64) error // SetReadOnly sets the read-only flag on a volume. - SetReadOnly(volumeID int, readOnly bool) error + SetReadOnly(volumeID int64, readOnly bool) error // SetAvailable sets the available flag on a volume. - SetAvailable(volumeID int, available bool) error + SetAvailable(volumeID int64, available bool) error - // MigrateSectors returns a new location for each occupied sector of a volume - // starting at min. The sector data should be copied to the new volume and - // synced to disk during migrateFn. Iteration is stopped if migrateFn returns an - // error. - MigrateSectors(volumeID int, min uint64, migrateFn func(newLocations []SectorLocation) error) error + // MigrateSectors returns a new location for each occupied sector of a + // volume starting at min. The sector data should be copied to the new + // location and synced to disk during migrateFn. Iteration is stopped if + // migrateFn returns an error. + MigrateSectors(volumeID int64, min uint64, migrateFn func(SectorLocation) error) error // StoreSector calls fn with an empty location in a writable volume. If // the sector root already exists, fn is called with the existing // location and exists is true. Unless exists is true, The sector must diff --git a/host/storage/storage.go b/host/storage/storage.go index 205a9593..fff56117 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -53,7 +53,7 @@ type ( // A SectorLocation is a location of a sector within a volume. SectorLocation struct { ID int64 - Volume int + Volume int64 Index uint64 Root types.Hash256 } @@ -81,16 +81,16 @@ type ( mu sync.Mutex // protects the following fields lastCleanup time.Time - volumes map[int]*volume + volumes map[int64]*volume // changedVolumes tracks volumes that need to be fsynced - changedVolumes map[int]bool + changedVolumes map[int64]bool cache *lru.Cache[types.Hash256, *[rhp2.SectorSize]byte] // Added cache } ) // getVolume returns the volume with the given ID, or an error if the volume does // not exist or is currently busy. -func (vm *VolumeManager) getVolume(v int) (*volume, error) { +func (vm *VolumeManager) getVolume(v int64) (*volume, error) { vm.mu.Lock() defer vm.mu.Unlock() vol, ok := vm.volumes[v] @@ -103,7 +103,7 @@ func (vm *VolumeManager) getVolume(v int) (*volume, error) { // lockVolume locks a volume for operations until release is called. A locked // volume cannot have its size or status changed and no new sectors can be // written to it. -func (vm *VolumeManager) lockVolume(id int) (func(), error) { +func (vm *VolumeManager) lockVolume(id int64) (func(), error) { vm.mu.Lock() defer vm.mu.Unlock() v, ok := vm.volumes[id] @@ -126,15 +126,20 @@ func (vm *VolumeManager) lockVolume(id int) (func(), error) { }, nil } -// writeSector writes a sector to a volume. The volume is not synced after the -// sector is written. The location is assumed to be empty and locked. -func (vm *VolumeManager) writeSector(data *[rhp2.SectorSize]byte, loc SectorLocation) error { +// writeSector writes a sector to a volume. The location is assumed to be empty +// and locked. +func (vm *VolumeManager) writeSector(data *[rhp2.SectorSize]byte, loc SectorLocation, sync bool) error { vol, err := vm.getVolume(loc.Volume) if err != nil { return fmt.Errorf("failed to get volume: %w", err) } else if err := vol.WriteSector(data, loc.Index); err != nil { return fmt.Errorf("failed to write sector data: %w", err) } + + if sync { + return vm.Sync() + } + vm.mu.Lock() vm.changedVolumes[loc.Volume] = true vm.mu.Unlock() @@ -170,7 +175,7 @@ func (vm *VolumeManager) loadVolumes() error { if err := v.OpenVolume(vol.LocalPath, false); err != nil { v.appendError(fmt.Errorf("failed to open volume: %w", err)) - vm.log.Error("unable to open volume", zap.Error(err), zap.Int("id", vol.ID), zap.String("path", vol.LocalPath)) + vm.log.Error("unable to open volume", zap.Error(err), zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath)) // mark the volume as unavailable if err := vm.vs.SetAvailable(vol.ID, false); err != nil { return fmt.Errorf("failed to mark volume '%v' as unavailable: %w", vol.LocalPath, err) @@ -195,46 +200,33 @@ func (vm *VolumeManager) loadVolumes() error { return fmt.Errorf("failed to mark volume '%v' as available: %w", vol.LocalPath, err) } v.SetStatus(VolumeStatusReady) - vm.log.Debug("loaded volume", zap.Int("id", vol.ID), zap.String("path", vol.LocalPath)) + vm.log.Debug("loaded volume", zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath)) } return nil } -// migrateSector migrates sectors to new locations. The sectors are read from -// their current locations and written to their new locations. Changed volumes -// are synced after all sectors have been written. -func (vm *VolumeManager) migrateSectors(locations []SectorLocation, force bool, log *zap.Logger) (migrated int, _ error) { - for _, loc := range locations { - err := func() error { - // read the sector from the old location - sector, err := vm.Read(loc.Root) - if err != nil { - return fmt.Errorf("failed to read sector: %w", err) - } - // calculate the returned root - root := rhp2.SectorRoot(sector) - if root != loc.Root { - return fmt.Errorf("sector corrupt: %v != %v", loc.Root, root) - } - if err := vm.writeSector(sector, loc); err != nil { // write the sector to the new location - return fmt.Errorf("failed to write sector: %w", err) - } - return nil - }() - if err != nil { - log.Error("failed to migrate sector", zap.Error(err), zap.Stringer("root", loc.Root), zap.Int("newVolumeID", loc.Volume), zap.Uint64("newIndex", loc.Index)) - if force { - continue - } - return migrated, fmt.Errorf("failed to migrate sector %v: %w", loc.Root, err) - } - migrated++ +// migrateSector migrates a sector to a new location. The sector is read from +// its current location and written to its new location. The volume is +// immediately synced after the sector is written. +func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) error { + // read the sector from the old location + sector, err := vm.Read(loc.Root) + if err != nil { + return fmt.Errorf("failed to read sector: %w", err) + } + // calculate the returned root + root := rhp2.SectorRoot(sector) + // verify the the sector is not corrupt + if root != loc.Root { + return fmt.Errorf("sector corrupt: %v != %v", loc.Root, root) } - return migrated, vm.Sync() + + // write the sector to the new location + return vm.writeSector(sector, loc, true) } // growVolume grows a volume by adding sectors to the end of the volume. -func (vm *VolumeManager) growVolume(ctx context.Context, id int, volume *volume, oldMaxSectors, newMaxSectors uint64) error { +func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error { if oldMaxSectors > newMaxSectors { return errors.New("old sectors must be less than new sectors") } @@ -289,8 +281,8 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int, volume *volume, } // shrinkVolume shrinks a volume by removing sectors from the end of the volume. -func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int, volume *volume, oldMaxSectors, newMaxSectors uint64) error { - log := vm.log.Named("shrinkVolume").With(zap.Int("volumeID", id), zap.Uint64("oldMaxSectors", oldMaxSectors), zap.Uint64("newMaxSectors", newMaxSectors)) +func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error { + log := vm.log.Named("shrinkVolume").With(zap.Int64("volumeID", id), zap.Uint64("oldMaxSectors", oldMaxSectors), zap.Uint64("newMaxSectors", newMaxSectors)) if oldMaxSectors <= newMaxSectors { return errors.New("old sectors must be greater than new sectors") } @@ -314,22 +306,23 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int, volume *volum // responsibility to register a completion alert defer vm.a.Dismiss(a.ID) - // migrate any sectors outside of the target range. migrateSectors will be - // called on chunks of 64 sectors + // migrate any sectors outside of the target range. var migrated int - err := vm.vs.MigrateSectors(id, newMaxSectors, func(newLocations []SectorLocation) error { + err := vm.vs.MigrateSectors(id, newMaxSectors, func(newLoc SectorLocation) error { select { case <-ctx.Done(): return ctx.Err() default: } - n, err := vm.migrateSectors(newLocations, false, log.Named("migrateSectors")) - migrated += n + if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil { + return err + } + migrated++ // update the alert a.Data["migratedSectors"] = migrated vm.a.Register(a) - return err + return nil }) log.Info("migrated sectors", zap.Int("count", migrated)) if err != nil { @@ -369,7 +362,7 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int, volume *volum return nil } -func (vm *VolumeManager) volumeStats(id int) (vs VolumeStats) { +func (vm *VolumeManager) volumeStats(id int64) (vs VolumeStats) { v, ok := vm.volumes[id] if !ok { vs.Status = "unavailable" @@ -379,7 +372,7 @@ func (vm *VolumeManager) volumeStats(id int) (vs VolumeStats) { return } -func (vm *VolumeManager) setVolumeStatus(id int, status string) { +func (vm *VolumeManager) setVolumeStatus(id int64, status string) { vm.mu.Lock() defer vm.mu.Unlock() @@ -390,7 +383,7 @@ func (vm *VolumeManager) setVolumeStatus(id int, status string) { v.stats.Status = status } -func (vm *VolumeManager) doResize(ctx context.Context, volumeID int, vol *volume, current, target uint64) error { +func (vm *VolumeManager) doResize(ctx context.Context, volumeID int64, vol *volume, current, target uint64) error { ctx, cancel, err := vm.tg.AddContext(ctx) if err != nil { return err @@ -408,7 +401,7 @@ func (vm *VolumeManager) doResize(ctx context.Context, volumeID int, vol *volume return nil } -func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int, localPath string, force bool, log *zap.Logger) (int, error) { +func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int64, localPath string, force bool, log *zap.Logger) (int, error) { ctx, cancel, err := vm.tg.AddContext(ctx) if err != nil { return 0, err @@ -433,23 +426,32 @@ func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int, localPat defer vm.a.Dismiss(a.ID) // migrate sectors to other volumes - var migrated int - err = vm.vs.MigrateSectors(id, 0, func(locations []SectorLocation) error { + var migrated, failed int + err = vm.vs.MigrateSectors(id, 0, func(newLoc SectorLocation) error { select { case <-ctx.Done(): return ctx.Err() default: } - n, err := vm.migrateSectors(locations, force, log.Named("migrateSectors")) - migrated += n + + if err := vm.migrateSector(newLoc, log.Named("migrate")); err != nil { + log.Error("failed to migrate sector", zap.Stringer("sectorRoot", newLoc.Root), zap.Error(err)) + if force { + failed++ + a.Data["failed"] = failed + return nil + } + return err + } + migrated++ // update the alert a.Data["migrated"] = migrated vm.a.Register(a) - return err + return nil }) - if err != nil && !force { + if err != nil { return migrated, fmt.Errorf("failed to migrate sector data: %w", err) - } else if err := vm.vs.RemoveVolume(id, force); err != nil { + } else if err := vm.vs.RemoveVolume(id); err != nil { return migrated, fmt.Errorf("failed to remove volume: %w", err) } @@ -478,9 +480,9 @@ func (vm *VolumeManager) Close() error { // sync and close all open volumes for id, vol := range vm.volumes { if err := vol.Sync(); err != nil { - vm.log.Error("failed to sync volume", zap.Int("id", id), zap.Error(err)) + vm.log.Error("failed to sync volume", zap.Int64("id", id), zap.Error(err)) } else if err := vol.Close(); err != nil { - vm.log.Error("failed to close volume", zap.Int("id", id), zap.Error(err)) + vm.log.Error("failed to close volume", zap.Int64("id", id), zap.Error(err)) } delete(vm.volumes, id) } @@ -525,7 +527,7 @@ func (vm *VolumeManager) Volumes() ([]VolumeMeta, error) { } // Volume returns a volume by its ID. -func (vm *VolumeManager) Volume(id int) (VolumeMeta, error) { +func (vm *VolumeManager) Volume(id int64) (VolumeMeta, error) { done, err := vm.tg.Add() if err != nil { return VolumeMeta{}, err @@ -593,7 +595,7 @@ func (vm *VolumeManager) AddVolume(ctx context.Context, localPath string, maxSec } go func() { - log := vm.log.Named("initialize").With(zap.Int("volumeID", volumeID), zap.Uint64("maxSectors", maxSectors)) + log := vm.log.Named("initialize").With(zap.Int64("volumeID", volumeID), zap.Uint64("maxSectors", maxSectors)) start := time.Now() err := func() error { defer vm.vs.SetAvailable(volumeID, true) @@ -630,7 +632,7 @@ func (vm *VolumeManager) AddVolume(ctx context.Context, localPath string, maxSec } // SetReadOnly sets the read-only status of a volume. -func (vm *VolumeManager) SetReadOnly(id int, readOnly bool) error { +func (vm *VolumeManager) SetReadOnly(id int64, readOnly bool) error { done, err := vm.tg.Add() if err != nil { return err @@ -650,8 +652,8 @@ func (vm *VolumeManager) SetReadOnly(id int, readOnly bool) error { } // RemoveVolume removes a volume from the manager. -func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int, force bool, result chan<- error) error { - log := vm.log.Named("remove").With(zap.Int("volumeID", id)) +func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error { + log := vm.log.Named("remove").With(zap.Int64("volumeID", id)) done, err := vm.tg.Add() if err != nil { return err @@ -714,7 +716,7 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int, force bool, r } // ResizeVolume resizes a volume to the specified size. -func (vm *VolumeManager) ResizeVolume(ctx context.Context, id int, maxSectors uint64, result chan<- error) error { +func (vm *VolumeManager) ResizeVolume(ctx context.Context, id int64, maxSectors uint64, result chan<- error) error { done, err := vm.tg.Add() if err != nil { return err @@ -746,7 +748,7 @@ func (vm *VolumeManager) ResizeVolume(ctx context.Context, id int, maxSectors ui } go func() { - log := vm.log.Named("resize").With(zap.Int("volumeID", id)) + log := vm.log.Named("resize").With(zap.Int64("volumeID", id)) start := time.Now() err := func() error { defer func() { @@ -894,7 +896,7 @@ func (vm *VolumeManager) Sync() error { defer done() vm.mu.Lock() - var toSync []int + var toSync []int64 for id := range vm.changedVolumes { toSync = append(toSync, id) } @@ -926,13 +928,10 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhp2.SectorSize]byte) return nil } start := time.Now() - vol, err := vm.getVolume(loc.Volume) - if err != nil { - return fmt.Errorf("failed to get volume %v: %w", loc.Volume, err) - } else if err := vol.WriteSector(data, loc.Index); err != nil { - return fmt.Errorf("failed to write sector %v: %w", root, err) + if err := vm.writeSector(data, loc, false); err != nil { + return err } - vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start))) + vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start))) // Add newly written sector to cache vm.cache.Add(root, data) return nil @@ -1004,8 +1003,8 @@ func NewVolumeManager(vs VolumeStore, a Alerts, cm ChainManager, log *zap.Logger log: log.Named("recorder"), }, - volumes: make(map[int]*volume), - changedVolumes: make(map[int]bool), + volumes: make(map[int64]*volume), + changedVolumes: make(map[int64]bool), cache: cache, tg: threadgroup.New(), } diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index 0e7777e7..3bed53a2 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -567,7 +567,7 @@ func TestVolumeDistribution(t *testing.T) { } defer vm.Close() - volumeIDs := make([]int, 5) + volumeIDs := make([]int64, 5) volumeDir := t.TempDir() for i := range volumeIDs { result := make(chan error, 1) diff --git a/host/storage/volume.go b/host/storage/volume.go index 2f9d1f4c..135b8a70 100644 --- a/host/storage/volume.go +++ b/host/storage/volume.go @@ -49,7 +49,7 @@ type ( // A Volume stores and retrieves sector data Volume struct { - ID int `json:"ID"` + ID int64 `json:"ID"` LocalPath string `json:"localPath"` UsedSectors uint64 `json:"usedSectors"` TotalSectors uint64 `json:"totalSectors"` diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 0e1b8a45..0cdbdeb5 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -137,44 +137,24 @@ func (u *updateContractsTxn) ContractRelevant(id types.FileContractID) (bool, er } func (s *Store) batchExpireContractSectors(height uint64) (removed []contractSectorRef, pruned int, err error) { - err = s.transaction(func(tx txn) error { - sectors, err := expiredContractSectors(tx, height, sqlSectorBatchSize) + err = s.transaction(func(tx txn) (err error) { + removed, err = expiredContractSectors(tx, height, sqlSectorBatchSize) if err != nil { return fmt.Errorf("failed to select sectors: %w", err) - } else if len(sectors) == 0 { - return nil - } - - contractSectorIDs := make([]int64, 0, len(sectors)) - for _, sector := range sectors { - contractSectorIDs = append(contractSectorIDs, sector.ID) - } - // delete the sector roots - query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(contractSectorIDs)) + `);` - res, err := tx.Exec(query, queryArgs(contractSectorIDs)...) - if err != nil { - return fmt.Errorf("failed to delete sectors: %w", err) - } else if rows, err := res.RowsAffected(); err != nil { - return fmt.Errorf("failed to get rows affected: %w", err) - } else if rows != int64(len(contractSectorIDs)) { - return fmt.Errorf("failed to delete all sectors: %w", err) } - // decrement the number of contract sectors - if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorIDs), time.Now()); err != nil { - return fmt.Errorf("failed to track contract sectors: %w", err) + refs := make([]contractSectorRootRef, 0, len(removed)) + for _, sector := range removed { + refs = append(refs, contractSectorRootRef{ + dbID: sector.ID, + sectorID: sector.SectorID, + }) } - for _, ref := range sectors { - err := pruneSectorRef(tx, ref.SectorID) - if errors.Is(err, errSectorHasRefs) { - continue - } else if err != nil { - return fmt.Errorf("failed to prune sector ref: %w", err) - } - pruned++ + pruned, err = deleteContractSectors(tx, refs) + if err != nil { + return fmt.Errorf("failed to prune sectors: %w", err) } - removed = sectors return nil }) return @@ -270,46 +250,63 @@ func (s *Store) RenewContract(renewal contracts.SignedRevision, clearing contrac return fmt.Errorf("failed to update renewed contract: %w", err) } - // get the count of sector roots for the old contract - var count int - if _, err = tx.Exec(`SELECT COUNT(*) FROM contract_sector_roots WHERE contract_id=$1;`, clearedDBID); err != nil { - return fmt.Errorf("failed to get sector root count: %w", err) - } - - // copy the sector roots from the old contract to the new contract - _, err = tx.Exec(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, sector_id, root_index FROM contract_sector_roots WHERE contract_id=$2;`, renewedDBID, clearedDBID) + // move the sector roots from the old contract to the new contract + _, err = tx.Exec(`UPDATE contract_sector_roots SET contract_id=$1 WHERE contract_id=$2`, renewedDBID, clearedDBID) if err != nil { return fmt.Errorf("failed to copy sector roots: %w", err) } - - // increment the number of contract sectors - if err := incrementNumericStat(tx, metricContractSectors, count, time.Now()); err != nil { - return fmt.Errorf("failed to track contract sectors: %w", err) - } return nil }) } +func contractSectorRoots(tx txn, contractID int64) (uint64, error) { + var index uint64 + err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots WHERE contract_id=$1`, contractID).Scan(&index) + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return index, err +} + // ReviseContract atomically updates a contract's revision and sectors -func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contracts.Usage, oldSectors uint64, sectorChanges []contracts.SectorChange) error { +func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contracts.Usage, sectorChanges []contracts.SectorChange) error { return s.transaction(func(tx txn) error { + // revise the contract contractID, err := reviseContract(tx, revision) if err != nil { return fmt.Errorf("failed to revise contract: %w", err) - } else if err := incrementContractUsage(tx, contractID, usage); err != nil { + } + // update the contract usage and metrics + if err := incrementContractUsage(tx, contractID, usage); err != nil { return fmt.Errorf("failed to update contract usage: %w", err) + } else if err := incrementCurrencyStat(tx, metricRiskedCollateral, usage.RiskedCollateral, false, time.Now()); err != nil { + return fmt.Errorf("failed to track risked collateral: %w", err) + } else if err := incrementPotentialRevenueMetrics(tx, usage, false); err != nil { + return fmt.Errorf("failed to track potential revenue: %w", err) + } + + // update the sector roots + sectors, err := contractSectorRoots(tx, contractID) + if err != nil { + return fmt.Errorf("failed to get sector index: %w", err) } - var delta int - sectorCount := oldSectors for _, change := range sectorChanges { switch change.Action { case contracts.SectorActionAppend: - if err := appendSector(tx, contractID, change.Root, sectorCount); err != nil { + if err := appendSector(tx, contractID, change.Root, sectors); err != nil { return fmt.Errorf("failed to append sector: %w", err) } - sectorCount++ - delta++ + sectors++ + case contracts.SectorActionTrim: + if sectors < change.A { + return fmt.Errorf("cannot trim %v sectors from contract with %v sectors", change.A, sectors) + } + + if err := trimSectors(tx, contractID, change.A, s.log); err != nil { + return fmt.Errorf("failed to trim sectors: %w", err) + } + sectors -= change.A case contracts.SectorActionUpdate: if err := updateSector(tx, contractID, change.Root, change.A); err != nil { return fmt.Errorf("failed to update sector: %w", err) @@ -318,29 +315,13 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, usage contract if err := swapSectors(tx, contractID, change.A, change.B); err != nil { return fmt.Errorf("failed to swap sectors: %w", err) } - case contracts.SectorActionTrim: - if err := trimSectors(tx, contractID, change.A); err != nil { - return fmt.Errorf("failed to trim sectors: %w", err) - } - sectorCount-- - delta -= int(change.A) } } - - // update global stats - if err := incrementNumericStat(tx, metricContractSectors, delta, time.Now()); err != nil { - return fmt.Errorf("failed to track contract sectors: %w", err) - } else if err := incrementCurrencyStat(tx, metricRiskedCollateral, usage.RiskedCollateral, false, time.Now()); err != nil { - return fmt.Errorf("failed to track risked collateral: %w", err) - } else if err := incrementPotentialRevenueMetrics(tx, usage, false); err != nil { - return fmt.Errorf("failed to track potential revenue: %w", err) - } return nil }) } -// SectorRoots returns the sector roots for a contract. If limit is 0, all roots -// are returned. +// SectorRoots returns the sector roots for a contract. func (s *Store) SectorRoots(contractID types.FileContractID) (roots []types.Hash256, err error) { err = s.transaction(func(tx txn) error { var dbID int64 @@ -551,10 +532,20 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) { func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error { var sectorID int64 err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(§orID) - return err + if err != nil { + return err + } else if err := incrementNumericStat(tx, metricContractSectors, 1, time.Now()); err != nil { + return fmt.Errorf("failed to track contract sectors: %w", err) + } + return nil } func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) error { + var oldSectorID int64 + if err := tx.QueryRow(`SELECT sector_id FROM contract_sector_roots WHERE contract_id=$1 AND root_index=$2`, contractID, index).Scan(&oldSectorID); err != nil { + return fmt.Errorf("failed to get old sector id: %w", err) + } + const query = `WITH sector AS ( SELECT id FROM stored_sectors WHERE sector_root=$1 ) @@ -563,9 +554,14 @@ SET sector_id=sector.id FROM sector WHERE contract_id=$2 AND root_index=$3 RETURNING sector_id;` - var sectorID int64 - err := tx.QueryRow(query, sqlHash256(root), contractID, index).Scan(§orID) - return err + var newSectorID int64 + err := tx.QueryRow(query, sqlHash256(root), contractID, index).Scan(&newSectorID) + if err != nil { + return err + } else if err := pruneSectorRef(tx, oldSectorID); err != nil { + return fmt.Errorf("failed to prune sector ref: %w", err) + } + return nil } func swapSectors(tx txn, contractID int64, i, j uint64) error { @@ -574,7 +570,7 @@ func swapSectors(tx txn, contractID int64, i, j uint64) error { } var records []contractSectorRootRef - rows, err := tx.Query(`SELECT id, sector_id FROM contract_sector_roots WHERE contract_id=$1 AND root_index IN ($2, $3) ORDER BY root_index ASC;`, contractID, i, j) + rows, err := tx.Query(`SELECT id, sector_id FROM contract_sector_roots WHERE contract_id=$1 AND root_index IN ($2, $3);`, contractID, i, j) if err != nil { return fmt.Errorf("failed to query sector IDs: %w", err) } @@ -591,7 +587,13 @@ func swapSectors(tx txn, contractID int64, i, j uint64) error { return errors.New("failed to find both sectors") } - res, err := tx.Exec(`UPDATE contract_sector_roots SET sector_id=$1 WHERE id=$2`, records[1].sectorID, records[0].dbID) + stmt, err := tx.Prepare(`UPDATE contract_sector_roots SET sector_id=$1 WHERE id=$2`) + if err != nil { + return fmt.Errorf("failed to prepare update statement: %w", err) + } + defer stmt.Close() + + res, err := stmt.Exec(records[1].sectorID, records[0].dbID) if err != nil { return fmt.Errorf("failed to update sector ID: %w", err) } else if rows, err := res.RowsAffected(); err != nil { @@ -600,7 +602,7 @@ func swapSectors(tx txn, contractID int64, i, j uint64) error { return fmt.Errorf("expected 1 row affected, got %v", rows) } - res, err = tx.Exec(`UPDATE contract_sector_roots SET sector_id=$1 WHERE id=$2`, records[0].sectorID, records[1].dbID) + res, err = stmt.Exec(records[0].sectorID, records[1].dbID) if err != nil { return fmt.Errorf("failed to update sector ID: %w", err) } else if rows, err := res.RowsAffected(); err != nil { @@ -609,48 +611,77 @@ func swapSectors(tx txn, contractID int64, i, j uint64) error { return fmt.Errorf("expected 1 row affected, got %v", rows) } - func() { - rows, err := tx.Query(`SELECT sector_id, root_index FROM contract_sector_roots WHERE contract_id=$1 AND root_index IN ($2, $3) ORDER BY root_index ASC;`, contractID, i, j) - if err != nil { - panic(fmt.Errorf("failed to query sector IDs: %w", err)) - } - defer rows.Close() - for rows.Next() { - var id int64 - var index int64 - if err := rows.Scan(&id, &index); err != nil { - panic(fmt.Errorf("failed to scan sector ID: %w", err)) - } - } - }() - return nil } -func trimSectors(tx txn, contractID int64, n uint64) error { - const query = `DELETE FROM contract_sector_roots -WHERE contract_id = $1 - AND root_index IN ( - SELECT root_index - FROM contract_sector_roots - WHERE contract_id = $1 - ORDER BY root_index DESC - LIMIT $2 - );` - - result, err := tx.Exec(query, contractID, n) +// lastContractSectors returns the last n sector IDs for a contract. +func lastContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) { + const query = `SELECT id, sector_id FROM contract_sector_roots WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;` + rows, err := tx.Query(query, contractID, n) if err != nil { - return err + return nil, err + } + defer rows.Close() + + for rows.Next() { + var ref contractSectorRootRef + if err := rows.Scan(&ref.dbID, &ref.sectorID); err != nil { + return nil, err + } + roots = append(roots, ref) + } + return +} + +// deleteContractSectors deletes sector roots from a contract. Sectors that are +// still referenced will not be removed. Returns the number of sectors deleted. +func deleteContractSectors(tx txn, refs []contractSectorRootRef) (int, error) { + var rootIDs []int64 + for _, ref := range refs { + rootIDs = append(rootIDs, ref.dbID) } - rowsAffected, err := result.RowsAffected() + + // delete the sector roots + query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(rootIDs)) + `);` + res, err := tx.Exec(query, queryArgs(rootIDs)...) if err != nil { - return fmt.Errorf("failed to get rows affected: %w", err) - } else if uint64(rowsAffected) != n { - return fmt.Errorf("expected %v sectors removed, got %v", n, rowsAffected) + return 0, fmt.Errorf("failed to delete sectors: %w", err) + } else if rows, err := res.RowsAffected(); err != nil { + return 0, fmt.Errorf("failed to get rows affected: %w", err) + } else if rows != int64(len(refs)) { + return 0, fmt.Errorf("failed to delete all sectors: %w", err) } - return nil + + // decrement the contract metrics + if err := incrementNumericStat(tx, metricContractSectors, -len(refs), time.Now()); err != nil { + return 0, fmt.Errorf("failed to decrement contract sectors: %w", err) + } + + // attempt to prune the deleted sectors + var pruned int + for _, ref := range refs { + if err := pruneSectorRef(tx, ref.sectorID); errors.Is(err, errSectorHasRefs) { + continue + } else if err != nil { + return 0, fmt.Errorf("failed to prune sector ref: %w", err) + } + pruned++ + } + return pruned, nil +} + +// trimSectors deletes the last n sector roots for a contract. +func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) error { + refs, err := lastContractSectors(tx, contractID, n) + if err != nil { + return fmt.Errorf("failed to get sector IDs: %w", err) + } + + _, err = deleteContractSectors(tx, refs) + return err } +// clearContract clears a contract and returns its ID func clearContract(tx txn, revision contracts.SignedRevision, renewedDBID int64, usage contracts.Usage) (dbID int64, err error) { // get the existing contract's current usage var total contracts.Usage @@ -686,6 +717,7 @@ func clearContract(tx txn, revision contracts.SignedRevision, renewedDBID int64, return } +// reviseContract revises a contract and returns its ID func reviseContract(tx txn, revision contracts.SignedRevision) (dbID int64, err error) { err = tx.QueryRow(`UPDATE contracts SET (revision_number, window_start, window_end, raw_revision, host_sig, renter_sig) = ($1, $2, $3, $4, $5, $6) WHERE contract_id=$7 RETURNING id;`, sqlUint64(revision.Revision.RevisionNumber), diff --git a/persist/sqlite/contracts_test.go b/persist/sqlite/contracts_test.go index 9f7f03fa..dee660ab 100644 --- a/persist/sqlite/contracts_test.go +++ b/persist/sqlite/contracts_test.go @@ -14,6 +14,17 @@ import ( "lukechampine.com/frand" ) +func (s *Store) rootAtIndex(contractID types.FileContractID, rootIndex int64) (root types.Hash256, err error) { + err = s.transaction(func(tx txn) error { + const query = `SELECT s.sector_root FROM contract_sector_roots csr +INNER JOIN stored_sectors s ON (csr.sector_id = s.id) +INNER JOIN contracts c ON (csr.contract_id = c.id) +WHERE c.contract_id=$1 AND csr.root_index=$2;` + return tx.QueryRow(query, sqlHash256(contractID), rootIndex).Scan((*sqlHash256)(&root)) + }) + return +} + func rootsEqual(a, b []types.Hash256) error { if len(a) != len(b) { return errors.New("length mismatch") @@ -26,6 +37,24 @@ func rootsEqual(a, b []types.Hash256) error { return nil } +func runRevision(db *Store, revision contracts.SignedRevision, changes []contracts.SectorChange) error { + for _, change := range changes { + switch change.Action { + // store a sector in the database for the append or update actions + case contracts.SectorActionAppend, contracts.SectorActionUpdate: + root := frand.Entropy256() + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + return fmt.Errorf("failed to store sector: %w", err) + } + defer release() + change.Root = root + } + } + + return db.ReviseContract(revision, contracts.Usage{}, changes) +} + func TestReviseContract(t *testing.T) { log := zaptest.NewLogger(t) db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) @@ -72,170 +101,218 @@ func TestReviseContract(t *testing.T) { t.Fatal(err) } - // add some sector roots - var changes []contracts.SectorChange - var roots []types.Hash256 - for i := 0; i < 10; i++ { - root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) - if err != nil { - t.Fatal(err) - } - defer release() - changes = append(changes, contracts.SectorChange{ - Root: root, - Action: contracts.SectorActionAppend, - }) - roots = append(roots, root) - } - - err = db.ReviseContract(contract, contracts.Usage{}, 0, changes) - if err != nil { - t.Fatal(err) - } - // checkConsistency is a helper function that verifies the expected sector // roots are consistent with the database - checkConsistency := func() error { - // verify the roots were added in the correct order - dbRoots, err := db.SectorRoots(contract.Revision.ParentID) + checkConsistency := func(roots []types.Hash256, expected int) error { + dbRoot, err := db.SectorRoots(contract.Revision.ParentID) if err != nil { return fmt.Errorf("failed to get sector roots: %w", err) - } else if err = rootsEqual(roots, dbRoots); err != nil { + } else if len(dbRoot) != expected { + return fmt.Errorf("expected %v sector roots, got %v", expected, len(dbRoot)) + } else if len(roots) != expected { + return fmt.Errorf("expected %v sector roots, got %v", expected, len(roots)) + } else if err = rootsEqual(roots, dbRoot); err != nil { return fmt.Errorf("sector roots mismatch: %w", err) } + // verify the roots were added in the correct order + for i := range roots { + root, err := db.rootAtIndex(contract.Revision.ParentID, int64(i)) + if err != nil { + return fmt.Errorf("failed to get sector root %d: %w", i, err) + } else if root != roots[i] { + return fmt.Errorf("sector root mismatch: expected %v, got %v", roots[i], root) + } + } + m, err := db.Metrics(time.Now()) if err != nil { return fmt.Errorf("failed to get metrics: %w", err) } else if m.Storage.ContractSectors != uint64(len(roots)) { return fmt.Errorf("expected %v contract sectors, got %v", len(roots), m.Storage.ContractSectors) + } else if m.Storage.PhysicalSectors != uint64(len(roots)) { + return fmt.Errorf("expected %v physical sectors, got %v", len(roots), m.Storage.PhysicalSectors) } return nil } - // verify the roots were added - if err = checkConsistency(); err != nil { - t.Fatal(err) - } - - // swap two roots - i, j := 5, 8 - changes = []contracts.SectorChange{ - {Action: contracts.SectorActionSwap, A: uint64(i), B: uint64(j)}, - } - err = db.ReviseContract(contract, contracts.Usage{}, uint64(len(roots)), changes) - if err != nil { - t.Fatal(err) - } - roots[i], roots[j] = roots[j], roots[i] - - // verify the roots were swapped - if err = checkConsistency(); err != nil { - t.Fatal(err) - } - - // trim the last 3 roots - toRemove := 3 - changes = []contracts.SectorChange{ - {Action: contracts.SectorActionTrim, A: uint64(toRemove)}, - } - err = db.ReviseContract(contract, contracts.Usage{}, uint64(len(roots)), changes) - if err != nil { - t.Fatal(err) - } - roots = roots[:len(roots)-toRemove] - - // verify the roots were removed - if err = checkConsistency(); err != nil { - t.Fatal(err) - } - - // swap a root outside of the range, should fail - changes = []contracts.SectorChange{ - {Action: contracts.SectorActionSwap, A: 0, B: 15}, - } - err = db.ReviseContract(contract, contracts.Usage{}, uint64(len(roots)), changes) - if err == nil { - t.Fatal("expected error") - } - - // verify the roots stayed the same - if err = checkConsistency(); err != nil { - t.Fatal(err) - } - - // trim everything - toTrim := len(roots) - // swap a root outside of the range, should fail - changes = []contracts.SectorChange{ - {Action: contracts.SectorActionTrim, A: uint64(toTrim)}, - } - err = db.ReviseContract(contract, contracts.Usage{}, uint64(len(roots)), changes) - if err != nil { - t.Fatal(err) - } - roots = roots[:0] - - // verify the roots are gone - if err = checkConsistency(); err != nil { - t.Fatal(err) + var roots []types.Hash256 + tests := []struct { + name string + changes []contracts.SectorChange + sectors int + errors bool + }{ + { + name: "append 10 roots", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + }, + sectors: 10, + }, + { + name: "swap 4 roots", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionSwap, A: 0, B: 1}, + {Action: contracts.SectorActionSwap, A: 6, B: 4}, + }, + sectors: 10, + }, + { + name: "update root", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionUpdate, A: 3}, + }, + sectors: 10, + }, + { + name: "trim 5 roots", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: 5}, + }, + sectors: 5, + }, + { + name: "swap outside range", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionSwap, A: 0, B: 10}, + }, + errors: true, + sectors: 5, + }, + { + name: "append swap trim", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionSwap, A: 5, B: 2}, + {Action: contracts.SectorActionTrim, A: 1}, + }, + sectors: 5, + }, + { + name: "trim append swap", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: 1}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionSwap, A: 4, B: 1}, + }, + sectors: 5, + }, + { + name: "swap 2 roots", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionSwap, A: 3, B: 1}, + }, + sectors: 5, + }, + { + name: "trim append", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: 5}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + }, + sectors: 3, + }, + { + name: "trim more", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: 5}, + }, + sectors: 3, + errors: true, + }, + { + name: "update outside range", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionUpdate, A: 6}, + }, + sectors: 3, + errors: true, + }, + { + name: "trim all", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: 3}, + }, + sectors: 0, + }, + { + name: "append 5", + changes: []contracts.SectorChange{ + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + {Action: contracts.SectorActionAppend}, + }, + sectors: 5, + }, } - - // test multiple operations in the same transaction - // add some sector roots - changes = changes[:0] - for i := 0; i < 10; i++ { - root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) - if err != nil { - t.Fatal(err) - } - defer release() - changes = append(changes, contracts.SectorChange{ - Root: root, - Action: contracts.SectorActionAppend, + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // update the expected roots + for i, change := range test.changes { + switch change.Action { + case contracts.SectorActionAppend: + // add a random sector root + root := frand.Entropy256() + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + t.Fatal(err) + } + defer release() + test.changes[i].Root = root + roots = append(roots, root) + case contracts.SectorActionUpdate: + // replace with a random sector root + root := frand.Entropy256() + release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + if err != nil { + t.Fatal(err) + } + defer release() + test.changes[i].Root = root + + if test.errors && change.A >= uint64(len(roots)) { // test failure + continue + } + roots[change.A] = root + case contracts.SectorActionSwap: + if test.errors && (change.A >= uint64(len(roots)) || change.B >= uint64(len(roots))) { // test failure + continue + } + roots[change.A], roots[change.B] = roots[change.B], roots[change.A] + case contracts.SectorActionTrim: + if test.errors && change.A >= uint64(len(roots)) { // test failure + continue + } + roots = roots[:len(roots)-int(change.A)] + } + } + + if err := runRevision(db, contract, test.changes); err != nil { + if test.errors { + t.Log("received error:", err) + return + } + t.Fatal(err) + } else if err == nil && test.errors { + t.Fatal("expected error") + } else if err := checkConsistency(roots, test.sectors); err != nil { + t.Fatal(err) + } }) - roots = append(roots, root) - } - - // store a sector root to update the contract - updateRoot := frand.Entropy256() - release, err := db.StoreSector(updateRoot, func(loc storage.SectorLocation, exists bool) error { return nil }) - if err != nil { - t.Fatal(err) - } - defer release() - - i, j = 3, 6 - toUpdate := 8 - toTrim = 3 - changes = append(changes, contracts.SectorChange{ - Action: contracts.SectorActionSwap, - A: uint64(i), - B: uint64(j), - }, contracts.SectorChange{ - Action: contracts.SectorActionUpdate, - A: uint64(toUpdate), - Root: updateRoot, - }, contracts.SectorChange{ - Action: contracts.SectorActionTrim, - A: uint64(toTrim), - }) - - err = db.ReviseContract(contract, contracts.Usage{}, 0, changes) - if err != nil { - t.Fatal(err) - } - - // update the roots - roots[i], roots[j] = roots[j], roots[i] - roots[toUpdate] = updateRoot - roots = roots[:len(roots)-toTrim] - - // verify the roots match - if err = checkConsistency(); err != nil { - t.Fatal(err) } } diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index 6145c670..0d55dd6c 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -57,8 +57,7 @@ CREATE TABLE volume_sectors ( sector_id INTEGER UNIQUE REFERENCES stored_sectors (id), UNIQUE (volume_id, volume_index) ); --- careful with these indices, the empty sector query is fragile and relies on --- the volume_index indice for performance. +CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id); CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id); CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC); CREATE INDEX volume_sectors_sector_id ON volume_sectors(sector_id); diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 63fcc7d1..62587e09 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -9,6 +9,77 @@ import ( "go.sia.tech/hostd/host/contracts" ) +// migrateVersion18 adds an index to the volume_sectors table to speed up +// empty sector selection. +func migrateVersion18(tx txn) error { + const query = `CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);` + _, err := tx.Exec(query) + return err +} + +// migrateVersion17 recalculates the indices of all contract sector roots. +// Fixes a bug where the indices were not being properly updated if more than +// one root was trimmed. +func migrateVersion17(tx txn) error { + const query = ` +-- create a temp table that contains the new indices +CREATE TEMP TABLE temp_contract_sector_roots AS +SELECT * FROM (SELECT id, contract_id, root_index, ROW_NUMBER() OVER (PARTITION BY contract_id ORDER BY root_index ASC)-1 AS expected_root_index +FROM contract_sector_roots) a WHERE root_index <> expected_root_index ORDER BY contract_id, root_index ASC; +-- update the contract_sector_roots table with the new indices +UPDATE contract_sector_roots +SET root_index = (SELECT expected_root_index FROM temp_contract_sector_roots WHERE temp_contract_sector_roots.id = contract_sector_roots.id) +WHERE id IN (SELECT id FROM temp_contract_sector_roots); +-- drop the temp table +DROP TABLE temp_contract_sector_roots;` + + _, err := tx.Exec(query) + return err +} + +// migrateVersion16 recalculates the contract and physical sector metrics. +func migrateVersion16(tx txn) error { + // recalculate the contract sectors metric + var contractSectorCount int64 + if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil { + return fmt.Errorf("failed to query contract sector count: %w", err) + } else if err := setNumericStat(tx, metricContractSectors, uint64(contractSectorCount), time.Now()); err != nil { + return fmt.Errorf("failed to set contract sectors metric: %w", err) + } + + // recalculate the physical sectors metric + var physicalSectorsCount int64 + volumePhysicalSectorCount := make(map[int64]int64) + rows, err := tx.Query(`SELECT volume_id, COUNT(*) FROM volume_sectors WHERE sector_id IS NOT NULL GROUP BY volume_id`) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to query volume sector count: %w", err) + } + defer rows.Close() + + for rows.Next() { + var volumeID, count int64 + if err := rows.Scan(&volumeID, &count); err != nil { + return fmt.Errorf("failed to scan volume sector count: %w", err) + } + volumePhysicalSectorCount[volumeID] = count + physicalSectorsCount += count + } + + // update the physical sectors metric + if err := setNumericStat(tx, metricPhysicalSectors, uint64(physicalSectorsCount), time.Now()); err != nil { + return fmt.Errorf("failed to set contract sectors metric: %w", err) + } + + // update the volume stats + for volumeID, count := range volumePhysicalSectorCount { + err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors = $1 WHERE id = $2 RETURNING id`, count, volumeID).Scan(&volumeID) + if err != nil { + return fmt.Errorf("failed to update volume stats: %w", err) + } + } + return nil +} + // migrateVersion15 adds the registry usage fields to the contracts table, // removes the usage fields from the accounts table, and refactors the // contract_account_funding table. @@ -419,4 +490,7 @@ var migrations = []func(tx txn) error{ migrateVersion13, migrateVersion14, migrateVersion15, + migrateVersion16, + migrateVersion17, + migrateVersion18, } diff --git a/persist/sqlite/sectors.go b/persist/sqlite/sectors.go index 0c2f68b7..c2c069ca 100644 --- a/persist/sqlite/sectors.go +++ b/persist/sqlite/sectors.go @@ -79,11 +79,8 @@ func (s *Store) RemoveSector(root types.Hash256) (err error) { } // decrement volume usage and metrics - _, err = tx.Exec(`UPDATE storage_volumes SET used_sectors=used_sectors-1 WHERE id=$1;`, volumeID) - if err != nil { - return fmt.Errorf("failed to update volume: %w", err) - } else if err = incrementNumericStat(tx, metricPhysicalSectors, -1, time.Now()); err != nil { - return fmt.Errorf("failed to update metric: %w", err) + if err = incrementVolumeUsage(tx, volumeID, -1); err != nil { + return fmt.Errorf("failed to update volume usage: %w", err) } return nil }) @@ -169,6 +166,35 @@ func (s *Store) ExpireTempSectors(height uint64) error { } } +func incrementVolumeUsage(tx txn, volumeID int64, delta int) error { + var used int64 + err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors=used_sectors+$1 WHERE id=$2 RETURNING used_sectors;`, delta, volumeID).Scan(&used) + if err != nil { + return fmt.Errorf("failed to update volume: %w", err) + } else if used < 0 { + panic("volume usage is negative") // developer error + } else if err = incrementNumericStat(tx, metricPhysicalSectors, delta, time.Now()); err != nil { + return fmt.Errorf("failed to update metric: %w", err) + } + return nil +} + +func clearVolumeSector(tx txn, id int64) error { + var volumeDBID int64 + err := tx.QueryRow(`UPDATE volume_sectors SET sector_id=NULL WHERE sector_id=$1 RETURNING volume_id`, id).Scan(&volumeDBID) + if errors.Is(err, sql.ErrNoRows) { + return nil + } else if err != nil { + return err + } + + // decrement the volume usage + if err = incrementVolumeUsage(tx, volumeDBID, -1); err != nil { + return fmt.Errorf("failed to update volume usage: %w", err) + } + return nil +} + func pruneSectorRef(tx txn, id int64) error { var hasReference bool // check if the sector is referenced by a contract @@ -194,22 +220,8 @@ func pruneSectorRef(tx txn, id int64) error { } // clear the volume sector reference - var volumeDBID int64 - err = tx.QueryRow(`UPDATE volume_sectors SET sector_id=NULL WHERE sector_id=$1 RETURNING volume_id`, id).Scan(&volumeDBID) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - return fmt.Errorf("failed to update volume sectors: %w", err) - } - // decrement the volume usage if the sector was in a volume. This should - // only happen if a sector was forcibly removed - if err == nil { - // update the volume usage - if _, err = tx.Exec(`UPDATE storage_volumes SET used_sectors=used_sectors-1 WHERE id=$1`, volumeDBID); err != nil { - return fmt.Errorf("failed to update volume: %w", err) - } - // decrement the physical sectors metric - if err = incrementNumericStat(tx, metricPhysicalSectors, -1, time.Now()); err != nil { - return fmt.Errorf("failed to update metric: %w", err) - } + if err = clearVolumeSector(tx, id); err != nil { + return fmt.Errorf("failed to clear volume sector: %w", err) } // delete the sector diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index b14b9e31..fecfc533 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -8,6 +8,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/hostd/host/storage" + "go.uber.org/zap" ) type volumeSectorRef struct { @@ -15,115 +16,96 @@ type volumeSectorRef struct { Empty bool } -func (s *Store) batchMigrateSectors(volumeID int, startIndex uint64, migrateFn func(locations []storage.SectorLocation) error) (bool, error) { - // get a batch of sectors to migrate - var done bool - var oldLocations, newLocations []storage.SectorLocation +var errNoSectorsToMigrate = errors.New("no sectors to migrate") + +func (s *Store) migrateSector(volumeID int64, startIndex uint64, migrateFn func(location storage.SectorLocation) error, log *zap.Logger) error { + start := time.Now() + var locks []int64 + var oldLoc, newLoc storage.SectorLocation err := s.transaction(func(tx txn) (err error) { - oldLocations, err = sectorsForMigration(tx, volumeID, startIndex, sqlSectorBatchSize) + oldLoc, err = sectorForMigration(tx, volumeID, startIndex) if err != nil { - return fmt.Errorf("failed to get sectors for migration: %w", err) - } else if len(oldLocations) == 0 { - done = true - return nil // no more sectors to migrate + return fmt.Errorf("failed to get sector for migration: %w", err) } - // get new locations for each sector - newLocations, err = locationsForMigration(tx, volumeID, startIndex, len(oldLocations)) - if err != nil { - return fmt.Errorf("failed to get new locations: %w", err) - } else if len(newLocations) == 0 { - // if no new locations were returned, there's no more space - return storage.ErrNotEnoughStorage - } else if len(newLocations) < len(oldLocations) { - // only enough space to partially migrate the batch. truncate - // the old locations to avoid unnecessary locks - oldLocations = oldLocations[:len(newLocations)] + newLoc, err = emptyLocationForMigration(tx, volumeID) + if errors.Is(err, storage.ErrNotEnoughStorage) && startIndex > 0 { + // if there is no space in other volumes, try to migrate within the + // same volume + newLoc, err = locationWithinVolume(tx, volumeID, startIndex) + if err != nil { + return fmt.Errorf("failed to get empty location in volume: %w", err) + } + } else if err != nil { + return fmt.Errorf("failed to get empty location: %w", err) } - // add the sector root to the new locations - for i := range newLocations { - newLocations[i].Root = oldLocations[i].Root - } + newLoc.Root = oldLoc.Root // lock the old and new locations - locks, err = lockLocations(tx, append(oldLocations, newLocations...)) + locks, err = lockLocations(tx, []storage.SectorLocation{oldLoc, newLoc}) if err != nil { return fmt.Errorf("failed to lock sectors: %w", err) } return nil }) if err != nil { - return false, fmt.Errorf("failed to move sectors: %w", err) - } else if len(oldLocations) == 0 { - return true, nil // no more sectors to migrate + return fmt.Errorf("failed to migrate sector: %w", err) } + // unlock the locations defer unlockLocations(&dbTxn{s}, locks) - // call migrateFn with the new locations, data should be copied to the - // new locations and synced to disk - if err := migrateFn(newLocations); err != nil { - return false, fmt.Errorf("failed to migrate data: %w", err) + // call the migrateFn with the new location, data should be copied to the + // new location and synced to disk + if err := migrateFn(newLoc); err != nil { + return fmt.Errorf("failed to migrate data: %w", err) } - // update the sector locations in a separate transaction + // update the sector location in a separate transaction err = s.transaction(func(tx txn) error { - selectStmt, err := tx.Prepare(`SELECT id FROM stored_sectors WHERE sector_root=$1`) + // get the sector ID + var sectorID int64 + err := tx.QueryRow(`SELECT sector_id FROM volume_sectors WHERE id=$1`, oldLoc.ID).Scan(§orID) if err != nil { - return fmt.Errorf("failed to prepare sector select statement: %w", err) + return fmt.Errorf("failed to get sector id: %w", err) } - defer selectStmt.Close() - clearStmt, err := tx.Prepare(`UPDATE volume_sectors SET sector_id=null WHERE id=$1 RETURNING volume_id`) + // clear the old sector + var oldVolumeID int64 + err = tx.QueryRow(`UPDATE volume_sectors SET sector_id=null WHERE id=$1 AND sector_id=$2 RETURNING volume_id`, oldLoc.ID, sectorID).Scan(&oldVolumeID) if err != nil { - return fmt.Errorf("failed to prepare sector clear statement: %w", err) + return fmt.Errorf("failed to clear sector location: %w", err) } - defer clearStmt.Close() - updateSectorStmt, err := tx.Prepare(`UPDATE volume_sectors SET sector_id=$1 WHERE id=$2 RETURNING volume_id;`) - if err != nil { - return fmt.Errorf("failed to prepare sector update statement: %w", err) + // update the old volume metadata + if err := incrementVolumeUsage(tx, oldVolumeID, -1); err != nil { + return fmt.Errorf("failed to update old volume metadata: %w", err) } - defer updateSectorStmt.Close() - updateMetaStmt, err := tx.Prepare(`UPDATE storage_volumes SET used_sectors=used_sectors+$1 WHERE id=$2`) + // add the sector to the new location + var newVolumeID int64 + err = tx.QueryRow(`UPDATE volume_sectors SET sector_id=$1 WHERE id=$2 RETURNING volume_id`, sectorID, newLoc.ID).Scan(&newVolumeID) if err != nil { - return fmt.Errorf("failed to prepare sector metadata update statement: %w", err) + return fmt.Errorf("failed to update sector location: %w", err) } - defer updateMetaStmt.Close() - - for i, newLoc := range newLocations { - oldLoc := oldLocations[i] - var sectorDBID int64 - if err = selectStmt.QueryRow(sqlHash256(oldLoc.Root)).Scan(§orDBID); err != nil { - return fmt.Errorf("failed to select sector: %w", err) - } - var oldVolumeID int64 - if err = clearStmt.QueryRow(oldLoc.ID).Scan(&oldVolumeID); err != nil { - return fmt.Errorf("failed to clear sector location: %w", err) - } else if _, err = updateMetaStmt.Exec(-1, oldVolumeID); err != nil { - return fmt.Errorf("failed to update sector metadata: %w", err) - } - var newVolumeID int64 - if err = updateSectorStmt.QueryRow(sectorDBID, newLoc.ID).Scan(&newVolumeID); err != nil { - return fmt.Errorf("failed to update sector location: %w", err) - } else if _, err = updateMetaStmt.Exec(1, newVolumeID); err != nil { - return fmt.Errorf("failed to update sector metadata: %w", err) - } + // update the new volume metadata + if err := incrementVolumeUsage(tx, newVolumeID, 1); err != nil { + return fmt.Errorf("failed to update new volume metadata: %w", err) } return nil }) - return done, err + log.Debug("migrated sector", zap.Uint64("oldIndex", oldLoc.Index), zap.Stringer("root", newLoc.Root), zap.Int64("newVolume", newLoc.Volume), zap.Uint64("newIndex", newLoc.Index), zap.Duration("elapsed", time.Since(start))) + return err } -func (s *Store) batchRemoveVolume(id int, force bool) (bool, error) { +func (s *Store) batchRemoveVolume(id int64) (bool, error) { var done bool err := s.transaction(func(tx txn) error { var dbID int64 err := tx.QueryRow(`SELECT id FROM volume_sectors WHERE volume_id=$1 AND sector_id IS NOT NULL LIMIT 1;`, id).Scan(&dbID) - if err == nil && !force { + if err == nil { return storage.ErrVolumeNotEmpty } else if err != nil && !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("failed to check if volume is empty: %w", err) @@ -137,23 +119,9 @@ func (s *Store) batchRemoveVolume(id int, force bool) (bool, error) { return nil // no more sectors to remove } - var forceRemoved int locIDs := make([]int64, 0, len(locations)) for _, loc := range locations { locIDs = append(locIDs, loc.ID) - // if the root is not empty, the sector was not migrated and - // will be forcefully removed - if !loc.Empty { - forceRemoved++ - } - } - - // reduce the physical sectors metric if there are sectors that - // failed to migrate. - if forceRemoved > 0 { - if err := incrementNumericStat(tx, metricPhysicalSectors, -forceRemoved, time.Now()); err != nil { - return fmt.Errorf("failed to update force removed sector metric: %w", err) - } } // remove the sectors @@ -208,7 +176,7 @@ ORDER BY v.id ASC` } // Volume returns a volume by its ID. -func (s *Store) Volume(id int) (storage.Volume, error) { +func (s *Store) Volume(id int64) (storage.Volume, error) { const query = `SELECT v.id, v.disk_path, v.read_only, v.available, v.total_sectors, v.used_sectors FROM storage_volumes v WHERE v.id=$1` @@ -281,11 +249,8 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati } // increment the volume usage - _, err = tx.Exec(`UPDATE storage_volumes SET used_sectors=used_sectors+1 WHERE id=$1`, location.Volume) - if err != nil { - return fmt.Errorf("failed to update volume usage: %w", err) - } else if err := incrementNumericStat(tx, metricPhysicalSectors, 1, time.Now()); err != nil { - return fmt.Errorf("failed to update metric: %w", err) + if err := incrementVolumeUsage(tx, location.Volume, 1); err != nil { + return fmt.Errorf("failed to update volume metadata: %w", err) } return nil }) @@ -314,23 +279,26 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati // MigrateSectors migrates each occupied sector of a volume starting at // startIndex. The sector data should be copied to the new location and synced -// to disk during migrateFn. Sectors are migrated in batches of 256. -func (s *Store) MigrateSectors(volumeID int, startIndex uint64, migrateFn func(locations []storage.SectorLocation) error) error { - for { - done, err := s.batchMigrateSectors(volumeID, startIndex, migrateFn) - if err != nil { - return fmt.Errorf("failed to migrate sectors: %w", err) - } else if done { - return nil +// to disk during migrateFn. +func (s *Store) MigrateSectors(volumeID int64, startIndex uint64, migrateFn func(location storage.SectorLocation) error) error { + log := s.log.Named("migrate").With(zap.Int64("oldVolume", volumeID), zap.Uint64("startIndex", startIndex)) + for i := 0; ; i++ { + if err := s.migrateSector(volumeID, startIndex, migrateFn, log); err != nil { + if errors.Is(err, errNoSectorsToMigrate) { + return nil + } + return fmt.Errorf("failed to migrate sector: %w", err) + } + if i%64 == 0 { + jitterSleep(time.Millisecond) // allow other transactions to run } - jitterSleep(time.Millisecond) // allow other transactions to run } } // AddVolume initializes a new storage volume and adds it to the volume // store. GrowVolume must be called afterwards to initialize the volume // to its desired size. -func (s *Store) AddVolume(localPath string, readOnly bool) (volumeID int, err error) { +func (s *Store) AddVolume(localPath string, readOnly bool) (volumeID int64, err error) { const query = `INSERT INTO storage_volumes (disk_path, read_only, used_sectors, total_sectors) VALUES (?, ?, 0, 0) RETURNING id;` err = s.queryRow(query, localPath, readOnly).Scan(&volumeID) return @@ -339,11 +307,11 @@ func (s *Store) AddVolume(localPath string, readOnly bool) (volumeID int, err er // RemoveVolume removes a storage volume from the volume store. If there // are used sectors in the volume, ErrVolumeNotEmpty is returned. If force is // true, the volume is removed regardless of whether it is empty. -func (s *Store) RemoveVolume(id int, force bool) error { +func (s *Store) RemoveVolume(id int64) error { // remove the volume sectors in batches to avoid holding a transaction lock // for too long for { - done, err := s.batchRemoveVolume(id, force) + done, err := s.batchRemoveVolume(id) if err != nil { return err } else if done { @@ -358,7 +326,7 @@ func (s *Store) RemoveVolume(id int, force bool) error { } // GrowVolume grows a storage volume's metadata by n sectors. -func (s *Store) GrowVolume(id int, maxSectors uint64) error { +func (s *Store) GrowVolume(id int64, maxSectors uint64) error { if maxSectors == 0 { panic("maxSectors must be greater than 0") // dev error } @@ -397,7 +365,7 @@ func (s *Store) GrowVolume(id int, maxSectors uint64) error { // ShrinkVolume shrinks a storage volume's metadata to maxSectors. If there are // used sectors outside of the new maximum, ErrVolumeNotEmpty is returned. -func (s *Store) ShrinkVolume(id int, maxSectors uint64) error { +func (s *Store) ShrinkVolume(id int64, maxSectors uint64) error { if maxSectors == 0 { panic("maxSectors must be greater than 0") // dev error } @@ -437,47 +405,20 @@ func (s *Store) ShrinkVolume(id int, maxSectors uint64) error { } // SetReadOnly sets the read-only flag on a volume. -func (s *Store) SetReadOnly(volumeID int, readOnly bool) error { +func (s *Store) SetReadOnly(volumeID int64, readOnly bool) error { const query = `UPDATE storage_volumes SET read_only=$1 WHERE id=$2;` _, err := s.exec(query, readOnly, volumeID) return err } // SetAvailable sets the available flag on a volume. -func (s *Store) SetAvailable(volumeID int, available bool) error { +func (s *Store) SetAvailable(volumeID int64, available bool) error { const query = `UPDATE storage_volumes SET available=$1 WHERE id=$2;` _, err := s.exec(query, available, volumeID) return err } -// sectorsForMigration returns a list of sectors that should be migrated from -// the given volume. The sectors are ordered by volume index, and the returned -// list will contain at most batchSize sectors. The startIndex is the volume -// index of the first sector to return. -func sectorsForMigration(tx txn, volumeID int, startIndex uint64, batchSize int64) (sectors []storage.SectorLocation, _ error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index, s.sector_root - FROM volume_sectors vs - INNER JOIN stored_sectors s ON (s.id=vs.sector_id) - WHERE vs.volume_id=$1 AND vs.volume_index>=$2 - ORDER BY vs.volume_index ASC LIMIT $3` - - rows, err := tx.Query(query, volumeID, startIndex, batchSize) - if err != nil { - return nil, fmt.Errorf("failed to query sectors: %w", err) - } - defer rows.Close() - - // scan the old locations - for rows.Next() { - var loc storage.SectorLocation - if err := rows.Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root)); err != nil { - return nil, fmt.Errorf("failed to scan volume sector: %w", err) - } - sectors = append(sectors, loc) - } - return sectors, nil -} - +// sectorDBID returns the database ID of a sector. func sectorDBID(tx txn, root types.Hash256) (id int64, err error) { err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) ON CONFLICT (sector_root) DO UPDATE SET last_access_timestamp=EXCLUDED.last_access_timestamp RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id) if errors.Is(err, sql.ErrNoRows) { @@ -486,6 +427,7 @@ func sectorDBID(tx txn, root types.Hash256) (id int64, err error) { return } +// sectorLocation returns the location of a sector. func sectorLocation(tx txn, sectorID int64) (loc storage.SectorLocation, err error) { const query = `SELECT v.id, v.volume_id, v.volume_index, s.sector_root FROM volume_sectors v @@ -498,6 +440,21 @@ WHERE v.sector_id=$1` return } +// emptyLocationInVolume returns an empty location in the given volume. If +// there is no space available, ErrNotEnoughStorage is returned. +func emptyLocationInVolume(tx txn, volumeID int64) (loc storage.SectorLocation, err error) { + const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs +LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) +WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 LIMIT 1;` + err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) + if errors.Is(err, sql.ErrNoRows) { + err = storage.ErrNotEnoughStorage + } + return +} + +// emptyLocation returns an empty location in a writable volume. If there is no +// space available, ErrNotEnoughStorage is returned. func emptyLocation(tx txn) (storage.SectorLocation, error) { var volumeID int64 err := tx.QueryRow(`SELECT id FROM storage_volumes WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 ORDER BY used_sectors ASC LIMIT 1;`).Scan(&volumeID) @@ -511,62 +468,79 @@ func emptyLocation(tx txn) (storage.SectorLocation, error) { // locked, but not committed. This is unlikely to happen in practice, and // the worst case is that the host fails to store a sector. The performance // benefits of choosing a volume first far outweigh the downsides. - var loc storage.SectorLocation - const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs -LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) -WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 LIMIT 1;` - err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) + return emptyLocationInVolume(tx, volumeID) +} + +// emptyLocationForMigration returns an empty location in a writable volume +// other than the given volumeID. If there is no space available, +// ErrNotEnoughStorage is returned. +func emptyLocationForMigration(tx txn, oldVolumeID int64) (loc storage.SectorLocation, err error) { + const query = `SELECT id FROM storage_volumes +WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 AND id<>$1 +ORDER BY used_sectors ASC LIMIT 1;` + var newVolumeID int64 + err = tx.QueryRow(query, oldVolumeID).Scan(&newVolumeID) if errors.Is(err, sql.ErrNoRows) { - err = storage.ErrNotEnoughStorage + return storage.SectorLocation{}, storage.ErrNotEnoughStorage + } else if err != nil { + return storage.SectorLocation{}, fmt.Errorf("failed to get empty location: %w", err) } - return loc, err + + // note: there is a slight race here where all sectors in a volume could be + // locked, but not committed. This is unlikely to happen in practice, and + // the worst case is that the host fails to store a sector. The performance + // benefits of choosing a volume first far outweigh the downsides. + return emptyLocationInVolume(tx, newVolumeID) } -func volumeSectorsForDeletion(tx txn, volumeID, batchSize int) (locs []volumeSectorRef, err error) { - const query = `SELECT id, sector_id IS NULL AS empty FROM volume_sectors WHERE volume_id=$1 LIMIT $2` - rows, err := tx.Query(query, volumeID, batchSize) - if err != nil { - return nil, fmt.Errorf("failed to query volume sectors: %w", err) - } - defer rows.Close() - for rows.Next() { - var ref volumeSectorRef - if err := rows.Scan(&ref.ID, &ref.Empty); err != nil { - return nil, fmt.Errorf("failed to scan volume sector: %w", err) - } - locs = append(locs, ref) +// sectorForMigration returns the location of the first occupied sector in the +// volume starting at minIndex. If there are no sectors to migrate, +// errNoSectorsToMigrate is returned. +func sectorForMigration(tx txn, volumeID int64, minIndex uint64) (loc storage.SectorLocation, err error) { + const query = `SELECT vs.id, vs.volume_id, vs.volume_index, s.sector_root + FROM volume_sectors vs + INNER JOIN stored_sectors s ON (s.id=vs.sector_id) + WHERE vs.sector_id IS NOT NULL AND vs.volume_id=$1 AND vs.volume_index >= $2` + + err = tx.QueryRow(query, volumeID, minIndex).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root)) + if errors.Is(err, sql.ErrNoRows) { + return storage.SectorLocation{}, errNoSectorsToMigrate } return } -// locationsForMigration returns a list of locations to migrate to. Locations -// are guaranteed to be empty and in a writable volume. As a special case, -// sectors may be migrated to empty indices less than startIndex within the -// given volume even if the volue is read-only. The locations are ordered by -// volume index and the returned list will contain at most batchSize locations. -func locationsForMigration(tx txn, volumeID int, startIndex uint64, batchSize int) (locations []storage.SectorLocation, _ error) { +// locationWithinVolume returns an empty location within the same volume as +// the given volumeID. If there is no space in the volume, ErrNotEnoughStorage +// is returned. +func locationWithinVolume(tx txn, volumeID int64, maxIndex uint64) (loc storage.SectorLocation, err error) { const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs - INNER JOIN storage_volumes v ON (vs.volume_id=v.id) WHERE vs.sector_id IS NULL AND vs.id NOT IN (SELECT volume_sector_id FROM locked_volume_sectors) - AND v.available=true AND ((v.read_only=false AND vs.volume_id <> $1) OR (vs.volume_id=$1 AND vs.volume_index<$2)) - LIMIT $3;` + AND vs.volume_id=$1 AND vs.volume_index<$2 + LIMIT 1;` + + err = tx.QueryRow(query, volumeID, maxIndex).Scan(&loc.ID, &loc.Volume, &loc.Index) + if errors.Is(err, sql.ErrNoRows) { + return storage.SectorLocation{}, storage.ErrNotEnoughStorage + } + return +} - rows, err := tx.Query(query, volumeID, startIndex, batchSize) +func volumeSectorsForDeletion(tx txn, volumeID int64, batchSize int) (locs []volumeSectorRef, err error) { + const query = `SELECT id, sector_id IS NULL AS empty FROM volume_sectors WHERE volume_id=$1 LIMIT $2` + rows, err := tx.Query(query, volumeID, batchSize) if err != nil { - return nil, fmt.Errorf("failed to query locations: %w", err) + return nil, fmt.Errorf("failed to query volume sectors: %w", err) } defer rows.Close() - - // scan the new locations for rows.Next() { - var loc storage.SectorLocation - if err := rows.Scan(&loc.ID, &loc.Volume, &loc.Index); err != nil { + var ref volumeSectorRef + if err := rows.Scan(&ref.ID, &ref.Empty); err != nil { return nil, fmt.Errorf("failed to scan volume sector: %w", err) } - locations = append(locations, loc) + locs = append(locs, ref) } - return locations, nil + return } func scanVolume(s scanner) (volume storage.Volume, err error) { diff --git a/persist/sqlite/volumes_test.go b/persist/sqlite/volumes_test.go index c6bf73da..005231e5 100644 --- a/persist/sqlite/volumes_test.go +++ b/persist/sqlite/volumes_test.go @@ -188,7 +188,7 @@ func TestVolumeAdd(t *testing.T) { volume, err := db.Volume(volumeID) if err != nil { t.Fatal(err) - } else if volume.ID != i { + } else if volume.ID != int64(i) { t.Fatalf("expected volume ID to be %v, got %v", i, volume.ID) } else if volume.LocalPath != localPath { t.Fatalf("expected local path to be %v, got %v", localPath, volume.LocalPath) @@ -390,7 +390,7 @@ func TestRemoveVolume(t *testing.T) { } // check that the empty volume can be removed - if err := db.RemoveVolume(volume.ID, false); err != nil { + if err := db.RemoveVolume(int64(volume.ID)); err != nil { t.Fatal(err) } @@ -435,12 +435,17 @@ func TestRemoveVolume(t *testing.T) { } // check that the volume cannot be removed - if err := db.RemoveVolume(volume.ID, false); !errors.Is(err, storage.ErrVolumeNotEmpty) { + if err := db.RemoveVolume(volume.ID); !errors.Is(err, storage.ErrVolumeNotEmpty) { t.Fatalf("expected ErrVolumeNotEmpty, got %v", err) } - // check that the volume can be force removed - if err := db.RemoveVolume(volume.ID, true); err != nil { + // expire all of the temporary sectors + if err := db.ExpireTempSectors(5); err != nil { + t.Fatal(err) + } + + // check that the volume can be removed + if err := db.RemoveVolume(volume.ID); err != nil { t.Fatal(err) } @@ -505,17 +510,15 @@ func TestMigrateSectors(t *testing.T) { var i int // migrate the remaining sectors to the first half of the volume - err = db.MigrateSectors(volume.ID, initialSectors/2, func(locations []storage.SectorLocation) error { - for _, loc := range locations { - if loc.Volume != volume.ID { - t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) - } else if loc.Index != uint64(i) { - t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if loc.Root != roots[i] { - t.Fatalf("expected sector root %v, got %v", roots[i], loc.Root) - } - i++ + err = db.MigrateSectors(volume.ID, initialSectors/2, func(loc storage.SectorLocation) error { + if loc.Volume != volume.ID { + t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) + } else if loc.Index != uint64(i) { + t.Fatalf("expected sector index %v, got %v", i, loc.Index) + } else if loc.Root != roots[i] { + t.Fatalf("expected sector root %v, got %v", roots[i], loc.Root) } + i++ // note: sync to disk return nil }) @@ -545,10 +548,12 @@ func TestMigrateSectors(t *testing.T) { } // migrate the remaining sectors from the first volume; should partially complete - err = db.MigrateSectors(volume.ID, 0, func(locations []storage.SectorLocation) error { - if len(locations) > initialSectors/4 { - t.Fatalf("expected only %v migrations, got %v", initialSectors/4, len(locations)) + var n int + err = db.MigrateSectors(volume.ID, 0, func(loc storage.SectorLocation) error { + if n > initialSectors/4 { + t.Fatalf("expected only %v migrations, got %v", initialSectors/4, n) } + n++ return nil }) if !errors.Is(err, storage.ErrNotEnoughStorage) { @@ -647,7 +652,7 @@ func TestPrune(t *testing.T) { Action: contracts.SectorActionAppend, }) } - err = db.ReviseContract(c, contracts.Usage{}, 0, changes) + err = db.ReviseContract(c, contracts.Usage{}, changes) if err != nil { t.Fatal(err) } @@ -716,9 +721,9 @@ func TestPrune(t *testing.T) { } } - for _, root := range deleted { + for i, root := range deleted { if _, _, err := db.SectorLocation(root); !errors.Is(err, storage.ErrSectorNotFound) { - return fmt.Errorf("expected ErrSectorNotFound, got %v", err) + return fmt.Errorf("expected ErrSectorNotFound for sector %d %q, got %v", i, root, err) } } @@ -767,7 +772,19 @@ func TestPrune(t *testing.T) { t.Fatal(err) } - // expire the contract sectors + // trim half of the contract sectors + changes = []contracts.SectorChange{ + {Action: contracts.SectorActionTrim, A: uint64(len(contractSectors) / 2)}, + } + if err := db.ReviseContract(c, contracts.Usage{}, changes); err != nil { + t.Fatal(err) + } + + if err := checkConsistency(contractSectors[:len(contractSectors)/2], nil, nil, roots[50:]); err != nil { + t.Fatal(err) + } + + // expire the rest of the contract sectors if err := db.ExpireContractSectors(c.Revision.WindowEnd + 1); err != nil { t.Fatal(err) } @@ -862,7 +879,7 @@ func BenchmarkVolumeMigrate(b *testing.B) { b.ReportMetric(float64(b.N), "sectors") // migrate all sectors from the first volume to the second - if err := db.MigrateSectors(volume1.ID, 0, func(locations []storage.SectorLocation) error { + if err := db.MigrateSectors(volume1.ID, 0, func(loc storage.SectorLocation) error { return nil }); err != nil { b.Fatal(err)