Skip to content

Commit

Permalink
Fix NDFs in stores unit tests. (#1185)
Browse files Browse the repository at this point in the history
Making slab pruning asynchronous introduced some NDFs where we run into
various `database is locked` related errors, this PR wraps three methods
that trigger these NDFs and blocks on the bg loop pruning the necessary
slabs. While testing I found we're not properly closing connections in
some tests.
  • Loading branch information
ChrisSchinnerl authored Apr 22, 2024
2 parents d87b0ac + d4b2801 commit 5847772
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 73 deletions.
1 change: 1 addition & 0 deletions stores/hostdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (s *SQLStore) insertTestAnnouncement(hk types.PublicKey, a hostdb.Announcem
// SQLite DB.
func TestSQLHostDB(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()
if ss.ccid != modules.ConsensusChangeBeginning {
t.Fatal("wrong ccid", ss.ccid, modules.ConsensusChangeBeginning)
}
Expand Down
10 changes: 7 additions & 3 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1813,11 +1813,11 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
})
}

func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error {
func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error {
var rowsAffected int64
var err error
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, key)
rowsAffected, err = s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
Expand All @@ -1827,7 +1827,7 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, key)
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path)
}
return nil
}
Expand Down Expand Up @@ -2748,6 +2748,10 @@ func (s *SQLStore) pruneSlabsLoop() {
})
} else {
s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID)

s.mu.Lock()
s.lastPrunedAt = time.Now()
s.mu.Unlock()
}
cancel()
}
Expand Down
156 changes: 87 additions & 69 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,52 @@ import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"lukechampine.com/frand"
)

func (s *SQLStore) RemoveObjectBlocking(ctx context.Context, bucket, path string) error {
ts := time.Now()
if err := s.RemoveObject(ctx, bucket, path); err != nil {
return err
}
return s.waitForPruneLoop(ts)
}

func (s *SQLStore) RemoveObjectsBlocking(ctx context.Context, bucket, prefix string) error {
ts := time.Now()
if err := s.RemoveObjects(ctx, bucket, prefix); err != nil {
return err
}
return s.waitForPruneLoop(ts)
}

func (s *SQLStore) UpdateObjectBlocking(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error {
var ts time.Time
_, err := s.Object(ctx, bucket, path)
if err == nil {
ts = time.Now()
}
if err := s.UpdateObject(ctx, bucket, path, contractSet, eTag, mimeType, metadata, o); err != nil {
return err
}
return s.waitForPruneLoop(ts)
}

func (s *SQLStore) waitForPruneLoop(ts time.Time) error {
return test.Retry(100, 100*time.Millisecond, func() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.lastPrunedAt.Before(ts) {
return errors.New("slabs have not been pruned yet")
}
return nil
})
}

