Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if a directory table can be used as an index to speed up object queries #1198

Merged
merged 59 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3c2b19a
tmp
ChrisSchinnerl Apr 23, 2024
1802ac6
stores: start modifying ObjectEntries
ChrisSchinnerl Apr 24, 2024
8ca6243
stores: prefix is working in ObjectEntries
ChrisSchinnerl Apr 24, 2024
94e8d0d
stores: TestObjectEntries passing again
ChrisSchinnerl Apr 24, 2024
c713268
store: fix TestObjectBasic
ChrisSchinnerl Apr 24, 2024
36e09c9
stores: fix TestObjectsBySlabKey
ChrisSchinnerl Apr 24, 2024
eb9b44e
stores: fix TestSQLMetadataStore and TestSlabCleanup
ChrisSchinnerl Apr 24, 2024
bd2d7e7
stores: fix makeDirsForPath for MySQL
ChrisSchinnerl Apr 25, 2024
677e0d2
stores: fix ObjectEntries query for MySQL
ChrisSchinnerl Apr 25, 2024
3cff23d
stores: db agnostic concat
ChrisSchinnerl Apr 25, 2024
548d5e8
stores: handle char_length vs length
ChrisSchinnerl Apr 25, 2024
7b58af9
stores: add LIKE for index usage
ChrisSchinnerl Apr 25, 2024
7d49911
stores: make makeDirsForPath a function
ChrisSchinnerl Apr 25, 2024
5f36dd2
e2e: fix TestObjectEntries
ChrisSchinnerl Apr 25, 2024
e484a3f
stores: mysql migration
ChrisSchinnerl Apr 25, 2024
a8f19e4
e2e: fix TestUploadDownloadExtended
ChrisSchinnerl Apr 25, 2024
b5b2640
e2e: fix TestMultipartUploads
ChrisSchinnerl Apr 25, 2024
c8edef3
e2e: TestS3List
ChrisSchinnerl Apr 25, 2024
fdc70b6
e2e: fix TsetObjectEntries on MySQL
ChrisSchinnerl Apr 25, 2024
065edd9
e2e: fix TsetObjectEntries on MySQL
ChrisSchinnerl Apr 25, 2024
436d1e2
Merge branch 'chris/directories-table' of github.com:SiaFoundation/re…
ChrisSchinnerl Apr 25, 2024
5df151d
Merge branch 'dev' into chris/directories-table
ChrisSchinnerl Apr 25, 2024
beecd2b
stores: add TestDirectories
ChrisSchinnerl Apr 25, 2024
fa1dcab
stores: fix TestSlabCleanup
ChrisSchinnerl Apr 25, 2024
ea62d62
stores: deduplicate directories when migrating
ChrisSchinnerl Apr 26, 2024
639a715
stores: implement renaming
ChrisSchinnerl Apr 26, 2024
4673b2b
stores: fix NDF in RenameObjectBlocknig
ChrisSchinnerl Apr 26, 2024
d04ea6b
stores: recursive solution
ChrisSchinnerl Apr 29, 2024
e768f13
Revert "stores: recursive solution"
ChrisSchinnerl Apr 29, 2024
2f1f427
stores: add migration files
ChrisSchinnerl Apr 29, 2024
75fcc62
Revert "Revert "stores: recursive solution""
ChrisSchinnerl Apr 29, 2024
2f745df
stores: pre-fetch bucket
ChrisSchinnerl Apr 29, 2024
bff67dd
stores: use full paths for dirs
ChrisSchinnerl Apr 30, 2024
8a1c65d
Merge branch 'chris/directories-table' of github.com:SiaFoundation/re…
ChrisSchinnerl Apr 30, 2024
a4db91c
stores: fix TestRenameObjects
ChrisSchinnerl Apr 30, 2024
25ae076
e2e: fix TestObjectEntries
ChrisSchinnerl Apr 30, 2024
f810b86
e2e: fix TestUploadDownloadExtended
ChrisSchinnerl Apr 30, 2024
ec3802f
stores: fix index creation in mysql migration
ChrisSchinnerl Apr 30, 2024
68bc545
stores: union all instead of union
ChrisSchinnerl May 2, 2024
e95384f
Merge branch 'dev' into chris/directories-table
ChrisSchinnerl May 2, 2024
7ba5064
stores: only store dir name
ChrisSchinnerl May 3, 2024
44f0ad0
stores: sqlConcat variable params
ChrisSchinnerl May 3, 2024
c8c65c5
stores: fix TestDirectories and TestRenameObjects
ChrisSchinnerl May 3, 2024
585a5fc
e2e: fix TestObjectEntries
ChrisSchinnerl May 3, 2024
eef1955
Merge branch 'dev' into chris/directories-table
ChrisSchinnerl May 3, 2024
7a5b2d5
stores: address comments
ChrisSchinnerl May 3, 2024
a28ebc3
stores: sqlite migration
ChrisSchinnerl May 7, 2024
7891201
Merge branch 'dev' into chris/directories-table
ChrisSchinnerl May 7, 2024
430c863
stores: rootDirID constant
ChrisSchinnerl May 7, 2024
7fce58a
stores: delay foreign keys
ChrisSchinnerl May 7, 2024
a32b630
Revert "stores: only store dir name"
ChrisSchinnerl May 8, 2024
d2f5ac1
Revert "stores: fix TestDirectories and TestRenameObjects"
ChrisSchinnerl May 8, 2024
7098ff0
stores: fix tests
ChrisSchinnerl May 8, 2024
3bdf734
Merge branch 'dev' into chris/directories-table
ChrisSchinnerl May 14, 2024
591aeb3
mysql/sqlite: move common code to sql/common.go
ChrisSchinnerl May 15, 2024
1426f8a
sql: get rid of INIT_SCHEMA
ChrisSchinnerl May 15, 2024
97894fb
sql: add Migrator interface
ChrisSchinnerl May 15, 2024
c2dec0c
sql: skip integrity check if migration didn't run
ChrisSchinnerl May 15, 2024
b0e3a32
sql: address comments
ChrisSchinnerl May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/sql/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ func (lt *loggedTxn) Prepare(query string) (*loggedStmt, error) {
return nil, err
}
return &loggedStmt{
Stmt: stmt,
query: query,
log: lt.log.Named("statement"),
Stmt: stmt,
query: query,
log: lt.log.Named("statement"),
longQueryDuration: lt.longQueryDuration,
}, nil
}

