Skip to content

Commit

Permalink
Merge branch 'dev' into chris/network-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Aug 9, 2024
2 parents 186a895 + 050aff6 commit 6847e21
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 411 deletions.
7 changes: 2 additions & 5 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,8 @@ func (s *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time,
}

func (s *SQLStore) ResetLostSectors(ctx context.Context, hk types.PublicKey) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
return tx.Model(&dbHost{}).
Where("public_key", publicKey(hk)).
Update("lost_sectors", 0).
Error
return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.ResetLostSectors(ctx, hk)
})
}

Expand Down
185 changes: 2 additions & 183 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,63 +342,6 @@ func (c dbContract) convert() api.ContractMetadata {
}
}

func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) {
if len(raw) == 0 {
return object.SlabSlice{}, errors.New("no sectors found")
} else if raw[0].SlabBuffered && len(raw) != 1 {
return object.SlabSlice{}, errors.New("buffered slab with multiple sectors")
}

// unmarshal key
if err := slice.Slab.Key.UnmarshalBinary(raw[0].SlabKey); err != nil {
return object.SlabSlice{}, err
}

// handle partial slab
if raw[0].SlabBuffered {
slice.Offset = raw[0].SliceOffset
slice.Length = raw[0].SliceLength
slice.Slab.MinShards = raw[0].SlabMinShards
slice.Slab.Health = raw[0].SlabHealth
return
}

// hydrate all sectors
slabID := raw[0].SlabID
sectors := make([]object.Sector, 0, len(raw))
secIdx := uint(0)
for _, sector := range raw {
if sector.SlabID != slabID {
return object.SlabSlice{}, errors.New("sectors from different slabs") // developer error
}
latestHost := types.PublicKey(sector.LatestHost)
fcid := types.FileContractID(sector.FCID)

// next sector
if sector.SectorIndex != secIdx {
sectors = append(sectors, object.Sector{
Contracts: make(map[types.PublicKey][]types.FileContractID),
LatestHost: latestHost,
Root: *(*types.Hash256)(sector.SectorRoot),
})
secIdx = sector.SectorIndex
}

// add host+contract to sector
if fcid != (types.FileContractID{}) {
sectors[len(sectors)-1].Contracts[types.PublicKey(sector.HostKey)] = append(sectors[len(sectors)-1].Contracts[types.PublicKey(sector.HostKey)], fcid)
}
}

// hydrate all fields
slice.Slab.Health = raw[0].SlabHealth
slice.Slab.Shards = sectors
slice.Slab.MinShards = raw[0].SlabMinShards
slice.Offset = raw[0].SliceOffset
slice.Length = raw[0].SliceLength
return slice, nil
}

func (s *SQLStore) Bucket(ctx context.Context, bucket string) (b api.Bucket, err error) {
err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
b, err = tx.Bucket(ctx, bucket)
Expand Down Expand Up @@ -681,8 +624,8 @@ func (s *SQLStore) ObjectEntries(ctx context.Context, bucket, path, prefix, sort
}

func (s *SQLStore) Object(ctx context.Context, bucket, path string) (obj api.Object, err error) {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
obj, err = s.object(tx, bucket, path)
err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
obj, err = tx.Object(ctx, bucket, path)
return err
})
return
Expand Down Expand Up @@ -1031,99 +974,6 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set
return
}

// object retrieves an object from the store.
func (s *SQLStore) object(tx *gorm.DB, bucket, path string) (api.Object, error) {
// fetch raw object data
raw, err := s.objectRaw(tx, bucket, path)
if errors.Is(err, gorm.ErrRecordNotFound) || (err == nil && len(raw) == 0) {
return api.Object{}, api.ErrObjectNotFound
} else if err != nil {
return api.Object{}, err
}

// hydrate raw object data
return s.objectHydrate(tx, bucket, path, raw)
}

// objectHydrate hydrates a raw object and returns an api.Object.
func (s *SQLStore) objectHydrate(tx *gorm.DB, bucket, path string, obj rawObject) (api.Object, error) {
// parse object key
var key object.EncryptionKey
if err := key.UnmarshalBinary(obj[0].ObjectKey); err != nil {
return api.Object{}, err
}

// filter out slabs without slab ID and buffered slabs - this is expected
// for an empty object or objects that end with a partial slab.
var filtered rawObject
minHealth := math.MaxFloat64
for _, sector := range obj {
if sector.SlabID != 0 {
filtered = append(filtered, sector)
if sector.SlabHealth < minHealth {
minHealth = sector.SlabHealth
}
}
}

// hydrate all slabs
slabs := make([]object.SlabSlice, 0, len(filtered))
if len(filtered) > 0 {
var start int
// create a helper function to add a slab and update the state
addSlab := func(end int) error {
if slab, err := filtered[start:end].toSlabSlice(); err != nil {
return err
} else {
slabs = append(slabs, slab)
start = end
}
return nil
}

curr := filtered[0]
for j, sector := range filtered {
if sector.ObjectIndex == 0 {
return api.Object{}, api.ErrObjectCorrupted
} else if sector.SectorIndex == 0 && !sector.SlabBuffered {
return api.Object{}, api.ErrObjectCorrupted
}
if sector.ObjectIndex != curr.ObjectIndex {
if err := addSlab(j); err != nil {
return api.Object{}, err
}
curr = sector
}
}
if err := addSlab(len(filtered)); err != nil {
return api.Object{}, err
}
}

// fetch object metadata
metadata, err := s.objectMetadata(tx, bucket, path)
if err != nil {
return api.Object{}, err
}

// return object
return api.Object{
Metadata: metadata,
ObjectMetadata: newObjectMetadata(
obj[0].ObjectName,
obj[0].ObjectETag,
obj[0].ObjectMimeType,
obj[0].ObjectHealth,
obj[0].ObjectModTime,
obj[0].ObjectSize,
),
Object: &object.Object{
Key: key,
Slabs: slabs,
},
}, nil
}

