Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contract Pruning #526

Merged
merged 93 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
b820a8c
bus: add methods that return the amount of prunable data
peterjan Aug 2, 2023
657a6dc
bus: remove Debug()
peterjan Aug 2, 2023
becce0f
bus: extend TestPrunableData
peterjan Aug 2, 2023
c85fb40
bus: return 404 when passing unknown fcid to /contract/:id/prunable
peterjan Aug 2, 2023
e45d156
api: update docstrings
peterjan Aug 2, 2023
5c01cfa
bus: fix error check
peterjan Aug 2, 2023
de672a8
internal: fix TestSectorPruning NDF
peterjan Aug 2, 2023
e6791c2
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 2, 2023
2b5d8ac
worker: add contracts prune and roots routes
peterjan Aug 3, 2023
041e767
stores: TestContractRoots
peterjan Aug 4, 2023
7828afc
all: implement review remarks
peterjan Aug 4, 2023
9686682
worker,bus: add contract roots routes
peterjan Aug 3, 2023
75e05e4
stores: fix TestAncestorsContracts
peterjan Aug 4, 2023
78a6d46
Merge branch 'pj/prunable-data' into pj/contract-roots
peterjan Aug 4, 2023
b163f97
worker: cleanup PR
peterjan Aug 4, 2023
6d1da04
stores: fix TestSQLMetadataStore,TestSQLContractStore
peterjan Aug 4, 2023
f32b5ac
Merge branch 'pj/prunable-data' into pj/contract-roots
peterjan Aug 4, 2023
dc7f55b
Merge branch 'pj/contract-roots' into pj/prune-contracts
peterjan Aug 4, 2023
da17485
bus: add contract spending fields
peterjan Aug 7, 2023
8a8e199
Merge branch 'pj/contract-spending' into pj/prune-contracts
peterjan Aug 7, 2023
996316b
worker: record contract spending
peterjan Aug 7, 2023
3de58d5
stores: fix MySQL query so it passes only_full_group_by check
peterjan Aug 7, 2023
7500a91
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 8, 2023
522cd32
stores: update query
peterjan Aug 8, 2023
6999f64
stores: use MAX instead of SUM
peterjan Aug 8, 2023
3458cd1
stores: remove Debug()
peterjan Aug 8, 2023
7d66350
Merge branch 'pj/contract-roots' of https://github.com/SiaFoundation/…
peterjan Aug 8, 2023
c8b4fb5
worker: add sanity checks to ContractRoots, extend testing
peterjan Aug 9, 2023
8be995c
worker: add convenience variable
peterjan Aug 9, 2023
cc6ce49
Merge branch 'pj/contract-roots' into pj/contract-spending
peterjan Aug 9, 2023
3d7e133
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 9, 2023
808e226
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 9, 2023
670b6d3
Merge branch 'pj/contract-spending' into pj/prune-contracts
peterjan Aug 9, 2023
114e6f7
testing: fix TestSectorPruning
peterjan Aug 9, 2023
318e406
worker: return int64
peterjan Aug 9, 2023
038438b
all: update hostd
peterjan Aug 9, 2023
cdc7060
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 10, 2023
85d7e1c
worker: make sure we can handle zero cost deletions
peterjan Aug 10, 2023
da7590b
bus: add uploaded sectors cache
peterjan Aug 10, 2023
5043932
Merge branch 'pj/sectors-cache' into pj/prune-contracts
peterjan Aug 10, 2023
92ae050
bus: rework uploading sectors cache
peterjan Aug 11, 2023
f849b92
api: revert uploadID
peterjan Aug 11, 2023
f09df88
testing: fix TestUploadingSectorsCache
peterjan Aug 11, 2023
3cc64ff
Merge branch 'pj/sectors-cache' of https://github.com/SiaFoundation/r…
peterjan Aug 14, 2023
2d8e7af
worker: fix bad merge
peterjan Aug 14, 2023
7d3a780
testing: add TestUploadingSectorsCache
peterjan Aug 14, 2023
668bf18
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 14, 2023
751427a
testing: extend test
peterjan Aug 16, 2023
ef1c71c
testing: fix TestUploadingSectorsCache
peterjan Aug 16, 2023
7e865ef
Merge branch 'pj/sectors-cache' of https://github.com/SiaFoundation/r…
peterjan Aug 16, 2023
4518762
worker: fix proof size
peterjan Aug 18, 2023
edffc89
Merge branch 'pj/bus-contract-sizes' of https://github.com/SiaFoundat…
peterjan Aug 18, 2023
ac17a9f
Merge branch 'pj/bus-contract-sizes' of https://github.com/SiaFoundat…
peterjan Aug 18, 2023
3823284
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 21, 2023
118ccfb
bus: update size query
peterjan Aug 21, 2023
b97ecd2
Merge branch 'pj/prunable-data-query' into pj/prune-contracts
peterjan Aug 21, 2023
cb64aea
worker: fix payment
peterjan Aug 21, 2023
97829a0
Merge branch 'chris/revision-broadcast-fix' of https://github.com/Sia…
peterjan Aug 24, 2023
f4d106c
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 24, 2023
8a30175
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 25, 2023
aebc443
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 25, 2023
4237703
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Aug 29, 2023
c0a47bf
worker: add logging
peterjan Aug 29, 2023
bc26865
db: fix only_full_group_by compat
peterjan Aug 29, 2023
484e4bb
worker: add logging and lower batchsize for older hosts
peterjan Aug 30, 2023
3855694
worker: update logging
peterjan Aug 30, 2023
ed2bfa0
worker: get rid of the waitgroup
peterjan Aug 31, 2023
3fb80da
Merge branch 'pj/fix-health' of https://github.com/SiaFoundation/rent…
peterjan Aug 31, 2023
0b69242
Merge branch 'pj/fix-res-chan' of https://github.com/SiaFoundation/re…
peterjan Aug 31, 2023
776ce3d
worker: fix race
peterjan Aug 31, 2023
c4f549a
Merge branch 'pj/fix-res-chan' into pj/prune-contracts
peterjan Aug 31, 2023
b1cc9cb
worker: use a mutex
peterjan Aug 31, 2023
6c10280
Merge branch 'pj/fix-res-chan' of https://github.com/SiaFoundation/re…
peterjan Aug 31, 2023
7a5ee94
worker: use a mutex
peterjan Aug 31, 2023
f7c6990
db: move sectors cache into sql store
peterjan Sep 1, 2023
399dd7d
Merge branch 'pj/fix-res-chan' of https://github.com/SiaFoundation/re…
peterjan Sep 1, 2023
6ad0bba
db: undo changes to migrations
peterjan Sep 1, 2023
4cf6aea
worker: add timeout to prune request
peterjan Sep 1, 2023
14cffa1
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Sep 1, 2023
24955c3
db: cleanup PR
peterjan Sep 1, 2023
918df84
worker: fix deadlock
peterjan Sep 1, 2023
550a6e9
testing: add logging
peterjan Sep 1, 2023
ee2308c
testing: add logging
peterjan Sep 1, 2023
4411c13
Merge branch 'pj/fix-res-chan' into pj/prune-contracts
peterjan Sep 1, 2023
5178378
bus: move sectors cache
peterjan Sep 4, 2023
4b5e454
worker: refactor fetch and delete roots
peterjan Sep 4, 2023
7b00c5a
worker: cleanup PR
peterjan Sep 5, 2023
cefff67
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Sep 5, 2023
e012697
worker: undo change
peterjan Sep 5, 2023
7ac8e92
api: add prune response
peterjan Sep 5, 2023
5712d96
worker: fix remaining
peterjan Sep 5, 2023
3ced2b9
worker: fix file size edge
peterjan Sep 5, 2023
0dde5ec
worker: implement review remarks
peterjan Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ var (
// from the database.
ErrContractSetNotFound = errors.New("couldn't find contract set")

// ErrHostNotFound is returned when a host can't be retrieved from the
// database.
ErrHostNotFound = errors.New("host doesn't exist in hostdb")

// ErrSettingNotFound is returned if a requested setting is not present in the
// database.
ErrSettingNotFound = errors.New("setting not found")
Expand Down
13 changes: 13 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ type RHPScanRequest struct {
Timeout time.Duration `json:"timeout"`
}

// RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune
// endpoint.
type RHPPruneContractRequest struct {
Timeout time.Duration `json:"timeout"`
}

// RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune
// endpoint.
type RHPPruneContractResponse struct {
Pruned uint64 `json:"pruned"`
Remaining uint64 `json:"remaining"`
}

// RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint.
type RHPPriceTableRequest struct {
HostKey types.PublicKey `json:"hostKey"`
Expand Down
34 changes: 29 additions & 5 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,16 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// build the response
for fcid, size := range sizes {
// adjust the amount of prunable data with the pending uploads, due to
// how we record contract spending a contract's size might already
// include pending sectors
pending := b.uploadingSectors.pending(fcid)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

contracts = append(contracts, api.ContractPrunableData{
ID: fcid,
ContractSize: size,
Expand All @@ -726,6 +736,9 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// sort contracts by the amount of prunable data
sort.Slice(contracts, func(i, j int) bool {
if contracts[i].Prunable == contracts[j].Prunable {
return contracts[i].Size > contracts[j].Size
}
return contracts[i].Prunable > contracts[j].Prunable
})

Expand All @@ -746,9 +759,21 @@ func (b *bus) contractSizeHandlerGET(jc jape.Context) {
if errors.Is(err, api.ErrContractNotFound) {
jc.Error(err, http.StatusNotFound)
return
} else if jc.Check("failed to fetch contract size", err) == nil {
jc.Encode(size)
} else if jc.Check("failed to fetch contract size", err) != nil {
return
}

// adjust the amount of prunable data with the pending uploads, due to how
// we record contract spending a contract's size might already include
// pending sectors
pending := b.uploadingSectors.pending(id)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

jc.Encode(size)
}

func (b *bus) contractReleaseHandlerPOST(jc jape.Context) {
Expand Down Expand Up @@ -1773,14 +1798,13 @@ func (b *bus) Handler() http.Handler {
"POST /search/hosts": b.searchHostsHandlerPOST,
"GET /search/objects": b.searchObjectsHandlerGET,

"GET /stats/objects": b.objectsStatshandlerGET,

"GET /settings": b.settingsHandlerGET,
"GET /setting/:key": b.settingKeyHandlerGET,
"PUT /setting/:key": b.settingKeyHandlerPUT,
"DELETE /setting/:key": b.settingKeyHandlerDELETE,

"GET /state": b.stateHandlerGET,
"GET /state": b.stateHandlerGET,
"GET /stats/objects": b.objectsStatshandlerGET,

"POST /upload/:id": b.uploadTrackHandlerPOST,
"POST /upload/:id/sector": b.uploadAddSectorHandlerPOST,
Expand Down
15 changes: 15 additions & 0 deletions bus/uploadingsectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)
Expand Down Expand Up @@ -65,6 +66,20 @@ func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid type
return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID)
}

func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) {
usc.mu.Lock()
var uploads []*ongoingUpload
for _, ongoing := range usc.uploads {
uploads = append(uploads, ongoing)
}
usc.mu.Unlock()

for _, ongoing := range uploads {
size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize
}
return
}

func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) {
usc.mu.Lock()
var uploads []*ongoingUpload
Expand Down
6 changes: 5 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,11 @@ func TestWallet(t *testing.T) {
// to the amount of money sent as well as the miner fees used.
spendableDiff := wallet.Spendable.Sub(updated.Spendable)
if updated.Unconfirmed.Cmp(spendableDiff) > 0 {
t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here, confirmed %v->%v unconfirmed %v->%v spendable %v->%v fee %v", wallet.Confirmed, updated.Confirmed, wallet.Unconfirmed, updated.Unconfirmed, wallet.Spendable, updated.Spendable, minerFee)
t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here: \nconfirmed %v (%v) - >%v (%v) \nunconfirmed %v (%v) -> %v (%v) \nspendable %v (%v) -> %v (%v) \nfee %v (%v)",
wallet.Confirmed, wallet.Confirmed.ExactString(), updated.Confirmed, updated.Confirmed.ExactString(),
wallet.Unconfirmed, wallet.Unconfirmed.ExactString(), updated.Unconfirmed, updated.Unconfirmed.ExactString(),
wallet.Spendable, wallet.Spendable.ExactString(), updated.Spendable, updated.Spendable.ExactString(),
minerFee, minerFee.ExactString())
}
withdrawnAmt := spendableDiff.Sub(updated.Unconfirmed)
expectedWithdrawnAmt := sendAmt.Add(minerFee)
Expand Down
47 changes: 42 additions & 5 deletions internal/testing/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -192,6 +193,8 @@ func TestSectorPruning(t *testing.T) {
w := cluster.Worker
b := cluster.Bus

numObjects := 10

// add hosts
_, err = cluster.AddHostsBlocking(int(cfg.Contracts.Amount))
if err != nil {
Expand All @@ -210,7 +213,7 @@ func TestSectorPruning(t *testing.T) {
}

// add several objects
for i := 0; i < 10; i++ {
for i := 0; i < numObjects; i++ {
filename := fmt.Sprintf("obj_%d", i)
if err := w.UploadObject(context.Background(), bytes.NewReader([]byte(filename)), filename); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -238,7 +241,7 @@ func TestSectorPruning(t *testing.T) {
}
n += len(cRoots)
}
if n != rs.TotalShards*10 {
if n != rs.TotalShards*numObjects {
t.Fatal("unexpected number of roots", n)
}

Expand All @@ -254,6 +257,7 @@ func TestSectorPruning(t *testing.T) {
}
}()
b = cluster2.Bus
w = cluster2.Worker

// assert prunable data is 0
if res, err := b.PrunableData(context.Background()); err != nil {
Expand All @@ -262,8 +266,8 @@ func TestSectorPruning(t *testing.T) {
t.Fatal("expected 0 prunable data", n)
}

// delete a random number of objects
for i := 0; i < 10; i += 2 {
// delete every other object
for i := 0; i < numObjects; i += 2 {
filename := fmt.Sprintf("obj_%d", i)
if err := b.DeleteObject(context.Background(), filename, false); err != nil {
t.Fatal(err)
Expand All @@ -273,7 +277,40 @@ func TestSectorPruning(t *testing.T) {
// assert amount of prunable data
if res, err := b.PrunableData(context.Background()); err != nil {
t.Fatal(err)
} else if res.TotalPrunable != 5*uint64(rs.TotalShards)*rhpv2.SectorSize {
} else if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*uint64(rs.TotalShards)*rhpv2.SectorSize {
t.Fatal("unexpected prunable data", n)
}

// prune all contracts
for _, c := range contracts {
if _, _, err := w.RHPPruneContract(context.Background(), c.ID, 0); err != nil {
t.Fatal(err)
}
}

// assert spending records were updated and prunable data is 0
if err = Retry(10, testBusFlushInterval, func() error {
if res, err := b.PrunableData(context.Background()); err != nil {
t.Fatal(err)
} else if res.TotalPrunable != 0 {
return fmt.Errorf("unexpected prunable data: %d", n)
}
return nil
}); err != nil {
t.Fatal(err)
}

// assert spending was updated
for _, c := range contracts {
c, err := b.Contract(context.Background(), c.ID)
if err != nil {
t.Fatal(err)
}
if c.Spending.SectorRoots.IsZero() {
t.Fatal("spending record not updated")
}
if c.Spending.Deletions.IsZero() {
t.Fatal("spending record not updated")
}
}
peterjan marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 1 addition & 2 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const (
)

var (
ErrHostNotFound = errors.New("host doesn't exist in hostdb")
ErrNegativeOffset = errors.New("offset can not be negative")
ErrNegativeMaxDowntime = errors.New("max downtime can not be negative")
)
Expand Down Expand Up @@ -435,7 +434,7 @@ func (ss *SQLStore) Host(ctx context.Context, hostKey types.PublicKey) (hostdb.H
Preload("Blocklist").
Take(&h)
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return hostdb.HostInfo{}, ErrHostNotFound
return hostdb.HostInfo{}, api.ErrHostNotFound
} else if tx.Error != nil {
return hostdb.HostInfo{}, tx.Error
}
Expand Down
2 changes: 1 addition & 1 deletion stores/hostdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSQLHostDB(t *testing.T) {
ctx := context.Background()
hk := types.GeneratePrivateKey().PublicKey()
_, err = hdb.Host(ctx, hk)
if !errors.Is(err, ErrHostNotFound) {
if !errors.Is(err, api.ErrHostNotFound) {
t.Fatal(err)
}

Expand Down
3 changes: 1 addition & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ WHERE c.fcid = ?
roots = append(roots, *(*types.Hash256)(&r))
}
}

return
}

Expand Down Expand Up @@ -694,7 +693,7 @@ func (s *SQLStore) ContractSize(ctx context.Context, id types.FileContractID) (a

if err := s.db.
Raw(`
SELECT c.size, CASE WHEN c.size>(COUNT(cs.db_sector_id) * ?) THEN c.size-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable
SELECT MAX(c.size) as size, CASE WHEN MAX(c.size)>(COUNT(cs.db_sector_id) * ?) THEN MAX(c.size)-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable
FROM contracts c
LEFT JOIN contract_sectors cs ON cs.db_contract_id = c.id
WHERE c.fcid = ?
Expand Down
11 changes: 11 additions & 0 deletions worker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ func (c *Client) RHPBroadcast(ctx context.Context, fcid types.FileContractID) (e
return
}

// RHPPruneContract prunes deleted sectors from the contract with given id.
func (c *Client) RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) {
var res api.RHPPruneContractResponse
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", fcid), api.RHPPruneContractRequest{
Timeout: timeout,
}, &res)
pruned = res.Pruned
remaining = res.Remaining
return
}

// RHPContractRoots fetches the roots of the contract with given id.
func (c *Client) RHPContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) {
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", fcid), &roots)
Expand Down
Loading