Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'health' column to objects to speed up object listing #771

Merged
merged 16 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ type (

// ObjectsStatsResponse is the response type for the /bus/stats/objects endpoint.
ObjectsStatsResponse struct {
NumObjects uint64 `json:"numObjects"` // number of objects
TotalObjectsSize uint64 `json:"totalObjectsSize"` // size of all objects
TotalSectorsSize uint64 `json:"totalSectorsSize"` // uploaded size of all objects
TotalUploadedSize uint64 `json:"totalUploadedSize"` // uploaded size of all objects including redundant sectors
NumObjects uint64 `json:"numObjects"` // number of objects
MinHealth float64 `json:"minHealth"` // minimum health of all objects
TotalObjectsSize uint64 `json:"totalObjectsSize"` // size of all objects
TotalSectorsSize uint64 `json:"totalSectorsSize"` // uploaded size of all objects
TotalUploadedSize uint64 `json:"totalUploadedSize"` // uploaded size of all objects including redundant sectors
}
)

Expand Down
2 changes: 1 addition & 1 deletion autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ OUTER:
start := time.Now()
if err := b.RefreshHealth(ctx); err != nil {
m.ap.RegisterAlert(ctx, newRefreshHealthFailedAlert(err))
m.logger.Errorf("failed to recompute cached health before migration", err)
m.logger.Errorf("failed to recompute cached health before migration: %v", err)
return
}
m.logger.Debugf("recomputed slab health in %v", time.Since(start))
Expand Down
12 changes: 12 additions & 0 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ func TestUploadDownloadExtended(t *testing.T) {
if info.NumObjects != 4 {
t.Error("wrong number of objects", info.NumObjects, 4)
}
if info.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", info.MinHealth)
}

// download the data
for _, data := range [][]byte{small, large} {
Expand Down Expand Up @@ -1688,6 +1691,9 @@ func TestUploadPacking(t *testing.T) {
if os.TotalUploadedSize != uint64(totalRedundantSize) {
return fmt.Errorf("expected totalUploadedSize of %v, got %v", totalRedundantSize, os.TotalUploadedSize)
}
if os.MinHealth != 1 {
return fmt.Errorf("expected minHealth of 1, got %v", os.MinHealth)
}
return nil
})

Expand Down Expand Up @@ -1845,6 +1851,9 @@ func TestSlabBufferStats(t *testing.T) {
if os.TotalUploadedSize != 0 {
return fmt.Errorf("expected totalUploadedSize of 0, got %d", os.TotalUploadedSize)
}
if os.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", os.MinHealth)
}
return nil
})

Expand Down Expand Up @@ -1897,6 +1906,9 @@ func TestSlabBufferStats(t *testing.T) {
if os.TotalUploadedSize != 3*rhpv2.SectorSize {
return fmt.Errorf("expected totalUploadedSize of %d, got %d", 3*rhpv2.SectorSize, os.TotalUploadedSize)
}
if os.MinHealth != 1 {
t.Errorf("expected minHealth of 1, got %v", os.MinHealth)
}
return nil
})