Expand Down
261 changes: 261 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package sql

import (
"embed"
"fmt"
"strings"
"unicode/utf8"

"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

type (
Migration struct {
ID string
Migrate func(tx Tx) error
}

// Migrator is an interface for defining database-specific helper methods
// required during migrations
Migrator interface {
ApplyMigration(func(tx Tx) (bool, error)) error
CreateMigrationTable() error
DB() *DB
}

MainMigrator interface {
Migrator
MakeDirsForPath(tx Tx, path string) (uint, error)
}
)

var (
MainMigrations = func(m MainMigrator, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
dbIdentifier := "main"
return []Migration{
{
ID: "00001_init",
Migrate: func(tx Tx) error { return ErrRunV072 },
},
{
ID: "00001_object_metadata",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00001_object_metadata", log)
},
},
{
ID: "00002_prune_slabs_trigger",
Migrate: func(tx Tx) error {
err := performMigration(tx, migrationsFs, dbIdentifier, "00002_prune_slabs_trigger", log)
if utils.IsErr(err, ErrMySQLNoSuperPrivilege) {
log.Warn("migration 00002_prune_slabs_trigger requires the user to have the SUPER privilege to register triggers")
}
return err
},
},
{
ID: "00003_idx_objects_size",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00003_idx_objects_size", log)
},
},
{
ID: "00004_prune_slabs_cascade",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00004_prune_slabs_cascade", log)
},
},
{
ID: "00005_zero_size_object_health",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00005_zero_size_object_health", log)
},
},
{
ID: "00006_idx_objects_created_at",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00006_idx_objects_created_at", log)
},
},
{
ID: "00007_host_checks",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00007_host_checks", log)
},
},
{
ID: "00008_directories",
Migrate: func(tx Tx) error {
if err := performMigration(tx, migrationsFs, dbIdentifier, "00008_directories_1", log); err != nil {
return fmt.Errorf("failed to migrate: %v", err)
}
// helper type
type obj struct {
ID uint
ObjectID string
}
// loop over all objects and deduplicate dirs to create
log.Info("beginning post-migration directory creation, this might take a while")
batchSize := 10000
processedDirs := make(map[string]struct{})
for offset := 0; ; offset += batchSize {
if offset > 0 && offset%batchSize == 0 {
log.Infof("processed %v objects", offset)
}
var objBatch []obj
rows, err := tx.Query("SELECT id, object_id FROM objects ORDER BY id LIMIT ? OFFSET ?", batchSize, offset)
if err != nil {
return fmt.Errorf("failed to fetch objects: %v", err)
}
for rows.Next() {
var o obj
if err := rows.Scan(&o.ID, &o.ObjectID); err != nil {
return fmt.Errorf("failed to scan object: %v", err)
}
objBatch = append(objBatch, o)
}
if len(objBatch) == 0 {
break // done
}
for _, obj := range objBatch {
// check if dir was processed
dir := "" // root
if i := strings.LastIndex(obj.ObjectID, "/"); i > -1 {
dir = obj.ObjectID[:i+1]
}
_, exists := processedDirs[dir]
if exists {
continue // already processed
}
processedDirs[dir] = struct{}{}

// process
dirID, err := m.MakeDirsForPath(tx, obj.ObjectID)
if err != nil {
return fmt.Errorf("failed to create directory %s: %w", obj.ObjectID, err)
}

if _, err := tx.Exec(`
UPDATE objects
SET db_directory_id = ?
WHERE object_id LIKE ? AND
SUBSTR(object_id, 1, ?) = ? AND
INSTR(SUBSTR(object_id, ?), '/') = 0
`,
dirID,
dir+"%",
utf8.RuneCountInString(dir), dir,
utf8.RuneCountInString(dir)+1); err != nil {
return fmt.Errorf("failed to update object %s: %w", obj.ObjectID, err)
}
}
}
log.Info("post-migration directory creation complete")
if err := performMigration(tx, migrationsFs, dbIdentifier, "00008_directories_2", log); err != nil {
return fmt.Errorf("failed to migrate: %v", err)
}
return nil
},
},
}
}
MetricsMigrations = func(migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
dbIdentifier := "metrics"
return []Migration{
{
ID: "00001_init",
Migrate: func(tx Tx) error { return ErrRunV072 },
},
{
ID: "00001_idx_contracts_fcid_timestamp",
Migrate: func(tx Tx) error {
return performMigration(tx, migrationsFs, dbIdentifier, "00001_idx_contracts_fcid_timestamp", log)
},
},
}
}
)

