Skip to content

Commit

Permalink
Merge branch 'dev' into chris/refresh-forgiveness
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Mar 26, 2024
2 parents b30999c + 447b3d1 commit 5bf554c
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 77 deletions.
56 changes: 45 additions & 11 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name: Publish

# Controls when the action will run.
# Controls when the action will run.
on:
# Triggers the workflow on new SemVer tags
push:
branches:
- master
- dev
tags:
tags:
- 'v[0-9]+.[0-9]+.[0-9]+'
- 'v[0-9]+.[0-9]+.[0-9]+-**'

Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
with:
name: renterd
path: release/
build-mac:
build-mac:
runs-on: macos-latest
strategy:
matrix:
Expand Down Expand Up @@ -212,7 +212,7 @@ jobs:
with:
name: renterd
path: release/
build-windows:
build-windows:
runs-on: windows-latest
strategy:
matrix:
Expand Down Expand Up @@ -253,28 +253,62 @@ jobs:
with:
name: renterd
path: release/
dispatch:

dispatch-homebrew: # only runs on full releases
if: startsWith(github.ref, 'refs/tags/v') && !contains(github.ref, '-')
needs: [docker, build-linux, build-mac, build-windows]
strategy:
matrix:
repo: ['siafoundation/homebrew-sia', 'siafoundation/linux']
runs-on: ubuntu-latest
steps:
- name: Extract Tag Name
id: get_tag
run: echo "tag_name=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV

- name: Repository Dispatch
- name: Dispatch
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.PAT_REPOSITORY_DISPATCH }}
repository: ${{ matrix.repo }}
repository: siafoundation/homebrew-sia
event-type: release-tagged
client-payload: >
{
"description": "Renterd: The Next-Gen Sia Renter",
"tag": "${{ env.tag_name }}",
"project": "renterd",
"workflow_id": "${{ github.run_id }}"
}
}
dispatch-linux: # run on full releases, release candidates, and master branch
if: startsWith(github.ref, 'refs/tags/v') || endsWith(github.ref, 'master')
needs: [docker, build-linux, build-mac, build-windows]
runs-on: ubuntu-latest
steps:
- name: Build Dispatch Payload
id: get_payload
uses: actions/github-script@v7
with:
script: |
const isRelease = context.ref.startsWith('refs/tags/v'),
isBeta = isRelease && context.ref.includes('-beta'),
tag = isRelease ? context.ref.replace('refs/tags/', '') : 'master';
let component = 'nightly';
if (isBeta) {
component = 'beta';
} else if (isRelease) {
component = 'main';
}
return {
description: "renterd: The Next-Gen Sia Renter",
tag: tag,
project: "renterd",
workflow_id: context.runId,
component: component
};
- name: Dispatch
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.PAT_REPOSITORY_DISPATCH }}
repository: siafoundation/linux
event-type: release-tagged
client-payload: ${{ steps.get_payload.outputs.result }}
4 changes: 2 additions & 2 deletions worker/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.sia.tech/renterd/object"
)

func (w *worker) migrate(ctx context.Context, s *object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) {
func (w *worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) {
// make a map of good hosts
goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool)
for _, c := range ulContracts {
Expand Down Expand Up @@ -86,7 +86,7 @@ SHARDS:
defer mem.Release()

// download the slab
shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, *s, dlContracts)
shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts)
if err != nil {
return 0, false, fmt.Errorf("failed to download slab for migration: %w", err)
}
Expand Down
44 changes: 41 additions & 3 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,21 @@ func (os *objectStoreMock) TrackUpload(ctx context.Context, uID api.UploadID) er
func (os *objectStoreMock) FinishUpload(ctx context.Context, uID api.UploadID) error { return nil }

func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
os.mu.Lock()
defer os.mu.Unlock()

for _, objects := range os.objects {
for _, object := range objects {
for _, slab := range object.Slabs {
for _, shard := range slab.Slab.Shards {
if shard.Root == root {
delete(shard.Contracts, hk)
}
}
}
}
}

return nil
}

