diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index 0d55dd6c..7cd147ec 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -47,7 +47,7 @@ CREATE TABLE storage_volumes ( read_only BOOLEAN NOT NULL, available BOOLEAN NOT NULL DEFAULT false ); -CREATE INDEX storage_volumes_read_only_available ON storage_volumes(read_only, available); +CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only); CREATE INDEX storage_volumes_read_only_available_used_sectors ON storage_volumes(available, read_only, used_sectors); CREATE TABLE volume_sectors ( @@ -57,6 +57,7 @@ CREATE TABLE volume_sectors ( sector_id INTEGER UNIQUE REFERENCES stored_sectors (id), UNIQUE (volume_id, volume_index) ); +CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL; CREATE INDEX volume_sectors_volume_id_sector_id ON volume_sectors(volume_id, sector_id); CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id); CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC); diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 62587e09..bea64182 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -9,6 +9,17 @@ import ( "go.sia.tech/hostd/host/contracts" ) +// migrateVersion19 adds a compound index to the volume_sectors table +func migrateVersion19(tx txn) error { + const query = ` +DROP INDEX storage_volumes_read_only_available; +CREATE INDEX storage_volumes_id_available_read_only ON storage_volumes(id, available, read_only); +CREATE INDEX volume_sectors_volume_id_sector_id_volume_index_compound ON volume_sectors(volume_id, sector_id, volume_index) WHERE sector_id IS NULL; +` + _, err := tx.Exec(query) + return err +} + // migrateVersion18 adds an index to the volume_sectors table to speed up // empty sector selection. func migrateVersion18(tx txn) error { @@ -493,4 +504,5 @@ var migrations = []func(tx txn) error{ migrateVersion16, migrateVersion17, migrateVersion18, + migrateVersion19, } diff --git a/persist/sqlite/sectors.go b/persist/sqlite/sectors.go index c2c069ca..e6092e4d 100644 --- a/persist/sqlite/sectors.go +++ b/persist/sqlite/sectors.go @@ -99,7 +99,7 @@ func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func } else if err != nil { return fmt.Errorf("failed to get sector id: %w", err) } - location, err = sectorLocation(tx, sectorID) + location, err = sectorLocation(tx, sectorID, root) if err != nil { return fmt.Errorf("failed to get sector location: %w", err) } diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index fecfc533..68e93201 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -206,7 +206,7 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati var exists bool err := s.transaction(func(tx txn) error { - sectorID, err := sectorDBID(tx, root) + sectorID, err := insertSectorDBID(tx, root) if err != nil { return fmt.Errorf("failed to get sector id: %w", err) } @@ -217,8 +217,8 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati return fmt.Errorf("failed to lock sector: %w", err) } - // check if the sector already exists - location, err = sectorLocation(tx, sectorID) + // check if the sector is already stored on disk + location, err = sectorLocation(tx, sectorID, root) exists = err == nil if errors.Is(err, storage.ErrSectorNotFound) { location, err = emptyLocation(tx) @@ -418,34 +418,51 @@ func (s *Store) SetAvailable(volumeID int64, available bool) error { return err } -// sectorDBID returns the database ID of a sector. +// sectorDBID returns the ID of a sector root in the stored_sectors table. func sectorDBID(tx txn, root types.Hash256) (id int64, err error) { - err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) ON CONFLICT (sector_root) DO UPDATE SET last_access_timestamp=EXCLUDED.last_access_timestamp RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id) + err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&id) if errors.Is(err, sql.ErrNoRows) { err = storage.ErrSectorNotFound } return } +// insertSectorDBID inserts a sector root into the stored_sectors table if it +// does not already exist. If the sector root already exists, the ID is +// returned. +func insertSectorDBID(tx txn, root types.Hash256) (id int64, err error) { + id, err = sectorDBID(tx, root) + if errors.Is(err, storage.ErrSectorNotFound) { + // insert the sector root + err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) RETURNING id`, sqlHash256(root), sqlTime(time.Now())).Scan(&id) + return + } + return +} + // sectorLocation returns the location of a sector. -func sectorLocation(tx txn, sectorID int64) (loc storage.SectorLocation, err error) { - const query = `SELECT v.id, v.volume_id, v.volume_index, s.sector_root -FROM volume_sectors v -INNER JOIN stored_sectors s ON (s.id=v.sector_id) +func sectorLocation(tx txn, sectorID int64, root types.Hash256) (loc storage.SectorLocation, err error) { + const query = `SELECT v.id, v.volume_id, v.volume_index +FROM volume_sectors v WHERE v.sector_id=$1` - err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index, (*sqlHash256)(&loc.Root)) + err = tx.QueryRow(query, sectorID).Scan(&loc.ID, &loc.Volume, &loc.Index) if errors.Is(err, sql.ErrNoRows) { return storage.SectorLocation{}, storage.ErrSectorNotFound } + // note: this is roundabout, but it saves an extra join since all calls to + // sectorLocation are preceded by a call to sectorDBID + loc.Root = root return } // emptyLocationInVolume returns an empty location in the given volume. If // there is no space available, ErrNotEnoughStorage is returned. func emptyLocationInVolume(tx txn, volumeID int64) (loc storage.SectorLocation, err error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index FROM volume_sectors vs + const query = `SELECT vs.id, vs.volume_id, vs.volume_index +FROM volume_sectors vs INDEXED BY volume_sectors_volume_id_sector_id_volume_index_compound LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) -WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 LIMIT 1;` +WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 +LIMIT 1;` err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) if errors.Is(err, sql.ErrNoRows) { err = storage.ErrNotEnoughStorage @@ -456,8 +473,12 @@ WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND vs.volume_id=$1 // emptyLocation returns an empty location in a writable volume. If there is no // space available, ErrNotEnoughStorage is returned. func emptyLocation(tx txn) (storage.SectorLocation, error) { + const query = `SELECT id +FROM storage_volumes +WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 +ORDER BY used_sectors ASC LIMIT 1;` var volumeID int64 - err := tx.QueryRow(`SELECT id FROM storage_volumes WHERE available=true AND read_only=false AND total_sectors-used_sectors > 0 ORDER BY used_sectors ASC LIMIT 1;`).Scan(&volumeID) + err := tx.QueryRow(query).Scan(&volumeID) if errors.Is(err, sql.ErrNoRows) { return storage.SectorLocation{}, storage.ErrNotEnoughStorage } else if err != nil {