Skip to content

Commit

Permalink
Merge pull request #526 from SiaFoundation/pj/prune-contracts
Browse files Browse the repository at this point in the history
Contract Pruning
  • Loading branch information
ChrisSchinnerl authored Sep 6, 2023
2 parents 2c16c88 + 0dde5ec commit 4b6cfb8
Show file tree
Hide file tree
Showing 13 changed files with 554 additions and 116 deletions.
4 changes: 4 additions & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ var (
// from the database.
ErrContractSetNotFound = errors.New("couldn't find contract set")

// ErrHostNotFound is returned when a host can't be retrieved from the
// database.
ErrHostNotFound = errors.New("host doesn't exist in hostdb")

// ErrSettingNotFound is returned if a requested setting is not present in the
// database.
ErrSettingNotFound = errors.New("setting not found")
Expand Down
13 changes: 13 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ type RHPScanRequest struct {
Timeout time.Duration `json:"timeout"`
}

// RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune
// endpoint.
type RHPPruneContractRequest struct {
Timeout time.Duration `json:"timeout"`
}

// RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune
// endpoint.
type RHPPruneContractResponse struct {
Pruned uint64 `json:"pruned"`
Remaining uint64 `json:"remaining"`
}

// RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint.
type RHPPriceTableRequest struct {
HostKey types.PublicKey `json:"hostKey"`
Expand Down
34 changes: 29 additions & 5 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,16 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// build the response
for fcid, size := range sizes {
// adjust the amount of prunable data with the pending uploads, due to
// how we record contract spending a contract's size might already
// include pending sectors
pending := b.uploadingSectors.pending(fcid)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

contracts = append(contracts, api.ContractPrunableData{
ID: fcid,
ContractSize: size,
Expand All @@ -781,6 +791,9 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// sort contracts by the amount of prunable data
sort.Slice(contracts, func(i, j int) bool {
if contracts[i].Prunable == contracts[j].Prunable {
return contracts[i].Size > contracts[j].Size
}
return contracts[i].Prunable > contracts[j].Prunable
})

Expand All @@ -801,9 +814,21 @@ func (b *bus) contractSizeHandlerGET(jc jape.Context) {
if errors.Is(err, api.ErrContractNotFound) {
jc.Error(err, http.StatusNotFound)
return
} else if jc.Check("failed to fetch contract size", err) == nil {
jc.Encode(size)
} else if jc.Check("failed to fetch contract size", err) != nil {
return
}

// adjust the amount of prunable data with the pending uploads, due to how
// we record contract spending a contract's size might already include
// pending sectors
pending := b.uploadingSectors.pending(id)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

jc.Encode(size)
}

func (b *bus) contractReleaseHandlerPOST(jc jape.Context) {
Expand Down Expand Up @@ -1858,14 +1883,13 @@ func (b *bus) Handler() http.Handler {
"POST /search/hosts": b.searchHostsHandlerPOST,
"GET /search/objects": b.searchObjectsHandlerGET,

"GET /stats/objects": b.objectsStatshandlerGET,

"GET /settings": b.settingsHandlerGET,
"GET /setting/:key": b.settingKeyHandlerGET,
"PUT /setting/:key": b.settingKeyHandlerPUT,
"DELETE /setting/:key": b.settingKeyHandlerDELETE,

"GET /state": b.stateHandlerGET,
"GET /state": b.stateHandlerGET,
"GET /stats/objects": b.objectsStatshandlerGET,

"POST /upload/:id": b.uploadTrackHandlerPOST,
"POST /upload/:id/sector": b.uploadAddSectorHandlerPOST,
Expand Down
15 changes: 15 additions & 0 deletions bus/uploadingsectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)
Expand Down Expand Up @@ -65,6 +66,20 @@ func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid type
return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID)
}

func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) {
usc.mu.Lock()
var uploads []*ongoingUpload
for _, ongoing := range usc.uploads {
uploads = append(uploads, ongoing)
}
usc.mu.Unlock()

for _, ongoing := range uploads {
size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize
}
return
}

func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) {
usc.mu.Lock()
var uploads []*ongoingUpload
Expand Down
6 changes: 5 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,11 @@ func TestWallet(t *testing.T) {
// to the amount of money sent as well as the miner fees used.
spendableDiff := wallet.Spendable.Sub(updated.Spendable)
if updated.Unconfirmed.Cmp(spendableDiff) > 0 {
t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here, confirmed %v->%v unconfirmed %v->%v spendable %v->%v fee %v", wallet.Confirmed, updated.Confirmed, wallet.Unconfirmed, updated.Unconfirmed, wallet.Spendable, updated.Spendable, minerFee)
t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here: \nconfirmed %v (%v) - >%v (%v) \nunconfirmed %v (%v) -> %v (%v) \nspendable %v (%v) -> %v (%v) \nfee %v (%v)",
wallet.Confirmed, wallet.Confirmed.ExactString(), updated.Confirmed, updated.Confirmed.ExactString(),
wallet.Unconfirmed, wallet.Unconfirmed.ExactString(), updated.Unconfirmed, updated.Unconfirmed.ExactString(),
wallet.Spendable, wallet.Spendable.ExactString(), updated.Spendable, updated.Spendable.ExactString(),
minerFee, minerFee.ExactString())
}
withdrawnAmt := spendableDiff.Sub(updated.Unconfirmed)
expectedWithdrawnAmt := sendAmt.Add(minerFee)
Expand Down
47 changes: 42 additions & 5 deletions internal/testing/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -192,6 +193,8 @@ func TestSectorPruning(t *testing.T) {
w := cluster.Worker
b := cluster.Bus

numObjects := 10

// add hosts
_, err = cluster.AddHostsBlocking(int(cfg.Contracts.Amount))
if err != nil {
Expand All @@ -210,7 +213,7 @@ func TestSectorPruning(t *testing.T) {
}

// add several objects
for i := 0; i < 10; i++ {
for i := 0; i < numObjects; i++ {
filename := fmt.Sprintf("obj_%d", i)
if err := w.UploadObject(context.Background(), bytes.NewReader([]byte(filename)), filename); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -238,7 +241,7 @@ func TestSectorPruning(t *testing.T) {
}
n += len(cRoots)
}
if n != rs.TotalShards*10 {
if n != rs.TotalShards*numObjects {
t.Fatal("unexpected number of roots", n)
}

Expand All @@ -254,6 +257,7 @@ func TestSectorPruning(t *testing.T) {
}
}()
b = cluster2.Bus
w = cluster2.Worker

// assert prunable data is 0
if res, err := b.PrunableData(context.Background()); err != nil {
Expand All @@ -262,8 +266,8 @@ func TestSectorPruning(t *testing.T) {
t.Fatal("expected 0 prunable data", n)
}

// delete a random number of objects
for i := 0; i < 10; i += 2 {
// delete every other object
for i := 0; i < numObjects; i += 2 {
filename := fmt.Sprintf("obj_%d", i)
if err := b.DeleteObject(context.Background(), api.DefaultBucketName, filename, false); err != nil {
t.Fatal(err)
Expand All @@ -273,7 +277,40 @@ func TestSectorPruning(t *testing.T) {
// assert amount of prunable data
if res, err := b.PrunableData(context.Background()); err != nil {
t.Fatal(err)
} else if res.TotalPrunable != 5*uint64(rs.TotalShards)*rhpv2.SectorSize {
} else if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*uint64(rs.TotalShards)*rhpv2.SectorSize {
t.Fatal("unexpected prunable data", n)
}

// prune all contracts
for _, c := range contracts {
if _, _, err := w.RHPPruneContract(context.Background(), c.ID, 0); err != nil {
t.Fatal(err)
}
}

// assert spending records were updated and prunable data is 0
if err = Retry(10, testBusFlushInterval, func() error {
if res, err := b.PrunableData(context.Background()); err != nil {
t.Fatal(err)
} else if res.TotalPrunable != 0 {
return fmt.Errorf("unexpected prunable data: %d", n)
}
return nil
}); err != nil {
t.Fatal(err)
}

// assert spending was updated
for _, c := range contracts {
c, err := b.Contract(context.Background(), c.ID)
if err != nil {
t.Fatal(err)
}
if c.Spending.SectorRoots.IsZero() {
t.Fatal("spending record not updated")
}
if c.Spending.Deletions.IsZero() {
t.Fatal("spending record not updated")
}
}
}
3 changes: 1 addition & 2 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
)

var (
ErrHostNotFound = errors.New("host doesn't exist in hostdb")
ErrNegativeOffset = errors.New("offset can not be negative")
ErrNegativeMaxDowntime = errors.New("max downtime can not be negative")
)
Expand Down Expand Up @@ -434,7 +433,7 @@ func (ss *SQLStore) Host(ctx context.Context, hostKey types.PublicKey) (hostdb.H
Preload("Blocklist").
Take(&h)
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return hostdb.HostInfo{}, ErrHostNotFound
return hostdb.HostInfo{}, api.ErrHostNotFound
} else if tx.Error != nil {
return hostdb.HostInfo{}, tx.Error
}
Expand Down
2 changes: 1 addition & 1 deletion stores/hostdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSQLHostDB(t *testing.T) {
ctx := context.Background()
hk := types.GeneratePrivateKey().PublicKey()
_, err = hdb.Host(ctx, hk)
if !errors.Is(err, ErrHostNotFound) {
if !errors.Is(err, api.ErrHostNotFound) {
t.Fatal(err)
}

Expand Down
3 changes: 1 addition & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ WHERE c.fcid = ?
roots = append(roots, *(*types.Hash256)(&r))
}
}

return
}

Expand Down Expand Up @@ -787,7 +786,7 @@ func (s *SQLStore) ContractSize(ctx context.Context, id types.FileContractID) (a

if err := s.db.
Raw(`
SELECT c.size, CASE WHEN c.size>(COUNT(cs.db_sector_id) * ?) THEN c.size-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable
SELECT MAX(c.size) as size, CASE WHEN MAX(c.size)>(COUNT(cs.db_sector_id) * ?) THEN MAX(c.size)-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable
FROM contracts c
LEFT JOIN contract_sectors cs ON cs.db_contract_id = c.id
WHERE c.fcid = ?
Expand Down
11 changes: 11 additions & 0 deletions worker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ func (c *Client) RHPBroadcast(ctx context.Context, fcid types.FileContractID) (e
return
}

// RHPPruneContract prunes deleted sectors from the contract with given id.
func (c *Client) RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) {
var res api.RHPPruneContractResponse
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", fcid), api.RHPPruneContractRequest{
Timeout: timeout,
}, &res)
pruned = res.Pruned
remaining = res.Remaining
return
}

// RHPContractRoots fetches the roots of the contract with given id.
func (c *Client) RHPContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) {
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", fcid), &roots)
Expand Down
Loading

0 comments on commit 4b6cfb8

Please sign in to comment.