Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compound index for empty location query #182

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion persist/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ CREATE TABLE storage_volumes (
read_only BOOLEAN NOT NULL,
available BOOLEAN NOT NULL DEFAULT false
);
CREATE INDEX storage_volumes_read_only_available ON storage_volumes(read_only, available);
CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only);
CREATE INDEX storage_volumes_read_only_available_used_sectors ON storage_volumes(available, read_only, used_sectors);

CREATE TABLE volume_sectors (
Expand All @@ -57,6 +57,7 @@ CREATE TABLE volume_sectors (
sector_id INTEGER UNIQUE REFERENCES stored_sectors (id),
UNIQUE (volume_id, volume_index)
);
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);
CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id);
CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC);
Expand Down
12 changes: 12 additions & 0 deletions persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"go.sia.tech/hostd/host/contracts"
)

// migrateVersion19 adds a compound index to the volume_sectors table
func migrateVersion19(tx txn) error {
const query = `
DROP INDEX storage_volumes_read_only_available;
CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only);
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
`
_, err := tx.Exec(query)
return err
}

// migrateVersion18 adds an index to the volume_sectors table to speed up
// empty sector selection.
func migrateVersion18(tx txn) error {
Expand Down Expand Up @@ -493,4 +504,5 @@ var migrations = []func(tx txn) error{
migrateVersion16,
migrateVersion17,
migrateVersion18,
migrateVersion19,
}
2 changes: 1 addition & 1 deletion persist/sqlite/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func
} else if err != nil {
return fmt.Errorf("failed to get sector id: %w", err)
}
location, err = sectorLocation(tx, sectorID)
location, err = sectorLocation(tx, sectorID, root)
if err != nil {
return fmt.Errorf("failed to get sector location: %w", err)
}
Expand Down
47 changes: 34 additions & 13 deletions persist/sqlite/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati
var exists bool

err := s.transaction(func(tx txn) error {
sectorID, err := sectorDBID(tx, root)
sectorID, err := insertSectorDBID(tx, root)
if err != nil {
return fmt.Errorf("failed to get sector id: %w", err)
}
Expand All @@ -217,8 +217,8 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati
return fmt.Errorf("failed to lock sector: %w", err)
}

// check if the sector already exists
location, err = sectorLocation(tx, sectorID)
// check if the sector is already stored on disk
location, err = sectorLocation(tx, sectorID, root)
exists = err == nil
if errors.Is(err, storage.ErrSectorNotFound) {
location, err = emptyLocation(tx)
Expand Down Expand Up @@ -418,34 +418,51 @@ func (s *Store) SetAvailable(volumeID int64, available bool) error {
return err
}

// sectorDBID returns the database ID of a sector.
// sectorDBID returns the ID of a sector root in the stored_sectors table.
func sectorDBID(tx txn, root types.Hash256) (id int64, err error) {
err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) ON CONFLICT (sector_root) DO UPDATE SET last_access_timestamp=EXCLUDED.last_access_timestamp RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id)
err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrSectorNotFound
}
return
}

// insertSectorDBID inserts a sector root into the stored_sectors table if it
// does not already exist. If the sector root already exists, the ID is
// returned.
func insertSectorDBID(tx txn, root types.Hash256) (id int64, err error) {
id, err = sectorDBID(tx, root)
if errors.Is(err, storage.ErrSectorNotFound) {
// insert the sector root
err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id)
return
}
return
}

// sectorLocation returns the location of a sector.
func sectorLocation(tx txn, sectorID int64) (loc storage.SectorLocation, err error) {
const query = `SELECT v.id, v.volume_id, v.volume_index, s.sector_root
FROM volume_sectors v
INNER JOIN stored_sectors s ON (s.id=v.sector_id)
func sectorLocation(tx txn, sectorID int64, root types.Hash256) (loc storage.SectorLocation, err error) {
const query = `SELECT v.id, v.volume_id, v.volume_index
FROM volume_sectors v
WHERE v.sector_id=$1`
err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root))
err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index)
if errors.Is(err, sql.ErrNoRows) {
return storage.SectorLocation{}, storage.ErrSectorNotFound
}
// note: this is roundabout, but it saves an extra join since all calls to
// sectorLocation are preceded by a call to sectorDBID
loc.Root = root
return
}

// emptyLocationInVolume returns an empty location in the given volume. If
// there is no space available, ErrNotEnoughStorage is returned.
func emptyLocationInVolume(tx txn, volumeID int64) (loc storage.SectorLocation, err error) {
const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs
const query = `SELECT vs.id, vs.volume_id, vs.volume_index
FROM volume_sectors vs INDEXED BY volume_sectors_volume_id_sector_id_volume_index_compound
LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id)
WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 LIMIT 1;`
WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1
LIMIT 1;`
err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index)
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrNotEnoughStorage
Expand All @@ -456,8 +473,12 @@ WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1
// emptyLocation returns an empty location in a writable volume. If there is no
// space available, ErrNotEnoughStorage is returned.
func emptyLocation(tx txn) (storage.SectorLocation, error) {
const query = `SELECT id
FROM storage_volumes
WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0
ORDER BY used_sectors ASC LIMIT 1;`
var volumeID int64
err := tx.QueryRow(`SELECT id FROM storage_volumes WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 ORDER BY used_sectors ASC LIMIT 1;`).Scan(&volumeID)
err := tx.QueryRow(query).Scan(&volumeID)
if errors.Is(err, sql.ErrNoRows) {
return storage.SectorLocation{}, storage.ErrNotEnoughStorage
} else if err != nil {
Expand Down