From d90e42f0410ed6c3a1f4e09291e5f87e70c91723 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 14 Feb 2024 11:59:49 +0100 Subject: [PATCH 1/2] stores: allow for filtering object stats by bucket --- api/object.go | 4 +++ bus/bus.go | 8 +++-- bus/client/objects.go | 8 +++-- internal/testing/cluster_test.go | 57 +++++++++++++++++--------------- stores/metadata.go | 29 ++++++++++++++-- stores/metadata_test.go | 48 ++++++++++++++++++--------- 6 files changed, 105 insertions(+), 49 deletions(-) diff --git a/api/object.go b/api/object.go index 73bb9c45c..e4fd4b465 100644 --- a/api/object.go +++ b/api/object.go @@ -119,6 +119,10 @@ type ( Mode string `json:"mode"` } + ObjectsStatsOpts struct { + Bucket string + } + // ObjectsStatsResponse is the response type for the /bus/stats/objects endpoint. ObjectsStatsResponse struct { NumObjects uint64 `json:"numObjects"` // number of objects diff --git a/bus/bus.go b/bus/bus.go index d11550595..10c610296 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -139,7 +139,7 @@ type ( Object(ctx context.Context, bucketName, path string) (api.Object, error) ObjectEntries(ctx context.Context, bucketName, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error) ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error) - ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error) + ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) RemoveObject(ctx context.Context, bucketName, path string) error RemoveObjects(ctx context.Context, bucketName, prefix string) error RenameObject(ctx context.Context, bucketName, from, to string, force bool) error @@ -1348,7 +1348,11 @@ func (b *bus) slabbuffersHandlerGET(jc jape.Context) { } func (b *bus) objectsStatshandlerGET(jc jape.Context) { - info, err := b.ms.ObjectsStats(jc.Request.Context()) + opts := api.ObjectsStatsOpts{} + if jc.DecodeForm(("bucket"), &opts.Bucket) != nil { + return + } + info, err := b.ms.ObjectsStats(jc.Request.Context(), opts) if jc.Check("couldn't get objects stats", err) != nil { return } diff --git a/bus/client/objects.go b/bus/client/objects.go index 38a7b14cd..23011a9ba 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -82,8 +82,12 @@ func (c *Client) ObjectsBySlabKey(ctx context.Context, bucket string, key object } // ObjectsStats returns information about the number of objects and their size. -func (c *Client) ObjectsStats() (osr api.ObjectsStatsResponse, err error) { - err = c.c.GET("/stats/objects", &osr) +func (c *Client) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (osr api.ObjectsStatsResponse, err error) { + values := url.Values{} + if opts.Bucket != "" { + values.Set("bucket", opts.Bucket) + } + err = c.c.WithContext(ctx).GET("/stats/objects?"+values.Encode(), &osr) return } diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index b0de2946e..13903115d 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -697,24 +697,29 @@ func TestUploadDownloadExtended(t *testing.T) { } // check objects stats. - info, err := cluster.Bus.ObjectsStats() - tt.OK(err) - objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large)) - if info.TotalObjectsSize != objectsSize { - t.Error("wrong size", info.TotalObjectsSize, objectsSize) - } - sectorsSize := 15 * rhpv2.SectorSize - if info.TotalSectorsSize != uint64(sectorsSize) { - t.Error("wrong size", info.TotalSectorsSize, sectorsSize) - } - if info.TotalUploadedSize != uint64(sectorsSize) { - t.Error("wrong size", info.TotalUploadedSize, sectorsSize) - } - if info.NumObjects != 4 { - t.Error("wrong number of objects", info.NumObjects, 4) - } - if info.MinHealth != 1 { - t.Errorf("expected minHealth of 1, got %v", info.MinHealth) + for _, opts := range []api.ObjectsStatsOpts{ + {}, // any bucket + {Bucket: api.DefaultBucketName}, // specific bucket + } { + info, err := cluster.Bus.ObjectsStats(context.Background(), opts) + tt.OK(err) + objectsSize := uint64(len(file1) + len(file2) + len(small) + len(large)) + if info.TotalObjectsSize != objectsSize { + t.Error("wrong size", info.TotalObjectsSize, objectsSize) + } + sectorsSize := 15 * rhpv2.SectorSize + if info.TotalSectorsSize != uint64(sectorsSize) { + t.Error("wrong size", info.TotalSectorsSize, sectorsSize) + } + if info.TotalUploadedSize != uint64(sectorsSize) { + t.Error("wrong size", info.TotalUploadedSize, sectorsSize) + } + if info.NumObjects != 4 { + t.Error("wrong number of objects", info.NumObjects, 4) + } + if info.MinHealth != 1 { + t.Errorf("expected minHealth of 1, got %v", info.MinHealth) + } } // download the data @@ -1633,7 +1638,7 @@ func TestUploadPacking(t *testing.T) { download("file4", data4, 0, int64(len(data4))) // assert number of objects - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.NumObjects != 5 { t.Fatalf("expected 5 objects, got %v", os.NumObjects) @@ -1642,7 +1647,7 @@ func TestUploadPacking(t *testing.T) { // check the object size stats, we use a retry loop since packed slabs are // uploaded in a separate goroutine, so the object stats might lag a bit tt.Retry(60, time.Second, func() error { - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) if err != nil { t.Fatal(err) } @@ -1796,7 +1801,7 @@ func TestSlabBufferStats(t *testing.T) { tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data1), api.DefaultBucketName, "1", api.UploadObjectOptions{})) // assert number of objects - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.NumObjects != 1 { t.Fatalf("expected 1 object, got %d", os.NumObjects) @@ -1805,7 +1810,7 @@ func TestSlabBufferStats(t *testing.T) { // check the object size stats, we use a retry loop since packed slabs are // uploaded in a separate goroutine, so the object stats might lag a bit tt.Retry(60, time.Second, func() error { - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) if err != nil { t.Fatal(err) } @@ -1853,7 +1858,7 @@ func TestSlabBufferStats(t *testing.T) { tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data2), api.DefaultBucketName, "2", api.UploadObjectOptions{})) // assert number of objects - os, err = b.ObjectsStats() + os, err = b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.NumObjects != 2 { t.Fatalf("expected 1 object, got %d", os.NumObjects) @@ -1862,7 +1867,7 @@ func TestSlabBufferStats(t *testing.T) { // check the object size stats, we use a retry loop since packed slabs are // uploaded in a separate goroutine, so the object stats might lag a bit tt.Retry(60, time.Second, func() error { - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.TotalObjectsSize != uint64(len(data1)+len(data2)) { return fmt.Errorf("expected totalObjectSize of %d, got %d", len(data1)+len(data2), os.TotalObjectsSize) @@ -2006,7 +2011,7 @@ func TestMultipartUploads(t *testing.T) { } // Check objects stats. - os, err := b.ObjectsStats() + os, err := b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.NumObjects != 0 { t.Fatalf("expected 0 object, got %v", os.NumObjects) @@ -2065,7 +2070,7 @@ func TestMultipartUploads(t *testing.T) { } // Check objects stats. - os, err = b.ObjectsStats() + os, err = b.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) tt.OK(err) if os.NumObjects != 1 { t.Fatalf("expected 1 object, got %v", os.NumObjects) diff --git a/stores/metadata.go b/stores/metadata.go index f20f7dbb0..68947ed95 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -582,7 +582,15 @@ func (s *SQLStore) ListBuckets(ctx context.Context) ([]api.Bucket, error) { // ObjectsStats returns some info related to the objects stored in the store. To // reduce locking and make sure all results are consistent, everything is done // within a single transaction. -func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error) { +func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) { + // if no bucket is specified, we consider all objects + whereBucket := func(table string) clause.Expr { + if opts.Bucket == "" { + return exprTRUE + } + return sqlWhereBucket(table, opts.Bucket) + } + // number of objects var objInfo struct { NumObjects uint64 @@ -592,6 +600,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, err := s.db. Model(&dbObject{}). Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize"). + Where(whereBucket(dbObject{}.TableName())). Scan(&objInfo). Error if err != nil { @@ -603,6 +612,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, err = s.db. Model(&dbMultipartUpload{}). Select("COUNT(*)"). + Where(whereBucket(dbMultipartUpload{}.TableName())). Scan(&unfinishedObjects). Error if err != nil { @@ -613,13 +623,26 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, var totalUnfinishedObjectsSize uint64 err = s.db. Model(&dbMultipartPart{}). + Joins("INNER JOIN multipart_uploads mu ON multipart_parts.db_multipart_upload_id = mu.id"). Select("COALESCE(SUM(size), 0)"). + Where(whereBucket("mu")). Scan(&totalUnfinishedObjectsSize). Error if err != nil { return api.ObjectsStatsResponse{}, err } + fromContractSectors := gorm.Expr("contract_sectors cs") + if opts.Bucket != "" { + fromContractSectors = gorm.Expr(` + contract_sectors cs + INNER JOIN sectors s ON s.id = cs.db_sector_id + INNER JOIN slabs sla ON sla.id = s.db_slab_id + INNER JOIN slices sli ON sli.db_slab_id = sla.id + INNER JOIN objects o ON o.id = sli.db_object_id AND (?) + `, whereBucket("o")) + } + var totalSectors uint64 batchSize := 500000 @@ -631,7 +654,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, } res := s.db. Model(&dbSector{}). - Raw("SELECT COUNT(*) as Sectors, MAX(sectors.db_sector_id) as Marker FROM (SELECT cs.db_sector_id FROM contract_sectors cs WHERE cs.db_sector_id > ? GROUP BY cs.db_sector_id LIMIT ?) sectors", marker, batchSize). + Raw("SELECT COUNT(*) as Sectors, MAX(sectors.db_sector_id) as Marker FROM (SELECT cs.db_sector_id FROM ? WHERE cs.db_sector_id > ? GROUP BY cs.db_sector_id LIMIT ?) sectors", fromContractSectors, marker, batchSize). Scan(&result) if err := res.Error; err != nil { return api.ObjectsStatsResponse{}, err @@ -644,7 +667,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, var totalUploaded int64 err = s.db. - Model(&dbContractSector{}). + Table("?", fromContractSectors). Count(&totalUploaded). Error if err != nil { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 96b06c4ec..a05c0be17 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2435,7 +2435,7 @@ func TestObjectsStats(t *testing.T) { defer ss.Close() // Fetch stats on clean database. - info, err := ss.ObjectsStats(context.Background()) + info, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) if err != nil { t.Fatal(err) } @@ -2499,21 +2499,37 @@ func TestObjectsStats(t *testing.T) { } // Check sizes. - info, err = ss.ObjectsStats(context.Background()) - if err != nil { - t.Fatal(err) - } - if info.TotalObjectsSize != objectsSize { - t.Fatal("wrong size", info.TotalObjectsSize, objectsSize) - } - if info.TotalSectorsSize != sectorsSize { - t.Fatal("wrong size", info.TotalSectorsSize, sectorsSize) - } - if info.TotalUploadedSize != sectorsSize*2 { - t.Fatal("wrong size", info.TotalUploadedSize, sectorsSize*2) + for _, opts := range []api.ObjectsStatsOpts{ + {}, // any bucket + {Bucket: api.DefaultBucketName}, // specific bucket + } { + info, err = ss.ObjectsStats(context.Background(), opts) + if err != nil { + t.Fatal(err) + } else if info.TotalObjectsSize != objectsSize { + t.Fatal("wrong size", info.TotalObjectsSize, objectsSize) + } else if info.TotalSectorsSize != sectorsSize { + t.Fatal("wrong size", info.TotalSectorsSize, sectorsSize) + } else if info.TotalUploadedSize != sectorsSize*2 { + t.Fatal("wrong size", info.TotalUploadedSize, sectorsSize*2) + } else if info.NumObjects != 2 { + t.Fatal("wrong number of objects", info.NumObjects, 2) + } } - if info.NumObjects != 2 { - t.Fatal("wrong number of objects", info.NumObjects, 2) + + // Check other bucket. + if err := ss.CreateBucket(context.Background(), "other", api.BucketPolicy{}); err != nil { + t.Fatal(err) + } else if info, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{Bucket: "other"}); err != nil { + t.Fatal(err) + } else if info.TotalObjectsSize != 0 { + t.Fatal("wrong size", info.TotalObjectsSize) + } else if info.TotalSectorsSize != 0 { + t.Fatal("wrong size", info.TotalSectorsSize) + } else if info.TotalUploadedSize != 0 { + t.Fatal("wrong size", info.TotalUploadedSize) + } else if info.NumObjects != 0 { + t.Fatal("wrong number of objects", info.NumObjects) } } @@ -2908,7 +2924,7 @@ func TestContractSizes(t *testing.T) { } // assert there's two objects - s, err := ss.ObjectsStats(context.Background()) + s, err := ss.ObjectsStats(context.Background(), api.ObjectsStatsOpts{}) if err != nil { t.Fatal(err) } From dff33cd61a457e553b40744ddc644860a5d6f211 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 14 Feb 2024 13:15:41 +0100 Subject: [PATCH 2/2] bus: fix jape --- bus/bus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bus/bus.go b/bus/bus.go index 10c610296..5c3aea504 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1349,7 +1349,7 @@ func (b *bus) slabbuffersHandlerGET(jc jape.Context) { func (b *bus) objectsStatshandlerGET(jc jape.Context) { opts := api.ObjectsStatsOpts{} - if jc.DecodeForm(("bucket"), &opts.Bucket) != nil { + if jc.DecodeForm("bucket", &opts.Bucket) != nil { return } info, err := b.ms.ObjectsStats(jc.Request.Context(), opts)