func PerformMigrations(m Migrator, fs embed.FS, identifier string, migrations []Migration) error {
// try to create migrations table
err := m.CreateMigrationTable()
if err != nil {
return fmt.Errorf("failed to create migrations table: %w", err)
}

// check if the migrations table is empty
var isEmpty bool
if err := m.DB().QueryRow("SELECT COUNT(*) = 0 FROM migrations").Scan(&isEmpty); err != nil {
return fmt.Errorf("failed to count rows in migrations table: %w", err)
} else if isEmpty {
// table is empty, init schema
return initSchema(m.DB(), fs, identifier, migrations)
}

// apply missing migrations
for _, migration := range migrations {
if err := m.ApplyMigration(func(tx Tx) (bool, error) {
// check if migration was already applied
var applied bool
if err := tx.QueryRow("SELECT EXISTS (SELECT 1 FROM migrations WHERE id = ?)", migration.ID).Scan(&applied); err != nil {
return false, fmt.Errorf("failed to check if migration '%s' was already applied: %w", migration.ID, err)
} else if applied {
return false, nil
}
// run migration
if err := migration.Migrate(tx); err != nil {
return false, fmt.Errorf("migration '%s' failed: %w", migration.ID, err)
}
// insert migration
if _, err := tx.Exec("INSERT INTO migrations (id) VALUES (?)", migration.ID); err != nil {
return false, fmt.Errorf("failed to insert migration '%s': %w", migration.ID, err)
}
return true, nil
}); err != nil {
return fmt.Errorf("migration '%s' failed: %w", migration.ID, err)
}
}
return nil
}

