Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
Browse files Browse the repository at this point in the history
…o pj/health-sort-2
  • Loading branch information
peterjan committed Nov 22, 2023
2 parents 04ee171 + ff89809 commit 7d5c69d
Show file tree
Hide file tree
Showing 31 changed files with 895 additions and 756 deletions.
16 changes: 9 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
args: --timeout=30m
- name: Jape Analyzer
uses: SiaFoundation/action-golang-analysis@HEAD
with:
analyzers: |
go.sia.tech/jape.Analyzer@master
directories: |
autopilot
bus bus/client
worker worker/client
- name: Test
uses: n8maninger/action-golang-test@v1
with:
Expand All @@ -53,12 +62,5 @@ jobs:
with:
package: "./internal/testing/..."
args: "-race;-tags='testing';-timeout=30m"
- name: Check Endpoints
uses: SiaFoundation/action-golang-analysis@HEAD
with:
analyzers: |
go.sia.tech/jape.Analyzer
directories: |
autopilot
- name: Build
run: go build -o bin/ ./cmd/renterd
18 changes: 8 additions & 10 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/object"
)

Expand Down Expand Up @@ -61,15 +60,14 @@ type (
}

MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
}

MultipartCompleteResponse struct {
Expand Down
12 changes: 5 additions & 7 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/object"
)

