Skip to content

Commit

Permalink
Merge pull request #961 from SiaFoundation/chris/objects-stats-bucket
Browse files Browse the repository at this point in the history
Allow for filtering object stats by bucket
  • Loading branch information
ChrisSchinnerl authored Feb 15, 2024
2 parents 72d514b + dff33cd commit 73fd775
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 49 deletions.
4 changes: 4 additions & 0 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ type (
Mode string `json:"mode"`
}

ObjectsStatsOpts struct {
Bucket string
}

// ObjectsStatsResponse is the response type for the /bus/stats/objects endpoint.
ObjectsStatsResponse struct {
NumObjects uint64 `json:"numObjects"` // number of objects
Expand Down
8 changes: 6 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type (
Object(ctx context.Context, bucketName, path string) (api.Object, error)
ObjectEntries(ctx context.Context, bucketName, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error)
ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error)
ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error)
ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error)
RemoveObject(ctx context.Context, bucketName, path string) error
RemoveObjects(ctx context.Context, bucketName, prefix string) error
RenameObject(ctx context.Context, bucketName, from, to string, force bool) error
Expand Down Expand Up @@ -1348,7 +1348,11 @@ func (b *bus) slabbuffersHandlerGET(jc jape.Context) {
}

func (b *bus) objectsStatshandlerGET(jc jape.Context) {
info, err := b.ms.ObjectsStats(jc.Request.Context())
opts := api.ObjectsStatsOpts{}
if jc.DecodeForm("bucket", &opts.Bucket) != nil {
return
}
info, err := b.ms.ObjectsStats(jc.Request.Context(), opts)
if jc.Check("couldn't get objects stats", err) != nil {
return
}
Expand Down
8 changes: 6 additions & 2 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ func (c *Client) ObjectsBySlabKey(ctx context.Context, bucket string, key object
}

// ObjectsStats returns information about the number of objects and their size.
func (c *Client) ObjectsStats() (osr api.ObjectsStatsResponse, err error) {
err = c.c.GET("/stats/objects", &osr)
func (c *Client) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (osr api.ObjectsStatsResponse, err error) {
values := url.Values{}
if opts.Bucket != "" {
values.Set("bucket", opts.Bucket)
}
err = c.c.WithContext(ctx).GET("/stats/objects?"+values.Encode(), &osr)
return
}

Expand Down
57 changes: 31 additions & 26 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,24 +697,29 @@ func TestUploadDownloadExtended(t *testing.T) {
}

// check objects stats.
info, err := cluster.Bus.ObjectsStats()
tt.OK(err)
objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large))
if info.TotalObjectsSize != objectsSize {
t.Error("wrong size", info.TotalObjectsSize, objectsSize)
}
sectorsSize := 15 * rhpv2.SectorSize
if info.TotalSectorsSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalSectorsSize, sectorsSize)
}
if info.TotalUploadedSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalUploadedSize, sectorsSize)
}
if info.NumObjects != 4 {
t.Error("wrong number of objects", info.NumObjects, 4)
}
if info.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", info.MinHealth)
for _, opts := range []api.ObjectsStatsOpts{
{}, // any bucket
{Bucket: api.DefaultBucketName}, // specific bucket
} {
info, err := cluster.Bus.ObjectsStats(context.Background(), opts)
tt.OK(err)
objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large))
if info.TotalObjectsSize != objectsSize {
t.Error("wrong size", info.TotalObjectsSize, objectsSize)
}
sectorsSize := 15 * rhpv2.SectorSize
if info.TotalSectorsSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalSectorsSize, sectorsSize)
}
if info.TotalUploadedSize != uint64(sectorsSize) {
t.Error("wrong size", info.TotalUploadedSize, sectorsSize)
}
if info.NumObjects != 4 {
t.Error("wrong number of objects", info.NumObjects, 4)
}
if info.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", info.MinHealth)
}
}

