diff --git a/stores/metadata.go b/stores/metadata.go index 2fa897ae7..db5d771e2 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -544,21 +544,16 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) } func (s *SQLStore) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) { - // Slab buffer info from the database. - var bufferedSlabs []dbBufferedSlab - err := s.db.Model(&dbBufferedSlab{}). - Joins("DBSlab"). - Joins("DBSlab.DBContractSet"). - Find(&bufferedSlabs). - Error + var err error + var fileNameToContractSet map[string]string + err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + fileNameToContractSet, err = tx.SlabBuffers(ctx) + return err + }) if err != nil { - return nil, err - } - // Translate buffers to contract set. - fileNameToContractSet := make(map[string]string) - for _, slab := range bufferedSlabs { - fileNameToContractSet[slab.Filename] = slab.DBSlab.DBContractSet.Name + return nil, fmt.Errorf("failed to fetch slab buffers: %w", err) } + // Fetch in-memory buffer info and fill in contract set name. buffers := s.slabBufferMgr.SlabBuffers() for i := range buffers { @@ -1869,14 +1864,12 @@ func (s *SQLStore) markPackedSlabUploaded(tx *gorm.DB, slab api.UploadedPackedSl } // delete buffer - var buffer dbBufferedSlab - if err := tx.Take(&buffer, "id = ?", slab.BufferID).Error; err != nil { + var fileName string + if err := tx.Raw("SELECT filename FROM buffered_slabs WHERE id = ?", slab.BufferID). + Scan(&fileName).Error; err != nil { return "", err } - fileName := buffer.Filename - err = tx.Delete(&buffer). - Error - if err != nil { + if err := tx.Exec("DELETE FROM buffered_slabs WHERE id = ?", slab.BufferID).Error; err != nil { return "", err } diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index 87bd024e5..5afca267e 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -16,7 +16,7 @@ import ( "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" - "gorm.io/gorm" + sql "go.sia.tech/renterd/stores/sql" "lukechampine.com/frand" ) @@ -204,8 +204,8 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m // If there is still data left, create a new buffer. if len(data) > 0 { var sb *SlabBuffer - err = mgr.s.retryTransaction(ctx, func(tx *gorm.DB) error { - sb, err = createSlabBuffer(tx, contractSet, mgr.dir, minShards, totalShards) + err = mgr.s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + sb, err = createSlabBuffer(ctx, tx, contractSet, mgr.dir, minShards, totalShards) return err }) if err != nil { @@ -472,31 +472,21 @@ func bufferedSlabSize(minShards uint8) int { return int(rhpv2.SectorSize) * int(minShards) } -func createSlabBuffer(tx *gorm.DB, contractSetID uint, dir string, minShards, totalShards uint8) (*SlabBuffer, error) { - ec := object.GenerateEncryptionKey() - key, err := ec.MarshalBinary() - if err != nil { - return nil, err - } +func createSlabBuffer(ctx context.Context, tx sql.DatabaseTx, contractSetID uint, dir string, minShards, totalShards uint8) (*SlabBuffer, error) { // Create a new buffer and slab. fileName := bufferFilename(contractSetID, minShards, totalShards) file, err := os.Create(filepath.Join(dir, fileName)) if err != nil { return nil, err } - createdSlab := dbBufferedSlab{ - DBSlab: dbSlab{ - DBContractSetID: contractSetID, - Key: key, - MinShards: minShards, - TotalShards: totalShards, - }, - Filename: fileName, + + ec := object.GenerateEncryptionKey() + bufferedSlabID, err := tx.InsertBufferedSlab(ctx, fileName, int64(contractSetID), ec, minShards, totalShards) + if err != nil { + return nil, fmt.Errorf("failed to insert buffered slab: %w", err) } - err = tx.Create(&createdSlab). - Error return &SlabBuffer{ - dbID: createdSlab.ID, + dbID: uint(bufferedSlabID), filename: fileName, slabKey: ec, maxSize: int64(bufferedSlabSize(minShards)), diff --git a/stores/sql/database.go b/stores/sql/database.go index 18a870745..a54937664 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -81,6 +81,12 @@ type ( // contains the root, latest_host is updated to that host. DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) + // InsertBufferedSlab inserts a buffered slab into the database. This + // includes the creation of a buffered slab as well as the corresponding + // regular slab it is linked to. It returns the ID of the buffered slab + // that was created. + InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) + // InsertMultipartUpload creates a new multipart upload and returns a // unique upload ID. InsertMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) @@ -190,6 +196,10 @@ type ( // 'false' and also marks them as requiring a resync. SetUncleanShutdown(ctx context.Context) error + // SlabBuffers returns the filenames and associated contract sets of all + // slab buffers. + SlabBuffers(ctx context.Context) (map[string]string, error) + // UpdateAutopilot updates the autopilot with the provided one or // creates a new one if it doesn't exist yet. UpdateAutopilot(ctx context.Context, ap api.Autopilot) error diff --git a/stores/sql/main.go b/stores/sql/main.go index 9d8175500..ee9641767 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -437,6 +437,32 @@ func HostsForScanning(ctx context.Context, tx sql.Tx, maxLastScan time.Time, off return hosts, nil } +func InsertBufferedSlab(ctx context.Context, tx sql.Tx, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) { + // insert buffered slab + res, err := tx.Exec(ctx, `INSERT INTO buffered_slabs (created_at, filename) VALUES (?, ?)`, + time.Now(), fileName) + if err != nil { + return 0, fmt.Errorf("failed to insert buffered slab: %w", err) + } + bufferedSlabID, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("failed to fetch buffered slab id: %w", err) + } + + key, err := ec.MarshalBinary() + if err != nil { + return 0, err + } + _, err = tx.Exec(ctx, ` + INSERT INTO slabs (created_at, db_contract_set_id, db_buffered_slab_id, `+"`key`"+`, min_shards, total_shards) + VALUES (?, ?, ?, ?, ?, ?)`, + time.Now(), contractSetID, bufferedSlabID, SecretKey(key), minShards, totalShards) + if err != nil { + return 0, fmt.Errorf("failed to insert slab: %w", err) + } + return bufferedSlabID, nil +} + func InsertMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { // fetch bucket id var bucketID int64 @@ -1327,6 +1353,30 @@ func SetUncleanShutdown(ctx context.Context, tx sql.Tx) error { return err } +func SlabBuffers(ctx context.Context, tx sql.Tx) (map[string]string, error) { + rows, err := tx.Query(ctx, ` + SELECT buffered_slabs.filename, cs.name + FROM buffered_slabs + INNER JOIN slabs sla ON sla.db_buffered_slab_id = buffered_slabs.id + INNER JOIN contract_sets cs ON cs.id = sla.db_contract_set_id + `) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract sets") + } + defer rows.Close() + + fileNameToContractSet := make(map[string]string) + for rows.Next() { + var fileName string + var contractSetName string + if err := rows.Scan(&fileName, &contractSetName); err != nil { + return nil, fmt.Errorf("failed to scan contract set: %w", err) + } + fileNameToContractSet[fileName] = contractSetName + } + return fileNameToContractSet, nil +} + func UpdateBucketPolicy(ctx context.Context, tx sql.Tx, bucket string, bp api.BucketPolicy) error { policy, err := json.Marshal(bp) if err != nil { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 3f46a8cb8..91e0f52d4 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -246,6 +246,10 @@ func (tx *MainDatabaseTx) DeleteHostSector(ctx context.Context, hk types.PublicK return ssql.DeleteHostSector(ctx, tx, hk, root) } +func (tx *MainDatabaseTx) InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) { + return ssql.InsertBufferedSlab(ctx, tx, fileName, contractSetID, ec, minShards, totalShards) +} + func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata) } @@ -597,6 +601,10 @@ func (tx *MainDatabaseTx) SetUncleanShutdown(ctx context.Context) error { return ssql.SetUncleanShutdown(ctx, tx) } +func (tx *MainDatabaseTx) SlabBuffers(ctx context.Context) (map[string]string, error) { + return ssql.SlabBuffers(ctx, tx) +} + func (tx *MainDatabaseTx) UpdateAutopilot(ctx context.Context, ap api.Autopilot) error { res, err := tx.Exec(ctx, ` INSERT INTO autopilots (created_at, identifier, config, current_period) diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 673b989c9..48caa57ed 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -250,6 +250,10 @@ func (tx *MainDatabaseTx) DeleteHostSector(ctx context.Context, hk types.PublicK return ssql.DeleteHostSector(ctx, tx, hk, root) } +func (tx *MainDatabaseTx) InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) { + return ssql.InsertBufferedSlab(ctx, tx, fileName, contractSetID, ec, minShards, totalShards) +} + func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata) } @@ -595,6 +599,10 @@ func (tx *MainDatabaseTx) SetUncleanShutdown(ctx context.Context) error { return ssql.SetUncleanShutdown(ctx, tx) } +func (tx *MainDatabaseTx) SlabBuffers(ctx context.Context) (map[string]string, error) { + return ssql.SlabBuffers(ctx, tx) +} + func (tx *MainDatabaseTx) UpdateAutopilot(ctx context.Context, ap api.Autopilot) error { res, err := tx.Exec(ctx, ` INSERT INTO autopilots (created_at, identifier, config, current_period)