Skip to content

Commit

Permalink
Merge branch 'chris/fix-s3-bucket-rewrite' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jun 26, 2024
2 parents c03d012 + 1f5859b commit 5c645c1
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 82 deletions.
95 changes: 20 additions & 75 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,6 @@ func (dbSlab) TableName() string { return "slabs" }
// TableName implements the gorm.Tabler interface.
func (dbSlice) TableName() string { return "slices" }

// convert converts a dbContract to an ArchivedContract.
func (c dbArchivedContract) convert() api.ArchivedContract {
var revisionNumber uint64
_, _ = fmt.Sscan(c.RevisionNumber, &revisionNumber)
return api.ArchivedContract{
ID: types.FileContractID(c.FCID),
HostKey: types.PublicKey(c.Host),
RenewedTo: types.FileContractID(c.RenewedTo),

ProofHeight: c.ProofHeight,
RevisionHeight: c.RevisionHeight,
RevisionNumber: revisionNumber,
Size: c.Size,
StartHeight: c.StartHeight,
State: c.State.String(),
WindowStart: c.WindowStart,
WindowEnd: c.WindowEnd,

Spending: api.ContractSpending{
Uploads: types.Currency(c.UploadSpending),
Downloads: types.Currency(c.DownloadSpending),
FundAccount: types.Currency(c.FundAccountSpending),
Deletions: types.Currency(c.DeleteSpending),
SectorRoots: types.Currency(c.ListSpending),
},
}
}

// convert converts a dbContract to a ContractMetadata.
func (c dbContract) convert() api.ContractMetadata {
var revisionNumber uint64
Expand Down Expand Up @@ -628,19 +600,12 @@ func (s *SQLStore) AddRenewedContract(ctx context.Context, c rhpv2.ContractRevis
return renewed.convert(), nil
}

func (s *SQLStore) AncestorContracts(ctx context.Context, id types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error) {
var ancestors []dbArchivedContract
err := s.db.WithContext(ctx).Raw("WITH RECURSIVE ancestors AS (SELECT * FROM archived_contracts WHERE renewed_to = ? UNION ALL SELECT archived_contracts.* FROM ancestors, archived_contracts WHERE archived_contracts.renewed_to = ancestors.fcid) SELECT * FROM ancestors WHERE start_height >= ?", fileContractID(id), startHeight).
Scan(&ancestors).
Error
if err != nil {
return nil, err
}
contracts := make([]api.ArchivedContract, len(ancestors))
for i, ancestor := range ancestors {
contracts[i] = ancestor.convert()
}
return contracts, nil
func (s *SQLStore) AncestorContracts(ctx context.Context, id types.FileContractID, startHeight uint64) (ancestors []api.ArchivedContract, err error) {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
ancestors, err = tx.AncestorContracts(ctx, id, startHeight)
return err
})
return
}

func (s *SQLStore) ArchiveContract(ctx context.Context, id types.FileContractID, reason string) error {
Expand Down Expand Up @@ -675,22 +640,14 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil
}

func (s *SQLStore) ArchiveAllContracts(ctx context.Context, reason string) error {
// fetch contract ids
var fcids []fileContractID
if err := s.db.
WithContext(ctx).
Model(&dbContract{}).
Pluck("fcid", &fcids).
Error; err != nil {
return err
contracts, err := s.Contracts(ctx, api.ContractsOpts{})
if err != nil {
return fmt.Errorf("failed to fetch contracts: %w", err)
}

// create map
toArchive := make(map[types.FileContractID]string)
for _, fcid := range fcids {
toArchive[types.FileContractID(fcid)] = reason
for _, c := range contracts {
toArchive[c.ID] = reason
}

return s.ArchiveContracts(ctx, toArchive)
}

Expand All @@ -707,30 +664,18 @@ func (s *SQLStore) ContractRoots(ctx context.Context, id types.FileContractID) (
return nil, api.ErrContractNotFound
}

var dbRoots []hash256
if err = s.db.
WithContext(ctx).
Raw(`
SELECT sec.root
FROM contracts c
INNER JOIN contract_sectors cs ON cs.db_contract_id = c.id
INNER JOIN sectors sec ON cs.db_sector_id = sec.id
WHERE c.fcid = ?
`, fileContractID(id)).
Scan(&dbRoots).
Error; err == nil {
for _, r := range dbRoots {
roots = append(roots, *(*types.Hash256)(&r))
}
}
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
roots, err = tx.ContractRoots(ctx, id)
return err
})
return
}