func randomMultisigUC() types.UnlockConditions {
uc := types.UnlockConditions{
PublicKeys: make([]types.UnlockKey, 2),
Expand Down Expand Up @@ -194,7 +234,7 @@ func TestObjectMetadata(t *testing.T) {
}

// remove the object
if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil {
if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1183,8 +1223,7 @@ func TestSQLMetadataStore(t *testing.T) {
fullObj, err = ss.addTestObject(objID, obj1)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(*fullObj.Object, obj1) {
} else if !reflect.DeepEqual(*fullObj.Object, obj1) {
t.Fatal("object mismatch")
}

Expand Down Expand Up @@ -1219,18 +1258,18 @@ func TestSQLMetadataStore(t *testing.T) {
}
return nil
}
ss.Retry(100, 100*time.Millisecond, func() error {
return countCheck(1, 1, 1, 1)
})
if err := countCheck(1, 1, 1, 1); err != nil {
t.Fatal(err)
}

// Delete the object. Due to the cascade this should delete everything
// but the sectors.
if err := ss.RemoveObject(ctx, api.DefaultBucketName, objID); err != nil {
if err := ss.RemoveObjectBlocking(ctx, api.DefaultBucketName, objID); err != nil {
t.Fatal(err)
}
if err := countCheck(0, 0, 0, 0); err != nil {
t.Fatal(err)
}
ss.Retry(100, 100*time.Millisecond, func() error {
return countCheck(0, 0, 0, 0)
})
}

// TestObjectHealth verifies the object's health is returned correctly by all
Expand Down Expand Up @@ -2030,7 +2069,7 @@ func TestContractSectors(t *testing.T) {
}

// Delete the object.
if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil {
if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -2942,37 +2981,30 @@ func TestContractSizes(t *testing.T) {
}

// remove the first object
if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_1"); err != nil {
if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_1"); err != nil {
t.Fatal(err)
}

// assert there's one sector that can be pruned and assert it's from fcid 1
ss.Retry(100, 100*time.Millisecond, func() error {
if n := prunableData(nil); n != rhpv2.SectorSize {
return fmt.Errorf("unexpected amount of prunable data %v", n)
}
if n := prunableData(&fcids[1]); n != 0 {
return fmt.Errorf("expected no prunable data %v", n)
}
return nil
})
if n := prunableData(nil); n != rhpv2.SectorSize {
t.Fatalf("unexpected amount of prunable data %v", n)
} else if n := prunableData(&fcids[1]); n != 0 {
t.Fatalf("expected no prunable data %v", n)
}

// remove the second object
if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_2"); err != nil {
if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_2"); err != nil {
t.Fatal(err)
}

// assert there's now two sectors that can be pruned
ss.Retry(100, 100*time.Millisecond, func() error {
if n := prunableData(nil); n != rhpv2.SectorSize*2 {
return fmt.Errorf("unexpected amount of prunable data %v", n)
} else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize {
return fmt.Errorf("unexpected amount of prunable data %v", n)
} else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize {
return fmt.Errorf("unexpected amount of prunable data %v", n)
}
return nil
})
if n := prunableData(nil); n != rhpv2.SectorSize*2 {
t.Fatalf("unexpected amount of prunable data %v", n)
} else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize {
t.Fatalf("unexpected amount of prunable data %v", n)
} else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize {
t.Fatalf("unexpected amount of prunable data %v", n)
}

if size, err := ss.ContractSize(context.Background(), fcids[0]); err != nil {
t.Fatal("unexpected err", err)
Expand Down Expand Up @@ -3244,9 +3276,9 @@ func TestBucketObjects(t *testing.T) {
}

// Delete foo/baz in bucket 1 but first try bucket 2 since that should fail.
if err := ss.RemoveObject(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) {
if err := ss.RemoveObjectBlocking(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) {
t.Fatal(err)
} else if err := ss.RemoveObject(context.Background(), b1, "/foo/baz"); err != nil {
} else if err := ss.RemoveObjectBlocking(context.Background(), b1, "/foo/baz"); err != nil {
t.Fatal(err)
} else if entries, _, err := ss.ObjectEntries(context.Background(), b1, "/foo/", "", "", "", "", 0, -1); err != nil {
t.Fatal(err)
Expand All @@ -3263,7 +3295,7 @@ func TestBucketObjects(t *testing.T) {
t.Fatal(err)
} else if len(entries) != 2 {
t.Fatal("expected 2 entries", len(entries))
} else if err := ss.RemoveObjects(context.Background(), b2, "/"); err != nil {
} else if err := ss.RemoveObjectsBlocking(context.Background(), b2, "/"); err != nil {
t.Fatal(err)
} else if entries, _, err := ss.ObjectEntries(context.Background(), b2, "/", "", "", "", "", 0, -1); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3349,6 +3381,7 @@ func TestCopyObject(t *testing.T) {

func TestMarkSlabUploadedAfterRenew(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

// create host.
hks, err := ss.addTestHosts(1)
Expand Down Expand Up @@ -3662,6 +3695,7 @@ func newTestShard(hk types.PublicKey, fcid types.FileContractID, root types.Hash

func TestUpdateSlabSanityChecks(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

// create hosts and contracts.
hks, err := ss.addTestHosts(5)
Expand Down Expand Up @@ -3726,8 +3760,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) {

func TestSlabHealthInvalidation(t *testing.T) {
// create db
cfg := defaultTestSQLStoreConfig
ss := newTestSQLStore(t, cfg)
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

// define a helper to assert the health validity of a given slab
Expand Down Expand Up @@ -4093,37 +4126,29 @@ func TestSlabCleanup(t *testing.T) {
}

// delete the object
err := ss.RemoveObject(context.Background(), api.DefaultBucketName, obj1.ObjectID)
err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj1.ObjectID)
if err != nil {
t.Fatal(err)
}

// check slice count
var slabCntr int64
ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 1 {
return fmt.Errorf("expected 1 slabs, got %v", slabCntr)
}
return nil
})
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 1 {
t.Fatalf("expected 1 slabs, got %v", slabCntr)
}

// delete second object
err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj2.ObjectID)
err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj2.ObjectID)
if err != nil {
t.Fatal(err)
} else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 0 {
t.Fatalf("expected 0 slabs, got %v", slabCntr)
}

ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 0 {
return fmt.Errorf("expected 0 slabs, got %v", slabCntr)
}
return nil
})

// create another object that references a slab with buffer
ek, _ = object.GenerateEncryptionKey().MarshalBinary()
bufferedSlab := dbSlab{
Expand Down Expand Up @@ -4158,19 +4183,14 @@ func TestSlabCleanup(t *testing.T) {
}

// delete third object
err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj3.ObjectID)
err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj3.ObjectID)
if err != nil {
t.Fatal(err)
} else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
t.Fatal(err)
} else if slabCntr != 1 {
t.Fatalf("expected 1 slabs, got %v", slabCntr)
}

ss.Retry(100, 100*time.Millisecond, func() error {
if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil {
return err
} else if slabCntr != 1 {
return fmt.Errorf("expected 1 slabs, got %v", slabCntr)
}
return nil
})
}