func execSQLFile(tx Tx, fs embed.FS, folder, filename string) error {
path := fmt.Sprintf("migrations/%s/%s.sql", folder, filename)

// read file
file, err := fs.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read %s: %w", path, err)
}

// execute it
if _, err := tx.Exec(string(file)); err != nil {
return fmt.Errorf("failed to execute %s: %w", path, err)
}
return nil
}

func initSchema(db *DB, fs embed.FS, identifier string, migrations []Migration) error {
return db.Transaction(func(tx Tx) error {
// init schema
if err := execSQLFile(tx, fs, identifier, "schema"); err != nil {
return fmt.Errorf("failed to execute schema: %w", err)
}
// insert migration ids
for _, migration := range migrations {
if _, err := tx.Exec("INSERT INTO migrations (id) VALUES (?)", migration.ID); err != nil {
return fmt.Errorf("failed to insert migration '%s': %w", migration.ID, err)
}
}
return nil
})
}

func performMigration(tx Tx, fs embed.FS, kind, migration string, logger *zap.SugaredLogger) error {
logger.Infof("performing %s migration '%s'", kind, migration)
if err := execSQLFile(tx, fs, kind, fmt.Sprintf("migration_%s", migration)); err != nil {
return err
}
logger.Infof("migration '%s' complete", migration)
return nil
}
14 changes: 8 additions & 6 deletions internal/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
factor = 1.8 // factor ^ retryAttempts = backoff time in milliseconds
maxBackoff = 15 * time.Second

SCHEMA_INIT = "SCHEMA_INIT"
DirectoriesRootID = 1
)

var (
Expand Down Expand Up @@ -93,9 +93,10 @@ func (s *DB) Prepare(query string) (*loggedStmt, error) {
return nil, err
}
return &loggedStmt{
Stmt: stmt,
query: query,
log: s.log.Named("statement"),
Stmt: stmt,
query: query,
log: s.log.Named("statement"),
longQueryDuration: s.longQueryDuration,
}, nil
}

Expand Down Expand Up @@ -185,8 +186,9 @@ func (s *DB) transaction(db *sql.DB, log *zap.Logger, fn func(tx Tx) error) erro
}()

ltx := &loggedTxn{
Tx: tx,
log: log,
Tx: tx,
log: log,
longQueryDuration: s.longQueryDuration,
}
if err := fn(ltx); err != nil {
return err
Expand Down
21 changes: 12 additions & 9 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,15 @@ func TestObjectEntries(t *testing.T) {
entries[i].ModTime = api.TimeRFC3339{}

// assert mime type
if entries[i].MimeType == "" {
t.Fatal("mime type should be set", entries[i].MimeType, entries[i].Name)
isDir := strings.HasSuffix(entries[i].Name, "/") && entries[i].Name != "//double/" // double is a file
if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") {
t.Fatal("unexpected mime type", entries[i].MimeType)
}
entries[i].MimeType = ""

// assert etag
if entries[i].ETag == "" {
t.Fatal("ETag should be set")
if isDir != (entries[i].ETag == "") {
t.Fatal("etag should be set for files and empty for dirs")
}
entries[i].ETag = ""
}
Expand Down Expand Up @@ -670,14 +671,16 @@ func TestUploadDownloadExtended(t *testing.T) {
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file2), api.DefaultBucketName, "fileś/file2", api.UploadObjectOptions{}))

// fetch all entries from the worker
entries, err := cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "", api.GetObjectOptions{})
entries, err := cluster.Worker.ObjectEntries(context.Background(), api.DefaultBucketName, "fileś/", api.GetObjectOptions{})
tt.OK(err)

if len(entries) != 1 {
t.Fatal("expected one entry to be returned", len(entries))
if len(entries) != 2 {
t.Fatal("expected two entries to be returned", len(entries))
}
if entries[0].MimeType != "application/octet-stream" {
t.Fatal("wrong mime type", entries[0].MimeType)
for _, entry := range entries {
if entry.MimeType != "application/octet-stream" {
t.Fatal("wrong mime type", entry.MimeType)
}
}

// fetch entries with "file" prefix
Expand Down
Loading
Loading