Skip to content

Commit

Permalink
worker: try renew contract when max revision is reached (#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored Jan 18, 2024
1 parent 3c7dd62 commit 3537c8d
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 55 deletions.
5 changes: 4 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,10 @@ func TestUploadDownloadSpending(t *testing.T) {
cluster.MineToRenewWindow()

// wait for the contract to be renewed
tt.Retry(100, 100*time.Millisecond, func() error {
tt.Retry(10, time.Second, func() error {
// mine a block
cluster.MineBlocks(1)

// fetch contracts
cms, err := cluster.Bus.Contracts(context.Background(), api.ContractsOpts{})
tt.OK(err)
Expand Down
43 changes: 23 additions & 20 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type (
sectors map[types.Hash256]*[rhpv2.SectorSize]byte
}

mockContractLocker struct {
mockContractStore struct {
contracts map[types.FileContractID]*mockContract
}

Expand Down Expand Up @@ -65,7 +65,7 @@ type (
}

mockWorker struct {
cl *mockContractLocker
cs *mockContractStore
hm *mockHostManager
mm *mockMemoryManager
os *mockObjectStore
Expand All @@ -78,12 +78,12 @@ type (
)

var (
_ ContractLocker = (*mockContractLocker)(nil)
_ Host = (*mockHost)(nil)
_ HostManager = (*mockHostManager)(nil)
_ Memory = (*mockMemory)(nil)
_ MemoryManager = (*mockMemoryManager)(nil)
_ ObjectStore = (*mockObjectStore)(nil)
_ ContractStore = (*mockContractStore)(nil)
_ Host = (*mockHost)(nil)
_ HostManager = (*mockHostManager)(nil)
_ Memory = (*mockMemory)(nil)
_ MemoryManager = (*mockMemoryManager)(nil)
_ ObjectStore = (*mockObjectStore)(nil)
)

var (
Expand Down Expand Up @@ -113,6 +113,10 @@ func (mm *mockMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Memo
return &mockMemory{}
}

func (os *mockContractStore) RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error) {
return api.ContractMetadata{}, api.ErrContractNotFound
}

func (os *mockObjectStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) {
return nil
}
Expand Down Expand Up @@ -350,8 +354,8 @@ func (hp *mockHostManager) Host(hk types.PublicKey, fcid types.FileContractID, s
return hp.hosts[hk]
}

func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) {
if lock, ok := cl.contracts[fcid]; !ok {
func (cs *mockContractStore) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) {
if lock, ok := cs.contracts[fcid]; !ok {
return 0, errContractNotFound
} else {
lock.mu.Lock()
Expand All @@ -360,16 +364,16 @@ func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.Fi
return 0, nil
}

func (cl *mockContractLocker) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) {
if lock, ok := cl.contracts[fcid]; !ok {
func (cs *mockContractStore) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) {
if lock, ok := cs.contracts[fcid]; !ok {
return errContractNotFound
} else {
lock.mu.Unlock()
}
return nil
}

func (cl *mockContractLocker) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) {
func (cs *mockContractStore) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) {
return nil
}

Expand Down Expand Up @@ -406,12 +410,12 @@ func newMockContract(fcid types.FileContractID) *mockContract {
}
}

func newMockContractLocker(contracts []*mockContract) *mockContractLocker {
cl := &mockContractLocker{contracts: make(map[types.FileContractID]*mockContract)}
func newMockContractStore(contracts []*mockContract) *mockContractStore {
cs := &mockContractStore{contracts: make(map[types.FileContractID]*mockContract)}
for _, c := range contracts {
cl.contracts[c.rev.ParentID] = c
cs.contracts[c.rev.ParentID] = c
}
return cl
return cs
}

func newMockHostManager(hosts []*mockHost) *mockHostManager {
Expand Down Expand Up @@ -440,13 +444,13 @@ func newMockWorker(numHosts int) *mockWorker {
contracts := newMockContracts(hosts)

// create dependencies
cl := newMockContractLocker(contracts)
cs := newMockContractStore(contracts)
hm := newMockHostManager(hosts)
os := newMockObjectStore()
mm := &mockMemoryManager{}

dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cl, 0, 0, time.Minute, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar())

// create contract metadata
metadatas := make(contractsMap)
Expand All @@ -458,7 +462,6 @@ func newMockWorker(numHosts int) *mockWorker {
}

return &mockWorker{
cl: cl,
hm: hm,
mm: mm,
os: os,
Expand Down
56 changes: 34 additions & 22 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type (
hm HostManager
mm MemoryManager
os ObjectStore
cl ContractLocker
cs ContractStore
logger *zap.SugaredLogger

contractLockDuration time.Duration
Expand Down Expand Up @@ -110,13 +110,15 @@ type (
}

sectorUpload struct {
index int
data *[rhpv2.SectorSize]byte
root types.Hash256
uploaded object.Sector
index int
root types.Hash256

ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
uploaded object.Sector
data *[rhpv2.SectorSize]byte
}

sectorUploadReq struct {
Expand Down Expand Up @@ -313,12 +315,12 @@ func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings
return nil
}

func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cl ContractLocker, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager {
func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cs ContractStore, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager {
return &uploadManager{
hm: hm,
mm: mm,
os: os,
cl: cl,
cs: cs,
logger: logger,

contractLockDuration: contractLockDuration,
Expand All @@ -335,10 +337,11 @@ func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os
}
}

func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host, c api.ContractMetadata, bh uint64) *uploader {
func (mgr *uploadManager) newUploader(os ObjectStore, cs ContractStore, hm HostManager, c api.ContractMetadata, bh uint64) *uploader {
return &uploader{
os: os,
cl: cl,
cs: cs,
hm: hm,
logger: mgr.logger,

// static
Expand All @@ -352,7 +355,7 @@ func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host,
statsSectorUploadSpeedBytesPerMS: stats.NoDecay(),

// covered by mutex
host: h,
host: hm.Host(c.HostKey, c.ID, c.SiamuxAddr),
bh: bh,
fcid: c.ID,
endHeight: c.WindowEnd,
Expand Down Expand Up @@ -559,6 +562,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
mem.Release()
}(up.rs, data, length, slabIndex)
}

slabIndex++
}
}()
Expand Down Expand Up @@ -727,10 +731,9 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
var refreshed []*uploader
existing := make(map[types.FileContractID]struct{})
for _, uploader := range mgr.uploaders {
// renew uploaders that got renewed
// refresh uploaders that got renewed
if renewal, renewed := renewals[uploader.ContractID()]; renewed {
host := mgr.hm.Host(renewal.HostKey, renewal.ID, renewal.SiamuxAddr)
uploader.Renew(host, renewal, bh)
uploader.Refresh(renewal, bh)
}

// stop uploaders that expired
Expand All @@ -751,8 +754,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
// add missing uploaders
for _, c := range contracts {
if _, exists := existing[c.ID]; !exists {
host := mgr.hm.Host(c.HostKey, c.ID, c.SiamuxAddr)
uploader := mgr.newUploader(mgr.os, mgr.cl, host, c, bh)
uploader := mgr.newUploader(mgr.os, mgr.cs, mgr.hm, c, bh)
refreshed = append(refreshed, uploader)
go uploader.Start()
}
Expand Down Expand Up @@ -1000,7 +1002,6 @@ func (s *slabUpload) launch(req *sectorUploadReq) error {
s.lastOverdrive = time.Now()
s.numOverdriving++
}

// update the state
s.numInflight++
s.numLaunched++
Expand Down Expand Up @@ -1071,18 +1072,15 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
}

// store the sector
sector.uploaded = object.Sector{
sector.finish(object.Sector{
Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}},
LatestHost: req.hk,
Root: resp.root,
}
})

// update uploaded sectors
s.numUploaded++

// cancel the sector's context
sector.cancel()

// release all other candidates for this sector
for _, candidate := range s.candidates {
if candidate.req != nil && candidate.req != req && candidate.req.sector.index == sector.index {
Expand All @@ -1091,16 +1089,30 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
}

// release memory
sector.data = nil
s.mem.ReleaseSome(rhpv2.SectorSize)

return true, s.numUploaded == s.numSectors
}

func (s *sectorUpload) finish(sector object.Sector) {
s.mu.Lock()
defer s.mu.Unlock()

s.cancel()
s.uploaded = sector
s.data = nil
}

func (s *sectorUpload) isUploaded() bool {
return s.uploaded.Root != (types.Hash256{})
}

func (s *sectorUpload) sectorData() *[rhpv2.SectorSize]byte {
s.mu.Lock()
defer s.mu.Unlock()
return s.data
}

func (req *sectorUploadReq) done() bool {
select {
case <-req.sector.ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func TestUpload(t *testing.T) {
contracts := newMockContracts(hosts)

// mock dependencies
cl := newMockContractLocker(contracts)
cs := newMockContractStore(contracts)
hm := newMockHostManager(hosts)
os := newMockObjectStore()
mm := &mockMemoryManager{}

// create managers
dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cl, 0, 0, time.Minute, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar())

// create test data
data := make([]byte, 128)
Expand Down
34 changes: 26 additions & 8 deletions worker/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
type (
uploader struct {
os ObjectStore
cl ContractLocker
cs ContractStore
hm HostManager
logger *zap.SugaredLogger

hk types.PublicKey
Expand Down Expand Up @@ -70,12 +71,12 @@ func (u *uploader) Healthy() bool {
return u.consecutiveFailures == 0
}

func (u *uploader) Renew(h Host, c api.ContractMetadata, bh uint64) {
func (u *uploader) Refresh(c api.ContractMetadata, bh uint64) {
u.mu.Lock()
defer u.mu.Unlock()

u.bh = bh
u.host = h
u.host = u.hm.Host(c.HostKey, c.ID, c.SiamuxAddr)
u.fcid = c.ID
u.siamuxAddr = c.SiamuxAddr
u.endHeight = c.WindowEnd
Expand Down Expand Up @@ -120,8 +121,10 @@ outer:

// the uploader's contract got renewed, requeue the request
if errors.Is(err, errMaxRevisionReached) {
u.enqueue(req)
continue outer
if u.tryRefresh(req.sector.ctx) {
u.enqueue(req)
continue outer
}
}

// send the response
Expand Down Expand Up @@ -196,13 +199,13 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration,
u.mu.Unlock()

// acquire contract lock
lockID, err := u.cl.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration)
lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration)
if err != nil {
return types.Hash256{}, 0, err
}

// defer the release
lock := newContractLock(u.shutdownCtx, fcid, lockID, req.contractLockDuration, u.cl, u.logger)
lock := newContractLock(u.shutdownCtx, fcid, lockID, req.contractLockDuration, u.cs, u.logger)
defer func() {
ctx, cancel := context.WithTimeout(u.shutdownCtx, 10*time.Second)
lock.Release(ctx)
Expand All @@ -228,7 +231,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration,

// upload the sector
start := time.Now()
root, err := host.UploadSector(ctx, req.sector.data, rev)
root, err := host.UploadSector(ctx, req.sector.sectorData(), rev)
if err != nil {
return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err)
}
Expand Down Expand Up @@ -287,3 +290,18 @@ func (u *uploader) tryRecomputeStats() {
u.statsSectorUploadEstimateInMS.Recompute()
u.statsSectorUploadSpeedBytesPerMS.Recompute()
}

func (u *uploader) tryRefresh(ctx context.Context) bool {
// fetch the renewed contract
renewed, err := u.cs.RenewedContract(ctx, u.ContractID())
if isError(err, api.ErrContractNotFound) || isError(err, context.Canceled) {
return false
} else if err != nil {
u.logger.Errorf("failed to fetch renewed contract %v, err: %v", u.ContractID(), err)
return false
}

// renew the uploader with the renewed contract
u.Refresh(renewed, u.BlockHeight())
return true
}
Loading

0 comments on commit 3537c8d

Please sign in to comment.