Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
Browse files Browse the repository at this point in the history
…o pj/contract-pruning
  • Loading branch information
peterjan committed Dec 1, 2023
2 parents 6350382 + 37b3513 commit 5b6d2ac
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 21 deletions.
59 changes: 59 additions & 0 deletions worker/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package worker

import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"lukechampine.com/frand"
)

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}

func newDownloadFailedAlert(bucket, path, prefix, marker string, offset, length, contracts int64, err error) alerts.Alert {
return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityError,
Message: "Download failed",
Data: map[string]any{
"bucket": bucket,
"path": path,
"prefix": prefix,
"marker": marker,
"offset": offset,
"length": length,
"contracts": contracts,
"err": err,
},
Timestamp: time.Now(),
}
}

func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert {
data := map[string]any{
"bucket": bucket,
"path": path,
"contractSet": contractSet,
"minShards": minShards,
"totalShards": totalShards,
"packing": packing,
"contracts": contracts,
"err": err,
}
if mimeType != "" {
data["mimeType"] = mimeType
}
if multipart {
data["multipart"] = true
}

return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityError,
Message: "Upload failed",
Data: data,
Timestamp: time.Now(),
}
}
6 changes: 5 additions & 1 deletion worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
maxConcurrentSlabsPerDownload = 3
)

var (
errDownloadManagerStopped = errors.New("download manager stopped")
)

type (
// id is a unique identifier used for debugging
id [8]byte
Expand Down Expand Up @@ -323,7 +327,7 @@ outer:
for {
select {
case <-mgr.stopChan:
return errors.New("manager was stopped")
return errDownloadManagerStopped
case <-ctx.Done():
return errors.New("download timed out")
case resp := <-responseChan:
Expand Down
19 changes: 7 additions & 12 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
)

var (
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadManagerStopped = errors.New("upload manager stopped")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
)

type uploadParameters struct {
Expand Down Expand Up @@ -257,7 +258,7 @@ func (w *worker) initUploadManager(mm *memoryManager, maxOverdrive uint64, overd
w.uploadManager = newUploadManager(w.bus, w, w, mm, maxOverdrive, overdriveTimeout, logger)
}

func (w *worker) upload(ctx context.Context, r io.Reader, up uploadParameters, opts ...UploadOption) (_ string, err error) {
func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (_ string, err error) {
// apply the options
for _, opt := range opts {
opt(&up)
Expand All @@ -277,7 +278,7 @@ func (w *worker) upload(ctx context.Context, r io.Reader, up uploadParameters, o
}

// perform the upload
bufferSizeLimitReached, eTag, err := w.uploadManager.Upload(ctx, r, up, lockingPriorityUpload)
bufferSizeLimitReached, eTag, err := w.uploadManager.Upload(ctx, r, contracts, up, lockingPriorityUpload)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -494,7 +495,7 @@ func (mgr *uploadManager) Stop() {
}
}

func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadParameters, lockPriority int) (bufferSizeLimitReached bool, eTag string, err error) {
func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, lockPriority int) (bufferSizeLimitReached bool, eTag string, err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -518,12 +519,6 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara
return false, "", err
}

// fetch contracts
contracts, err := mgr.b.ContractSetContracts(ctx, up.contractSet)
if err != nil {
return false, "", fmt.Errorf("couldn't fetch contracts from bus: %w", err)
}

// create the upload
u, finishFn, err := mgr.newUpload(ctx, up.rs.TotalShards, contracts, up.bh, lockPriority)
if err != nil {
Expand Down Expand Up @@ -604,7 +599,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara
for len(responses) < numSlabs {
select {
case <-mgr.stopChan:
return false, "", errors.New("manager was stopped")
return false, "", errUploadManagerStopped
case numSlabs = <-numSlabsChan:
case res := <-respChan:
if res.err != nil {
Expand Down
52 changes: 44 additions & 8 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,14 @@ func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, c
})
}

func (w *worker) registerAlert(a alerts.Alert) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := w.alerts.RegisterAlert(ctx, a); err != nil {
w.logger.Error("failed to register alert", err)
}
cancel()
}

func (w *worker) rhpScanHandler(jc jape.Context) {
var rsr api.RHPScanRequest
if jc.Decode(&rsr) != nil {
Expand Down Expand Up @@ -1044,9 +1052,13 @@ func (w *worker) objectsHandlerGET(jc jape.Context) {
}

// create a download function
downloadFn := func(wr io.Writer, offset, length int64) error {
downloadFn := func(wr io.Writer, offset, length int64) (err error) {
ctx = WithGougingChecker(ctx, w.bus, gp)
return w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts)
err = w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts)
if err != nil && !errors.Is(err, errDownloadManagerStopped) {
w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err))
}
return
}

// serve the content
Expand All @@ -1064,6 +1076,9 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
jc.Custom((*[]byte)(nil), nil)
ctx := jc.Request.Context()

// grab the path
path := jc.PathParam("path")

// fetch the upload parameters
up, err := w.bus.UploadParams(ctx)
if jc.Check("couldn't fetch upload parameters from bus", err) != nil {
Expand Down Expand Up @@ -1134,10 +1149,19 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
// attach gouging checker to the context
ctx = WithGougingChecker(ctx, w.bus, up.GougingParams)

// fetch contracts
contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet)
if jc.Check("couldn't fetch contracts from bus", err) != nil {
return
}

// upload the object
params := defaultParameters(bucket, jc.PathParam("path"))
eTag, err := w.upload(ctx, jc.Request.Body, params, opts...)
if jc.Check("couldn't upload object", err) != nil {
params := defaultParameters(bucket, path)
eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...)
if err := jc.Check("couldn't upload object", err); err != nil {
if !errors.Is(err, errUploadManagerStopped) {
w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err))
}
return
}

Expand All @@ -1149,6 +1173,9 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
jc.Custom((*[]byte)(nil), nil)
ctx := jc.Request.Context()

// grab the path
path := jc.PathParam("path")

// fetch the upload parameters
up, err := w.bus.UploadParams(ctx)
if jc.Check("couldn't fetch upload parameters from bus", err) != nil {
Expand Down Expand Up @@ -1255,10 +1282,19 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
// attach gouging checker to the context
ctx = WithGougingChecker(ctx, w.bus, up.GougingParams)

// fetch contracts
contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet)
if jc.Check("couldn't fetch contracts from bus", err) != nil {
return
}

// upload the multipart
params := multipartParameters(bucket, jc.PathParam("path"), uploadID, partNumber)
eTag, err := w.upload(ctx, jc.Request.Body, params, opts...)
if jc.Check("couldn't upload multipart", err) != nil {
params := multipartParameters(bucket, path, uploadID, partNumber)
eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...)
if jc.Check("couldn't upload object", err) != nil {
if !errors.Is(err, errUploadManagerStopped) {
w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err))
}
return
}

Expand Down

0 comments on commit 5b6d2ac

Please sign in to comment.