func (s *SQLStore) ContractSets(ctx context.Context) ([]string, error) {
var sets []string
err := s.db.WithContext(ctx).Raw("SELECT name FROM contract_sets").
Scan(&sets).
Error
func (s *SQLStore) ContractSets(ctx context.Context) (sets []string, err error) {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
sets, err = tx.ContractSets(ctx)
return err
})
return sets, err
}

Expand Down
15 changes: 8 additions & 7 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func TestAncestorsContracts(t *testing.T) {
t.Fatal("wrong number of contracts returned", len(contracts))
}
for i := 0; i < len(contracts)-1; i++ {
if !reflect.DeepEqual(contracts[i], api.ArchivedContract{
expected := api.ArchivedContract{
ID: fcids[len(fcids)-2-i],
HostKey: hk,
RenewedTo: fcids[len(fcids)-1-i],
Expand All @@ -872,7 +872,9 @@ func TestAncestorsContracts(t *testing.T) {
State: api.ContractStatePending,
WindowStart: 400,
WindowEnd: 500,
}) {
}
if !reflect.DeepEqual(contracts[i], expected) {
t.Log(cmp.Diff(contracts[i], expected))
t.Fatal("wrong contract", i, contracts[i])
}
}
Expand Down Expand Up @@ -4913,11 +4915,10 @@ func TestDirectories(t *testing.T) {
}

now := time.Now()
time.Sleep(time.Millisecond) // force a different time
ss.triggerSlabPruning()
if err := ss.waitForPruneLoop(now); err != nil {
t.Fatal(err)
}
ss.Retry(100, 100*time.Millisecond, func() error {
ss.triggerSlabPruning()
return ss.waitForPruneLoop(now)
})

var n int64
if err := ss.db.Model(&dbDirectory{}).Count(&n).Error; err != nil {
Expand Down
10 changes: 10 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type (
// exists, it is updated.
AddWebhook(ctx context.Context, wh webhooks.Webhook) error

// AncestorContracts returns all ancestor contracts of the contract up
// until the given start height.
AncestorContracts(ctx context.Context, id types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error)

// ArchiveContract moves a contract from the regular contracts to the
// archived ones.
ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error
Expand All @@ -66,10 +70,16 @@ type (
// duplicates but can contain gaps.
CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (string, error)

// ContractRoots returns the roots of the contract with the given ID.
ContractRoots(ctx context.Context, fcid types.FileContractID) ([]types.Hash256, error)

// Contracts returns contract metadata for all active contracts. The
// opts argument can be used to filter the result.
Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error)

// ContractSets returns the names of all contract sets.
ContractSets(ctx context.Context) ([]string, error)

// ContractSize returns the size of the contract with the given ID as
// well as the estimated number of bytes that can be pruned from it.
ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error)
Expand Down
82 changes: 82 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,46 @@ func Accounts(ctx context.Context, tx sql.Tx) ([]api.Account, error) {
return accounts, nil
}

func AncestorContracts(ctx context.Context, tx sql.Tx, fcid types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error) {
rows, err := tx.Query(ctx, `
WITH RECURSIVE ancestors AS
(
SELECT *
FROM archived_contracts
WHERE renewed_to = ?
UNION ALL
SELECT archived_contracts.*
FROM ancestors, archived_contracts
WHERE archived_contracts.renewed_to = ancestors.fcid
)
SELECT fcid, host, renewed_to, upload_spending, download_spending, fund_account_spending, delete_spending,
proof_height, revision_height, revision_number, size, start_height, state, window_start, window_end
FROM ancestors
WHERE start_height >= ?
`, FileContractID(fcid), startHeight)
if err != nil {
return nil, fmt.Errorf("failed to fetch ancestor contracts: %w", err)
}
defer rows.Close()

var contracts []api.ArchivedContract
for rows.Next() {
var c api.ArchivedContract
var state ContractState
err := rows.Scan((*FileContractID)(&c.ID), (*PublicKey)(&c.HostKey), (*FileContractID)(&c.RenewedTo),
(*Currency)(&c.Spending.Uploads), (*Currency)(&c.Spending.Downloads), (*Currency)(&c.Spending.FundAccount),
(*Currency)(&c.Spending.Deletions), &c.ProofHeight,
&c.RevisionHeight, &c.RevisionNumber, &c.Size, &c.StartHeight, &state, &c.WindowStart,
&c.WindowEnd)
if err != nil {
return nil, fmt.Errorf("failed to scan contract: %w", err)
}
c.State = state.String()
contracts = append(contracts, c)
}
return contracts, nil
}

func ArchiveContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID, reason string) error {
_, err := tx.Exec(ctx, `
INSERT INTO archived_contracts (created_at, fcid, renewed_from, contract_price, state, total_cost,
Expand Down Expand Up @@ -140,6 +180,30 @@ func Bucket(ctx context.Context, tx sql.Tx, bucket string) (api.Bucket, error) {
return b, nil
}

func ContractRoots(ctx context.Context, tx sql.Tx, fcid types.FileContractID) ([]types.Hash256, error) {
rows, err := tx.Query(ctx, `
SELECT s.root
FROM contract_sectors cs
INNER JOIN sectors s ON s.id = cs.db_sector_id
INNER JOIN contracts c ON c.id = cs.db_contract_id
WHERE c.fcid = ?
`, FileContractID(fcid))
if err != nil {
return nil, fmt.Errorf("failed to fetch contract roots: %w", err)
}
defer rows.Close()

var roots []types.Hash256
for rows.Next() {
var root types.Hash256
if err := rows.Scan((*Hash256)(&root)); err != nil {
return nil, fmt.Errorf("failed to scan root: %w", err)
}
roots = append(roots, root)
}
return roots, nil
}

func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.ContractMetadata, error) {
var rows *sql.LoggedRows
var err error
Expand Down Expand Up @@ -218,6 +282,24 @@ func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.Co
return contracts, nil
}

func ContractSets(ctx context.Context, tx sql.Tx) ([]string, error) {
rows, err := tx.Query(ctx, "SELECT name FROM contract_sets")
if err != nil {
return nil, fmt.Errorf("failed to fetch contract sets: %w", err)
}
defer rows.Close()

var sets []string
for rows.Next() {
var cs string
if err := rows.Scan(&cs); err != nil {
return nil, fmt.Errorf("failed to scan contract set: %w", err)
}
sets = append(sets, cs)
}
return sets, nil
}

func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
// stmt to fetch bucket id
bucketIDStmt, err := tx.Prepare(ctx, "SELECT id FROM buckets WHERE name = ?")
Expand Down
12 changes: 12 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (tx *MainDatabaseTx) AddWebhook(ctx context.Context, wh webhooks.Webhook) e
return nil
}

func (tx *MainDatabaseTx) AncestorContracts(ctx context.Context, fcid types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error) {
return ssql.AncestorContracts(ctx, tx, fcid, startHeight)
}

func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error {
return ssql.ArchiveContract(ctx, tx, fcid, reason)
}
Expand Down Expand Up @@ -235,10 +239,18 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k
return eTag, nil
}

func (tx *MainDatabaseTx) ContractRoots(ctx context.Context, fcid types.FileContractID) ([]types.Hash256, error) {
return ssql.ContractRoots(ctx, tx, fcid)
}

func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) {
return ssql.Contracts(ctx, tx, opts)
}

func (tx *MainDatabaseTx) ContractSets(ctx context.Context) ([]string, error) {
return ssql.ContractSets(ctx, tx)
}

func (tx *MainDatabaseTx) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) {
return ssql.ContractSize(ctx, tx, id)
}
Expand Down
12 changes: 12 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (tx *MainDatabaseTx) AddWebhook(ctx context.Context, wh webhooks.Webhook) e
return nil
}

func (tx *MainDatabaseTx) AncestorContracts(ctx context.Context, fcid types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error) {
return ssql.AncestorContracts(ctx, tx, fcid, startHeight)
}

func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error {
return ssql.ArchiveContract(ctx, tx, fcid, reason)
}
Expand Down Expand Up @@ -239,10 +243,18 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k
return eTag, nil
}

func (tx *MainDatabaseTx) ContractRoots(ctx context.Context, fcid types.FileContractID) ([]types.Hash256, error) {
return ssql.ContractRoots(ctx, tx, fcid)
}

func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) {
return ssql.Contracts(ctx, tx, opts)
}

func (tx *MainDatabaseTx) ContractSets(ctx context.Context) ([]string, error) {
return ssql.ContractSets(ctx, tx)
}

func (tx *MainDatabaseTx) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) {
return ssql.ContractSize(ctx, tx, id)
}
Expand Down

0 comments on commit 5c645c1

Please sign in to comment.