Expand Down Expand Up @@ -508,10 +523,33 @@ func (os *objectStoreMock) UpdateSlab(ctx context.Context, s object.Slab, contra

os.forEachObject(func(bucket, path string, o object.Object) {
for i, slab := range o.Slabs {
if slab.Key.String() == s.Key.String() {
os.objects[bucket][path].Slabs[i].Slab = s
return
if slab.Key.String() != s.Key.String() {
continue
}
// update slab
shards := os.objects[bucket][path].Slabs[i].Slab.Shards
for sI := range shards {
// overwrite latest host
shards[sI].LatestHost = s.Shards[sI].LatestHost

// merge contracts for each shard
existingContracts := make(map[types.FileContractID]struct{})
for _, fcids := range shards[sI].Contracts {
for _, fcid := range fcids {
existingContracts[fcid] = struct{}{}
}
}
for hk, fcids := range s.Shards[sI].Contracts {
for _, fcid := range fcids {
if _, exists := existingContracts[fcid]; exists {
continue
}
shards[sI].Contracts[hk] = append(shards[sI].Contracts[hk], fcids...)
}
}
}
os.objects[bucket][path].Slabs[i].Slab.Shards = shards
return
}
})

Expand Down
21 changes: 4 additions & 17 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc
return nil
}

func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) {
func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -642,27 +642,14 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shar
// overwrite the shards with the newly uploaded ones
for i, si := range shardIndices {
s.Shards[si].LatestHost = uploaded[i].LatestHost

knownContracts := make(map[types.FileContractID]struct{})
for _, fcids := range s.Shards[si].Contracts {
for _, fcid := range fcids {
knownContracts[fcid] = struct{}{}
}
}
s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID)
for hk, fcids := range uploaded[i].Contracts {
for _, fcid := range fcids {
if _, exists := knownContracts[fcid]; !exists {
if s.Shards[si].Contracts == nil {
s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID)
}
s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcid)
}
}
s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcids...)
}
}

// update the slab
return mgr.os.UpdateSlab(ctx, *s, contractSet)
return mgr.os.UpdateSlab(ctx, s, contractSet)
}

func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) {
Expand Down
110 changes: 108 additions & 2 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,107 @@ func TestUploadPackedSlab(t *testing.T) {
}
}

func TestMigrateLostSector(t *testing.T) {
// create test worker
w := newTestWorker(t)

// add hosts to worker
w.AddHosts(testRedundancySettings.TotalShards * 2)

// convenience variables
os := w.os
mm := w.ulmm
dl := w.downloadManager
ul := w.uploadManager

// create test data
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())

// upload data
_, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload)
if err != nil {
t.Fatal(err)
}

// grab the slab
o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
} else if len(o.Object.Object.Slabs) != 1 {
t.Fatal("expected 1 slab")
}
slab := o.Object.Object.Slabs[0]

// build usedHosts hosts
usedHosts := make(map[types.PublicKey]struct{})
for _, shard := range slab.Shards {
usedHosts[shard.LatestHost] = struct{}{}
}

// assume the host of the first shard lost its sector
badHost := slab.Shards[0].LatestHost
badContract := slab.Shards[0].Contracts[badHost][0]
err = os.DeleteHostSector(context.Background(), badHost, slab.Shards[0].Root)
if err != nil {
t.Fatal(err)
}

// download the slab
shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts())
if err != nil {
t.Fatal(err)
}

// encrypt the shards
o.Object.Object.Slabs[0].Slab.Encrypt(shards)

// filter it down to the shards we need to migrate
shards = shards[:1]

// recreate upload contracts
contracts := make([]api.ContractMetadata, 0)
for _, c := range w.Contracts() {
_, used := usedHosts[c.HostKey]
if !used && c.HostKey != badHost {
contracts = append(contracts, c)
}
}

// migrate the shard away from the bad host
mem := mm.AcquireMemory(context.Background(), rhpv2.SectorSize)
err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, []int{0}, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem)
if err != nil {
t.Fatal(err)
}

// re-grab the slab
o, err = os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
} else if len(o.Object.Object.Slabs) != 1 {
t.Fatal("expected 1 slab")
}
slab = o.Object.Object.Slabs[0]

// assert the bad shard is on a good host now
shard := slab.Shards[0]
if shard.LatestHost == badHost {
t.Fatal("latest host is bad")
} else if len(shard.Contracts) != 1 {
t.Fatal("expected 1 contract")
}
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if fcid == badContract {
t.Fatal("contract belongs to bad host")
}
}
}
}

func TestUploadShards(t *testing.T) {
// create test worker
w := newTestWorker(t)
Expand Down Expand Up @@ -340,7 +441,7 @@ func TestUploadShards(t *testing.T) {

// migrate those shards away from bad hosts
mem := mm.AcquireMemory(context.Background(), uint64(len(badIndices))*rhpv2.SectorSize)
err = ul.UploadShards(context.Background(), &o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem)
err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem)
if err != nil {
t.Fatal(err)
}
Expand All @@ -355,7 +456,12 @@ func TestUploadShards(t *testing.T) {
slab = o.Object.Object.Slabs[0]

// assert none of the shards are on bad hosts
for _, shard := range slab.Shards {
for i, shard := range slab.Shards {
if i%2 == 0 && len(shard.Contracts) != 1 {
t.Fatalf("expected 1 contract, got %v", len(shard.Contracts))
} else if i%2 != 0 && len(shard.Contracts) != 2 {
t.Fatalf("expected 2 contracts, got %v", len(shard.Contracts))
}
if _, bad := badHosts[shard.LatestHost]; bad {
t.Fatal("shard is on bad host", shard.LatestHost)
}
Expand Down
Loading

0 comments on commit 5bf554c

Please sign in to comment.