Expand Down
159 changes: 98 additions & 61 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"regexp"
"strings"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -106,9 +107,10 @@ type (
DBBucket dbBucket
ObjectID string `gorm:"index;uniqueIndex:idx_object_bucket"`

Key secretKey
Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too
Size int64
Key secretKey
Slabs []dbSlice `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete slices too
Health float64 `gorm:"index;default:1.0; NOT NULL"`
Size int64

MimeType string `json:"index"`
Etag string `gorm:"index"`
Expand Down Expand Up @@ -458,8 +460,6 @@ func (raw rawObject) convert() (api.Object, error) {
if err := addSlab(len(filtered)); err != nil {
return api.Object{}, err
}
} else {
minHealth = 1 // empty object
}

// fetch a potential partial slab from the buffer.
Expand All @@ -480,7 +480,7 @@ func (raw rawObject) convert() (api.Object, error) {
return api.Object{
ObjectMetadata: api.ObjectMetadata{
ETag: raw[0].ObjectETag,
Health: minHealth,
Health: raw[0].ObjectHealth,
MimeType: raw[0].ObjectMimeType,
ModTime: raw[0].ObjectModTime.UTC(),
Name: raw[0].ObjectName,
Expand Down Expand Up @@ -649,11 +649,12 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
// Number of objects.
var objInfo struct {
NumObjects uint64
MinHealth float64
TotalObjectsSize uint64
}
err := s.db.
Model(&dbObject{}).
Select("COUNT(*) AS NumObjects, SUM(size) AS TotalObjectsSize").
Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize").
Scan(&objInfo).
Error
if err != nil {
Expand Down Expand Up @@ -692,6 +693,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,
}

return api.ObjectsStatsResponse{
MinHealth: objInfo.MinHealth,
NumObjects: objInfo.NumObjects,
TotalObjectsSize: objInfo.TotalObjectsSize,
TotalSectorsSize: totalSectors * rhpv2.SectorSize,
Expand Down Expand Up @@ -1085,14 +1087,12 @@ func (s *SQLStore) SearchObjects(ctx context.Context, bucket, substring string,

var objects []api.ObjectMetadata
err := s.db.
Select("o.object_id as name, MAX(o.size) as size, MIN(sla.health) as health").
Select("o.object_id as name, o.size as size, o.health as health").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Joins("LEFT JOIN slices sli ON o.id = sli.`db_object_id`").
Joins("LEFT JOIN slabs sla ON sli.db_slab_id = sla.`id`").
Where("INSTR(o.object_id, ?) > 0 AND b.name = ?", substring, bucket).
Group("o.object_id").
Order("o.object_id ASC").
Offset(offset).
Limit(limit).
Scan(&objects).Error
Expand All @@ -1103,6 +1103,11 @@ func (s *SQLStore) SearchObjects(ctx context.Context, bucket, substring string,
return objects, nil
}

func replaceAnyValue(query string) string {
re := regexp.MustCompile(`ANY_VALUE\((.*?)\)`)
return re.ReplaceAllString(query, "$1")
}

func (s *SQLStore) ObjectEntries(ctx context.Context, bucket, path, prefix, sortBy, sortDir, marker string, offset, limit int) (metadata []api.ObjectMetadata, hasMore bool, err error) {
// sanity check we are passing a directory
if !strings.HasSuffix(path, "/") {
Expand Down Expand Up @@ -1153,45 +1158,55 @@ func (s *SQLStore) ObjectEntries(ctx context.Context, bucket, path, prefix, sort
offset = 0
}

onameExpr := fmt.Sprintf("CASE INSTR(SUBSTR(object_id, ?), '/') WHEN 0 THEN %s ELSE %s END",
sqlConcat(s.db, "?", "SUBSTR(object_id, ?)"),
sqlConcat(s.db, "?", "substr(SUBSTR(object_id, ?), 1, INSTR(SUBSTR(object_id, ?), '/'))"),
)

// build objects query & parameters
objectsQuery := fmt.Sprintf(`
SELECT
MAX(etag) AS ETag,
MAX(created_at) AS ModTime,
CASE slashindex WHEN 0 THEN %s ELSE %s END AS name,
SUM(size) AS size,
MIN(health) as health,
MAX(mimeType) as MimeType
SELECT ETag, ModTime, oname as Name, Size, Health, MimeType
FROM (
SELECT MAX(etag) AS etag, MAX(objects.created_at) AS created_at, MAX(size) AS size, MIN(slabs.health) as health, MAX(objects.mime_type) as mimeType, SUBSTR(object_id, ?) AS trimmed , INSTR(SUBSTR(object_id, ?), "/") AS slashindex
SELECT ANY_VALUE(etag) AS ETag,
MAX(objects.created_at) AS ModTime,
%s AS oname,
SUM(size) AS Size,
MIN(health) as Health,
ANY_VALUE(mime_type) as MimeType
FROM objects
INNER JOIN buckets b ON objects.db_bucket_id = b.id
LEFT JOIN slices ON objects.id = slices.db_object_id
LEFT JOIN slabs ON slices.db_slab_id = slabs.id
WHERE SUBSTR(object_id, 1, ?) = ? AND b.name = ?
GROUP BY object_id
) AS m
GROUP BY name
HAVING SUBSTR(name, 1, ?) = ? AND name != ?
WHERE SUBSTR(object_id, 1, ?) = ? AND b.name = ? AND SUBSTR(%s, 1, ?) = ? AND %s != ?
GROUP BY oname
) baseQuery
`,
sqlConcat(s.db, "?", "trimmed"),
sqlConcat(s.db, "?", "substr(trimmed, 1, slashindex)"),
onameExpr,
onameExpr,
onameExpr,
)

if isSQLite(s.db) {
objectsQuery = replaceAnyValue(objectsQuery)
}

objectsQueryParams := []interface{}{
path, // sqlConcat(s.db, "?", "trimmed"),
path, // sqlConcat(s.db, "?", "substr(trimmed, 1, slashindex)")
utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, utf8.RuneCountInString(path) + 1, // onameExpr

utf8.RuneCountInString(path) + 1, // SUBSTR(object_id, ?)
utf8.RuneCountInString(path) + 1, // INSTR(SUBSTR(object_id, ?), "/")
utf8.RuneCountInString(path), // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?
path, // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?
bucket, // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?

utf8.RuneCountInString(path), // WHERE SUBSTR(object_id, 1, ?) = ? AND b.name = ?
path, // WHERE SUBSTR(object_id, 1, ?) = ? AND b.name = ?
bucket, // WHERE SUBSTR(object_id, 1, ?) = ? AND b.name = ?
utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, utf8.RuneCountInString(path) + 1, // onameExpr

utf8.RuneCountInString(path + prefix), // HAVING SUBSTR(name, 1, ?) = ? AND name != ?
path + prefix, // HAVING SUBSTR(name, 1, ?) = ? AND name != ?
path, // HAVING SUBSTR(name, 1, ?) = ? AND name != ?
utf8.RuneCountInString(path + prefix), // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?
path + prefix, // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?
utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, // onameExpr
path, utf8.RuneCountInString(path) + 1, utf8.RuneCountInString(path) + 1, // onameExpr
path, // WHERE SUBSTR(%s, 1, ?) = ? AND %s != ? AND b.name = ?
}

// build marker expr
Expand All @@ -1202,24 +1217,24 @@ HAVING SUBSTR(name, 1, ?) = ? AND name != ?
case api.ObjectSortByHealth:
var markerHealth float64
if err = s.db.
Raw(fmt.Sprintf(`SELECT health FROM (%s AND name >= ? ORDER BY name LIMIT 1) as n`, objectsQuery), append(objectsQueryParams, marker)...).
Raw(fmt.Sprintf(`SELECT Health FROM (%s WHERE Name >= ? ORDER BY Name LIMIT 1) as n`, objectsQuery), append(objectsQueryParams, marker)...).
Scan(&markerHealth).
Error; err != nil {
return
}

if sortDir == api.ObjectSortDirAsc {
markerExpr = "(health > ? OR (health = ? AND name > ?))"
markerExpr = "(Health > ? OR (Health = ? AND Name > ?))"
markerParams = []interface{}{markerHealth, markerHealth, marker}
} else {
markerExpr = "(health = ? AND name > ?) OR health < ?"
markerExpr = "(Health = ? AND Name > ?) OR Health < ?"
markerParams = []interface{}{markerHealth, marker, markerHealth}
}
case api.ObjectSortByName:
if sortDir == api.ObjectSortDirAsc {
markerExpr = "name > ?"
markerExpr = "Name > ?"
} else {
markerExpr = "name < ?"
markerExpr = "Name < ?"
}
markerParams = []interface{}{marker}
default:
Expand All @@ -1230,7 +1245,7 @@ HAVING SUBSTR(name, 1, ?) = ? AND name != ?
// build order clause
orderByClause := fmt.Sprintf("%s %s", sortBy, sortDir)
if sortBy == api.ObjectSortByHealth {
orderByClause += ", name"
orderByClause += ", Name"
}

var rows []rawObjectMetadata
Expand Down Expand Up @@ -1907,17 +1922,35 @@ LIMIT ?
s.objectsMu.Lock()
defer s.objectsMu.Unlock()

// create temp table from the health query since we will reuse it
if err := tx.Exec("DROP TABLE IF EXISTS src").Error; err != nil {
return err
} else if err = tx.Exec("CREATE TEMPORARY TABLE src AS ?", healthQuery).Error; err != nil {
return err
}

var res *gorm.DB
if isSQLite(s.db) {
res = tx.Exec("UPDATE slabs SET health = src.health, health_valid_until = (?) FROM (?) AS src WHERE slabs.id=src.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity), healthQuery)
res = tx.Exec("UPDATE slabs SET health = src.health, health_valid_until = (?) FROM src WHERE slabs.id=src.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
} else {
res = tx.Exec("UPDATE slabs sla INNER JOIN (?) h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", healthQuery, sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
res = tx.Exec("UPDATE slabs sla INNER JOIN src h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
}
if res.Error != nil {
return res.Error
}
rowsAffected = res.RowsAffected
return nil

// Update the health of the objects associated with the updated slabs.
if isSQLite(s.db) {
return tx.Exec(`UPDATE objects SET health = src.health FROM src
INNER JOIN slices ON slices.db_slab_id = src.id
WHERE slices.db_object_id = objects.id`).Error
} else {
return tx.Exec(`UPDATE objects
INNER JOIN slices sli ON sli.db_object_id = objects.id
INNER JOIN src s ON s.id = sli.db_slab_id
SET objects.health = s.health`).Error
}
})
if err != nil {
return err
Expand Down Expand Up @@ -2083,7 +2116,7 @@ func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path
// accordingly
var rows rawObject
tx := s.db.
Select("o.id as ObjectID, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey").
Select("o.id as ObjectID, o.health as ObjectHealth, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Expand All @@ -2107,12 +2140,10 @@ func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path

func (s *SQLStore) objectHealth(ctx context.Context, tx *gorm.DB, objectID uint) (health float64, err error) {
if err = tx.
Select("COALESCE(MIN(sla.health), 1)").
Select("objects.health").
Model(&dbObject{}).
Table("objects o").
Joins("LEFT JOIN slices sli ON o.id = sli.`db_object_id`").
Joins("LEFT JOIN slabs sla ON sli.db_slab_id = sla.`id`").
Where("o.id = ?", objectID).
Table("objects").
Where("id", objectID).
Scan(&health).
Error; errors.Is(err, gorm.ErrRecordNotFound) {
err = api.ErrObjectNotFound
Expand Down Expand Up @@ -2527,16 +2558,14 @@ func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sort

var rows []rawObjectMetadata
if err := s.db.
Select("o.object_id as Name, MAX(o.size) as Size, MIN(sla.health) as Health, MAX(o.mime_type) as mimeType, MAX(o.created_at) as ModTime").
Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as mimeType, o.created_at as ModTime").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Joins("LEFT JOIN slices sli ON o.id = sli.`db_object_id`").
Joins("LEFT JOIN slabs sla ON sli.db_slab_id = sla.`id`").
Where("b.name = ? AND ? AND ?", bucket, prefixExpr, markerExpr).
Group("o.object_id").
Order(orderBy).
Order(markerOrderBy).
Order("Name ASC").
Limit(int(limit)).
Scan(&rows).Error; err != nil {
return api.ObjectsListResponse{}, err
Expand Down Expand Up @@ -2680,14 +2709,11 @@ func buildMarkerExpr(db *gorm.DB, bucket, prefix, marker, sortBy, sortDir string
var markerHealth float64
if marker != "" && sortBy == api.ObjectSortByHealth {
if err := db.
Select("MIN(sla.health)").
Select("o.health").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Joins("LEFT JOIN slices sli ON o.id = sli.`db_object_id`").
Joins("LEFT JOIN slabs sla ON sli.db_slab_id = sla.`id`").
Where("b.name = ? AND ? AND ?", bucket, buildPrefixExpr(prefix), gorm.Expr("o.object_id >= ?", marker)).
Group("o.object_id").
Limit(1).
Scan(&markerHealth).
Error; err != nil {
Expand Down Expand Up @@ -2731,6 +2757,17 @@ func buildPrefixExpr(prefix string) clause.Expr {
}
}

func updateAllObjectsHealth(tx *gorm.DB) error {
return tx.Exec(`
UPDATE objects
SET health = (
SELECT COALESCE(MIN(slabs.health), 1)
FROM slabs
INNER JOIN slices sli ON sli.db_slab_id = slabs.id
WHERE sli.db_object_id = objects.id)
`).Error
}

func validateSort(sortBy, sortDir string) error {
allowed := func(s string, allowed ...string) bool {
for _, a := range allowed {
Expand Down
Loading