// ObjectMetadata returns an object's metadata
func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (obj api.Object, err error) {
err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
Expand All @@ -1133,37 +983,6 @@ func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (obj
return
}

func (s *SQLStore) objectMetadata(tx *gorm.DB, bucket, path string) (api.ObjectUserMetadata, error) {
var rows []dbObjectUserMetadata
err := tx.
Model(&dbObjectUserMetadata{}).
Table("object_user_metadata oum").
Joins("INNER JOIN objects o ON oum.db_object_id = o.id").
Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id").
Where("o.object_id = ? AND b.name = ?", path, bucket).
Find(&rows).
Error
if err != nil {
return nil, err
}
metadata := make(api.ObjectUserMetadata)
for _, row := range rows {
metadata[row.Key] = row.Value
}
return metadata, nil
}

func newObjectMetadata(name, etag, mimeType string, health float64, modTime time.Time, size int64) api.ObjectMetadata {
return api.ObjectMetadata{
ETag: etag,
Health: health,
ModTime: api.TimeRFC3339(modTime.UTC()),
Name: name,
Size: size,
MimeType: mimeType,
}
}

func (s *SQLStore) objectRaw(txn *gorm.DB, bucket string, path string) (rows rawObject, err error) {
// NOTE: we LEFT JOIN here because empty objects are valid and need to be
// included in the result set, when we convert the rawObject before
Expand Down
14 changes: 5 additions & 9 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,13 @@ func TestObjectBasic(t *testing.T) {
t.Fatal(err)
}
if !reflect.DeepEqual(*got.Object, want) {
t.Fatal("object mismatch", cmp.Diff(got.Object, want))
t.Fatal("object mismatch", got.Object, want)
}

// delete a sector
var sectors []dbSector
if err := ss.gormDB.Find(&sectors).Error; err != nil {
t.Fatal(err)
} else if len(sectors) != 2 {
t.Fatal("unexpected number of sectors")
} else if tx := ss.gormDB.Delete(sectors[0]); tx.Error != nil || tx.RowsAffected != 1 {
t.Fatal("unexpected number of sectors deleted", tx.Error, tx.RowsAffected)
// update the sector to have a non-consecutive slab index
_, err = ss.DB().Exec(context.Background(), "UPDATE sectors SET slab_index = 100 WHERE slab_index = 1")
if err != nil {
t.Fatalf("failed to update sector: %v", err)
}

// fetch the object again and assert we receive an indication it was corrupted
Expand Down
49 changes: 0 additions & 49 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/stores/sql/mysql"
"go.sia.tech/renterd/stores/sql/sqlite"
Expand Down Expand Up @@ -245,49 +242,3 @@ func (s *SQLStore) Close() error {
s.mu.Unlock()
return nil
}

func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) error) error {
return retryTransaction(ctx, s.gormDB, s.logger, s.retryTransactionIntervals, fc, s.retryAbortFn)
}

func (s *SQLStore) retryAbortFn(err error) bool {
return err == nil ||
utils.IsErr(err, context.Canceled) ||
utils.IsErr(err, context.DeadlineExceeded) ||
utils.IsErr(err, gorm.ErrRecordNotFound) ||
utils.IsErr(err, api.ErrContractNotFound) ||
utils.IsErr(err, api.ErrObjectNotFound) ||
utils.IsErr(err, api.ErrObjectCorrupted) ||
utils.IsErr(err, api.ErrBucketExists) ||
utils.IsErr(err, api.ErrBucketNotFound) ||
utils.IsErr(err, api.ErrBucketNotEmpty) ||
utils.IsErr(err, api.ErrMultipartUploadNotFound) ||
utils.IsErr(err, api.ErrObjectExists) ||
utils.IsErr(err, api.ErrPartNotFound) ||
utils.IsErr(err, api.ErrSlabNotFound) ||
utils.IsErr(err, syncer.ErrPeerNotFound)
}

func retryTransaction(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger, intervals []time.Duration, fn func(tx *gorm.DB) error, abortFn func(error) bool) error {
var err error
attempts := len(intervals) + 1
for i := 0; i < attempts; i++ {
// execute the transaction
err = db.WithContext(ctx).Transaction(fn)
if abortFn(err) {
return err
}

// if this was the last attempt, return the error
if i == len(intervals) {
logger.Warn(fmt.Sprintf("transaction attempt %d/%d failed, err: %v", i+1, attempts, err))
return err
}

// log the failed attempt and sleep before retrying
interval := intervals[i]
logger.Warn(fmt.Sprintf("transaction attempt %d/%d failed, retry in %v, err: %v", i+1, attempts, interval, err))
time.Sleep(interval)
}
return fmt.Errorf("retryTransaction failed: %w", err)
}
3 changes: 3 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type (
// MultipartUploads returns a list of all multipart uploads.
MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, limit int) (api.MultipartListUploadsResponse, error)

// Object returns an object from the database.
Object(ctx context.Context, bucket, key string) (api.Object, error)

// ObjectEntries queries the database for objects in a given dir.
ObjectEntries(ctx context.Context, bucket, key, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error)

Expand Down
Loading

0 comments on commit 6847e21

Please sign in to comment.