Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of objects stats #970

Merged
merged 10 commits into from
Feb 20, 2024
49 changes: 26 additions & 23 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,30 +697,33 @@ func TestUploadDownloadExtended(t *testing.T) {
}

// check objects stats.
for _, opts := range []api.ObjectsStatsOpts{
{}, // any bucket
{Bucket: api.DefaultBucketName}, // specific bucket
} {
info, err := cluster.Bus.ObjectsStats(context.Background(), opts)
tt.OK(err)
objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large))
if info.TotalObjectsSize != objectsSize {
t.Error("wrong size", info.TotalObjectsSize, objectsSize)
}
sectorsSize := 15 * rhpv2.SectorSize
if info.TotalSectorsSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalSectorsSize, sectorsSize)
}
if info.TotalUploadedSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalUploadedSize, sectorsSize)
}
if info.NumObjects != 4 {
t.Error("wrong number of objects", info.NumObjects, 4)
}
if info.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", info.MinHealth)
tt.Retry(100, 100*time.Millisecond, func() error {
for _, opts := range []api.ObjectsStatsOpts{
{}, // any bucket
{Bucket: api.DefaultBucketName}, // specific bucket
} {
info, err := cluster.Bus.ObjectsStats(context.Background(), opts)
tt.OK(err)
objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large))
if info.TotalObjectsSize != objectsSize {
return fmt.Errorf("wrong size %v %v", info.TotalObjectsSize, objectsSize)
}
sectorsSize := 15 * rhpv2.SectorSize
if info.TotalSectorsSize != uint64(sectorsSize) {
return fmt.Errorf("wrong size %v %v", info.TotalSectorsSize, sectorsSize)
}
if info.TotalUploadedSize != uint64(sectorsSize) {
return fmt.Errorf("wrong size %v %v", info.TotalUploadedSize, sectorsSize)
}
if info.NumObjects != 4 {
return fmt.Errorf("wrong number of objects %v %v", info.NumObjects, 4)
}
if info.MinHealth != 1 {
return fmt.Errorf("expected minHealth of 1, got %v", info.MinHealth)
}
}
}
return nil
})

// download the data
for _, data := range [][]byte{small, large} {
Expand Down
100 changes: 46 additions & 54 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,13 @@ func (s *SQLStore) ListBuckets(ctx context.Context) ([]api.Bucket, error) {
// reduce locking and make sure all results are consistent, everything is done
// within a single transaction.
func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) {
// if no bucket is specified, we consider all objects
whereBucket := func(table string) clause.Expr {
if opts.Bucket == "" {
return exprTRUE
// fetch bucket id if a bucket was specified
var bucketID uint
if opts.Bucket != "" {
err := s.db.Model(&dbBucket{}).Select("id").Where("name = ?", opts.Bucket).Take(&bucketID).Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}
return sqlWhereBucket(table, opts.Bucket)
}

// number of objects
Expand All @@ -597,78 +598,69 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts)
MinHealth float64
TotalObjectsSize uint64
}
err := s.db.
objInfoQuery := s.db.
Model(&dbObject{}).
Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize").
Where(whereBucket(dbObject{}.TableName())).
Scan(&objInfo).
Error
Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize")
if opts.Bucket != "" {
objInfoQuery = objInfoQuery.Where("db_bucket_id", bucketID)
}
err := objInfoQuery.Scan(&objInfo).Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}

// number of unfinished objects
var unfinishedObjects uint64
err = s.db.
unfinishedObjectsQuery := s.db.
Model(&dbMultipartUpload{}).
Select("COUNT(*)").
Where(whereBucket(dbMultipartUpload{}.TableName())).
Scan(&unfinishedObjects).
Error
Select("COUNT(*)")
if opts.Bucket != "" {
unfinishedObjectsQuery = unfinishedObjectsQuery.Where("db_bucket_id", bucketID)
}
err = unfinishedObjectsQuery.Scan(&unfinishedObjects).Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}

// size of unfinished objects
var totalUnfinishedObjectsSize uint64
err = s.db.
totalUnfinishedObjectsSizeQuery := s.db.
Model(&dbMultipartPart{}).
Joins("INNER JOIN multipart_uploads mu ON multipart_parts.db_multipart_upload_id = mu.id").
Select("COALESCE(SUM(size), 0)").
Where(whereBucket("mu")).
Scan(&totalUnfinishedObjectsSize).
Error
Select("COALESCE(SUM(size), 0)")
if opts.Bucket != "" {
totalUnfinishedObjectsSizeQuery = totalUnfinishedObjectsSizeQuery.Where("db_bucket_id", bucketID)
}
err = totalUnfinishedObjectsSizeQuery.Scan(&totalUnfinishedObjectsSize).Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}

fromContractSectors := gorm.Expr("contract_sectors cs")
var totalSectors int64
totalSectorsQuery := s.db.
Table("slabs sla").
Select("COALESCE(SUM(total_shards), 0)").
Where("db_buffered_slab_id IS NULL")

