From fb9e52b746e511b7da2c3e9a172c5b7e7a64d71e Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 22 Apr 2024 13:13:39 +0200 Subject: [PATCH 1/2] stores: block on slab pruning --- stores/hostdb_test.go | 1 + stores/metadata.go | 10 ++- stores/metadata_test.go | 159 ++++++++++++++++++++++----------------- stores/multipart_test.go | 3 + stores/sql.go | 2 + stores/sql_test.go | 2 +- 6 files changed, 104 insertions(+), 73 deletions(-) diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 6adf19968..ca3c07130 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -34,6 +34,7 @@ func (s *SQLStore) insertTestAnnouncement(hk types.PublicKey, a hostdb.Announcem // SQLite DB. func TestSQLHostDB(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() if ss.ccid != modules.ConsensusChangeBeginning { t.Fatal("wrong ccid", ss.ccid, modules.ConsensusChangeBeginning) } diff --git a/stores/metadata.go b/stores/metadata.go index 87a63c7bc..744261150 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1813,11 +1813,11 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, }) } -func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { +func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error { var rowsAffected int64 var err error err = s.retryTransaction(ctx, func(tx *gorm.DB) error { - rowsAffected, err = s.deleteObject(tx, bucket, key) + rowsAffected, err = s.deleteObject(tx, bucket, path) if err != nil { return fmt.Errorf("RemoveObject: failed to delete object: %w", err) } @@ -1827,7 +1827,7 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { return err } if rowsAffected == 0 { - return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, key) + return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path) } return nil } @@ -2750,6 +2750,10 @@ func (s *SQLStore) pruneSlabsLoop() { s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID) } cancel() + + s.mu.Lock() + s.lastPrunedAt = time.Now() + s.mu.Unlock() } } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index abda57b95..9ecd56422 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -18,12 +18,52 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/object" "gorm.io/gorm" "gorm.io/gorm/schema" "lukechampine.com/frand" ) +func (s *SQLStore) RemoveObjectBlocking(ctx context.Context, bucket, path string) error { + ts := time.Now() + if err := s.RemoveObject(ctx, bucket, path); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) RemoveObjectsBlocking(ctx context.Context, bucket, prefix string) error { + ts := time.Now() + if err := s.RemoveObjects(ctx, bucket, prefix); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) UpdateObjectBlocking(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error { + var ts time.Time + _, err := s.Object(ctx, bucket, path) + if err == nil { + ts = time.Now() + } + if err := s.UpdateObject(ctx, bucket, path, contractSet, eTag, mimeType, metadata, o); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) waitForPruneLoop(ts time.Time) error { + return test.Retry(100, 100*time.Millisecond, func() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.lastPrunedAt.Before(ts) { + return errors.New("slabs have not been pruned yet") + } + return nil + }) +} + func randomMultisigUC() types.UnlockConditions { uc := types.UnlockConditions{ PublicKeys: make([]types.UnlockKey, 2), @@ -194,7 +234,7 @@ func TestObjectMetadata(t *testing.T) { } // remove the object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil { t.Fatal(err) } @@ -1179,12 +1219,14 @@ func TestSQLMetadataStore(t *testing.T) { } // Remove the first slab of the object. + ts := time.Now() obj1.Slabs = obj1.Slabs[1:] fullObj, err = ss.addTestObject(objID, obj1) if err != nil { t.Fatal(err) - } - if !reflect.DeepEqual(*fullObj.Object, obj1) { + } else if err := ss.waitForPruneLoop(ts); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(*fullObj.Object, obj1) { t.Fatal("object mismatch") } @@ -1219,18 +1261,18 @@ func TestSQLMetadataStore(t *testing.T) { } return nil } - ss.Retry(100, 100*time.Millisecond, func() error { - return countCheck(1, 1, 1, 1) - }) + if err := countCheck(1, 1, 1, 1); err != nil { + t.Fatal(err) + } // Delete the object. Due to the cascade this should delete everything // but the sectors. - if err := ss.RemoveObject(ctx, api.DefaultBucketName, objID); err != nil { + if err := ss.RemoveObjectBlocking(ctx, api.DefaultBucketName, objID); err != nil { + t.Fatal(err) + } + if err := countCheck(0, 0, 0, 0); err != nil { t.Fatal(err) } - ss.Retry(100, 100*time.Millisecond, func() error { - return countCheck(0, 0, 0, 0) - }) } // TestObjectHealth verifies the object's health is returned correctly by all @@ -2030,7 +2072,7 @@ func TestContractSectors(t *testing.T) { } // Delete the object. - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil { t.Fatal(err) } @@ -2942,37 +2984,30 @@ func TestContractSizes(t *testing.T) { } // remove the first object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_1"); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_1"); err != nil { t.Fatal(err) } // assert there's one sector that can be pruned and assert it's from fcid 1 - ss.Retry(100, 100*time.Millisecond, func() error { - if n := prunableData(nil); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } - if n := prunableData(&fcids[1]); n != 0 { - return fmt.Errorf("expected no prunable data %v", n) - } - return nil - }) + if n := prunableData(nil); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[1]); n != 0 { + t.Fatalf("expected no prunable data %v", n) + } // remove the second object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_2"); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_2"); err != nil { t.Fatal(err) } // assert there's now two sectors that can be pruned - ss.Retry(100, 100*time.Millisecond, func() error { - if n := prunableData(nil); n != rhpv2.SectorSize*2 { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } - return nil - }) + if n := prunableData(nil); n != rhpv2.SectorSize*2 { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } if size, err := ss.ContractSize(context.Background(), fcids[0]); err != nil { t.Fatal("unexpected err", err) @@ -3244,9 +3279,9 @@ func TestBucketObjects(t *testing.T) { } // Delete foo/baz in bucket 1 but first try bucket 2 since that should fail. - if err := ss.RemoveObject(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) { + if err := ss.RemoveObjectBlocking(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) { t.Fatal(err) - } else if err := ss.RemoveObject(context.Background(), b1, "/foo/baz"); err != nil { + } else if err := ss.RemoveObjectBlocking(context.Background(), b1, "/foo/baz"); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(context.Background(), b1, "/foo/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3263,7 +3298,7 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } else if len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) - } else if err := ss.RemoveObjects(context.Background(), b2, "/"); err != nil { + } else if err := ss.RemoveObjectsBlocking(context.Background(), b2, "/"); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(context.Background(), b2, "/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3349,6 +3384,7 @@ func TestCopyObject(t *testing.T) { func TestMarkSlabUploadedAfterRenew(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() // create host. hks, err := ss.addTestHosts(1) @@ -3662,6 +3698,7 @@ func newTestShard(hk types.PublicKey, fcid types.FileContractID, root types.Hash func TestUpdateSlabSanityChecks(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() // create hosts and contracts. hks, err := ss.addTestHosts(5) @@ -3726,8 +3763,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) { func TestSlabHealthInvalidation(t *testing.T) { // create db - cfg := defaultTestSQLStoreConfig - ss := newTestSQLStore(t, cfg) + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() // define a helper to assert the health validity of a given slab @@ -4093,37 +4129,29 @@ func TestSlabCleanup(t *testing.T) { } // delete the object - err := ss.RemoveObject(context.Background(), api.DefaultBucketName, obj1.ObjectID) + err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj1.ObjectID) if err != nil { t.Fatal(err) } // check slice count var slabCntr int64 - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 1 { - return fmt.Errorf("expected 1 slabs, got %v", slabCntr) - } - return nil - }) + if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 1 { + t.Fatalf("expected 1 slabs, got %v", slabCntr) + } // delete second object - err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj2.ObjectID) + err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj2.ObjectID) if err != nil { t.Fatal(err) + } else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 0 { + t.Fatalf("expected 0 slabs, got %v", slabCntr) } - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 0 { - return fmt.Errorf("expected 0 slabs, got %v", slabCntr) - } - return nil - }) - // create another object that references a slab with buffer ek, _ = object.GenerateEncryptionKey().MarshalBinary() bufferedSlab := dbSlab{ @@ -4158,19 +4186,14 @@ func TestSlabCleanup(t *testing.T) { } // delete third object - err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj3.ObjectID) + err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj3.ObjectID) if err != nil { t.Fatal(err) + } else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 1 { + t.Fatalf("expected 1 slabs, got %v", slabCntr) } - - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 1 { - return fmt.Errorf("expected 1 slabs, got %v", slabCntr) - } - return nil - }) } func TestUpsertSectors(t *testing.T) { @@ -4573,8 +4596,6 @@ func TestTypeCurrency(t *testing.T) { // same transaction, deadlocks become more likely due to the gap locks MySQL // uses. func TestUpdateObjectParallel(t *testing.T) { - cfg := defaultTestSQLStoreConfig - dbURI, _, _, _ := DBConfigFromEnv() if dbURI == "" { // it's pretty much impossile to optimise for both sqlite and mysql at @@ -4584,7 +4605,7 @@ func TestUpdateObjectParallel(t *testing.T) { // can revisit this t.SkipNow() } - ss := newTestSQLStore(t, cfg) + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) ss.retryTransactionIntervals = []time.Duration{0} // don't retry defer ss.Close() diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 50272fcda..4bd202a51 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -85,6 +85,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { } // Complete the upload. Check that the number of slices stays the same. + ts := time.Now() var nSlicesBefore int64 var nSlicesAfter int64 if err := ss.db.Model(&dbSlice{}).Count(&nSlicesBefore).Error; err != nil { @@ -97,6 +98,8 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatal(err) } else if nSlicesBefore != nSlicesAfter { t.Fatalf("expected number of slices to stay the same, but got %v before and %v after", nSlicesBefore, nSlicesAfter) + } else if err := ss.waitForPruneLoop(ts); err != nil { + t.Fatal(err) } // Fetch the object. diff --git a/stores/sql.go b/stores/sql.go index ce4a4f3ec..c46dc7fe3 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -106,6 +106,7 @@ type ( mu sync.Mutex allowListCnt uint64 blockListCnt uint64 + lastPrunedAt time.Time closed bool knownContracts map[types.FileContractID]struct{} @@ -275,6 +276,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) { ID: types.BlockID(ci.BlockID), }, + lastPrunedAt: time.Now(), retryTransactionIntervals: cfg.RetryTransactionIntervals, shutdownCtx: shutdownCtx, diff --git a/stores/sql_test.go b/stores/sql_test.go index 01ce8fe6b..18b7f5609 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -215,7 +215,7 @@ func newTestLogger() logger.Interface { } func (s *testSQLStore) addTestObject(path string, o object.Object) (api.Object, error) { - if err := s.UpdateObject(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil { + if err := s.UpdateObjectBlocking(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil { return api.Object{}, err } else if obj, err := s.Object(context.Background(), api.DefaultBucketName, path); err != nil { return api.Object{}, err From d4b2801ef001dc9ef4d2b45fa51709ad71400613 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 22 Apr 2024 13:17:15 +0200 Subject: [PATCH 2/2] stores: cleanup PR --- stores/metadata.go | 8 ++++---- stores/metadata_test.go | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 744261150..4d8b6e097 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -2748,12 +2748,12 @@ func (s *SQLStore) pruneSlabsLoop() { }) } else { s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID) + + s.mu.Lock() + s.lastPrunedAt = time.Now() + s.mu.Unlock() } cancel() - - s.mu.Lock() - s.lastPrunedAt = time.Now() - s.mu.Unlock() } } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 9ecd56422..aa12e3fb8 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1219,13 +1219,10 @@ func TestSQLMetadataStore(t *testing.T) { } // Remove the first slab of the object. - ts := time.Now() obj1.Slabs = obj1.Slabs[1:] fullObj, err = ss.addTestObject(objID, obj1) if err != nil { t.Fatal(err) - } else if err := ss.waitForPruneLoop(ts); err != nil { - t.Fatal(err) } else if !reflect.DeepEqual(*fullObj.Object, obj1) { t.Fatal("object mismatch") }