Skip to content

Commit

Permalink
Merge pull request #740 from SiaFoundation/chris/contracts-in-slabs
Browse files Browse the repository at this point in the history
Add contract information to object.Slab
  • Loading branch information
ChrisSchinnerl authored Nov 17, 2023
2 parents c2c6634 + c93b86f commit 2444635
Show file tree
Hide file tree
Showing 20 changed files with 532 additions and 627 deletions.
18 changes: 8 additions & 10 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/object"
)

Expand Down Expand Up @@ -61,15 +60,14 @@ type (
}

MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
}

MultipartCompleteResponse struct {
Expand Down
12 changes: 5 additions & 7 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/object"
)

Expand Down Expand Up @@ -48,12 +47,11 @@ type (

// ObjectAddRequest is the request type for the /bus/object/*key endpoint.
ObjectAddRequest struct {
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
MimeType string `json:"mimeType"`
ETag string `json:"eTag"`
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
MimeType string `json:"mimeType"`
ETag string `json:"eTag"`
}

// ObjectsResponse is the response type for the /bus/objects endpoint.
Expand Down
12 changes: 7 additions & 5 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type (
}

PackedSlabsRequestPOST struct {
Slabs []UploadedPackedSlab `json:"slabs"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
Slabs []UploadedPackedSlab `json:"slabs"`
}

// UploadSectorRequest is the request type for the /upload/:id/sector endpoint.
Expand All @@ -70,8 +69,11 @@ type (

// UpdateSlabRequest is the request type for the /slab endpoint.
UpdateSlabRequest struct {
ContractSet string `json:"contractSet"`
Slab object.Slab `json:"slab"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
ContractSet string `json:"contractSet"`
Slab object.Slab `json:"slab"`
}
)

func (s UploadedPackedSlab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} {
return object.ContractsFromShards(s.Shards)
}
16 changes: 8 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ type (
RenameObject(ctx context.Context, bucketName, from, to string) error
RenameObjects(ctx context.Context, bucketName, from, to string) error
SearchObjects(ctx context.Context, bucketName, substring string, offset, limit int) ([]api.ObjectMetadata, error)
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, usedContracts map[types.PublicKey]types.FileContractID) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucketName, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)

Expand All @@ -166,7 +166,7 @@ type (
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
RefreshHealth(ctx context.Context) error
UnhealthySlabs(ctx context.Context, healthCutoff float64, set string, limit int) ([]api.UnhealthySlab, error)
UpdateSlab(ctx context.Context, s object.Slab, contractSet string, usedContracts map[types.PublicKey]types.FileContractID) error
UpdateSlab(ctx context.Context, s object.Slab, contractSet string) error
}

// An AutopilotStore stores autopilots.
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (b *bus) objectsHandlerPUT(jc jape.Context) {
} else if aor.Bucket == "" {
aor.Bucket = api.DefaultBucketName
}
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object, aor.UsedContracts))
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object))
}

func (b *bus) objectsCopyHandlerPOST(jc jape.Context) {
Expand Down Expand Up @@ -1363,7 +1363,7 @@ func (b *bus) packedSlabsHandlerDonePOST(jc jape.Context) {
if jc.Decode(&psrp) != nil {
return
}
jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs, psrp.UsedContracts))
jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs))
}

func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) {
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func (b *bus) slabHandlerGET(jc jape.Context) {
func (b *bus) slabHandlerPUT(jc jape.Context) {
var usr api.UpdateSlabRequest
if jc.Decode(&usr) == nil {
jc.Check("couldn't update slab", b.ms.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet, usr.UsedContracts))
jc.Check("couldn't update slab", b.ms.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet))
}
}

Expand Down Expand Up @@ -2118,7 +2118,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest)
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs, req.UsedContracts)
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs)
if jc.Check("failed to upload part", err) != nil {
return
}
Expand Down
20 changes: 9 additions & 11 deletions bus/client/multipart-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
)
Expand All @@ -20,17 +19,16 @@ func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string,
}

// AddMultipartPart adds a part to a multipart upload.
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, usedContracts map[types.PublicKey]types.FileContractID) (err error) {
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) {
err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
PartialSlabs: partialSlab,
UsedContracts: usedContracts,
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
PartialSlabs: partialSlab,
})
return
}
Expand Down
14 changes: 6 additions & 8 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,19 @@ import (
"fmt"
"net/url"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
)

// AddObject stores the provided object under the given path.
func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID, opts api.AddObjectOptions) (err error) {
func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) {
path = api.ObjectPathEscape(path)
err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.ObjectAddRequest{
Bucket: bucket,
ContractSet: contractSet,
Object: o,
UsedContracts: usedContracts,
MimeType: opts.MimeType,
ETag: opts.ETag,
Bucket: bucket,
ContractSet: contractSet,
Object: o,
MimeType: opts.MimeType,
ETag: opts.ETag,
})
return
}
Expand Down
13 changes: 5 additions & 8 deletions bus/client/slabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/url"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
)
Expand Down Expand Up @@ -83,10 +82,9 @@ func (c *Client) FetchPartialSlab(ctx context.Context, key object.EncryptionKey,
}

