Skip to content

Commit

Permalink
worker: fix NDF
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Dec 6, 2023
1 parent 2ba2184 commit cdc9060
Showing 1 changed file with 25 additions and 27 deletions.
52 changes: 25 additions & 27 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ type (

sectorUploadResp struct {
req *sectorUploadReq
fcid types.FileContractID
hk types.PublicKey
root types.Hash256
err error
}
Expand Down Expand Up @@ -1148,7 +1146,7 @@ outer:
if err != nil {
req.fail(err)
} else {
req.succeed(root, u.hk, u.fcid)
req.succeed(root)
}

// track the error, ignore gracefully closed streams and canceled overdrives
Expand Down Expand Up @@ -1297,12 +1295,10 @@ func (u *uploader) pop() *sectorUploadReq {
return nil
}

func (req *sectorUploadReq) succeed(root types.Hash256, hk types.PublicKey, fcid types.FileContractID) {
func (req *sectorUploadReq) succeed(root types.Hash256) {
select {
case <-req.sector.ctx.Done():
case req.responseChan <- sectorUploadResp{
fcid: fcid,
hk: hk,
req: req,
root: root,
}:
Expand Down Expand Up @@ -1345,8 +1341,8 @@ func (s *slabUpload) finish() (sectors []object.Sector, _ error) {
return nil, fmt.Errorf("failed to upload slab: remaining=%d, inflight=%d, launched=%d uploaders=%d errors=%d %w", remaining, s.numInflight, s.numLaunched, s.mgr.numUploaders(), len(s.errs), s.errs)
}

for _, sector := range s.sectors {
sectors = append(sectors, sector.uploaded)
for i := 0; i < len(s.shards); i++ {
sectors = append(sectors, s.sectors[i].uploaded)
}
return
}
Expand Down Expand Up @@ -1496,48 +1492,50 @@ func (s *slabUpload) receive(resp sectorUploadResp) bool {
s.mu.Lock()
defer s.mu.Unlock()

// convenience variable
req := resp.req
sector := req.sector

// update the state
if resp.req.overdrive {
resp.req.sector.mu.Lock()
delete(resp.req.sector.overdriving, resp.req.fcid)
resp.req.sector.mu.Unlock()
if req.overdrive {
sector.mu.Lock()
delete(sector.overdriving, req.fcid)
sector.mu.Unlock()
}

// failed reqs can't complete the upload
s.numInflight--
if resp.err != nil {
s.errs[resp.req.hk] = resp.err
s.errs[req.hk] = resp.err
return false
}

// mark uploaders we used for overdrives as unused
for fcid := range sector.overdriving {
s.upload.markUnused(req.sID, fcid)
}

// redundant sectors can't complete the upload
if resp.req.sector.uploaded.Root != (types.Hash256{}) {
if sector.uploaded.Root != (types.Hash256{}) {
return false
}

// store the sector
resp.req.sector.uploaded = object.Sector{
Contracts: map[types.PublicKey][]types.FileContractID{resp.hk: {resp.fcid}},
LatestHost: resp.req.hk,
sector.uploaded = object.Sector{
Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}},
LatestHost: req.hk,
Root: resp.root,
}

// cancel the sector context
resp.req.sector.cancel()

// mark uploaders we used for overdrives as unused
for fcid := range resp.req.sector.overdriving {
if fcid != resp.req.fcid {
s.upload.markUnused(resp.req.sID, fcid)
}
}
sector.cancel()

// update uploaded sectors
s.numUploaded++

// release memory
resp.req.sector.data = nil
s.shards[resp.req.sector.index] = nil
sector.data = nil
s.shards[sector.index] = nil
s.mem.ReleaseSome(rhpv2.SectorSize)

return s.numUploaded == uint64(len(s.shards))
Expand Down

0 comments on commit cdc9060

Please sign in to comment.