// download the data
Expand Down Expand Up @@ -1633,7 +1638,7 @@ func TestUploadPacking(t *testing.T) {
download("file4", data4, 0, int64(len(data4)))

// assert number of objects
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.NumObjects != 5 {
t.Fatalf("expected 5 objects, got %v", os.NumObjects)
Expand All @@ -1642,7 +1647,7 @@ func TestUploadPacking(t *testing.T) {
// check the object size stats, we use a retry loop since packed slabs are
// uploaded in a separate goroutine, so the object stats might lag a bit
tt.Retry(60, time.Second, func() error {
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1796,7 +1801,7 @@ func TestSlabBufferStats(t *testing.T) {
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data1), api.DefaultBucketName, "1", api.UploadObjectOptions{}))

// assert number of objects
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.NumObjects != 1 {
t.Fatalf("expected 1 object, got %d", os.NumObjects)
Expand All @@ -1805,7 +1810,7 @@ func TestSlabBufferStats(t *testing.T) {
// check the object size stats, we use a retry loop since packed slabs are
// uploaded in a separate goroutine, so the object stats might lag a bit
tt.Retry(60, time.Second, func() error {
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1853,7 +1858,7 @@ func TestSlabBufferStats(t *testing.T) {
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data2), api.DefaultBucketName, "2", api.UploadObjectOptions{}))

// assert number of objects
os, err = b.ObjectsStats()
os, err = b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.NumObjects != 2 {
t.Fatalf("expected 1 object, got %d", os.NumObjects)
Expand All @@ -1862,7 +1867,7 @@ func TestSlabBufferStats(t *testing.T) {
// check the object size stats, we use a retry loop since packed slabs are
// uploaded in a separate goroutine, so the object stats might lag a bit
tt.Retry(60, time.Second, func() error {
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.TotalObjectsSize != uint64(len(data1)+len(data2)) {
return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1)+len(data2), os.TotalObjectsSize)
Expand Down Expand Up @@ -2006,7 +2011,7 @@ func TestMultipartUploads(t *testing.T) {
}

// Check objects stats.
os, err := b.ObjectsStats()
os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.NumObjects != 0 {
t.Fatalf("expected 0 object, got %v", os.NumObjects)
Expand Down Expand Up @@ -2065,7 +2070,7 @@ func TestMultipartUploads(t *testing.T) {
}

// Check objects stats.
os, err = b.ObjectsStats()
os, err = b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
tt.OK(err)
if os.NumObjects != 1 {
t.Fatalf("expected 1 object, got %v", os.NumObjects)
Expand Down
29 changes: 26 additions & 3 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,15 @@ func (s *SQLStore) ListBuckets(ctx context.Context) ([]api.Bucket, error) {
// ObjectsStats returns some info related to the objects stored in the store. To
// reduce locking and make sure all results are consistent, everything is done
// within a single transaction.
func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error) {
func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) {
// if no bucket is specified, we consider all objects
whereBucket := func(table string) clause.Expr {
if opts.Bucket == "" {
return exprTRUE
}
return sqlWhereBucket(table, opts.Bucket)
}

// number of objects
var objInfo struct {
NumObjects uint64
Expand All @@ -592,6 +600,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
err := s.db.
Model(&dbObject{}).
Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize").
Where(whereBucket(dbObject{}.TableName())).
Scan(&objInfo).
Error
if err != nil {
Expand All @@ -603,6 +612,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
err = s.db.
Model(&dbMultipartUpload{}).
Select("COUNT(*)").
Where(whereBucket(dbMultipartUpload{}.TableName())).
Scan(&unfinishedObjects).
Error
if err != nil {
Expand All @@ -613,13 +623,26 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
var totalUnfinishedObjectsSize uint64
err = s.db.
Model(&dbMultipartPart{}).
Joins("INNER JOIN multipart_uploads mu ON multipart_parts.db_multipart_upload_id = mu.id").
Select("COALESCE(SUM(size), 0)").
Where(whereBucket("mu")).
Scan(&totalUnfinishedObjectsSize).
Error
if err != nil {
return api.ObjectsStatsResponse{}, err
}

fromContractSectors := gorm.Expr("contract_sectors cs")
if opts.Bucket != "" {
fromContractSectors = gorm.Expr(`
contract_sectors cs
INNER JOIN sectors s ON s.id = cs.db_sector_id
INNER JOIN slabs sla ON sla.id = s.db_slab_id
INNER JOIN slices sli ON sli.db_slab_id = sla.id
INNER JOIN objects o ON o.id = sli.db_object_id AND (?)
`, whereBucket("o"))
}

var totalSectors uint64

batchSize := 500000
Expand All @@ -631,7 +654,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
}
res := s.db.
Model(&dbSector{}).
Raw("SELECT COUNT(*) as Sectors, MAX(sectors.db_sector_id) as Marker FROM (SELECT cs.db_sector_id FROM contract_sectors cs WHERE cs.db_sector_id > ? GROUP BY cs.db_sector_id LIMIT ?) sectors", marker, batchSize).
Raw("SELECT COUNT(*) as Sectors, MAX(sectors.db_sector_id) as Marker FROM (SELECT cs.db_sector_id FROM ? WHERE cs.db_sector_id > ? GROUP BY cs.db_sector_id LIMIT ?) sectors", fromContractSectors, marker, batchSize).
Scan(&result)
if err := res.Error; err != nil {
return api.ObjectsStatsResponse{}, err
Expand All @@ -644,7 +667,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,

var totalUploaded int64
err = s.db.
Model(&dbContractSector{}).
Table("?", fromContractSectors).
Count(&totalUploaded).
Error
if err != nil {
Expand Down
48 changes: 32 additions & 16 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2435,7 +2435,7 @@ func TestObjectsStats(t *testing.T) {
defer ss.Close()

// Fetch stats on clean database.
info, err := ss.ObjectsStats(context.Background())
info, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2499,21 +2499,37 @@ func TestObjectsStats(t *testing.T) {
}

// Check sizes.
info, err = ss.ObjectsStats(context.Background())
if err != nil {
t.Fatal(err)
}
if info.TotalObjectsSize != objectsSize {
t.Fatal("wrong size", info.TotalObjectsSize, objectsSize)
}
if info.TotalSectorsSize != sectorsSize {
t.Fatal("wrong size", info.TotalSectorsSize, sectorsSize)
}
if info.TotalUploadedSize != sectorsSize*2 {
t.Fatal("wrong size", info.TotalUploadedSize, sectorsSize*2)
for _, opts := range []api.ObjectsStatsOpts{
{}, // any bucket
{Bucket: api.DefaultBucketName}, // specific bucket
} {
info, err = ss.ObjectsStats(context.Background(), opts)
if err != nil {
t.Fatal(err)
} else if info.TotalObjectsSize != objectsSize {
t.Fatal("wrong size", info.TotalObjectsSize, objectsSize)
} else if info.TotalSectorsSize != sectorsSize {
t.Fatal("wrong size", info.TotalSectorsSize, sectorsSize)
} else if info.TotalUploadedSize != sectorsSize*2 {
t.Fatal("wrong size", info.TotalUploadedSize, sectorsSize*2)
} else if info.NumObjects != 2 {
t.Fatal("wrong number of objects", info.NumObjects, 2)
}
}
if info.NumObjects != 2 {
t.Fatal("wrong number of objects", info.NumObjects, 2)

// Check other bucket.
if err := ss.CreateBucket(context.Background(), "other", api.BucketPolicy{}); err != nil {
t.Fatal(err)
} else if info, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{Bucket: "other"}); err != nil {
t.Fatal(err)
} else if info.TotalObjectsSize != 0 {
t.Fatal("wrong size", info.TotalObjectsSize)
} else if info.TotalSectorsSize != 0 {
t.Fatal("wrong size", info.TotalSectorsSize)
} else if info.TotalUploadedSize != 0 {
t.Fatal("wrong size", info.TotalUploadedSize)
} else if info.NumObjects != 0 {
t.Fatal("wrong number of objects", info.NumObjects)
}
}

Expand Down Expand Up @@ -2908,7 +2924,7 @@ func TestContractSizes(t *testing.T) {
}

// assert there's two objects
s, err := ss.ObjectsStats(context.Background())
s, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 73fd775

Please sign in to comment.