if opts.Bucket != "" {
fromContractSectors = gorm.Expr(`
contract_sectors cs
INNER JOIN sectors s ON s.id = cs.db_sector_id
INNER JOIN slabs sla ON sla.id = s.db_slab_id
INNER JOIN slices sli ON sli.db_slab_id = sla.id
INNER JOIN objects o ON o.id = sli.db_object_id AND (?)
`, whereBucket("o"))
}

var totalSectors uint64

batchSize := 500000
marker := uint64(0)
for offset := 0; ; offset += batchSize {
var result struct {
Sectors uint64
Marker uint64
}
res := s.db.
Model(&dbSector{}).
Raw("SELECT COUNT(*) as Sectors, MAX(sectors.db_sector_id) as Marker FROM (SELECT cs.db_sector_id FROM ? WHERE cs.db_sector_id > ? GROUP BY cs.db_sector_id LIMIT ?) sectors", fromContractSectors, marker, batchSize).
Scan(&result)
if err := res.Error; err != nil {
return api.ObjectsStatsResponse{}, err
} else if result.Sectors == 0 {
break // done
}
totalSectors += result.Sectors
marker = result.Marker
totalSectorsQuery = totalSectorsQuery.Where(`
EXISTS (
SELECT 1 FROM slices sli
INNER JOIN objects o ON o.id = sli.db_object_id AND o.db_bucket_id = ?
WHERE sli.db_slab_id = sla.id
)
`, bucketID)
}
err = totalSectorsQuery.Scan(&totalSectors).Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}

var totalUploaded int64
err = s.db.
Table("?", fromContractSectors).
Count(&totalUploaded).
Model(&dbContract{}).
Select("COALESCE(SUM(size), 0)").
Scan(&totalUploaded).
Error
if err != nil {
return api.ObjectsStatsResponse{}, err
Expand All @@ -680,8 +672,8 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts)
NumUnfinishedObjects: unfinishedObjects,
TotalUnfinishedObjectsSize: totalUnfinishedObjectsSize,
TotalObjectsSize: objInfo.TotalObjectsSize,
TotalSectorsSize: totalSectors * rhpv2.SectorSize,
TotalUploadedSize: uint64(totalUploaded) * rhpv2.SectorSize,
TotalSectorsSize: uint64(totalSectors) * rhpv2.SectorSize,
TotalUploadedSize: uint64(totalUploaded),
}, nil
}

Expand Down
17 changes: 10 additions & 7 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,7 @@ func TestObjectsStats(t *testing.T) {
// Create a few objects of different size.
var objectsSize uint64
var sectorsSize uint64
var totalUploadedSize uint64
for i := 0; i < 2; i++ {
obj := newTestObject(1)
objectsSize += uint64(obj.TotalSize())
Expand All @@ -2458,10 +2459,11 @@ func TestObjectsStats(t *testing.T) {
t.Fatal(err)
}
for _, fcid := range fcids {
_, err := ss.addTestContract(fcid, hpk)
c, err := ss.addTestContract(fcid, hpk)
if err != nil {
t.Fatal(err)
}
totalUploadedSize += c.Size
}
}
}
Expand All @@ -2482,10 +2484,11 @@ func TestObjectsStats(t *testing.T) {
}
var newContractID types.FileContractID
frand.Read(newContractID[:])
_, err = ss.addTestContract(newContractID, types.PublicKey{})
c, err := ss.addTestContract(newContractID, types.PublicKey{})
if err != nil {
t.Fatal(err)
}
totalUploadedSize += c.Size
newContract, err := ss.contract(context.Background(), fileContractID(newContractID))
if err != nil {
t.Fatal(err)
Expand All @@ -2510,8 +2513,8 @@ func TestObjectsStats(t *testing.T) {
t.Fatal("wrong size", info.TotalObjectsSize, objectsSize)
} else if info.TotalSectorsSize != sectorsSize {
t.Fatal("wrong size", info.TotalSectorsSize, sectorsSize)
} else if info.TotalUploadedSize != sectorsSize*2 {
t.Fatal("wrong size", info.TotalUploadedSize, sectorsSize*2)
} else if info.TotalUploadedSize != totalUploadedSize {
t.Fatal("wrong size", info.TotalUploadedSize, totalUploadedSize)
} else if info.NumObjects != 2 {
t.Fatal("wrong number of objects", info.NumObjects, 2)
}
Expand All @@ -2525,9 +2528,9 @@ func TestObjectsStats(t *testing.T) {
} else if info.TotalObjectsSize != 0 {
t.Fatal("wrong size", info.TotalObjectsSize)
} else if info.TotalSectorsSize != 0 {
t.Fatal("wrong size", info.TotalSectorsSize)
} else if info.TotalUploadedSize != 0 {
t.Fatal("wrong size", info.TotalUploadedSize)
t.Fatal("wrong size", info.TotalSectorsSize, 0)
} else if info.TotalUploadedSize != totalUploadedSize {
t.Fatal("wrong size", info.TotalUploadedSize, totalUploadedSize)
} else if info.NumObjects != 0 {
t.Fatal("wrong number of objects", info.NumObjects)
}
Expand Down
Loading