func TestUpsertSectors(t *testing.T) {
Expand Down Expand Up @@ -4573,8 +4593,6 @@ func TestTypeCurrency(t *testing.T) {
// same transaction, deadlocks become more likely due to the gap locks MySQL
// uses.
func TestUpdateObjectParallel(t *testing.T) {
cfg := defaultTestSQLStoreConfig

dbURI, _, _, _ := DBConfigFromEnv()
if dbURI == "" {
// it's pretty much impossile to optimise for both sqlite and mysql at
Expand All @@ -4584,7 +4602,7 @@ func TestUpdateObjectParallel(t *testing.T) {
// can revisit this
t.SkipNow()
}
ss := newTestSQLStore(t, cfg)
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
ss.retryTransactionIntervals = []time.Duration{0} // don't retry
defer ss.Close()

Expand Down
3 changes: 3 additions & 0 deletions stores/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) {
}

// Complete the upload. Check that the number of slices stays the same.
ts := time.Now()
var nSlicesBefore int64
var nSlicesAfter int64
if err := ss.db.Model(&dbSlice{}).Count(&nSlicesBefore).Error; err != nil {
Expand All @@ -97,6 +98,8 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) {
t.Fatal(err)
} else if nSlicesBefore != nSlicesAfter {
t.Fatalf("expected number of slices to stay the same, but got %v before and %v after", nSlicesBefore, nSlicesAfter)
} else if err := ss.waitForPruneLoop(ts); err != nil {
t.Fatal(err)
}

// Fetch the object.
Expand Down
2 changes: 2 additions & 0 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type (
mu sync.Mutex
allowListCnt uint64
blockListCnt uint64
lastPrunedAt time.Time
closed bool

knownContracts map[types.FileContractID]struct{}
Expand Down Expand Up @@ -275,6 +276,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
ID: types.BlockID(ci.BlockID),
},

lastPrunedAt: time.Now(),
retryTransactionIntervals: cfg.RetryTransactionIntervals,

shutdownCtx: shutdownCtx,
Expand Down
2 changes: 1 addition & 1 deletion stores/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func newTestLogger() logger.Interface {
}

func (s *testSQLStore) addTestObject(path string, o object.Object) (api.Object, error) {
if err := s.UpdateObject(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil {
if err := s.UpdateObjectBlocking(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil {
return api.Object{}, err
} else if obj, err := s.Object(context.Background(), api.DefaultBucketName, path); err != nil {
return api.Object{}, err
Expand Down

0 comments on commit 5847772

Please sign in to comment.