Skip to content

Commit

Permalink
Migrate ListObjects to raw SQL (#1336)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 27, 2024
1 parent bf2d616 commit 115402c
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 179 deletions.
1 change: 1 addition & 0 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

var (
ErrMarkerNotFound = errors.New("marker not found")
ErrMaxFundAmountExceeded = errors.New("renewal exceeds max fund amount")
)

Expand Down
8 changes: 5 additions & 3 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ type (
}

ListObjectOptions struct {
Prefix string
Marker string
Limit int
Prefix string
Marker string
Limit int
SortBy string
SortDir string
}

SearchObjectOptions struct {
Expand Down
5 changes: 4 additions & 1 deletion bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,10 @@ func (b *bus) objectsListHandlerPOST(jc jape.Context) {
req.Bucket = api.DefaultBucketName
}
resp, err := b.ms.ListObjects(jc.Request.Context(), req.Bucket, req.Prefix, req.SortBy, req.SortDir, req.Marker, req.Limit)
if jc.Check("couldn't list objects", err) != nil {
if errors.Is(err, api.ErrMarkerNotFound) {
jc.Error(err, http.StatusBadRequest)
return
} else if jc.Check("couldn't list objects", err) != nil {
return
}
jc.Encode(resp)
Expand Down
10 changes: 6 additions & 4 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ func (c *Client) DeleteObject(ctx context.Context, bucket, path string, opts api
// ListOBjects lists objects in the given bucket.
func (c *Client) ListObjects(ctx context.Context, bucket string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) {
err = c.c.WithContext(ctx).POST("/objects/list", api.ObjectsListRequest{
Bucket: bucket,
Limit: opts.Limit,
Prefix: opts.Prefix,
Marker: opts.Marker,
Bucket: bucket,
Limit: opts.Limit,
Prefix: opts.Prefix,
Marker: opts.Marker,
SortBy: opts.SortBy,
SortDir: opts.SortDir,
}, &resp)
return
}
Expand Down
138 changes: 138 additions & 0 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,150 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
"go.uber.org/zap"
"lukechampine.com/frand"
)

func TestListObjects(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

// assertMetadata asserts ModTime, ETag and MimeType are set and then clears
// them afterwards so we can compare without having to specify the metadata
start := time.Now()
assertMetadata := func(entries []api.ObjectMetadata) {
for i := range entries {
// assert mod time
if !strings.HasSuffix(entries[i].Name, "/") && !entries[i].ModTime.Std().After(start.UTC()) {
t.Fatal("mod time should be set")
}
entries[i].ModTime = api.TimeRFC3339{}

// assert mime type
isDir := strings.HasSuffix(entries[i].Name, "/") && entries[i].Name != "//double/" // double is a file
if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") {
t.Fatal("unexpected mime type", entries[i].MimeType)
}
entries[i].MimeType = ""

// assert etag
if isDir != (entries[i].ETag == "") {
t.Fatal("etag should be set for files and empty for dirs")
}
entries[i].ETag = ""
}
}

// create a test cluster
cluster := newTestCluster(t, testClusterOptions{
hosts: test.RedundancySettings.TotalShards,
})
defer cluster.Shutdown()

b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// upload the following paths
uploads := []struct {
path string
size int
}{
{"/foo/bar", 1},
{"/foo/bat", 2},
{"/foo/baz/quux", 3},
{"/foo/baz/quuz", 4},
{"/gab/guub", 5},
{"/FOO/bar", 6}, // test case sensitivity
}

for _, upload := range uploads {
if upload.size == 0 {
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(nil), api.DefaultBucketName, upload.path, api.UploadObjectOptions{}))
} else {
data := make([]byte, upload.size)
frand.Read(data)
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, upload.path, api.UploadObjectOptions{}))
}
}

