Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix volume migration #180

Merged
merged 15 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type (
VolumeManager interface {
Usage() (usedSectors uint64, totalSectors uint64, err error)
Volumes() ([]storage.VolumeMeta, error)
Volume(id int) (storage.VolumeMeta, error)
Volume(id int64) (storage.VolumeMeta, error)
AddVolume(ctx context.Context, localPath string, maxSectors uint64, result chan<- error) (storage.Volume, error)
RemoveVolume(ctx context.Context, id int, force bool, result chan<- error) error
ResizeVolume(ctx context.Context, id int, maxSectors uint64, result chan<- error) error
SetReadOnly(id int, readOnly bool) error
RemoveVolume(ctx context.Context, id int64, force bool, result chan<- error) error
ResizeVolume(ctx context.Context, id int64, maxSectors uint64, result chan<- error) error
SetReadOnly(id int64, readOnly bool) error
RemoveSector(root types.Hash256) error
ResizeCache(size uint32)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func NewServer(name string, hostKey types.PublicKey, a Alerts, g Syncer, chain C
},
volumeJobs: volumeJobs{
volumes: vm,
jobs: make(map[int]context.CancelFunc),
jobs: make(map[int64]context.CancelFunc),
},
}
return jape.Mux(map[string]jape.Handler{
Expand Down
4 changes: 2 additions & 2 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (a *api) handleGETVolume(c jape.Context) {
return
}

volume, err := a.volumes.Volume(id)
volume, err := a.volumes.Volume(int64(id))
if errors.Is(err, storage.ErrVolumeNotFound) {
c.Error(err, http.StatusNotFound)
return
Expand All @@ -309,7 +309,7 @@ func (a *api) handlePUTVolume(c jape.Context) {
return
}

err := a.volumes.SetReadOnly(id, req.ReadOnly)
err := a.volumes.SetReadOnly(int64(id), req.ReadOnly)
if errors.Is(err, storage.ErrVolumeNotFound) {
c.Error(err, http.StatusNotFound)
return
Expand Down
14 changes: 7 additions & 7 deletions api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type (
volumes VolumeManager

mu sync.Mutex // protects jobs
jobs map[int]context.CancelFunc
jobs map[int64]context.CancelFunc
}
)

Expand Down Expand Up @@ -48,7 +48,7 @@ func (vj *volumeJobs) AddVolume(path string, maxSectors uint64) (storage.Volume,
return volume, nil
}

func (vj *volumeJobs) RemoveVolume(id int, force bool) error {
func (vj *volumeJobs) RemoveVolume(id int64, force bool) error {
vj.mu.Lock()
defer vj.mu.Unlock()
if _, exists := vj.jobs[id]; exists {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (vj *volumeJobs) RemoveVolume(id int, force bool) error {
return nil
}

func (vj *volumeJobs) ResizeVolume(id int, newSize uint64) error {
func (vj *volumeJobs) ResizeVolume(id int64, newSize uint64) error {
vj.mu.Lock()
defer vj.mu.Unlock()
if _, exists := vj.jobs[id]; exists {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (vj *volumeJobs) ResizeVolume(id int, newSize uint64) error {
return nil
}

func (vj *volumeJobs) Cancel(id int) error {
func (vj *volumeJobs) Cancel(id int64) error {
vj.mu.Lock()
defer vj.mu.Unlock()
cancel, exists := vj.jobs[id]
Expand Down Expand Up @@ -163,7 +163,7 @@ func (a *api) handleDeleteVolume(c jape.Context) {
} else if err := c.DecodeForm("force", &force); err != nil {
return
}
err := a.volumeJobs.RemoveVolume(id, force)
err := a.volumeJobs.RemoveVolume(int64(id), force)
a.checkServerError(c, "failed to remove volume", err)
}

Expand All @@ -181,7 +181,7 @@ func (a *api) handlePUTVolumeResize(c jape.Context) {
return
}

err := a.volumeJobs.ResizeVolume(id, req.MaxSectors)
err := a.volumeJobs.ResizeVolume(int64(id), req.MaxSectors)
a.checkServerError(c, "failed to resize volume", err)
}

Expand All @@ -194,6 +194,6 @@ func (a *api) handleDELETEVolumeCancelOp(c jape.Context) {
return
}

err := a.volumeJobs.Cancel(id)
err := a.volumeJobs.Cancel(int64(id))
a.checkServerError(c, "failed to cancel operation", err)
}
2 changes: 1 addition & 1 deletion host/contracts/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (cu *ContractUpdater) Commit(revision SignedRevision, usage Usage) error {

start := time.Now()
// revise the contract
err := cu.store.ReviseContract(revision, usage, cu.sectors, cu.sectorActions)
err := cu.store.ReviseContract(revision, usage, cu.sectorActions)
if err == nil {
// clear the committed sector actions
cu.sectorActions = cu.sectorActions[:0]
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func TestSectorRoots(t *testing.T) {
defer release()

// use the database method directly to avoid the sector cache
err = db.ReviseContract(rev, contracts.Usage{}, uint64(i), []contracts.SectorChange{
err = db.ReviseContract(rev, contracts.Usage{}, []contracts.SectorChange{
{Action: contracts.SectorActionAppend, Root: root},
})
if err != nil {
Expand Down
20 changes: 1 addition & 19 deletions host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,6 @@ import (
)

type (
// UpdateContractTransaction atomically updates a single contract and its
// associated sector roots.
UpdateContractTransaction interface {
// AppendSector appends a sector root to the end of the contract
AppendSector(root types.Hash256) error
// SwapSectors swaps the sector roots at the given indices.
SwapSectors(i, j uint64) error
// TrimSectors removes the last n sector roots from the contract.
TrimSectors(n int) error
// UpdateSector updates the sector root at the given index.
UpdateSector(index uint64, newRoot types.Hash256) error

// AddUsage adds the additional usage costs to the contract.
AddUsage(Usage) error
// ReviseContract updates the current revision associated with a contract.
ReviseContract(SignedRevision) error
}

// UpdateStateTransaction atomically updates the contract manager's state.
UpdateStateTransaction interface {
ContractRelevant(types.FileContractID) (bool, error)
Expand Down Expand Up @@ -66,7 +48,7 @@ type (
ContractAction(height uint64, contractFn func(types.FileContractID, uint64, string)) error
// ReviseContract atomically updates a contract and its associated
// sector roots.
ReviseContract(revision SignedRevision, usage Usage, oldSectors uint64, sectorChanges []SectorChange) error
ReviseContract(revision SignedRevision, usage Usage, sectorChanges []SectorChange) error
// UpdateContractState atomically updates the contract manager's state.
UpdateContractState(modules.ConsensusChangeID, uint64, func(UpdateStateTransaction) error) error
// ExpireContractSectors removes sector roots for any contracts that are
Expand Down
24 changes: 12 additions & 12 deletions host/storage/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,33 @@ type (
// Volumes returns a list of all volumes in the volume store.
Volumes() ([]Volume, error)
// Volume returns a volume in the store by its id
Volume(id int) (Volume, error)
Volume(id int64) (Volume, error)
// AddVolume initializes a new storage volume and adds it to the volume
// store. GrowVolume must be called afterwards to initialize the volume
// to its desired size.
AddVolume(localPath string, readOnly bool) (int, error)
AddVolume(localPath string, readOnly bool) (int64, error)
// RemoveVolume removes a storage volume from the volume store. If there
// are used sectors in the volume, ErrVolumeNotEmpty is returned. If
// force is true, the volume is removed even if it is not empty.
RemoveVolume(volumeID int, force bool) error
RemoveVolume(volumeID int64) error
// GrowVolume grows a storage volume's metadata to maxSectors. If the
// number of sectors in the volume is already greater than maxSectors,
// nil is returned.
GrowVolume(volumeID int, maxSectors uint64) error
GrowVolume(volumeID int64, maxSectors uint64) error
// ShrinkVolume shrinks a storage volume's metadata to maxSectors. If
// there are used sectors in the shrink range, an error is returned.
ShrinkVolume(volumeID int, maxSectors uint64) error
ShrinkVolume(volumeID int64, maxSectors uint64) error

// SetReadOnly sets the read-only flag on a volume.
SetReadOnly(volumeID int, readOnly bool) error
SetReadOnly(volumeID int64, readOnly bool) error
// SetAvailable sets the available flag on a volume.
SetAvailable(volumeID int, available bool) error
SetAvailable(volumeID int64, available bool) error

// MigrateSectors returns a new location for each occupied sector of a volume
// starting at min. The sector data should be copied to the new volume and
// synced to disk during migrateFn. Iteration is stopped if migrateFn returns an
// error.
MigrateSectors(volumeID int, min uint64, migrateFn func(newLocations []SectorLocation) error) error
// MigrateSectors returns a new location for each occupied sector of a
// volume starting at min. The sector data should be copied to the new
// location and synced to disk during migrateFn. Iteration is stopped if
// migrateFn returns an error.
MigrateSectors(volumeID int64, min uint64, migrateFn func(SectorLocation) error) error
// StoreSector calls fn with an empty location in a writable volume. If
// the sector root already exists, fn is called with the existing
// location and exists is true. Unless exists is true, The sector must
Expand Down
Loading