Expand Down Expand Up @@ -58,12 +57,11 @@ type (

// ObjectAddRequest is the request type for the /bus/object/*key endpoint.
ObjectAddRequest struct {
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
MimeType string `json:"mimeType"`
ETag string `json:"eTag"`
Bucket string `json:"bucket"`
ContractSet string `json:"contractSet"`
Object object.Object `json:"object"`
MimeType string `json:"mimeType"`
ETag string `json:"eTag"`
}

// ObjectsResponse is the response type for the /bus/objects endpoint.
Expand Down
12 changes: 7 additions & 5 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type (
}

PackedSlabsRequestPOST struct {
Slabs []UploadedPackedSlab `json:"slabs"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
Slabs []UploadedPackedSlab `json:"slabs"`
}

// UploadSectorRequest is the request type for the /upload/:id/sector endpoint.
Expand All @@ -70,8 +69,11 @@ type (

// UpdateSlabRequest is the request type for the /slab endpoint.
UpdateSlabRequest struct {
ContractSet string `json:"contractSet"`
Slab object.Slab `json:"slab"`
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
ContractSet string `json:"contractSet"`
Slab object.Slab `json:"slab"`
}
)

func (s UploadedPackedSlab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} {
return object.ContractsFromShards(s.Shards)
}
43 changes: 43 additions & 0 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ func (ap *Autopilot) Run() error {
return
}

// block until the autopilot is funded
if funded, interrupted := ap.blockUntilFunded(ap.ticker.C); !funded {
if interrupted {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before wallet got funded")
return
}

// Trace/Log worker id chosen for this maintenance iteration.
workerID, err := w.ID(ctx)
if err != nil {
Expand Down Expand Up @@ -392,6 +402,39 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure
}
}

func (ap *Autopilot) blockUntilFunded(interrupt <-chan time.Time) (funded, interrupted bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var once sync.Once

for {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
wallet, err := ap.bus.Wallet(ctx)
funded := !wallet.Confirmed.Add(wallet.Unconfirmed).IsZero()
cancel()

// if an error occurred, or if we're not funded, we continue
if err != nil {
ap.logger.Errorf("failed to get wallet info, err: %v", err)
} else if wallet.Confirmed.Add(wallet.Unconfirmed).IsZero() {
once.Do(func() { ap.logger.Info("autopilot is waiting for wallet to get funded...") })
}

if err != nil || !funded {
select {
case <-ap.stopChan:
return false, false
case <-interrupt:
return false, true
case <-ticker.C:
continue
}
}
return true, false
}
}

func (ap *Autopilot) blockUntilOnline() (online bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down
9 changes: 6 additions & 3 deletions autopilot/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo
s.mu.Unlock()

s.logger.Infof("%s started", scanType)
hostCfg := s.ap.State().cfg.Hosts
maxDowntime := time.Duration(hostCfg.MaxDowntimeHours) * time.Hour
minRecentScanFailures := hostCfg.MinRecentScanFailures

s.wg.Add(1)
go func(st string) {
Expand All @@ -210,6 +207,12 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo
}
}

// fetch the config right before removing offline hosts to get the most
// recent settings in case they were updated while scanning.
hostCfg := s.ap.State().cfg.Hosts
maxDowntime := time.Duration(hostCfg.MaxDowntimeHours) * time.Hour
minRecentScanFailures := hostCfg.MinRecentScanFailures

if !interrupted && maxDowntime > 0 {
s.logger.Debugf("removing hosts that have been offline for more than %v and have failed at least %d scans", maxDowntime, minRecentScanFailures)
removed, err := s.bus.RemoveOfflineHosts(ctx, minRecentScanFailures, maxDowntime)
Expand Down
57 changes: 33 additions & 24 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ type (
RenameObject(ctx context.Context, bucketName, from, to string) error
RenameObjects(ctx context.Context, bucketName, from, to string) error
SearchObjects(ctx context.Context, bucketName, substring string, offset, limit int) ([]api.ObjectMetadata, error)
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, usedContracts map[types.PublicKey]types.FileContractID) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucketName, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)

Expand All @@ -166,7 +166,7 @@ type (
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
RefreshHealth(ctx context.Context) error
UnhealthySlabs(ctx context.Context, healthCutoff float64, set string, limit int) ([]api.UnhealthySlab, error)
UpdateSlab(ctx context.Context, s object.Slab, contractSet string, usedContracts map[types.PublicKey]types.FileContractID) error
UpdateSlab(ctx context.Context, s object.Slab, contractSet string) error
}

// An AutopilotStore stores autopilots.
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func (b *bus) objectsHandlerPUT(jc jape.Context) {
} else if aor.Bucket == "" {
aor.Bucket = api.DefaultBucketName
}
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object, aor.UsedContracts))
jc.Check("couldn't store object", b.ms.UpdateObject(jc.Request.Context(), aor.Bucket, jc.PathParam("path"), aor.ContractSet, aor.ETag, aor.MimeType, aor.Object))
}

func (b *bus) objectsCopyHandlerPOST(jc jape.Context) {
Expand Down Expand Up @@ -1374,7 +1374,7 @@ func (b *bus) packedSlabsHandlerDonePOST(jc jape.Context) {
if jc.Decode(&psrp) != nil {
return
}
jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs, psrp.UsedContracts))
jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs))
}

func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) {
Expand Down Expand Up @@ -1426,7 +1426,7 @@ func (b *bus) slabHandlerGET(jc jape.Context) {
func (b *bus) slabHandlerPUT(jc jape.Context) {
var usr api.UpdateSlabRequest
if jc.Decode(&usr) == nil {
jc.Check("couldn't update slab", b.ms.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet, usr.UsedContracts))
jc.Check("couldn't update slab", b.ms.UpdateSlab(jc.Request.Context(), usr.Slab, usr.ContractSet))
}
}

Expand Down Expand Up @@ -2009,10 +2009,7 @@ func (b *bus) metricsHandlerPUT(jc jape.Context) {
}

func (b *bus) metricsHandlerGET(jc jape.Context) {
key := jc.PathParam("key")

// parse mandatory query parameters
var err error
var start time.Time
var n uint64
var interval time.Duration
Expand All @@ -2025,49 +2022,61 @@ func (b *bus) metricsHandlerGET(jc jape.Context) {
}

// parse optional query parameters
switch key {
switch key := jc.PathParam("key"); key {
case api.MetricContract:
var metrics []api.ContractMetric
var opts api.ContractMetricsQueryOpts
if jc.DecodeForm("fcid", &opts.ContractID) != nil {
return
} else if jc.DecodeForm("host", &opts.HostKey) != nil {
return
} else if metrics, err = b.mtrcs.ContractMetrics(jc.Request.Context(), start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil {
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
jc.Encode(metrics)
return
case api.MetricContractSet:
var metrics []api.ContractSetMetric
var opts api.ContractSetMetricsQueryOpts
if jc.DecodeForm("name", &opts.Name) != nil {
return
} else if metrics, err = b.mtrcs.ContractSetMetrics(jc.Request.Context(), start, n, interval, opts); jc.Check("failed to get contract set metrics", err) != nil {
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract set metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
jc.Encode(metrics)
return
case api.MetricContractSetChurn:
var metrics []api.ContractSetChurnMetric
var opts api.ContractSetChurnMetricsQueryOpts
if jc.DecodeForm("name", &opts.Name) != nil {
return
} else if jc.DecodeForm("direction", &opts.Direction) != nil {
return
} else if jc.DecodeForm("reason", &opts.Reason) != nil {
return
} else if metrics, err = b.mtrcs.ContractSetChurnMetrics(jc.Request.Context(), start, n, interval, opts); jc.Check("failed to get contract churn metrics", err) != nil {
} else if metrics, err := b.metrics(jc.Request.Context(), key, start, n, interval, opts); jc.Check("failed to get contract churn metrics", err) != nil {
return
} else {
jc.Encode(metrics)
return
}
jc.Encode(metrics)
return
default:
jc.Error(fmt.Errorf("unknown metric '%s'", key), http.StatusBadRequest)
return
}
}

func (b *bus) metrics(ctx context.Context, key string, start time.Time, n uint64, interval time.Duration, opts interface{}) (interface{}, error) {
switch key {
case api.MetricContract:
return b.mtrcs.ContractMetrics(ctx, start, n, interval, opts.(api.ContractMetricsQueryOpts))
case api.MetricContractSet:
return b.mtrcs.ContractSetMetrics(ctx, start, n, interval, opts.(api.ContractSetMetricsQueryOpts))
case api.MetricContractSetChurn:
return b.mtrcs.ContractSetChurnMetrics(ctx, start, n, interval, opts.(api.ContractSetChurnMetricsQueryOpts))
}
return nil, nil
}

func (b *bus) multipartHandlerCreatePOST(jc jape.Context) {
var req api.MultipartCreateRequest
if jc.Decode(&req) != nil {
Expand Down Expand Up @@ -2129,7 +2138,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest)
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs, req.UsedContracts)
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs)
if jc.Check("failed to upload part", err) != nil {
return
}
Expand Down
Loading

0 comments on commit 7d5c69d

Please sign in to comment.