Skip to content

Commit

Permalink
stores: update MarkPackedSlabsUploaded
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Aug 5, 2024
1 parent 2bb2190 commit 573a25b
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 71 deletions.
2 changes: 1 addition & 1 deletion api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ type (
}
)

func (s UploadedPackedSlab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} {
func (s UploadedPackedSlab) Contracts() []types.FileContractID {
return object.ContractsFromShards(s.Shards)
}
13 changes: 7 additions & 6 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab {

// ContractsFromShards is a helper to extract all contracts used by a set of
// shards.
func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} {
usedContracts := make(map[types.PublicKey]map[types.FileContractID]struct{})
func ContractsFromShards(shards []Sector) []types.FileContractID {
var usedContracts []types.FileContractID
usedMap := make(map[types.FileContractID]struct{})
for _, shard := range shards {
for h, fcids := range shard.Contracts {
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, exists := usedContracts[h]; !exists {
usedContracts[h] = make(map[types.FileContractID]struct{})
if _, exists := usedMap[fcid]; !exists {
usedContracts = append(usedContracts, fcid)
}
usedContracts[h][fcid] = struct{}{}
usedMap[fcid] = struct{}{}
}
}
}
Expand Down
71 changes: 7 additions & 64 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,14 +1226,15 @@ func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Uplo
}
}
}
var fileName string
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
for _, slab := range slabs {
var err error
fileName, err = s.markPackedSlabUploaded(tx, slab)
var fileNames []string
err := s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
fileNames = make([]string, len(slabs))
for i, slab := range slabs {
fileName, err := tx.MarkPackedSlabUploaded(ctx, slab)
if err != nil {
return err
}
fileNames[i] = fileName
}
return nil
})
Expand All @@ -1242,68 +1243,10 @@ func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Uplo
}

// Delete buffer from disk.
s.slabBufferMgr.RemoveBuffers(fileName)
s.slabBufferMgr.RemoveBuffers(fileNames...)
return nil
}

func (s *SQLStore) markPackedSlabUploaded(tx *gorm.DB, slab api.UploadedPackedSlab) (string, error) {
// collect all used contracts
usedContracts := slab.Contracts()
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
return "", err
}

// find the slab
var sla dbSlab
if err := tx.Where("db_buffered_slab_id", slab.BufferID).
Take(&sla).Error; err != nil {
return "", err
}

// update the slab
if err := tx.Model(&dbSlab{}).
Where("id", sla.ID).
Updates(map[string]interface{}{
"db_buffered_slab_id": nil,
}).Error; err != nil {
return "", fmt.Errorf("failed to set buffered slab NULL: %w", err)
}

// delete buffer
var fileName string
if err := tx.Raw("SELECT filename FROM buffered_slabs WHERE id = ?", slab.BufferID).
Scan(&fileName).Error; err != nil {
return "", err
}
if err := tx.Exec("DELETE FROM buffered_slabs WHERE id = ?", slab.BufferID).Error; err != nil {
return "", err
}

// add the shards to the slab
var shards []dbSector
for i := range slab.Shards {
sector := dbSector{
DBSlabID: sla.ID,
SlabIndex: i + 1,
LatestHost: publicKey(slab.Shards[i].LatestHost),
Root: slab.Shards[i].Root[:],
}
for _, fcids := range slab.Shards[i].Contracts {
for _, fcid := range fcids {
if c, ok := contracts[fcid]; ok {
sector.Contracts = append(sector.Contracts, c)
}
}
}
shards = append(shards, sector)
}
if err := tx.Create(&shards).Error; err != nil {
return "", fmt.Errorf("failed to create shards: %w", err)
}
return fileName, nil
}

