Skip to content

Commit

Permalink
bus: take renewals into account in the sectors cache
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Mar 26, 2024
1 parent 19a25f9 commit f03530f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 24 deletions.
1 change: 1 addition & 0 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) {
if jc.Check("couldn't store contract", err) == nil {
jc.Encode(r)
}
b.uploadingSectors.addRenewal(req.Contract.ID(), req.RenewedFrom)
}

func (b *bus) contractIDRootsHandlerGET(jc jape.Context) {
Expand Down
58 changes: 44 additions & 14 deletions bus/uploadingsectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const (

type (
uploadingSectorsCache struct {
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
renewedFrom map[types.FileContractID]types.FileContractID
renewedTo map[types.FileContractID]types.FileContractID
}

ongoingUpload struct {
Expand All @@ -32,7 +34,9 @@ type (

func newUploadingSectorsCache() *uploadingSectorsCache {
return &uploadingSectorsCache{
uploads: make(map[api.UploadID]*ongoingUpload),
uploads: make(map[api.UploadID]*ongoingUpload),
renewedFrom: make(map[types.FileContractID]types.FileContractID),
renewedTo: make(map[types.FileContractID]types.FileContractID),
}
}

Expand All @@ -51,6 +55,32 @@ func (ou *ongoingUpload) sectors(fcid types.FileContractID) (roots []types.Hash2
return
}

func (usc *uploadingSectorsCache) fcids(fcid types.FileContractID) (types.FileContractID, types.FileContractID) {
usc.mu.Lock()
defer usc.mu.Unlock()

if renewed, ok := usc.renewedTo[fcid]; ok {
return renewed, fcid
} else {
return fcid, usc.renewedFrom[fcid] // might be the default but that is fine
}
}

func (usc *uploadingSectorsCache) addRenewal(fcid, renewedFrom types.FileContractID) {
usc.mu.Lock()
defer usc.mu.Unlock()

// to prevent leaking memory we delete the grand parent, this is fine as
// long as a contract doesn't renew twice within the course of one upload
if prev, ok := usc.renewedFrom[renewedFrom]; ok {
delete(usc.renewedTo, prev)
delete(usc.renewedFrom, renewedFrom)
}

usc.renewedFrom[fcid] = renewedFrom
usc.renewedTo[renewedFrom] = fcid
}

func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error {
// fetch ongoing upload
usc.mu.Lock()
Expand All @@ -66,30 +96,30 @@ func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid type
return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID)
}

func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) {
func (usc *uploadingSectorsCache) ongoingUploads() []*ongoingUpload {
usc.mu.Lock()
defer usc.mu.Unlock()
var uploads []*ongoingUpload
for _, ongoing := range usc.uploads {
uploads = append(uploads, ongoing)
}
usc.mu.Unlock()
return uploads
}

for _, ongoing := range uploads {
func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) {
fcid, renewedFrom := usc.fcids(fcid)
for _, ongoing := range usc.ongoingUploads() {
size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize
size += uint64(len(ongoing.sectors(renewedFrom))) * rhp.SectorSize
}
return
}

func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) {
usc.mu.Lock()
var uploads []*ongoingUpload
for _, ongoing := range usc.uploads {
uploads = append(uploads, ongoing)
}
usc.mu.Unlock()

for _, ongoing := range uploads {
fcid, renewedFrom := usc.fcids(fcid)
for _, ongoing := range usc.ongoingUploads() {
roots = append(roots, ongoing.sectors(fcid)...)
roots = append(roots, ongoing.sectors(renewedFrom)...)
}
return
}
Expand Down
88 changes: 78 additions & 10 deletions bus/uploadingsectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"testing"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
Expand All @@ -15,20 +16,24 @@ func TestUploadingSectorsCache(t *testing.T) {
uID1 := newTestUploadID()
uID2 := newTestUploadID()

fcid1 := types.FileContractID{1}
fcid2 := types.FileContractID{2}
fcid3 := types.FileContractID{3}

c.trackUpload(uID1)
c.trackUpload(uID2)

_ = c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1})
_ = c.addUploadingSector(uID1, types.FileContractID{2}, types.Hash256{2})
_ = c.addUploadingSector(uID2, types.FileContractID{2}, types.Hash256{3})
_ = c.addUploadingSector(uID1, fcid1, types.Hash256{1})
_ = c.addUploadingSector(uID1, fcid2, types.Hash256{2})
_ = c.addUploadingSector(uID2, fcid2, types.Hash256{3})

if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) {
if roots1 := c.sectors(fcid1); len(roots1) != 1 || roots1[0] != (types.Hash256{1}) {
t.Fatal("unexpected cached sectors")
}
if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 2 {
if roots2 := c.sectors(fcid2); len(roots2) != 2 {
t.Fatal("unexpected cached sectors", roots2)
}
if roots3 := c.sectors(types.FileContractID{3}); len(roots3) != 0 {
if roots3 := c.sectors(fcid3); len(roots3) != 0 {
t.Fatal("unexpected cached sectors")
}

Expand All @@ -40,19 +45,19 @@ func TestUploadingSectorsCache(t *testing.T) {
}

c.finishUpload(uID1)
if roots1 := c.sectors(types.FileContractID{1}); len(roots1) != 0 {
if roots1 := c.sectors(fcid1); len(roots1) != 0 {
t.Fatal("unexpected cached sectors")
}
if roots2 := c.sectors(types.FileContractID{2}); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) {
if roots2 := c.sectors(fcid2); len(roots2) != 1 || roots2[0] != (types.Hash256{3}) {
t.Fatal("unexpected cached sectors")
}

c.finishUpload(uID2)
if roots2 := c.sectors(types.FileContractID{1}); len(roots2) != 0 {
if roots2 := c.sectors(fcid1); len(roots2) != 0 {
t.Fatal("unexpected cached sectors")
}

if err := c.addUploadingSector(uID1, types.FileContractID{1}, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) {
if err := c.addUploadingSector(uID1, fcid1, types.Hash256{1}); !errors.Is(err, api.ErrUnknownUpload) {
t.Fatal("unexpected error", err)
}
if err := c.trackUpload(uID1); err != nil {
Expand All @@ -61,6 +66,69 @@ func TestUploadingSectorsCache(t *testing.T) {
if err := c.trackUpload(uID1); !errors.Is(err, api.ErrUploadAlreadyExists) {
t.Fatal("unexpected error", err)
}

// reset cache
c = newUploadingSectorsCache()

// track upload that uploads across two contracts
c.trackUpload(uID1)
c.addUploadingSector(uID1, fcid1, types.Hash256{1})
c.addUploadingSector(uID1, fcid1, types.Hash256{2})
c.addRenewal(fcid2, fcid1)
c.addUploadingSector(uID1, fcid2, types.Hash256{3})
c.addUploadingSector(uID1, fcid2, types.Hash256{4})

// assert pending sizes for both contracts should be 4 sectors
p1 := c.pending(fcid1)
p2 := c.pending(fcid2)
if p1 != p2 || p1 != 4*rhpv2.SectorSize {
t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize)
}

// assert sectors for both contracts contain 4 sectors
s1 := c.sectors(fcid1)
s2 := c.sectors(fcid2)
if len(s1) != 4 || len(s2) != 4 {
t.Fatal("unexpected sectors", len(s1), len(s2))
}

// renew contract
c.addRenewal(fcid3, fcid2)

// assert renewal maps get pruned
if len(c.renewedFrom) != 1 || len(c.renewedTo) != 1 {
t.Fatal("unexpected", len(c.renewedFrom), len(c.renewedTo))
}

// repeat a similar upload
c.trackUpload(uID2)
c.addUploadingSector(uID2, fcid2, types.Hash256{1})
c.addUploadingSector(uID2, fcid2, types.Hash256{2})
c.addUploadingSector(uID2, fcid3, types.Hash256{3})
c.addUploadingSector(uID2, fcid3, types.Hash256{4})

// pending sizes should be 6 sectors because the 1st upload is still ongoing
p1 = c.pending(fcid2)
p2 = c.pending(fcid3)
if p1 != p2 || p1 != 6*rhpv2.SectorSize {
t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize)
}

// finishing upload 1 brings it back to 4
c.finishUpload(uID1)
p1 = c.pending(fcid2)
p2 = c.pending(fcid3)
if p1 != p2 || p1 != 4*rhpv2.SectorSize {
t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize)
}

// finishing upload 2 brings it back to 0
c.finishUpload(uID2)
p1 = c.pending(fcid2)
p2 = c.pending(fcid3)
if p1 != p2 || p1 != 0 {
t.Fatal("unexpected pending size", p1/rhpv2.SectorSize, p2/rhpv2.SectorSize)
}
}

func newTestUploadID() api.UploadID {
Expand Down

0 comments on commit f03530f

Please sign in to comment.