tests := []struct {
prefix string
sortBy string
sortDir string
want []api.ObjectMetadata
}{
{"/", "", "", []api.ObjectMetadata{{Name: "/FOO/bar", Size: 6, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/gab/guub", Size: 5, Health: 1}}},
{"/", "", api.ObjectSortDirAsc, []api.ObjectMetadata{{Name: "/FOO/bar", Size: 6, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/gab/guub", Size: 5, Health: 1}}},
{"/", "", api.ObjectSortDirDesc, []api.ObjectMetadata{{Name: "/gab/guub", Size: 5, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/FOO/bar", Size: 6, Health: 1}}},
{"/", api.ObjectSortByHealth, api.ObjectSortDirAsc, []api.ObjectMetadata{{Name: "/FOO/bar", Size: 6, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/gab/guub", Size: 5, Health: 1}}},
{"/", api.ObjectSortByHealth, api.ObjectSortDirDesc, []api.ObjectMetadata{{Name: "/FOO/bar", Size: 6, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/gab/guub", Size: 5, Health: 1}}},
{"/foo/b", "", "", []api.ObjectMetadata{{Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}}},
{"o/baz/quu", "", "", []api.ObjectMetadata{}},
{"/foo", "", "", []api.ObjectMetadata{{Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}}},
{"/foo", api.ObjectSortBySize, api.ObjectSortDirAsc, []api.ObjectMetadata{{Name: "/foo/bar", Size: 1, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/baz/quuz", Size: 4, Health: 1}}},
{"/foo", api.ObjectSortBySize, api.ObjectSortDirDesc, []api.ObjectMetadata{{Name: "/foo/baz/quuz", Size: 4, Health: 1}, {Name: "/foo/baz/quux", Size: 3, Health: 1}, {Name: "/foo/bat", Size: 2, Health: 1}, {Name: "/foo/bar", Size: 1, Health: 1}}},
}
for _, test := range tests {
// use the bus client
res, err := b.ListObjects(context.Background(), api.DefaultBucketName, api.ListObjectOptions{
Prefix: test.prefix,
SortBy: test.sortBy,
SortDir: test.sortDir,
Limit: -1,
})
if err != nil {
t.Fatal(err, test.prefix)
}
assertMetadata(res.Objects)

got := res.Objects
if !(len(got) == 0 && len(test.want) == 0) && !reflect.DeepEqual(got, test.want) {
t.Log(cmp.Diff(got, test.want, cmp.Comparer(api.CompareTimeRFC3339)))
t.Fatalf("\nkey: %v\ngot: %v\nwant: %v\nsortBy: %v\nsortDir: %v", test.prefix, got, test.want, test.sortBy, test.sortDir)
}
if len(res.Objects) > 0 {
marker := ""
for offset := 0; offset < len(test.want); offset++ {
res, err := b.ListObjects(context.Background(), api.DefaultBucketName, api.ListObjectOptions{
Prefix: test.prefix,
SortBy: test.sortBy,
SortDir: test.sortDir,
Marker: marker,
Limit: 1,
})
if err != nil {
t.Fatal(err)
}

// assert mod time & clear it afterwards so we can compare
assertMetadata(res.Objects)

got := res.Objects
if len(got) != 1 {
t.Fatalf("expected 1 object, got %v", len(got))
} else if got[0].Name != test.want[offset].Name {
t.Fatalf("expected %v, got %v, offset %v, marker %v, sortBy %v, sortDir %v", test.want[offset].Name, got[0].Name, offset, marker, test.sortBy, test.sortDir)
}
marker = res.NextMarker
}
}
}

// list invalid marker
_, err := b.ListObjects(context.Background(), api.DefaultBucketName, api.ListObjectOptions{
Marker: "invalid",
SortBy: api.ObjectSortByHealth,
})
if !utils.IsErr(err, api.ErrMarkerNotFound) {
t.Fatal(err)
}
}

// TestNewTestCluster is a test for creating a cluster of Nodes for testing,
// making sure that it forms contracts, renews contracts and shuts down.
func TestNewTestCluster(t *testing.T) {
Expand Down
173 changes: 6 additions & 167 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,62 +2002,12 @@ func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types
// TODO: we can use ObjectEntries instead of ListObject if we want to use '/' as
// a delimiter for now (see backend.go) but it would be interesting to have
// arbitrary 'delim' support in ListObjects.
func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) {
// fetch one more to see if there are more entries
if limit <= -1 {
limit = math.MaxInt
} else {
limit++
}

// build prefix expr
prefixExpr := buildPrefixExpr(prefix)

// build order clause
orderBy, err := buildOrderClause(sortBy, sortDir)
if err != nil {
return api.ObjectsListResponse{}, err
}

// build marker expr
markerExpr, markerOrderBy, err := buildMarkerExpr(s.db, bucket, prefix, marker, sortBy, sortDir)
if err != nil {
return api.ObjectsListResponse{}, err
}
var rows []rawObjectMetadata
if err := s.db.
Select("o.object_id as ObjectName, o.size as Size, o.health as Health, o.mime_type as MimeType, o.created_at as ModTime, o.etag as ETag").
Model(&dbObject{}).
Table("objects o").
Where("o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?)", bucket).
Where("?", prefixExpr).
Where("?", markerExpr).
Order(orderBy).
Order(markerOrderBy).
Order("ObjectName ASC").
Limit(int(limit)).
Scan(&rows).Error; err != nil {
return api.ObjectsListResponse{}, err
}

var hasMore bool
var nextMarker string
if len(rows) == limit {
hasMore = true
rows = rows[:len(rows)-1]
nextMarker = rows[len(rows)-1].ObjectName
}

var objects []api.ObjectMetadata
for _, row := range rows {
objects = append(objects, row.convert())
}

return api.ObjectsListResponse{
HasMore: hasMore,
NextMarker: nextMarker,
Objects: objects,
}, nil
func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
resp, err = tx.ListObjects(ctx, bucket, prefix, sortBy, sortDir, marker, limit)
return err
})
return
}

func (ss *SQLStore) processConsensusChangeContracts(cc modules.ConsensusChange) {
Expand Down Expand Up @@ -2149,117 +2099,6 @@ func (ss *SQLStore) processConsensusChangeContracts(cc modules.ConsensusChange)
}
}

func buildMarkerExpr(db *gorm.DB, bucket, prefix, marker, sortBy, sortDir string) (markerExpr clause.Expr, orderBy clause.OrderBy, err error) {
// no marker
if marker == "" {
return exprTRUE, clause.OrderBy{}, nil
}

// for markers to work we need to order by object_id
orderBy = clause.OrderBy{
Columns: []clause.OrderByColumn{
{
Column: clause.Column{Name: "object_id"},
Desc: false,
},
},
}

desc := strings.EqualFold(sortDir, api.ObjectSortDirDesc)
switch sortBy {
case "", api.ObjectSortByName:
if desc {
markerExpr = gorm.Expr("object_id < ?", marker)
} else {
markerExpr = gorm.Expr("object_id > ?", marker)
}
case api.ObjectSortByHealth:
// fetch marker health
var markerHealth float64
if marker != "" && sortBy == api.ObjectSortByHealth {
if err := db.
Select("o.health").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Where("b.name = ? AND ? AND ?", bucket, buildPrefixExpr(prefix), gorm.Expr("o.object_id >= ?", marker)).
Limit(1).
Scan(&markerHealth).
Error; err != nil {
return exprTRUE, clause.OrderBy{}, err
}
}

if desc {
markerExpr = gorm.Expr("(Health <= ? AND object_id > ?) OR Health < ?", markerHealth, marker, markerHealth)
} else {
markerExpr = gorm.Expr("Health > ? OR (Health >= ? AND object_id > ?)", markerHealth, markerHealth, marker)
}
case api.ObjectSortBySize:
// fetch marker size
var markerSize float64
if marker != "" && sortBy == api.ObjectSortBySize {
if err := db.
Select("o.size").
Model(&dbObject{}).
Table("objects o").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Where("b.name = ? AND ? AND ?", bucket, buildPrefixExpr(prefix), gorm.Expr("o.object_id >= ?", marker)).
Limit(1).
Scan(&markerSize).
Error; err != nil {
return exprTRUE, clause.OrderBy{}, err
}
}

if desc {
markerExpr = gorm.Expr("(Size <= ? AND object_id > ?) OR Size < ?", markerSize, marker, markerSize)
} else {
markerExpr = gorm.Expr("Size > ? OR (Size >= ? AND object_id > ?)", markerSize, markerSize, marker)
}
default:
err = fmt.Errorf("unhandled sortBy parameter '%s'", sortBy)
}
return
}

func buildOrderClause(sortBy, sortDir string) (clause.OrderByColumn, error) {
if err := validateSort(sortBy, sortDir); err != nil {
return clause.OrderByColumn{}, err
}

orderByColumns := map[string]string{
"": "object_id",
api.ObjectSortByName: "object_id",
api.ObjectSortByHealth: "Health",
api.ObjectSortBySize: "Size",
}

return clause.OrderByColumn{
Column: clause.Column{Name: orderByColumns[sortBy]},
Desc: strings.EqualFold(sortDir, api.ObjectSortDirDesc),
}, nil
}

func buildPrefixExpr(prefix string) clause.Expr {
if prefix != "" {
return gorm.Expr("o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ?", prefix+"%", utf8.RuneCountInString(prefix), prefix)
} else {
return exprTRUE
}
}

func updateAllObjectsHealth(tx *gorm.DB) error {
return tx.Exec(`
UPDATE objects
SET health = (
SELECT COALESCE(MIN(slabs.health), 1)
FROM slabs
INNER JOIN slices sli ON sli.db_slab_id = slabs.id
WHERE sli.db_object_id = objects.id)
`).Error
}

func validateSort(sortBy, sortDir string) error {
allowed := func(s string, allowed ...string) bool {
for _, a := range allowed {
Expand Down
Loading

0 comments on commit 115402c

Please sign in to comment.