// MarkPackedSlabsUploaded marks the given slabs as uploaded.
func (c *Client) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) (err error) {
func (c *Client) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) (err error) {
err = c.c.WithContext(ctx).POST("/slabbuffer/done", api.PackedSlabsRequestPOST{
Slabs: slabs,
UsedContracts: usedContracts,
Slabs: slabs,
}, nil)
return
}
Expand Down Expand Up @@ -133,11 +131,10 @@ func (c *Client) SlabsForMigration(ctx context.Context, healthCutoff float64, se
}

// UpdateSlab updates the given slab in the database.
func (c *Client) UpdateSlab(ctx context.Context, slab object.Slab, contractSet string, usedContracts map[types.PublicKey]types.FileContractID) (err error) {
func (c *Client) UpdateSlab(ctx context.Context, slab object.Slab, contractSet string) (err error) {
err = c.c.WithContext(ctx).PUT("/slab", api.UpdateSlabRequest{
ContractSet: contractSet,
Slab: slab,
UsedContracts: usedContracts,
ContractSet: contractSet,
Slab: slab,
})
return
}
31 changes: 30 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewTestCluster(t *testing.T) {
Length: 0,
},
},
}, map[types.PublicKey]types.FileContractID{}, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag})
}, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag})
tt.OK(err)

// Try talking to the worker and request the object.
Expand Down Expand Up @@ -537,6 +537,35 @@ func TestUploadDownloadBasic(t *testing.T) {
path := fmt.Sprintf("data_%v", len(data))
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{}))

// fetch object and check its slabs
resp, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, path, api.GetObjectOptions{})
tt.OK(err)
for _, slab := range resp.Object.Slabs {
hosts := make(map[types.PublicKey]struct{})
roots := make(map[types.Hash256]struct{})
if len(slab.Shards) != testRedundancySettings.TotalShards {
t.Fatal("wrong amount of shards", len(slab.Shards), testRedundancySettings.TotalShards)
}
for _, shard := range slab.Shards {
if shard.LatestHost == (types.PublicKey{}) {
t.Fatal("latest host should be set")
} else if len(shard.Contracts) != 1 {
t.Fatal("each shard should have a host")
} else if _, found := roots[shard.Root]; found {
t.Fatal("each root should only exist once per slab")
}
for hpk, contracts := range shard.Contracts {
if len(contracts) != 1 {
t.Fatal("each host should have one contract")
} else if _, found := hosts[hpk]; found {
t.Fatal("each host should only be used once per slab")
}
hosts[hpk] = struct{}{}
}
roots[shard.Root] = struct{}{}
}
}

// download data
var buffer bytes.Buffer
tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}))
Expand Down
61 changes: 36 additions & 25 deletions internal/testing/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMigrations(t *testing.T) {
used := make(map[types.PublicKey]struct{})
for _, slab := range res.Object.Slabs {
for _, sector := range slab.Shards {
used[sector.Host] = struct{}{}
used[sector.LatestHost] = struct{}{}
}
}
return used
Expand All @@ -55,10 +55,11 @@ func TestMigrations(t *testing.T) {
// add an object
data := make([]byte, rhpv2.SectorSize)
frand.Read(data)
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{}))
path := "foo"
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{}))

// assert amount of hosts used
used := usedHosts("foo")
used := usedHosts(path)
if len(used) != testRedundancySettings.TotalShards {
t.Fatal("unexpected amount of hosts used", len(used), testRedundancySettings.TotalShards)
}
Expand All @@ -69,36 +70,46 @@ func TestMigrations(t *testing.T) {
if _, ok := used[h.PublicKey()]; ok {
cluster.RemoveHost(h)
removed = h.PublicKey()

// find the contract
contracts, err := cluster.Bus.Contracts(context.Background())
tt.OK(err)
var contract *api.ContractMetadata
for _, c := range contracts {
if c.HostKey == removed {
contract = &c
break
}
}
if contract == nil {
t.Fatal("contract not found")
}

// mine until we archive the contract
endHeight := contract.WindowEnd
cs, err := cluster.Bus.ConsensusState(context.Background())
tt.OK(err)
cluster.MineBlocks(int(endHeight - cs.BlockHeight + 1))
break
}
}

// assert we migrated away from the bad host
tt.Retry(300, 100*time.Millisecond, func() error {
if _, used := usedHosts("foo")[removed]; used {
cluster.MineBlocks(1)
if _, used := usedHosts(path)[removed]; used {
return errors.New("host is still used")
}
return nil
})

res, err := cluster.Bus.Object(context.Background(), api.DefaultBucketName, path, api.GetObjectOptions{})
tt.OK(err)

// check slabs
shardHosts := 0
for _, slab := range res.Object.Slabs {
hosts := make(map[types.PublicKey]struct{})
roots := make(map[types.Hash256]struct{})
for _, shard := range slab.Shards {
if shard.LatestHost == (types.PublicKey{}) {
t.Fatal("latest host should be set")
} else if len(shard.Contracts) == 0 {
t.Fatal("each shard should have > 0 hosts")
}
for hpk, contracts := range shard.Contracts {
if len(contracts) != 1 {
t.Fatal("each host should have one contract")
} else if _, found := hosts[hpk]; found {
t.Fatal("each host should only be used once per slab")
}
hosts[hpk] = struct{}{}
}
roots[shard.Root] = struct{}{}
shardHosts += len(shard.Contracts)
}
}
// all shards should have 1 host except for 1. So we end up with 4 in total.
if shardHosts != 4 {
t.Fatalf("expected 4 shard hosts, got %v", shardHosts)
}
}
Loading

0 comments on commit 2444635

Please sign in to comment.