Skip to content

Commit

Permalink
stores: fix RenameDirectories
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Nov 12, 2024
1 parent 4b0f42a commit b6e6ba3
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 13 deletions.
69 changes: 69 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,75 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00026_key_prefix", log)
},
},
{
ID: "00027_recreate_directories",
Migrate: func(tx Tx) error {
// truncate the directories table
if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00027_recreate_directories_1", log); err != nil {
return fmt.Errorf("failed to migrate: %v", err)
}

// fetch all objects
type obj struct {
ID int64
Path string
Bucket string
}

rows, err := tx.Query(ctx, "SELECT o.id, o.object_id, b.name FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id")
if err != nil {
return fmt.Errorf("failed to fetch objects: %w", err)
}
defer rows.Close()

var objects []obj
for rows.Next() {
var o obj
if err := rows.Scan(&o.ID, &o.Path, &o.Bucket); err != nil {
return fmt.Errorf("failed to scan object: %w", err)
}
objects = append(objects, o)
}

// re-insert directories and collect object updates
memo := make(map[string]int64)
updates := make(map[int64]int64)
for _, o := range objects {
// build path directories
dirs := object.Directories(o.Path)
last := dirs[len(dirs)-1]
if _, ok := memo[last]; ok {
updates[o.ID] = memo[last]
continue
}

// insert directories
dirID, err := m.InsertDirectories(ctx, tx, o.Bucket, o.Path)
if err != nil {
return fmt.Errorf("failed to create directory %s in bucket %s: %w", o.Path, o.Bucket, err)
}
updates[o.ID] = dirID
memo[last] = dirID
}

// prepare an update statement
stmt, err := tx.Prepare(ctx, "UPDATE objects SET db_directory_id = ? WHERE id = ?")
if err != nil {
return fmt.Errorf("failed to prepare update statement: %w", err)
}
defer stmt.Close()

// update all objects
for id, dirID := range updates {
if _, err := stmt.Exec(ctx, dirID, id); err != nil {
return fmt.Errorf("failed to update object %d: %w", id, err)
}
}

// re-add the foreign key check (only for MySQL)
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00027_recreate_directories_2", log)
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
9 changes: 9 additions & 0 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,15 @@ func TestRenameObjectsRegression(t *testing.T) {
assertNumObjects("/video/", 2)
assertNumObjects("/video/thriller/", 2)
assertNumObjects("/", 4)

// assert we move a folder the parent id remains correct
if err := ss.RenameObjects(ctx, testBucket, "/video/thriller/", "/thriller/", true); err != nil {
t.Fatal(err)
}

assertNumObjects("/", 5)
assertNumObjects("/thriller/", 2)
assertNumObjects("/video/", 1)
}

// TestObjectsStats is a unit test for ObjectsStats.
Expand Down
45 changes: 34 additions & 11 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type (

CharLengthExpr() string

InsertDirectories(ctx context.Context, bucket, path string) (int64, error)

// ScanObjectMetadata scans the object metadata from the given scanner.
// The columns required to scan the metadata are returned by the
// SelectObjectMetadataExpr helper method. Additional fields can be
Expand Down Expand Up @@ -1803,7 +1805,7 @@ WHERE h.recent_downtime >= ? AND h.recent_scan_failures >= ?`, DurationMS(maxDow

// RenameDirectories renames all directories in the database with the given
// prefix to the new prefix.
func RenameDirectories(ctx context.Context, tx sql.Tx, bucket, prefixOld, prefixNew string) (int64, error) {
func RenameDirectories(ctx context.Context, tx Tx, bucket, prefixOld, prefixNew string) (int64, error) {
// sanity check input
if !strings.HasPrefix(prefixNew, "/") {
return 0, errors.New("paths has to have a leading slash")
Expand All @@ -1824,17 +1826,17 @@ func RenameDirectories(ctx context.Context, tx sql.Tx, bucket, prefixOld, prefix
}
defer insertStmt.Close()

updateNameStmt, err := tx.Prepare(ctx, "UPDATE directories SET name = ? WHERE id = ?")
updateExistingStmt, err := tx.Prepare(ctx, "UPDATE directories SET name = ?, db_parent_id = ? WHERE id = ?")
if err != nil {
return 0, err
}
defer updateNameStmt.Close()
defer updateExistingStmt.Close()

updateParentStmt, err := tx.Prepare(ctx, "UPDATE directories SET db_parent_id = ? WHERE db_parent_id = ?")
updateChildrenStmt, err := tx.Prepare(ctx, "UPDATE directories SET db_parent_id = ? WHERE db_parent_id = ?")
if err != nil {
return 0, err
}
defer updateParentStmt.Close()
defer updateChildrenStmt.Close()

// fetch bucket id
var bucketID int64
Expand All @@ -1846,32 +1848,53 @@ func RenameDirectories(ctx context.Context, tx sql.Tx, bucket, prefixOld, prefix
}

// fetch destination directories
directories := make(map[int64]string)
type directory struct {
id int64
name string
}
rows, err := tx.Query(ctx, "SELECT id, name FROM directories WHERE name LIKE ? AND db_bucket_id = ? ORDER BY LENGTH(name) - LENGTH(REPLACE(name, '/', '')) ASC", prefixOld+"%", bucketID)
if err != nil {
return 0, err
}
defer rows.Close()

var directories []directory
for rows.Next() {
var id int64
var name string
if err := rows.Scan(&id, &name); err != nil {
return 0, err
}
directories[id] = strings.Replace(name, prefixOld, prefixNew, 1)
directories = append(directories, directory{
id: id,
name: strings.Replace(name, prefixOld, prefixNew, 1),
})
}

// update existing directories
for id, name := range directories {
for _, d := range directories {
var existingID int64
if err := queryDirStmt.QueryRow(ctx, name, bucketID).Scan(&existingID); err != nil && !errors.Is(err, dsql.ErrNoRows) {
if err := queryDirStmt.QueryRow(ctx, d.name, bucketID).Scan(&existingID); err != nil && !errors.Is(err, dsql.ErrNoRows) {
return 0, err
} else if existingID > 0 {
if _, err := updateParentStmt.Exec(ctx, existingID, id); err != nil {
// case 1: the target directory already exists, in this case we want
// to update all of the directories that were pointing to the
// directory we're renaming, to point to the existing target
// directory
if _, err := updateChildrenStmt.Exec(ctx, existingID, d.id); err != nil {
return 0, err
}
} else {
if _, err := updateNameStmt.Exec(ctx, name, id); err != nil {
// case 2: the target directory does not exist, but we do have a
// directory we want to rename, in this case we want to update the
// name of the directory but also ensure its pointing to the correct
// parent, e.g. we might be moving a folder up or downwards in the
// directory tree structure
parentID, err := tx.InsertDirectories(ctx, bucket, d.name)
if err != nil {
return 0, err
}
if _, err := updateExistingStmt.Exec(ctx, d.name, parentID, d.id); err != nil {
return 0, err
}
}
Expand Down
12 changes: 11 additions & 1 deletion stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,15 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
}
}

// delete directories that match the old prefix
_, err := tx.Exec(ctx, `
DELETE
FROM objects
WHERE db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND object_id = ? AND size = 0`, bucket, prefixOld)
if err != nil {
return err
}

// update objects where bucket matches, where the object_id is prefixed by
// the old prefix (case sensitive) and it doesn't exactly match the new
// prefix, we update the object_id at all times but only update directory_id
Expand All @@ -864,7 +873,8 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
WHERE object_id LIKE ?
AND SUBSTR(object_id, 1, ?) = ?
AND object_id != ?
AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`
AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)
AND size != 0`

args := []any{
prefixNew, utf8.RuneCountInString(prefixOld) + 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE objects DROP FOREIGN KEY fk_objects_db_directory_id;
TRUNCATE TABLE directories;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE objects
ADD CONSTRAINT fk_objects_db_directory_id
FOREIGN KEY (db_directory_id) REFERENCES directories(id);
12 changes: 11 additions & 1 deletion stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,15 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
}
}

// delete directories that match the old prefix
_, err := tx.Exec(ctx, `
DELETE
FROM objects
WHERE db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND object_id = ? AND size = 0`, bucket, prefixOld)
if err != nil {
return err
}

// update objects where bucket matches, where the object_id is prefixed by
// the old prefix (case sensitive) and it doesn't exactly match the new
// prefix, we update the object_id at all times but only update directory_id
Expand All @@ -873,7 +882,8 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld,
WHERE object_id LIKE ?
AND SUBSTR(object_id, 1, ?) = ?
AND object_id != ?
AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`
AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)
AND size != 0`

args := []any{
prefixNew, utf8.RuneCountInString(prefixOld) + 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM directories;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- nothing to do

0 comments on commit b6e6ba3

Please sign in to comment.