func (s *SQLStore) pruneSlabsLoop() {
for {
select {
Expand Down
5 changes: 5 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ type (
// MakeDirsForPath creates all directories for a given object's path.
MakeDirsForPath(ctx context.Context, path string) (int64, error)

// MarkPackedSlabUploaded marks the packed slab as uploaded in the
// database, causing the provided shards to be associated with the slab.
// The returned string contains the filename of the slab buffer on disk.
MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error)

// MultipartUpload returns the multipart upload with the given ID or
// api.ErrMultipartUploadNotFound if the upload doesn't exist.
MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error)
Expand Down
78 changes: 78 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2690,3 +2690,81 @@ func ObjectsBySlabKey(ctx context.Context, tx Tx, bucket string, slabKey object.
}
return objects, nil
}

func MarkPackedSlabUploaded(ctx context.Context, tx Tx, slab api.UploadedPackedSlab) (string, error) {
// fetch relevant slab info
var slabID, bufferedSlabID int64
var bufferFileName string
if err := tx.QueryRow(ctx, `
SELECT sla.id, bs.id, bs.filename
FROM slabs sla
INNER JOIN buffered_slabs bs ON buffered_slabs.id = sla.db_buffered_slab_id
WHERE sla.db_buffered_slab_id = ?
`, slab.BufferID).
Scan(&slabID, &bufferedSlabID, &bufferFileName); err != nil {
return "", fmt.Errorf("failed to fetch slab id: %w", err)
}

// set 'db_buffered_slab_id' to NULL
if _, err := tx.Exec(ctx, "UPDATE slabs SET db_buffered_slab_id = NULL WHERE id = ?", slabID); err != nil {
return "", fmt.Errorf("failed to update slab: %w", err)
}

// delete buffer slab
if _, err := tx.Exec(ctx, "DELETE FROM buffered_slabs WHERE id = ?", bufferedSlabID); err != nil {
return "", fmt.Errorf("failed to delete buffered slab: %w", err)
}

// stmt to add sector
sectorStmt, err := tx.Prepare(ctx, "INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?)")
if err != nil {
return "", fmt.Errorf("failed to prepare statement to insert sectors: %w", err)
}
defer sectorStmt.Close()

// stmt to get contrat id from fcid
contractIDStmt, err := tx.Prepare(ctx, "SELECT id FROM contracts WHERE contracts.fcid = ?")
if err != nil {
return "", fmt.Errorf("failed to prepare statement to fetch contract id: %w", err)
}
defer contractIDStmt.Close()

// stmt to insert contract_sector
contractSectorStmt, err := tx.Prepare(ctx, "INSERT INTO contract_sectors (db_contract_id, db_sector_id) VALUES (?, ?)")
if err != nil {
return "", fmt.Errorf("failed to prepare statement to insert contract sectors: %w", err)
}
defer contractSectorStmt.Close()

// insert shards
for i := range slab.Shards {
// insert shard
res, err := sectorStmt.Exec(ctx, slabID, i+1, PublicKey(slab.Shards[i].LatestHost), slab.Shards[i].Root[:])
if err != nil {
return "", fmt.Errorf("failed to insert sector: %w", err)
}
sectorID, err := res.LastInsertId()
if err != nil {
return "", fmt.Errorf("failed to get sector id: %w", err)
}

// insert contracts for shard
for _, fcids := range slab.Shards[i].Contracts {
for _, fcid := range fcids {
// fetch contract id
var contractID int64
err := contractIDStmt.QueryRow(ctx, FileContractID(fcid)).Scan(&contractID)
if errors.Is(err, dsql.ErrNoRows) {
continue
} else if err != nil {
return "", fmt.Errorf("failed to fetch contract id: %w", err)
}
// insert contract sector
if _, err := contractSectorStmt.Exec(ctx, contractID, sectorID); err != nil {
return "", fmt.Errorf("failed to insert contract sector: %w", err)
}
}
}
}
return bufferFileName, nil
}
4 changes: 4 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int
return dirID, nil
}

func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) {
return ssql.MarkPackedSlabUploaded(ctx, tx, slab)
}

func (tx *MainDatabaseTx) MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error) {
return ssql.MultipartUpload(ctx, tx, uploadID)
}
Expand Down
4 changes: 4 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,10 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int
return dirID, nil
}

func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) {
return ssql.MarkPackedSlabUploaded(ctx, tx, slab)
}

func (tx *MainDatabaseTx) MultipartUpload(ctx context.Context, uploadID string) (api.MultipartUpload, error) {
return ssql.MultipartUpload(ctx, tx, uploadID)
}
Expand Down

0 comments on commit 573a25b

Please sign in to comment.