From 573a25b9832dabca81fa7c483c07be0159ed4816 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 5 Aug 2024 15:54:44 +0200 Subject: [PATCH] stores: update MarkPackedSlabsUploaded --- api/slab.go | 2 +- object/slab.go | 13 ++++--- stores/metadata.go | 71 ++++------------------------------- stores/sql/database.go | 5 +++ stores/sql/main.go | 78 +++++++++++++++++++++++++++++++++++++++ stores/sql/mysql/main.go | 4 ++ stores/sql/sqlite/main.go | 4 ++ 7 files changed, 106 insertions(+), 71 deletions(-) diff --git a/api/slab.go b/api/slab.go index 1a5d3fc79..65d19788d 100644 --- a/api/slab.go +++ b/api/slab.go @@ -74,6 +74,6 @@ type ( } ) -func (s UploadedPackedSlab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} { +func (s UploadedPackedSlab) Contracts() []types.FileContractID { return object.ContractsFromShards(s.Shards) } diff --git a/object/slab.go b/object/slab.go index 770df9ef6..e52e7bd7b 100644 --- a/object/slab.go +++ b/object/slab.go @@ -53,15 +53,16 @@ func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab { // ContractsFromShards is a helper to extract all contracts used by a set of // shards. -func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} { - usedContracts := make(map[types.PublicKey]map[types.FileContractID]struct{}) +func ContractsFromShards(shards []Sector) []types.FileContractID { + var usedContracts []types.FileContractID + usedMap := make(map[types.FileContractID]struct{}) for _, shard := range shards { - for h, fcids := range shard.Contracts { + for _, fcids := range shard.Contracts { for _, fcid := range fcids { - if _, exists := usedContracts[h]; !exists { - usedContracts[h] = make(map[types.FileContractID]struct{}) + if _, exists := usedMap[fcid]; !exists { + usedContracts = append(usedContracts, fcid) } - usedContracts[h][fcid] = struct{}{} + usedMap[fcid] = struct{}{} } } } diff --git a/stores/metadata.go b/stores/metadata.go index 21dca732a..acb824e8a 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1226,14 +1226,15 @@ func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Uplo } } } - var fileName string - err := s.retryTransaction(ctx, func(tx *gorm.DB) error { - for _, slab := range slabs { - var err error - fileName, err = s.markPackedSlabUploaded(tx, slab) + var fileNames []string + err := s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { + fileNames = make([]string, len(slabs)) + for i, slab := range slabs { + fileName, err := tx.MarkPackedSlabUploaded(ctx, slab) if err != nil { return err } + fileNames[i] = fileName } return nil }) @@ -1242,68 +1243,10 @@ func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Uplo } // Delete buffer from disk. - s.slabBufferMgr.RemoveBuffers(fileName) + s.slabBufferMgr.RemoveBuffers(fileNames...) return nil } -func (s *SQLStore) markPackedSlabUploaded(tx *gorm.DB, slab api.UploadedPackedSlab) (string, error) { - // collect all used contracts - usedContracts := slab.Contracts() - contracts, err := fetchUsedContracts(tx, usedContracts) - if err != nil { - return "", err - } - - // find the slab - var sla dbSlab - if err := tx.Where("db_buffered_slab_id", slab.BufferID). - Take(&sla).Error; err != nil { - return "", err - } - - // update the slab - if err := tx.Model(&dbSlab{}). - Where("id", sla.ID). - Updates(map[string]interface{}{ - "db_buffered_slab_id": nil, - }).Error; err != nil { - return "", fmt.Errorf("failed to set buffered slab NULL: %w", err) - } - - // delete buffer - var fileName string - if err := tx.Raw("SELECT filename FROM buffered_slabs WHERE id = ?", slab.BufferID). - Scan(&fileName).Error; err != nil { - return "", err - } - if err := tx.Exec("DELETE FROM buffered_slabs WHERE id = ?", slab.BufferID).Error; err != nil { - return "", err - } - - // add the shards to the slab - var shards []dbSector - for i := range slab.Shards { - sector := dbSector{ - DBSlabID: sla.ID, - SlabIndex: i + 1, - LatestHost: publicKey(slab.Shards[i].LatestHost), - Root: slab.Shards[i].Root[:], - } - for _, fcids := range slab.Shards[i].Contracts { - for _, fcid := range fcids { - if c, ok := contracts[fcid]; ok { - sector.Contracts = append(sector.Contracts, c) - } - } - } - shards = append(shards, sector) - } - if err := tx.Create(&shards).Error; err != nil { - return "", fmt.Errorf("failed to create shards: %w", err) - } - return fileName, nil -} - func (s *SQLStore) pruneSlabsLoop() { for { select { diff --git a/stores/sql/database.go b/stores/sql/database.go index 5ab57c073..7e2b1aa4b 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -186,6 +186,11 @@ type ( // MakeDirsForPath creates all directories for a given object's path. MakeDirsForPath(ctx context.Context, path string) (int64, error) + // MarkPackedSlabUploaded marks the packed slab as uploaded in the + // database, causing the provided shards to be associated with the slab. + // The returned string contains the filename of the slab buffer on disk. + MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) + // MultipartUpload returns the multipart upload with the given ID or // api.ErrMultipartUploadNotFound if the upload doesn't exist. MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error) diff --git a/stores/sql/main.go b/stores/sql/main.go index 8b99b1f60..f37e30c59 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2690,3 +2690,81 @@ func ObjectsBySlabKey(ctx context.Context, tx Tx, bucket string, slabKey object. } return objects, nil } + +func MarkPackedSlabUploaded(ctx context.Context, tx Tx, slab api.UploadedPackedSlab) (string, error) { + // fetch relevant slab info + var slabID, bufferedSlabID int64 + var bufferFileName string + if err := tx.QueryRow(ctx, ` + SELECT sla.id, bs.id, bs.filename + FROM slabs sla + INNER JOIN buffered_slabs bs ON buffered_slabs.id = sla.db_buffered_slab_id + WHERE sla.db_buffered_slab_id = ? + `, slab.BufferID). + Scan(&slabID, &bufferedSlabID, &bufferFileName); err != nil { + return "", fmt.Errorf("failed to fetch slab id: %w", err) + } + + // set 'db_buffered_slab_id' to NULL + if _, err := tx.Exec(ctx, "UPDATE slabs SET db_buffered_slab_id = NULL WHERE id = ?", slabID); err != nil { + return "", fmt.Errorf("failed to update slab: %w", err) + } + + // delete buffer slab + if _, err := tx.Exec(ctx, "DELETE FROM buffered_slabs WHERE id = ?", bufferedSlabID); err != nil { + return "", fmt.Errorf("failed to delete buffered slab: %w", err) + } + + // stmt to add sector + sectorStmt, err := tx.Prepare(ctx, "INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?)") + if err != nil { + return "", fmt.Errorf("failed to prepare statement to insert sectors: %w", err) + } + defer sectorStmt.Close() + + // stmt to get contrat id from fcid + contractIDStmt, err := tx.Prepare(ctx, "SELECT id FROM contracts WHERE contracts.fcid = ?") + if err != nil { + return "", fmt.Errorf("failed to prepare statement to fetch contract id: %w", err) + } + defer contractIDStmt.Close() + + // stmt to insert contract_sector + contractSectorStmt, err := tx.Prepare(ctx, "INSERT INTO contract_sectors (db_contract_id, db_sector_id) VALUES (?, ?)") + if err != nil { + return "", fmt.Errorf("failed to prepare statement to insert contract sectors: %w", err) + } + defer contractSectorStmt.Close() + + // insert shards + for i := range slab.Shards { + // insert shard + res, err := sectorStmt.Exec(ctx, slabID, i+1, PublicKey(slab.Shards[i].LatestHost), slab.Shards[i].Root[:]) + if err != nil { + return "", fmt.Errorf("failed to insert sector: %w", err) + } + sectorID, err := res.LastInsertId() + if err != nil { + return "", fmt.Errorf("failed to get sector id: %w", err) + } + + // insert contracts for shard + for _, fcids := range slab.Shards[i].Contracts { + for _, fcid := range fcids { + // fetch contract id + var contractID int64 + err := contractIDStmt.QueryRow(ctx, FileContractID(fcid)).Scan(&contractID) + if errors.Is(err, dsql.ErrNoRows) { + continue + } else if err != nil { + return "", fmt.Errorf("failed to fetch contract id: %w", err) + } + // insert contract sector + if _, err := contractSectorStmt.Exec(ctx, contractID, sectorID); err != nil { + return "", fmt.Errorf("failed to insert contract sector: %w", err) + } + } + } + } + return bufferFileName, nil +} diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 6a883b8bf..928387e98 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -514,6 +514,10 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int return dirID, nil } +func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) { + return ssql.MarkPackedSlabUploaded(ctx, tx, slab) +} + func (tx *MainDatabaseTx) MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error) { return ssql.MultipartUpload(ctx, tx, uploadID) } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 32abf6460..c4de59e10 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -511,6 +511,10 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int return dirID, nil } +func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) { + return ssql.MarkPackedSlabUploaded(ctx, tx, slab) +} + func (tx *MainDatabaseTx) MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error) { return ssql.MultipartUpload(ctx, tx, uploadID) }