Skip to content

Commit

Permalink
Batch pruning of slabs after deleting objects from the database (#1150)
Browse files Browse the repository at this point in the history
This speeds up deletions and avoids redundant deletions in scenarios
where files get deleted rapidly. e.g. when running `rclone sync` which
might replace lots of files one-by-one.

It also puts the pruning of slabs into a background goroutine. There is
no reason for an object deletion to fail when pruning slabs fails. This
might lead to brief inconsistencies on the object stats endpoint but
will greatly improve the speed at which we can delete objects and also
reduce the database locking necessary when pruning slabs.

Closes #1149
  • Loading branch information
ChrisSchinnerl authored Apr 15, 2024
2 parents e3c1233 + 56adf02 commit cdad414
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 31 deletions.
48 changes: 44 additions & 4 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ import (

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/siad/modules"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"lukechampine.com/frand"
)

var (
pruneSlabsAlertID = frand.Entropy256()
)

const (
Expand Down Expand Up @@ -2713,8 +2719,36 @@ func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.F
return nil
}

func (s *SQLStore) pruneSlabsLoop() {
for {
select {
case <-s.slabPruneSigChan:
case <-s.shutdownCtx.Done():
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second+sumDurations(s.retryTransactionIntervals))
err := s.retryTransaction(ctx, pruneSlabs)
if err != nil {
s.logger.Errorw("failed to prune slabs", zap.Error(err))
s.alerts.RegisterAlert(s.shutdownCtx, alerts.Alert{
ID: pruneSlabsAlertID,
Severity: alerts.SeverityWarning,
Message: "Failed to prune slabs from database",
Timestamp: time.Now(),
Data: map[string]interface{}{
"error": err.Error(),
"hint": "This might happen when your database is under a lot of load due to deleting objects rapidly. This alert will disappear the next time slabs are pruned successfully.",
},
})
} else {
s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID)
}
cancel()
}
}

func pruneSlabs(tx *gorm.DB) error {
// delete slabs without any associated slices or buffers
return tx.Exec(`
DELETE
FROM slabs
Expand All @@ -2723,6 +2757,13 @@ AND slabs.db_buffered_slab_id IS NULL
`).Error
}

func (s *SQLStore) triggerSlabPruning() {
select {
case s.slabPruneSigChan <- struct{}{}:
default:
}
}

// deleteObject deletes an object from the store and prunes all slabs which are
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
Expand All @@ -2749,9 +2790,8 @@ func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64,
numDeleted := tx.RowsAffected
if numDeleted == 0 {
return 0, nil // nothing to prune if no object was deleted
} else if err := pruneSlabs(tx); err != nil {
return numDeleted, err
}
s.triggerSlabPruning()
return numDeleted, nil
}

Expand Down Expand Up @@ -2784,7 +2824,7 @@ func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string
// prune slabs if we deleted an object
rowsAffected = res.RowsAffected
if rowsAffected > 0 {
return pruneSlabs(tx)
s.triggerSlabPruning()
}
duration = time.Since(start)
return nil
Expand Down
50 changes: 30 additions & 20 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,9 +1058,9 @@ func TestSQLMetadataStore(t *testing.T) {
// incremented due to the object and slab being overwritten.
two := uint(2)
expectedObj.Slabs[0].DBObjectID = &two
expectedObj.Slabs[0].DBSlabID = 3
expectedObj.Slabs[0].DBSlabID = 1
expectedObj.Slabs[1].DBObjectID = &two
expectedObj.Slabs[1].DBSlabID = 4
expectedObj.Slabs[1].DBSlabID = 2
if !reflect.DeepEqual(obj, expectedObj) {
t.Fatal("object mismatch", cmp.Diff(obj, expectedObj))
}
Expand All @@ -1082,7 +1082,7 @@ func TestSQLMetadataStore(t *testing.T) {
TotalShards: 1,
Shards: []dbSector{
{
DBSlabID: 3,
DBSlabID: 1,
SlabIndex: 1,
Root: obj1.Slabs[0].Shards[0].Root[:],
LatestHost: publicKey(obj1.Slabs[0].Shards[0].LatestHost),
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func TestSQLMetadataStore(t *testing.T) {
TotalShards: 1,
Shards: []dbSector{
{
DBSlabID: 4,
DBSlabID: 2,
SlabIndex: 1,
Root: obj1.Slabs[1].Shards[0].Root[:],
LatestHost: publicKey(obj1.Slabs[1].Shards[0].LatestHost),
Expand Down Expand Up @@ -4028,7 +4028,7 @@ func TestRefreshHealth(t *testing.T) {
}
}

func TestSlabCleanupTrigger(t *testing.T) {
func TestSlabCleanup(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

Expand Down Expand Up @@ -4100,23 +4100,29 @@ func TestSlabCleanupTrigger(t *testing.T) {

// check slice count
var slabCntr int64
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 1 {
t.Fatalf("expected 1 slabs, got %v", slabCntr)
}
ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 1 {
return fmt.Errorf("expected 1 slabs, got %v", slabCntr)
}
return nil
})

// delete second object
err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj2.ObjectID)
if err != nil {
t.Fatal(err)
}

if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 0 {
t.Fatalf("expected 0 slabs, got %v", slabCntr)
}
ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 0 {
return fmt.Errorf("expected 0 slabs, got %v", slabCntr)
}
return nil
})

// create another object that references a slab with buffer
ek, _ = object.GenerateEncryptionKey().MarshalBinary()
Expand Down Expand Up @@ -4156,11 +4162,15 @@ func TestSlabCleanupTrigger(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 1 {
t.Fatalf("expected 1 slabs, got %v", slabCntr)
}

ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 1 {
return fmt.Errorf("expected 1 slabs, got %v", slabCntr)
}
return nil
})
}

func TestUpsertSectors(t *testing.T) {
Expand Down
10 changes: 3 additions & 7 deletions stores/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,8 @@ func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string
}
return errors.New("failed to delete multipart upload for unknown reason")
}
// Prune the slabs.
if err := pruneSlabs(tx); err != nil {
return fmt.Errorf("failed to prune slabs: %w", err)
}
// Prune the dangling slabs.
s.triggerSlabPruning()
return nil
})
}
Expand Down Expand Up @@ -459,9 +457,7 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str
}

// Prune the slabs.
if err := pruneSlabs(tx); err != nil {
return fmt.Errorf("failed to prune slabs: %w", err)
}
s.triggerSlabPruning()
return nil
})
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
if err != nil {
return nil, modules.ConsensusChangeID{}, err
}
if err := ss.initSlabPruning(); err != nil {
return nil, modules.ConsensusChangeID{}, err
}
return ss, ccid, nil
}

Expand All @@ -305,6 +308,18 @@ func (ss *SQLStore) hasAllowlist() bool {
return ss.allowListCnt > 0
}

func (s *SQLStore) initSlabPruning() error {
// start pruning loop
s.wg.Add(1)
go func() {
s.pruneSlabsLoop()
s.wg.Done()
}()

// prune once to guarantee consistency on startup
return s.retryTransaction(s.shutdownCtx, pruneSlabs)
}

func (ss *SQLStore) updateHasAllowlist(err *error) {
if *err != nil {
return
Expand Down Expand Up @@ -605,3 +620,11 @@ func (s *SQLStore) ResetConsensusSubscription(ctx context.Context) error {
s.persistMu.Unlock()
return nil
}

func sumDurations(durations []time.Duration) time.Duration {
var sum time.Duration
for _, d := range durations {
sum += d
}
return sum
}

0 comments on commit cdad414

Please sign in to comment.