Skip to content

Commit

Permalink
Merge pull request #182 from SiaFoundation/nate/add-location-compund-…
Browse files Browse the repository at this point in the history
…index

Add compound index for empty location query
  • Loading branch information
n8maninger authored Oct 5, 2023
2 parents 6d189a2 + ca66e14 commit a04acb1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 15 deletions.
3 changes: 2 additions & 1 deletion persist/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ CREATE TABLE storage_volumes (
read_only BOOLEAN NOT NULL,
available BOOLEAN NOT NULL DEFAULT false
);
CREATE INDEX storage_volumes_read_only_available ON storage_volumes(read_only, available);
CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only);
CREATE INDEX storage_volumes_read_only_available_used_sectors ON storage_volumes(available, read_only, used_sectors);

CREATE TABLE volume_sectors (
Expand All @@ -57,6 +57,7 @@ CREATE TABLE volume_sectors (
sector_id INTEGER UNIQUE REFERENCES stored_sectors (id),
UNIQUE (volume_id, volume_index)
);
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id);
CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id);
CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC);
Expand Down
12 changes: 12 additions & 0 deletions persist/sqlite/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"go.sia.tech/hostd/host/contracts"
)

// migrateVersion19 adds a compound index to the volume_sectors table
func migrateVersion19(tx txn) error {
const query = `
DROP INDEX storage_volumes_read_only_available;
CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only);
CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL;
`
_, err := tx.Exec(query)
return err
}

// migrateVersion18 adds an index to the volume_sectors table to speed up
// empty sector selection.
func migrateVersion18(tx txn) error {
Expand Down Expand Up @@ -493,4 +504,5 @@ var migrations = []func(tx txn) error{
migrateVersion16,
migrateVersion17,
migrateVersion18,
migrateVersion19,
}
2 changes: 1 addition & 1 deletion persist/sqlite/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func
} else if err != nil {
return fmt.Errorf("failed to get sector id: %w", err)
}
location, err = sectorLocation(tx, sectorID)
location, err = sectorLocation(tx, sectorID, root)
if err != nil {
return fmt.Errorf("failed to get sector location: %w", err)
}
Expand Down
47 changes: 34 additions & 13 deletions persist/sqlite/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati
var exists bool

err := s.transaction(func(tx txn) error {
sectorID, err := sectorDBID(tx, root)
sectorID, err := insertSectorDBID(tx, root)
if err != nil {
return fmt.Errorf("failed to get sector id: %w", err)
}
Expand All @@ -217,8 +217,8 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati
return fmt.Errorf("failed to lock sector: %w", err)
}

// check if the sector already exists
location, err = sectorLocation(tx, sectorID)
// check if the sector is already stored on disk
location, err = sectorLocation(tx, sectorID, root)
exists = err == nil
if errors.Is(err, storage.ErrSectorNotFound) {
location, err = emptyLocation(tx)
Expand Down Expand Up @@ -418,34 +418,51 @@ func (s *Store) SetAvailable(volumeID int64, available bool) error {
return err
}

// sectorDBID returns the database ID of a sector.
// sectorDBID returns the ID of a sector root in the stored_sectors table.
func sectorDBID(tx txn, root types.Hash256) (id int64, err error) {
err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) ON CONFLICT (sector_root) DO UPDATE SET last_access_timestamp=EXCLUDED.last_access_timestamp RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id)
err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrSectorNotFound
}
return
}

// insertSectorDBID inserts a sector root into the stored_sectors table if it
// does not already exist. If the sector root already exists, the ID is
// returned.
func insertSectorDBID(tx txn, root types.Hash256) (id int64, err error) {
id, err = sectorDBID(tx, root)
if errors.Is(err, storage.ErrSectorNotFound) {
// insert the sector root
err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id)
return
}
return
}

// sectorLocation returns the location of a sector.
func sectorLocation(tx txn, sectorID int64) (loc storage.SectorLocation, err error) {
const query = `SELECT v.id, v.volume_id, v.volume_index, s.sector_root
FROM volume_sectors v
INNER JOIN stored_sectors s ON (s.id=v.sector_id)
func sectorLocation(tx txn, sectorID int64, root types.Hash256) (loc storage.SectorLocation, err error) {
const query = `SELECT v.id, v.volume_id, v.volume_index
FROM volume_sectors v
WHERE v.sector_id=$1`
err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root))
err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index)
if errors.Is(err, sql.ErrNoRows) {
return storage.SectorLocation{}, storage.ErrSectorNotFound
}
// note: this is roundabout, but it saves an extra join since all calls to
// sectorLocation are preceded by a call to sectorDBID
loc.Root = root
return
}

// emptyLocationInVolume returns an empty location in the given volume. If
// there is no space available, ErrNotEnoughStorage is returned.
func emptyLocationInVolume(tx txn, volumeID int64) (loc storage.SectorLocation, err error) {
const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs
const query = `SELECT vs.id, vs.volume_id, vs.volume_index
FROM volume_sectors vs INDEXED BY volume_sectors_volume_id_sector_id_volume_index_compound
LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id)
WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 LIMIT 1;`
WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1
LIMIT 1;`
err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index)
if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrNotEnoughStorage
Expand All @@ -456,8 +473,12 @@ WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1
// emptyLocation returns an empty location in a writable volume. If there is no
// space available, ErrNotEnoughStorage is returned.
func emptyLocation(tx txn) (storage.SectorLocation, error) {
const query = `SELECT id
FROM storage_volumes
WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0
ORDER BY used_sectors ASC LIMIT 1;`
var volumeID int64
err := tx.QueryRow(`SELECT id FROM storage_volumes WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 ORDER BY used_sectors ASC LIMIT 1;`).Scan(&volumeID)
err := tx.QueryRow(query).Scan(&volumeID)
if errors.Is(err, sql.ErrNoRows) {
return storage.SectorLocation{}, storage.ErrNotEnoughStorage
} else if err != nil {
Expand Down

0 comments on commit a04acb1

Please sign in to comment.