diff --git a/api/host.go b/api/host.go index e4d472495..4ad1f87a1 100644 --- a/api/host.go +++ b/api/host.go @@ -158,6 +158,7 @@ type ( Scanned bool `json:"scanned"` Blocked bool `json:"blocked"` Checks map[string]HostCheck `json:"checks"` + StoredData uint64 `json:"storedData"` } HostAddress struct { diff --git a/api/multipart.go b/api/multipart.go index ee26567b1..ecd19789f 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -7,6 +7,11 @@ import ( ) var ( + // ErrInvalidMultipartEncryptionSettings is returned if the multipart upload + // has an invalid combination of encryption params. e.g. when encryption is + // enabled but not offset is set. + ErrInvalidMultipartEncryptionSettings = errors.New("invalid multipart encryption settings") + // ErrMultipartUploadNotFound is returned if the specified multipart upload // wasn't found. ErrMultipartUploadNotFound = errors.New("multipart upload not found") diff --git a/api/object.go b/api/object.go index 0382f69a7..91332eec7 100644 --- a/api/object.go +++ b/api/object.go @@ -91,12 +91,12 @@ type ( // HeadObjectResponse is the response type for the HEAD /worker/object endpoint. HeadObjectResponse struct { - ContentType string `json:"contentType"` - Etag string `json:"eTag"` - LastModified string `json:"lastModified"` - Range *DownloadRange `json:"range,omitempty"` - Size int64 `json:"size"` - Metadata ObjectUserMetadata `json:"metadata"` + ContentType string + Etag string + LastModified TimeRFC3339 + Range *ContentRange + Size int64 + Metadata ObjectUserMetadata } // ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint. @@ -151,12 +151,6 @@ func ExtractObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadat return oum } -// LastModified returns the object's ModTime formatted for use in the -// 'Last-Modified' header -func (o ObjectMetadata) LastModified() string { - return o.ModTime.Std().Format(http.TimeFormat) -} - // ContentType returns the object's MimeType for use in the 'Content-Type' // header, if the object's mime type is empty we try and deduce it from the // extension in the object's name. @@ -214,12 +208,12 @@ type ( HeadObjectOptions struct { IgnoreDelim bool - Range DownloadRange + Range *DownloadRange } DownloadObjectOptions struct { GetObjectOptions - Range DownloadRange + Range *DownloadRange } GetObjectOptions struct { @@ -247,7 +241,6 @@ type ( // UploadObjectOptions is the options type for the worker client. UploadObjectOptions struct { - Offset int MinShards int TotalShards int ContractSet string @@ -257,15 +250,15 @@ type ( } UploadMultipartUploadPartOptions struct { + ContractSet string + MinShards int + TotalShards int EncryptionOffset *int ContentLength int64 } ) func (opts UploadObjectOptions) ApplyValues(values url.Values) { - if opts.Offset != 0 { - values.Set("offset", fmt.Sprint(opts.Offset)) - } if opts.MinShards != 0 { values.Set("minshards", fmt.Sprint(opts.MinShards)) } @@ -290,6 +283,15 @@ func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) { if opts.EncryptionOffset != nil { values.Set("offset", fmt.Sprint(*opts.EncryptionOffset)) } + if opts.MinShards != 0 { + values.Set("minshards", fmt.Sprint(opts.MinShards)) + } + if opts.TotalShards != 0 { + values.Set("totalshards", fmt.Sprint(opts.TotalShards)) + } + if opts.ContractSet != "" { + values.Set("contractset", opts.ContractSet) + } } func (opts DownloadObjectOptions) ApplyValues(values url.Values) { @@ -297,7 +299,7 @@ func (opts DownloadObjectOptions) ApplyValues(values url.Values) { } func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { @@ -319,7 +321,7 @@ func (opts HeadObjectOptions) Apply(values url.Values) { } func (opts HeadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { diff --git a/api/setting.go b/api/setting.go index 0c0057410..923863e58 100644 --- a/api/setting.go +++ b/api/setting.go @@ -24,6 +24,10 @@ const ( ) var ( + // ErrInvalidRedundancySettings is returned if the redundancy settings are + // not valid + ErrInvalidRedundancySettings = errors.New("invalid redundancy settings") + // ErrSettingNotFound is returned if a requested setting is not present in the // database. ErrSettingNotFound = errors.New("setting not found") @@ -136,13 +140,13 @@ func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { // valid. func (rs RedundancySettings) Validate() error { if rs.MinShards < 1 { - return errors.New("MinShards must be greater than 0") + return fmt.Errorf("%w: MinShards must be greater than 0", ErrInvalidRedundancySettings) } if rs.TotalShards < rs.MinShards { - return errors.New("TotalShards must be at least MinShards") + return fmt.Errorf("%w: TotalShards must be at least MinShards", ErrInvalidRedundancySettings) } if rs.TotalShards > 255 { - return errors.New("TotalShards must be less than 256") + return fmt.Errorf("%w: TotalShards must be less than 256", ErrInvalidRedundancySettings) } return nil } diff --git a/api/worker.go b/api/worker.go index 6d0c0e9d2..2908802f7 100644 --- a/api/worker.go +++ b/api/worker.go @@ -3,9 +3,12 @@ package api import ( "errors" "fmt" + "math" + "net/http" "strconv" "strings" + "github.com/gotd/contrib/http_range" rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" @@ -23,6 +26,10 @@ var ( // ErrHostOnPrivateNetwork is returned by the worker API when a host can't // be scanned since it is on a private network. ErrHostOnPrivateNetwork = errors.New("host is on a private network") + + // ErrMultiRangeNotSupported is returned by the worker API when a request + // tries to download multiple ranges at once. + ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported") ) type ( @@ -216,41 +223,76 @@ type ( } ) -type DownloadRange struct { +// ContentRange represents a content range returned via the "Content-Range" +// header. +type ContentRange struct { Offset int64 Length int64 Size int64 } -func ParseDownloadRange(contentRange string) (DownloadRange, error) { +// DownloadRange represents a requested range for a download via the "Range" +// header. +type DownloadRange struct { + Offset int64 + Length int64 +} + +func (r *DownloadRange) ContentRange(size int64) *ContentRange { + return &ContentRange{ + Offset: r.Offset, + Length: r.Length, + Size: size, + } +} + +func ParseContentRange(contentRange string) (ContentRange, error) { parts := strings.Split(contentRange, " ") if len(parts) != 2 || parts[0] != "bytes" { - return DownloadRange{}, errors.New("missing 'bytes' prefix in range header") + return ContentRange{}, errors.New("missing 'bytes' prefix in range header") } parts = strings.Split(parts[1], "/") if len(parts) != 2 { - return DownloadRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) + return ContentRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) } rangeStr := parts[0] rangeParts := strings.Split(rangeStr, "-") if len(rangeParts) != 2 { - return DownloadRange{}, errors.New("invalid Content-Range header") + return ContentRange{}, errors.New("invalid Content-Range header") } start, err := strconv.ParseInt(rangeParts[0], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } end, err := strconv.ParseInt(rangeParts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } size, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } - return DownloadRange{ + return ContentRange{ Offset: start, Length: end - start + 1, Size: size, }, nil } + +func ParseDownloadRange(req *http.Request) (DownloadRange, error) { + // parse the request range we pass math.MaxInt64 since a range header in a + // request doesn't have a size + ranges, err := http_range.ParseRange(req.Header.Get("Range"), math.MaxInt64) + if err != nil { + return DownloadRange{}, err + } + + // extract requested offset and length + dr := DownloadRange{Offset: 0, Length: -1} + if len(ranges) == 1 { + dr.Offset, dr.Length = ranges[0].Start, ranges[0].Length + } else if len(ranges) > 1 { + return DownloadRange{}, ErrMultiRangeNotSupported + } + return dr, nil +} diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index 98105afd1..74637e0e8 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -282,12 +282,10 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro usedHosts[contract.HostKey] = struct{}{} } - // compile map of stored data per host + // compile map of stored data per contract contractData := make(map[types.FileContractID]uint64) - hostData := make(map[types.PublicKey]uint64) for _, c := range contracts { contractData[c.ID] = c.FileSize() - hostData[c.HostKey] += c.FileSize() } // fetch all hosts @@ -310,7 +308,7 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro } // fetch candidate hosts - candidates, unusableHosts, err := c.candidateHosts(mCtx, hosts, usedHosts, hostData, minValidScore) // avoid 0 score hosts + candidates, unusableHosts, err := c.candidateHosts(mCtx, hosts, usedHosts, minValidScore) // avoid 0 score hosts if err != nil { return false, err } @@ -324,7 +322,7 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro } // run host checks - checks, err := c.runHostChecks(mCtx, hosts, hostData, minScore) + checks, err := c.runHostChecks(mCtx, hosts, minScore) if err != nil { return false, fmt.Errorf("failed to run host checks, err: %v", err) } @@ -742,7 +740,7 @@ LOOP: return toKeep, toArchive, toStopUsing, toRefresh, toRenew } -func (c *Contractor) runHostChecks(ctx *mCtx, hosts []api.Host, hostData map[types.PublicKey]uint64, minScore float64) (map[types.PublicKey]*api.HostCheck, error) { +func (c *Contractor) runHostChecks(ctx *mCtx, hosts []api.Host, minScore float64) (map[types.PublicKey]*api.HostCheck, error) { // fetch consensus state cs, err := c.bus.ConsensusState(ctx) if err != nil { @@ -756,7 +754,7 @@ func (c *Contractor) runHostChecks(ctx *mCtx, hosts []api.Host, hostData map[typ checks := make(map[types.PublicKey]*api.HostCheck) for _, h := range hosts { h.PriceTable.HostBlockHeight = cs.BlockHeight // ignore HostBlockHeight - checks[h.PublicKey] = checkHost(ctx.AutopilotConfig(), ctx.state.RS, gc, h, minScore, hostData[h.PublicKey]) + checks[h.PublicKey] = checkHost(ctx.AutopilotConfig(), ctx.state.RS, gc, h, minScore) } return checks, nil } @@ -984,7 +982,16 @@ func (c *Contractor) runContractRenewals(ctx *mCtx, w Worker, toRenew []contract contract := toRenew[i].contract.ContractMetadata renewed, proceed, err := c.renewContract(ctx, w, toRenew[i], budget) if err != nil { - c.alerter.RegisterAlert(ctx, newContractRenewalFailedAlert(contract, !proceed, err)) + // don't register an alert for hosts that are out of funds since the + // user can't do anything about it + if !(worker.IsErrHost(err) && utils.IsErr(err, wallet.ErrNotEnoughFunds)) { + c.alerter.RegisterAlert(ctx, newContractRenewalFailedAlert(contract, !proceed, err)) + } + c.logger.With(zap.Error(err)). + With("fcid", toRenew[i].contract.ID). + With("hostKey", toRenew[i].contract.HostKey). + With("proceed", proceed). + Errorw("failed to renew contract") if toRenew[i].usable { toKeep = append(toKeep, toRenew[i].contract.ContractMetadata) } @@ -1220,7 +1227,7 @@ func (c *Contractor) calculateMinScore(candidates []scoredHost, numContracts uin return minScore } -func (c *Contractor) candidateHosts(ctx *mCtx, hosts []api.Host, usedHosts map[types.PublicKey]struct{}, storedData map[types.PublicKey]uint64, minScore float64) ([]scoredHost, unusableHostsBreakdown, error) { +func (c *Contractor) candidateHosts(ctx *mCtx, hosts []api.Host, usedHosts map[types.PublicKey]struct{}, minScore float64) ([]scoredHost, unusableHostsBreakdown, error) { start := time.Now() // fetch consensus state @@ -1273,7 +1280,7 @@ func (c *Contractor) candidateHosts(ctx *mCtx, hosts []api.Host, usedHosts map[t // NOTE: ignore the pricetable's HostBlockHeight by setting it to our // own blockheight h.PriceTable.HostBlockHeight = cs.BlockHeight - hc := checkHost(ctx.AutopilotConfig(), ctx.state.RS, gc, h, minScore, storedData[h.PublicKey]) + hc := checkHost(ctx.AutopilotConfig(), ctx.state.RS, gc, h, minScore) if hc.Usability.IsUsable() { candidates = append(candidates, scoredHost{h, hc.Score.Score()}) continue diff --git a/autopilot/contractor/evaluate.go b/autopilot/contractor/evaluate.go index cc964b3d4..685cb4b70 100644 --- a/autopilot/contractor/evaluate.go +++ b/autopilot/contractor/evaluate.go @@ -9,7 +9,7 @@ import ( func countUsableHosts(cfg api.AutopilotConfig, cs api.ConsensusState, fee types.Currency, currentPeriod uint64, rs api.RedundancySettings, gs api.GougingSettings, hosts []api.Host) (usables uint64) { gc := worker.NewGougingChecker(gs, cs, fee, currentPeriod, cfg.Contracts.RenewWindow) for _, host := range hosts { - hc := checkHost(cfg, rs, gc, host, minValidScore, 0) + hc := checkHost(cfg, rs, gc, host, minValidScore) if hc.Usability.IsUsable() { usables++ } @@ -25,7 +25,7 @@ func EvaluateConfig(cfg api.AutopilotConfig, cs api.ConsensusState, fee types.Cu resp.Hosts = uint64(len(hosts)) for _, host := range hosts { - hc := checkHost(cfg, rs, gc, host, 0, 0) + hc := checkHost(cfg, rs, gc, host, 0) if hc.Usability.IsUsable() { resp.Usable++ continue diff --git a/autopilot/contractor/hostfilter.go b/autopilot/contractor/hostfilter.go index bfc11b903..dc95b1386 100644 --- a/autopilot/contractor/hostfilter.go +++ b/autopilot/contractor/hostfilter.go @@ -236,7 +236,7 @@ func isUpForRenewal(cfg api.AutopilotConfig, r types.FileContractRevision, block } // checkHost performs a series of checks on the host. -func checkHost(cfg api.AutopilotConfig, rs api.RedundancySettings, gc worker.GougingChecker, h api.Host, minScore float64, storedData uint64) *api.HostCheck { +func checkHost(cfg api.AutopilotConfig, rs api.RedundancySettings, gc worker.GougingChecker, h api.Host, minScore float64) *api.HostCheck { if rs.Validate() != nil { panic("invalid redundancy settings were supplied - developer error") } @@ -278,7 +278,7 @@ func checkHost(cfg api.AutopilotConfig, rs api.RedundancySettings, gc worker.Gou // not gouging, this because the core package does not have overflow // checks in its cost calculations needed to calculate the period // cost - sb = hostScore(cfg, h, storedData, rs.Redundancy()) + sb = hostScore(cfg, h, rs.Redundancy()) if sb.Score() < minScore { ub.LowScore = true } diff --git a/autopilot/contractor/hostscore.go b/autopilot/contractor/hostscore.go index 3a05a947a..51d8275fc 100644 --- a/autopilot/contractor/hostscore.go +++ b/autopilot/contractor/hostscore.go @@ -22,7 +22,7 @@ const ( minValidScore = math.SmallestNonzeroFloat64 ) -func hostScore(cfg api.AutopilotConfig, h api.Host, storedData uint64, expectedRedundancy float64) api.HostScoreBreakdown { +func hostScore(cfg api.AutopilotConfig, h api.Host, expectedRedundancy float64) api.HostScoreBreakdown { cCfg := cfg.Contracts // idealDataPerHost is the amount of data that we would have to put on each // host assuming that our storage requirements were spread evenly across @@ -44,7 +44,7 @@ func hostScore(cfg api.AutopilotConfig, h api.Host, storedData uint64, expectedR Collateral: collateralScore(cCfg, h.PriceTable.HostPriceTable, uint64(allocationPerHost)), Interactions: interactionScore(h), Prices: priceAdjustmentScore(hostPeriodCost, cCfg), - StorageRemaining: storageRemainingScore(h.Settings, storedData, allocationPerHost), + StorageRemaining: storageRemainingScore(h.Settings, h.StoredData, allocationPerHost), Uptime: uptimeScore(h), Version: versionScore(h.Settings, cfg.Hosts.MinProtocolVersion), } diff --git a/autopilot/contractor/hostscore_test.go b/autopilot/contractor/hostscore_test.go index 84f964692..ae1b7668e 100644 --- a/autopilot/contractor/hostscore_test.go +++ b/autopilot/contractor/hostscore_test.go @@ -42,13 +42,13 @@ func TestHostScore(t *testing.T) { // assert both hosts score equal redundancy := 3.0 - if hostScore(cfg, h1, 0, redundancy) != hostScore(cfg, h2, 0, redundancy) { + if hostScore(cfg, h1, redundancy) != hostScore(cfg, h2, redundancy) { t.Fatal("unexpected") } // assert age affects the score h1.KnownSince = time.Now().Add(-1 * day) - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } @@ -57,21 +57,21 @@ func TestHostScore(t *testing.T) { settings.Collateral = settings.Collateral.Div64(2) settings.MaxCollateral = settings.MaxCollateral.Div64(2) h1 = newHost(settings) // reset - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } // assert interactions affect the score h1 = newHost(test.NewHostSettings()) // reset h1.Interactions.SuccessfulInteractions++ - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } // assert uptime affects the score h2 = newHost(test.NewHostSettings()) // reset h2.Interactions.SecondToLastScanSuccess = false - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() || ageScore(h1) != ageScore(h2) { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() || ageScore(h1) != ageScore(h2) { t.Fatal("unexpected") } @@ -79,28 +79,28 @@ func TestHostScore(t *testing.T) { h2Settings := test.NewHostSettings() h2Settings.Version = "1.5.6" // lower h2 = newHost(h2Settings) // reset - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } // asseret remaining storage affects the score. h1 = newHost(test.NewHostSettings()) // reset h2.Settings.RemainingStorage = 100 - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } // assert MaxCollateral affects the score. h2 = newHost(test.NewHostSettings()) // reset h2.PriceTable.MaxCollateral = types.ZeroCurrency - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } // assert price affects the score. h2 = newHost(test.NewHostSettings()) // reset h2.PriceTable.WriteBaseCost = types.Siacoins(1) - if hostScore(cfg, h1, 0, redundancy).Score() <= hostScore(cfg, h2, 0, redundancy).Score() { + if hostScore(cfg, h1, redundancy).Score() <= hostScore(cfg, h2, redundancy).Score() { t.Fatal("unexpected") } } diff --git a/bus/bus.go b/bus/bus.go index a3b283a54..0add147bd 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1362,7 +1362,7 @@ func (b *bus) objectsCopyHandlerPOST(jc jape.Context) { return } - jc.ResponseWriter.Header().Set("Last-Modified", om.LastModified()) + jc.ResponseWriter.Header().Set("Last-Modified", om.ModTime.Std().Format(http.TimeFormat)) jc.ResponseWriter.Header().Set("ETag", api.FormatETag(om.ETag)) jc.Encode(om) } diff --git a/bus/client/client_test.go b/bus/client/client_test.go index ce84c8986..c5b887e8e 100644 --- a/bus/client/client_test.go +++ b/bus/client/client_test.go @@ -81,7 +81,6 @@ func newTestClient(dir string) (*client.Client, func() error, func(context.Conte Network: network, Genesis: genesis, SlabPruningInterval: time.Minute, - SlabPruningCooldown: time.Minute, }, filepath.Join(dir, "bus"), types.GeneratePrivateKey(), zap.New(zapcore.NewNopCore())) if err != nil { return nil, nil, nil, err diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 2b0a99605..e418c67fb 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -28,9 +28,9 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/utils" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.sia.tech/web/renterd" "go.uber.org/zap" "golang.org/x/sys/cpu" @@ -437,7 +437,6 @@ func main() { Network: network, Genesis: genesis, SlabPruningInterval: time.Hour, - SlabPruningCooldown: 30 * time.Second, } // Init db dialector if cfg.Database.MySQL.URI != "" { @@ -574,7 +573,10 @@ func main() { var workers []autopilot.Worker if len(cfg.Worker.Remotes) == 0 { if cfg.Worker.Enabled { - w, fn, err := node.NewWorker(cfg.Worker, bc, seed, logger) + w, s3Handler, fn, err := node.NewWorker(cfg.Worker, s3.Opts{ + AuthDisabled: cfg.S3.DisableAuth, + HostBucketEnabled: cfg.S3.HostBucketEnabled, + }, bc, seed, logger) if err != nil { logger.Fatal("failed to create worker: " + err.Error()) } @@ -589,13 +591,6 @@ func main() { workers = append(workers, wc) if cfg.S3.Enabled { - s3Handler, err := s3.New(bc, wc, logger.Sugar(), s3.Opts{ - AuthDisabled: cfg.S3.DisableAuth, - HostBucketEnabled: cfg.S3.HostBucketEnabled, - }) - if err != nil { - log.Fatal("failed to create s3 client", err) - } s3Srv = &http.Server{ Addr: cfg.S3.Address, Handler: s3Handler, diff --git a/go.mod b/go.mod index 62cfb250e..34b7b9e58 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 go.sia.tech/mux v1.2.0 go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca - go.sia.tech/web/renterd v0.50.0 + go.sia.tech/web/renterd v0.51.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.22.0 golang.org/x/sys v0.19.0 diff --git a/go.sum b/go.sum index 1dafb2cb7..c1f52b1fc 100644 --- a/go.sum +++ b/go.sum @@ -261,8 +261,8 @@ go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQf go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y= go.sia.tech/web v0.0.0-20240403135501-82ff3a2a3e7c h1:os2ZFJojHi0ckCNbr8c2GnWGm0ftvHkQUJOfBRGGIfk= go.sia.tech/web v0.0.0-20240403135501-82ff3a2a3e7c/go.mod h1:nGEhGmI8zV/BcC3LOCC5JLVYpidNYJIvLGIqVRWQBCg= -go.sia.tech/web/renterd v0.50.0 h1:Q955SDKAIej3vEr+P9nOjpgxCKaO+noTnOSUF30SGsc= -go.sia.tech/web/renterd v0.50.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM= +go.sia.tech/web/renterd v0.51.0 h1:hQfq6vOMll2lseQMaK9tUtc6RscO3zgLOzhCk9myHTk= +go.sia.tech/web/renterd v0.51.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= diff --git a/internal/node/node.go b/internal/node/node.go index b3de44774..c4b5d36f4 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -24,6 +24,7 @@ import ( "go.sia.tech/renterd/stores" "go.sia.tech/renterd/webhooks" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.uber.org/zap" "golang.org/x/crypto/blake2b" "gorm.io/gorm" @@ -34,6 +35,11 @@ import ( // - add wallet metrics // - add UPNP support +type Bus interface { + worker.Bus + s3.Bus +} + type BusConfig struct { config.Bus Network *consensus.Network @@ -42,7 +48,6 @@ type BusConfig struct { DBDialector gorm.Dialector DBMetricsDialector gorm.Dialector SlabPruningInterval time.Duration - SlabPruningCooldown time.Duration } type AutopilotConfig struct { @@ -203,14 +208,18 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger return b.Handler(), shutdownFn, cm, nil } -func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err + } + s3Handler, err := s3.New(b, w, l.Named("s3").Sugar(), s3Opts) + if err != nil { + err = errors.Join(err, w.Shutdown(context.Background())) + return nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err) } - - return w.Handler(), w.Shutdown, nil + return w.Handler(), s3Handler, w.Shutdown, nil } func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 35f53d421..a16bba524 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -23,12 +23,13 @@ import ( "go.sia.tech/jape" "go.sia.tech/renterd/api" "go.sia.tech/renterd/autopilot" + "go.sia.tech/renterd/build" "go.sia.tech/renterd/bus" "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/test" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" + "go.sia.tech/renterd/worker/s3" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gorm.io/gorm" @@ -36,6 +37,8 @@ import ( "go.sia.tech/renterd/worker" stypes "go.sia.tech/siad/types" + gormlogger "gorm.io/gorm/logger" + "moul.io/zapgorm2" ) const ( @@ -169,7 +172,7 @@ type testClusterOptions struct { // newTestLogger creates a console logger used for testing. func newTestLogger() *zap.Logger { - return newTestLoggerCustom(zapcore.ErrorLevel) + return newTestLoggerCustom(zapcore.DebugLevel) } // newTestLoggerCustom creates a console logger used for testing and allows @@ -241,6 +244,18 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { apSettings = *opts.autopilotSettings } + // default database logger + if busCfg.DBLogger == nil { + busCfg.DBLogger = zapgorm2.Logger{ + ZapLogger: logger.Named("SQL"), + LogLevel: gormlogger.Warn, + SlowThreshold: 100 * time.Millisecond, + SkipCallerLookup: false, + IgnoreRecordNotFoundError: true, + Context: nil, + } + } + // Check if we are testing against an external database. If so, we create a // database with a random name first. uri, user, password, _ := stores.DBConfigFromEnv() @@ -314,7 +329,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { busShutdownFns = append(busShutdownFns, bShutdownFn) // Create worker. - w, wShutdownFn, err := node.NewWorker(workerCfg, busClient, wk, logger) + w, s3Handler, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) tt.OK(err) workerAuth := jape.BasicAuth(workerPassword) @@ -327,9 +342,6 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { workerShutdownFns = append(workerShutdownFns, wShutdownFn) // Create S3 API. - s3Handler, err := s3.New(busClient, workerClient, logger.Sugar(), s3.Opts{}) - tt.OK(err) - s3Server := http.Server{ Handler: s3Handler, } @@ -420,7 +432,10 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { tt.OK(busClient.UpdateSetting(ctx, api.SettingS3Authentication, api.S3AuthenticationSettings{ V4Keypairs: map[string]string{test.S3AccessKeyID: test.S3SecretAccessKey}, })) - tt.OK(busClient.UpdateSetting(ctx, api.SettingUploadPacking, api.UploadPackingSettings{Enabled: enableUploadPacking})) + tt.OK(busClient.UpdateSetting(ctx, api.SettingUploadPacking, api.UploadPackingSettings{ + Enabled: enableUploadPacking, + SlabBufferMaxSizeSoft: build.DefaultUploadPackingSettings.SlabBufferMaxSizeSoft, + })) // Fund the bus. if funding { @@ -898,7 +913,6 @@ func testBusCfg() node.BusConfig { Network: network, Genesis: genesis, SlabPruningInterval: time.Second, - SlabPruningCooldown: 10 * time.Millisecond, } } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 8b47cb4cc..d6a0a10e3 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -27,7 +27,6 @@ import ( "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/object" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "lukechampine.com/frand" ) @@ -410,8 +409,11 @@ func TestObjectEntries(t *testing.T) { } for _, entry := range got { if !strings.HasSuffix(entry.Name, "/") { - if err := w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { + buf := new(bytes.Buffer) + if err := w.DownloadObject(context.Background(), buf, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { t.Fatal(err) + } else if buf.Len() != int(entry.Size) { + t.Fatal("unexpected", buf.Len(), entry.Size) } } } @@ -581,17 +583,26 @@ func TestUploadDownloadBasic(t *testing.T) { t.Fatal("unexpected", len(data), buffer.Len()) } - // download again, 32 bytes at a time. + // download again, 32 bytes at a time for i := int64(0); i < 4; i++ { offset := i * 32 var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: 32}})) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: 32}})) if !bytes.Equal(data[offset:offset+32], buffer.Bytes()) { fmt.Println(data[offset : offset+32]) fmt.Println(buffer.Bytes()) t.Fatalf("mismatch for offset %v", offset) } } + + // check that stored data on hosts was updated + hosts, err := cluster.Bus.Hosts(context.Background(), api.GetHostsOptions{}) + tt.OK(err) + for _, host := range hosts { + if host.StoredData != rhpv2.SectorSize { + t.Fatalf("stored data should be %v, got %v", rhpv2.SectorSize, host.StoredData) + } + } } // TestUploadDownloadExtended is an integration test that verifies objects can @@ -1450,9 +1461,7 @@ func TestWalletTransactions(t *testing.T) { t.SkipNow() } - cluster := newTestCluster(t, testClusterOptions{ - logger: newTestLoggerCustom(zapcore.DebugLevel), - }) + cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus tt := cluster.tt @@ -1570,7 +1579,7 @@ func TestUploadPacking(t *testing.T) { &buffer, api.DefaultBucketName, path, - api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: length}}, + api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: length}}, ); err != nil { t.Fatal(err) } @@ -1706,9 +1715,7 @@ func TestWallet(t *testing.T) { t.SkipNow() } - cluster := newTestCluster(t, testClusterOptions{ - logger: newTestLoggerCustom(zapcore.DebugLevel), - }) + cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus tt := cluster.tt @@ -1913,9 +1920,7 @@ func TestAlerts(t *testing.T) { t.SkipNow() } - cluster := newTestCluster(t, testClusterOptions{ - logger: newTestLoggerCustom(zapcore.DebugLevel), - }) + cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus tt := cluster.tt @@ -2139,7 +2144,7 @@ func TestMultipartUploads(t *testing.T) { } // Download a range of the object - gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: 0, Length: 1}}) + gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: 0, Length: 1}}) tt.OK(err) if gor.Range == nil || gor.Range.Offset != 0 || gor.Range.Length != 1 { t.Fatal("unexpected range:", gor.Range) diff --git a/internal/test/e2e/gouging_test.go b/internal/test/e2e/gouging_test.go index f5fa2d7fa..657ef6722 100644 --- a/internal/test/e2e/gouging_test.go +++ b/internal/test/e2e/gouging_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "testing" "time" @@ -90,11 +91,8 @@ func TestGouging(t *testing.T) { // again, this is necessary for the host to be considered price gouging time.Sleep(defaultHostSettings.PriceTableValidity) - // download the data - should fail - buffer.Reset() - if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { - t.Fatal("expected download to fail", err) - } + // download the data - should still work + tt.OKAll(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, path, api.DownloadObjectOptions{})) // try optimising gouging settings resp, err := cluster.Autopilot.EvaluateConfig(context.Background(), test.AutopilotConfig, gs, test.RedundancySettings) diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index af924f847..4bb1ea2dd 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -3,8 +3,10 @@ package e2e import ( "bytes" "context" + "net/http" "reflect" "testing" + "time" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" @@ -59,15 +61,23 @@ func TestObjectMetadata(t *testing.T) { t.Fatal("missing etag") } + // HeadObject retrieves the modtime from a http header so it's not as + // accurate as the modtime from the object GET endpoint which returns it in + // the body. + orModtime, err := time.Parse(http.TimeFormat, or.Object.ModTime.Std().Format(http.TimeFormat)) + if err != nil { + t.Fatal(err) + } + // perform a HEAD request and assert the headers are all present - hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}}) + hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: &api.DownloadRange{Offset: 1, Length: 1}}) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ ContentType: or.Object.ContentType(), Etag: gor.Etag, - LastModified: or.Object.LastModified(), - Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))}, + LastModified: api.TimeRFC3339(orModtime), + Range: &api.ContentRange{Offset: 1, Length: 1, Size: int64(len(data))}, Size: int64(len(data)), Metadata: gor.Metadata, }) { diff --git a/stores/hostdb.go b/stores/hostdb.go index a0865c813..5ca6b3c34 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -251,7 +251,7 @@ func (dbAllowlistEntry) TableName() string { return "host_allowlist_entries" } func (dbBlocklistEntry) TableName() string { return "host_blocklist_entries" } // convert converts a host into a api.HostInfo -func (h dbHost) convert(blocked bool) api.Host { +func (h dbHost) convert(blocked bool, storedData uint64) api.Host { var lastScan time.Time if h.LastScan > 0 { lastScan = time.Unix(0, h.LastScan) @@ -279,11 +279,12 @@ func (h dbHost) convert(blocked bool) api.Host { HostPriceTable: h.PriceTable.convert(), Expiry: h.PriceTableExpiry.Time, }, - PublicKey: types.PublicKey(h.PublicKey), - Scanned: h.Scanned, - Settings: rhpv2.HostSettings(h.Settings), - Blocked: blocked, - Checks: checks, + PublicKey: types.PublicKey(h.PublicKey), + Scanned: h.Scanned, + Settings: rhpv2.HostSettings(h.Settings), + Blocked: blocked, + Checks: checks, + StoredData: storedData, } } @@ -572,9 +573,25 @@ func (ss *SQLStore) SearchHosts(ctx context.Context, autopilotID, filterMode, us Preload("Blocklist") } + // fetch stored data for each host + var storedData []struct { + HostID uint + StoredData uint64 + } + err := ss.db.Raw("SELECT host_id, SUM(size) as StoredData FROM contracts GROUP BY host_id"). + Scan(&storedData). + Error + if err != nil { + return nil, fmt.Errorf("failed to fetch stored data: %w", err) + } + storedDataMap := make(map[uint]uint64) + for _, host := range storedData { + storedDataMap[host.HostID] = host.StoredData + } + var hosts []api.Host var fullHosts []dbHost - err := query. + err = query. Offset(offset). Limit(limit). FindInBatches(&fullHosts, hostRetrievalBatchSize, func(tx *gorm.DB, batch int) error { @@ -585,7 +602,7 @@ func (ss *SQLStore) SearchHosts(ctx context.Context, autopilotID, filterMode, us } else { blocked = filterMode == api.HostFilterModeBlocked } - hosts = append(hosts, fh.convert(blocked)) + hosts = append(hosts, fh.convert(blocked, storedDataMap[fh.ID])) } return nil }). diff --git a/stores/metadata.go b/stores/metadata.go index 85c242d26..403236380 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1447,13 +1447,16 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con return nil } -func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]map[types.FileContractID]struct{}) (map[types.FileContractID]dbContract, error) { - fcids := make([]fileContractID, 0, len(usedContracts)) - for _, hostFCIDs := range usedContracts { +func fetchUsedContracts(tx *gorm.DB, usedContractsByHost map[types.PublicKey]map[types.FileContractID]struct{}) (map[types.FileContractID]dbContract, error) { + // flatten map to get all used contract ids + fcids := make([]fileContractID, 0, len(usedContractsByHost)) + for _, hostFCIDs := range usedContractsByHost { for fcid := range hostFCIDs { fcids = append(fcids, fileContractID(fcid)) } } + + // fetch all contracts, take into account renewals var contracts []dbContract err := tx.Model(&dbContract{}). Joins("Host"). @@ -1462,17 +1465,19 @@ func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]map[types if err != nil { return nil, err } - fetchedContracts := make(map[types.FileContractID]dbContract, len(contracts)) + + // build map of used contracts + usedContracts := make(map[types.FileContractID]dbContract, len(contracts)) for _, c := range contracts { - // If a contract has been renewed, we add the renewed contract to the - // map using the old contract's id. - if _, renewed := usedContracts[types.PublicKey(c.Host.PublicKey)][types.FileContractID(c.RenewedFrom)]; renewed { - fetchedContracts[types.FileContractID(c.RenewedFrom)] = c - } else { - fetchedContracts[types.FileContractID(c.FCID)] = c + if _, used := usedContractsByHost[types.PublicKey(c.Host.PublicKey)][types.FileContractID(c.FCID)]; used { + usedContracts[types.FileContractID(c.FCID)] = c + } + if _, used := usedContractsByHost[types.PublicKey(c.Host.PublicKey)][types.FileContractID(c.RenewedFrom)]; used { + usedContracts[types.FileContractID(c.RenewedFrom)] = c } } - return fetchedContracts, nil + + return usedContracts, nil } func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 9b93fd6e7..4b3395dcd 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -4561,3 +4561,83 @@ func TestUpdateObjectParallel(t *testing.T) { close(c) wg.Wait() } + +// TestFetchUsedContracts is a unit test that verifies the functionality of +// fetchUsedContracts +func TestFetchUsedContracts(t *testing.T) { + // create store + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // add test host + hk1 := types.PublicKey{1} + err := ss.addTestHost(hk1) + if err != nil { + t.Fatal(err) + } + + // add test contract + fcid1 := types.FileContractID{1} + _, err = ss.addTestContract(fcid1, hk1) + if err != nil { + t.Fatal(err) + } + + // assert empty map returns no contracts + usedContracts := make(map[types.PublicKey]map[types.FileContractID]struct{}) + contracts, err := fetchUsedContracts(ss.db, usedContracts) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 0 { + t.Fatal("expected 0 contracts", len(contracts)) + } + + // add an entry for fcid1 + usedContracts[hk1] = make(map[types.FileContractID]struct{}) + usedContracts[hk1][types.FileContractID{1}] = struct{}{} + + // assert we get the used contract + contracts, err = fetchUsedContracts(ss.db, usedContracts) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 1 { + t.Fatal("expected 1 contract", len(contracts)) + } else if _, ok := contracts[fcid1]; !ok { + t.Fatal("contract not found") + } + + // renew the contract + fcid2 := types.FileContractID{2} + _, err = ss.addTestRenewedContract(fcid2, fcid1, hk1, 1) + if err != nil { + t.Fatal(err) + } + + // assert used contracts contains one entry and it points to the renewal + contracts, err = fetchUsedContracts(ss.db, usedContracts) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 1 { + t.Fatal("expected 1 contract", len(contracts)) + } else if contract, ok := contracts[fcid1]; !ok { + t.Fatal("contract not found") + } else if contract.convert().ID != fcid2 { + t.Fatal("contract should point to the renewed contract") + } + + // add an entry for fcid2 + usedContracts[hk1][types.FileContractID{2}] = struct{}{} + + // assert used contracts now contains an entry for both contracts and both + // point to the renewed contract + contracts, err = fetchUsedContracts(ss.db, usedContracts) + if err != nil { + t.Fatal(err) + } else if len(contracts) != 2 { + t.Fatal("expected 2 contracts", len(contracts)) + } else if !reflect.DeepEqual(contracts[types.FileContractID{1}], contracts[types.FileContractID{2}]) { + t.Fatal("contracts should match") + } else if contracts[types.FileContractID{1}].convert().ID != fcid2 { + t.Fatal("contracts should point to the renewed contract") + } +} diff --git a/stores/sql.go b/stores/sql.go index 2cfcdc7dd..3ac622104 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -395,13 +395,24 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er func retryTransaction(ctx context.Context, db *gorm.DB, logger *zap.SugaredLogger, intervals []time.Duration, fn func(tx *gorm.DB) error, abortFn func(error) bool) error { var err error - for i := 0; i < len(intervals); i++ { + attempts := len(intervals) + 1 + for i := 0; i < attempts; i++ { + // execute the transaction err = db.WithContext(ctx).Transaction(fn) if abortFn(err) { return err } - logger.Warn(fmt.Sprintf("transaction attempt %d/%d failed, retry in %v, err: %v", i+1, len(intervals), intervals[i], err)) - time.Sleep(intervals[i]) + + // if this was the last attempt, return the error + if i == len(intervals) { + logger.Warn(fmt.Sprintf("transaction attempt %d/%d failed, err: %v", i+1, attempts, err)) + return err + } + + // log the failed attempt and sleep before retrying + interval := intervals[i] + logger.Warn(fmt.Sprintf("transaction attempt %d/%d failed, retry in %v, err: %v", i+1, attempts, interval, err)) + time.Sleep(interval) } return fmt.Errorf("retryTransaction failed: %w", err) } diff --git a/stores/sql_test.go b/stores/sql_test.go index 6203f1163..2dea1756b 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -3,19 +3,23 @@ package stores import ( "context" "encoding/hex" + "errors" "fmt" "os" "path/filepath" + "reflect" "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "gorm.io/gorm" "gorm.io/gorm/logger" "lukechampine.com/frand" @@ -334,3 +338,58 @@ func TestQueryPlan(t *testing.T) { } } } + +func TestRetryTransaction(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // create custom logger to capture logs + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + ss.logger = zap.New(observedZapCore).Sugar() + + // collectLogs returns all logs + collectLogs := func() (logs []string) { + t.Helper() + for _, entry := range observedLogs.All() { + logs = append(logs, entry.Message) + } + return + } + + // disable retries and retry a transaction that fails + ss.retryTransactionIntervals = nil + ss.retryTransaction(context.Background(), func(tx *gorm.DB) error { return errors.New("database locked") }) + + // assert transaction is attempted once and not retried + got := collectLogs() + want := []string{"transaction attempt 1/1 failed, err: database locked"} + if !reflect.DeepEqual(got, want) { + t.Fatal("unexpected logs", cmp.Diff(got, want)) + } + + // enable retries and retry the same transaction + ss.retryTransactionIntervals = []time.Duration{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 15 * time.Millisecond, + } + ss.retryTransaction(context.Background(), func(tx *gorm.DB) error { return errors.New("database locked") }) + + // assert transaction is retried 4 times in total + got = collectLogs() + want = append(want, + "transaction attempt 1/4 failed, retry in 5ms, err: database locked", + "transaction attempt 2/4 failed, retry in 10ms, err: database locked", + "transaction attempt 3/4 failed, retry in 15ms, err: database locked", + "transaction attempt 4/4 failed, err: database locked", + ) + if !reflect.DeepEqual(got, want) { + t.Fatal("unexpected logs", cmp.Diff(got, want)) + } + + // retry transaction that aborts, assert no logs were added + ss.retryTransaction(context.Background(), func(tx *gorm.DB) error { return context.Canceled }) + if len(observedLogs.All()) != len(want) { + t.Fatal("expected no logs") + } +} diff --git a/worker/client/client.go b/worker/client/client.go index fe284469f..71fd200ad 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -303,9 +303,9 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } // parse range - var r *api.DownloadRange + var r *api.ContentRange if cr := header.Get("Content-Range"); cr != "" { - dr, err := api.ParseDownloadRange(cr) + dr, err := api.ParseContentRange(cr) if err != nil { return api.HeadObjectResponse{}, err } @@ -325,10 +325,14 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } } + modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) + if err != nil { + return api.HeadObjectResponse{}, fmt.Errorf("failed to parse Last-Modified header: %w", err) + } return api.HeadObjectResponse{ ContentType: header.Get("Content-Type"), Etag: trimEtag(header.Get("ETag")), - LastModified: header.Get("Last-Modified"), + LastModified: api.TimeRFC3339(modTime), Range: r, Size: size, Metadata: api.ExtractObjectUserMetadataFrom(headers), diff --git a/worker/host.go b/worker/host.go index 4f4e97496..f092534a0 100644 --- a/worker/host.go +++ b/worker/host.go @@ -88,8 +88,8 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 if err != nil { return err } - if breakdown := gc.Check(nil, &hpt); breakdown.Gouging() { - return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) + if breakdown := gc.Check(nil, &hpt); breakdown.DownloadErr != "" { + return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown.DownloadErr) } // return errBalanceInsufficient if balance insufficient @@ -235,10 +235,11 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ } // check whether we have money left in the contract - if pt.FundAccountCost.Cmp(rev.ValidRenterPayout()) >= 0 { - return fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), pt.FundAccountCost) + cost := types.NewCurrency64(1) + if cost.Cmp(rev.ValidRenterPayout()) >= 0 { + return fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), cost) } - availableFunds := rev.ValidRenterPayout().Sub(pt.FundAccountCost) + availableFunds := rev.ValidRenterPayout().Sub(cost) // cap the deposit amount by the money that's left in the contract if deposit.Cmp(availableFunds) > 0 { @@ -246,7 +247,7 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ } // create the payment - amount := deposit.Add(pt.FundAccountCost) + amount := deposit.Add(cost) payment, err := payByContract(rev, amount, rhpv3.Account{}, h.renterKey) // no account needed for funding if err != nil { return err @@ -254,7 +255,7 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ // fund the account if err := RPCFundAccount(ctx, t, &payment, h.acc.id, pt.UID); err != nil { - return fmt.Errorf("failed to fund account with %v (excluding cost %v);%w", deposit, pt.FundAccountCost, err) + return fmt.Errorf("failed to fund account with %v (excluding cost %v);%w", deposit, cost, err) } // record the spend @@ -277,7 +278,7 @@ func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) return h.acc.WithSync(ctx, func() (types.Currency, error) { var balance types.Currency err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - payment, err := payByContract(rev, pt.AccountBalanceCost, h.acc.id, h.renterKey) + payment, err := payByContract(rev, types.NewCurrency64(1), h.acc.id, h.renterKey) if err != nil { return err } diff --git a/worker/host_test.go b/worker/host_test.go index 3d124e9aa..dcc089154 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -21,7 +21,8 @@ type ( testHost struct { *hostMock *contractMock - hptFn func() api.HostPriceTable + hptFn func() api.HostPriceTable + uploadDelay time.Duration } testHostManager struct { @@ -91,7 +92,14 @@ func (h *testHost) DownloadSector(ctx context.Context, w io.Writer, root types.H } func (h *testHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { - h.AddSector(sector) + h.AddSector(sectorRoot, sector) + if h.uploadDelay > 0 { + select { + case <-time.After(h.uploadDelay): + case <-ctx.Done(): + return ctx.Err() + } + } return nil } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 0e45b80df..897d96cdb 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -124,8 +124,7 @@ func newContractMock(hk types.PublicKey, fcid types.FileContractID) *contractMoc } } -func (c *contractMock) AddSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) { - root = rhpv2.SectorRoot(sector) +func (c *contractMock) AddSector(root types.Hash256, sector *[rhpv2.SectorSize]byte) { c.mu.Lock() c.sectors[root] = sector c.mu.Unlock() diff --git a/s3/authentication.go b/worker/s3/authentication.go similarity index 100% rename from s3/authentication.go rename to worker/s3/authentication.go diff --git a/s3/backend.go b/worker/s3/backend.go similarity index 98% rename from s3/backend.go rename to worker/s3/backend.go index bb6e3ff7c..a8dd1cb22 100644 --- a/s3/backend.go +++ b/worker/s3/backend.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "io" + "net/http" "strings" "go.sia.tech/gofakes3" @@ -29,8 +30,8 @@ var ( ) type s3 struct { - b bus - w worker + b Bus + w Worker logger *zap.SugaredLogger } @@ -249,7 +250,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range if rangeRequest.End >= 0 { length = rangeRequest.End - rangeRequest.Start + 1 } - opts.Range = api.DownloadRange{Offset: rangeRequest.Start, Length: length} + opts.Range = &api.DownloadRange{Offset: rangeRequest.Start, Length: length} } res, err := s.w.GetObject(ctx, bucketName, objectName, opts) @@ -277,7 +278,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // decorate metadata res.Metadata["Content-Type"] = res.ContentType - res.Metadata["Last-Modified"] = res.LastModified + res.Metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes etag, err := hex.DecodeString(res.Etag) @@ -322,7 +323,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go // decorate metadata metadata["Content-Type"] = res.ContentType - metadata["Last-Modified"] = res.LastModified + metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes hash, err := hex.DecodeString(res.Etag) diff --git a/s3/s3.go b/worker/s3/s3.go similarity index 97% rename from s3/s3.go rename to worker/s3/s3.go index 0ac1dbd49..045fdf946 100644 --- a/s3/s3.go +++ b/worker/s3/s3.go @@ -23,7 +23,7 @@ type Opts struct { HostBucketEnabled bool } -type bus interface { +type Bus interface { Bucket(ctx context.Context, bucketName string) (api.Bucket, error) CreateBucket(ctx context.Context, bucketName string, opts api.CreateBucketOptions) error DeleteBucket(ctx context.Context, bucketName string) error @@ -46,7 +46,7 @@ type bus interface { UploadParams(ctx context.Context) (api.UploadParams, error) } -type worker interface { +type Worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) @@ -66,7 +66,7 @@ func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { } } -func New(b bus, w worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { +func New(b Bus, w Worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { namedLogger := logger.Named("s3") s3Backend := &s3{ b: b, diff --git a/worker/serve.go b/worker/serve.go index 25d0c0412..31c347ff7 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -6,7 +6,6 @@ import ( "io" "net/http" - "github.com/gotd/contrib/http_range" "go.sia.tech/renterd/api" ) @@ -24,14 +23,12 @@ type ( } ) -var errMultiRangeNotSupported = errors.New("multipart ranges are not supported") - -func newContentReader(r io.Reader, obj api.Object, offset int64) io.ReadSeeker { +func newContentReader(r io.Reader, size int64, offset int64) io.ReadSeeker { return &contentReader{ r: r, dataOffset: offset, seekOffset: offset, - size: obj.Size, + size: size, } } @@ -58,67 +55,18 @@ func (cr *contentReader) Read(p []byte) (int, error) { return cr.r.Read(p) } -func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, downloadFn func(w io.Writer, offset, length int64) error) (int, error) { - // parse offset and length from the request range header - offset, length, err := parseRangeHeader(req, obj) - if err != nil { - return http.StatusRequestedRangeNotSatisfiable, err - } - - // launch the download in a goroutine - pr, pw := io.Pipe() - defer pr.Close() - go func() { - if err := downloadFn(pw, offset, length); err != nil { - pw.CloseWithError(err) - } else { - pw.Close() - } - }() - - // fetch the content type, if not set and we can't infer it from object's - // name we default to application/octet-stream, that is important because we - // have to avoid http.ServeContent to sniff the content type as it would - // require a seek - contentType := obj.ContentType() - if contentType == "" { - contentType = "application/octet-stream" - } - rw.Header().Set("Content-Type", contentType) - - // set the response headers, no need to set Last-Modified header as - // serveContent does that for us - rw.Header().Set("ETag", api.FormatETag(obj.ETag)) +func serveContent(rw http.ResponseWriter, req *http.Request, name string, content io.Reader, hor api.HeadObjectResponse) { + // set content type and etag + rw.Header().Set("Content-Type", hor.ContentType) + rw.Header().Set("ETag", api.FormatETag(hor.Etag)) // set the user metadata headers - for k, v := range obj.Metadata { + for k, v := range hor.Metadata { rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetadataPrefix, k), v) } // create a content reader - rs := newContentReader(pr, obj, offset) + rs := newContentReader(content, hor.Size, hor.Range.Offset) - http.ServeContent(rw, req, obj.Name, obj.ModTime.Std(), rs) - return http.StatusOK, nil -} - -func parseRangeHeader(req *http.Request, obj api.Object) (int64, int64, error) { - // parse the request range - ranges, err := http_range.ParseRange(req.Header.Get("Range"), obj.Size) - if err != nil { - return 0, 0, err - } - - // extract requested offset and length - offset := int64(0) - length := obj.Size - if len(ranges) == 1 { - offset, length = ranges[0].Start, ranges[0].Length - if offset < 0 || length < 0 || offset+length > obj.Size { - return 0, 0, fmt.Errorf("%w: %v %v", http_range.ErrInvalid, offset, length) - } - } else if len(ranges) > 1 { - return 0, 0, errMultiRangeNotSupported - } - return offset, length, nil + http.ServeContent(rw, req, name, hor.LastModified.Std(), rs) } diff --git a/worker/upload.go b/worker/upload.go index 00c0a924b..718d717ab 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -154,8 +154,9 @@ func (w *worker) initUploadManager(maxMemory, maxOverdrive uint64, overdriveTime w.uploadManager = newUploadManager(w.shutdownCtx, w, mm, w.bus, w.bus, w.bus, maxOverdrive, overdriveTimeout, w.contractLockingDuration, logger) } -func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (_ string, err error) { +func (w *worker) upload(ctx context.Context, bucket, path string, r io.Reader, contracts []api.ContractMetadata, opts ...UploadOption) (_ string, err error) { // apply the options + up := defaultParameters(bucket, path) for _, opt := range opts { opt(&up) } @@ -202,11 +203,11 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra } } } - - // make sure there's a goroutine uploading the remainder of the packed slabs - go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload) } + // make sure there's a goroutine uploading any packed slabs + go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload) + return eTag, nil } @@ -950,7 +951,7 @@ loop: func (s *slabUpload) canOverdrive(overdriveTimeout time.Duration) bool { // overdrive is not kicking in yet remaining := s.numSectors - s.numUploaded - if remaining >= s.maxOverdrive { + if remaining > s.maxOverdrive { return false } @@ -1046,14 +1047,23 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { } s.numInflight-- - // failed reqs can't complete the upload - if resp.err != nil { - s.errs[req.hk] = resp.err + // redundant sectors can't complete the upload + if sector.isUploaded() { + // release the candidate + for _, candidate := range s.candidates { + if candidate.req == req { + candidate.req = nil + break + } + } return false, false } - // redundant sectors can't complete the upload - if sector.uploaded.Root != (types.Hash256{}) { + // failed reqs can't complete the upload, we do this after the isUploaded + // check since any error returned for a redundant sector is probably a + // result of the sector ctx being closed + if resp.err != nil { + s.errs[req.hk] = resp.err return false, false } @@ -1067,13 +1077,6 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { // update uploaded sectors s.numUploaded++ - // release all other candidates for this sector - for _, candidate := range s.candidates { - if candidate.req != nil && candidate.req != req && candidate.req.sector.index == sector.index { - candidate.req = nil - } - } - // release memory s.mem.ReleaseSome(rhpv2.SectorSize) diff --git a/worker/upload_params.go b/worker/upload_params.go index d3cca49e4..ae8baa8d0 100644 --- a/worker/upload_params.go +++ b/worker/upload_params.go @@ -38,22 +38,6 @@ func defaultParameters(bucket, path string) uploadParameters { } } -func multipartParameters(bucket, path, uploadID string, partNumber int) uploadParameters { - return uploadParameters{ - bucket: bucket, - path: path, - - multipart: true, - uploadID: uploadID, - partNumber: partNumber, - - ec: object.GenerateEncryptionKey(), // random key - encryptionOffset: 0, // from the beginning - - rs: build.DefaultRedundancySettings, - } -} - type UploadOption func(*uploadParameters) func WithBlockHeight(bh uint64) UploadOption { @@ -92,12 +76,25 @@ func WithPacking(packing bool) UploadOption { } } +func WithPartNumber(partNumber int) UploadOption { + return func(up *uploadParameters) { + up.partNumber = partNumber + } +} + func WithRedundancySettings(rs api.RedundancySettings) UploadOption { return func(up *uploadParameters) { up.rs = rs } } +func WithUploadID(uploadID string) UploadOption { + return func(up *uploadParameters) { + up.uploadID = uploadID + up.multipart = true + } +} + func WithObjectUserMetadata(metadata api.ObjectUserMetadata) UploadOption { return func(up *uploadParameters) { up.metadata = metadata diff --git a/worker/upload_test.go b/worker/upload_test.go index 0b6308ffe..b9cc05ba2 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -140,6 +140,8 @@ func TestUploadPackedSlab(t *testing.T) { // create upload params params := testParameters(t.Name()) params.packing = true + opts := testOpts() + opts = append(opts, WithPacking(true)) // create test data data := frand.Bytes(128) @@ -220,7 +222,7 @@ func TestUploadPackedSlab(t *testing.T) { uploadBytes := func(n int) { t.Helper() params.path = fmt.Sprintf("%s_%d", t.Name(), c) - _, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params) + _, err := w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(frand.Bytes(n)), w.Contracts(), opts...) if err != nil { t.Fatal(err) } @@ -502,10 +504,11 @@ func TestRefreshUploaders(t *testing.T) { // create upload params params := testParameters(t.Name()) + opts := testOpts() // upload data contracts := w.Contracts() - _, err := w.upload(context.Background(), bytes.NewReader(data), contracts, params) + _, err := w.upload(context.Background(), params.bucket, t.Name(), bytes.NewReader(data), contracts, opts...) if err != nil { t.Fatal(err) } @@ -607,7 +610,7 @@ func TestUploadRegression(t *testing.T) { // upload data ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, err := w.upload(ctx, bytes.NewReader(data), w.Contracts(), params) + _, err := w.upload(ctx, params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if !errors.Is(err, errUploadInterrupted) { t.Fatal(err) } @@ -616,7 +619,7 @@ func TestUploadRegression(t *testing.T) { unblock() // upload data - _, err = w.upload(context.Background(), bytes.NewReader(data), w.Contracts(), params) + _, err = w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if err != nil { t.Fatal(err) } @@ -637,6 +640,37 @@ func TestUploadRegression(t *testing.T) { } } +func TestUploadSingleSectorSlowHosts(t *testing.T) { + // create test worker + w := newTestWorker(t) + + // add hosts to worker + minShards := 10 + totalShards := 30 + slowHosts := 5 + w.uploadManager.maxOverdrive = uint64(slowHosts) + w.uploadManager.overdriveTimeout = time.Second + hosts := w.AddHosts(totalShards + slowHosts) + + for i := 0; i < slowHosts; i++ { + hosts[i].uploadDelay = time.Hour + } + + // create test data + data := frand.Bytes(rhpv2.SectorSize * minShards) + + // create upload params + params := testParameters(t.Name()) + params.rs.MinShards = minShards + params.rs.TotalShards = totalShards + + // upload data + _, _, err := w.uploadManager.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload) + if err != nil { + t.Fatal(err) + } +} + func testParameters(path string) uploadParameters { return uploadParameters{ bucket: testBucket, @@ -649,3 +683,10 @@ func testParameters(path string) uploadParameters { rs: testRedundancySettings, } } + +func testOpts() []UploadOption { + return []UploadOption{ + WithContractSet(testContractSet), + WithRedundancySettings(testRedundancySettings), + } +} diff --git a/worker/uploader.go b/worker/uploader.go index 403accbc8..80bd2393b 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -139,6 +139,8 @@ outer: canceledOverdrive := req.done() && req.overdrive && err != nil if !canceledOverdrive && !isClosedStream(err) { u.trackSectorUpload(err, elapsed) + } else { + u.logger.Debugw("not tracking sector upload metric", zap.Error(err)) } } } diff --git a/worker/worker.go b/worker/worker.go index 813067e17..b2d157b1e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,6 +1,7 @@ package worker import ( + "bytes" "context" "errors" "fmt" @@ -872,32 +873,45 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { return } + var off int + if jc.DecodeForm("offset", &off) != nil { + return + } + limit := -1 + if jc.DecodeForm("limit", &limit) != nil { + return + } + + dr, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) + return + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) + return + } else if err != nil { + jc.Error(err, http.StatusInternalServerError) + return + } + // fetch object metadata - res, err := w.bus.Object(jc.Request.Context(), bucket, path, api.GetObjectOptions{ - IgnoreDelim: ignoreDelim, - OnlyMetadata: true, + hor, err := w.HeadObject(jc.Request.Context(), bucket, path, api.HeadObjectOptions{ + IgnoreDelim: ignoreDelim, + Range: &dr, }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return - } else if err != nil { - jc.Error(err, http.StatusInternalServerError) + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) return - } else if res.Object == nil { - jc.Error(api.ErrObjectNotFound, http.StatusInternalServerError) // should never happen but checking because we deref. later + } else if jc.Check("couldn't get object", err) != nil { return } // serve the content to ensure we're setting the exact same headers as we // would for a GET request - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, func(io.Writer, int64, int64) error { return nil }) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, bytes.NewReader(nil), *hor) } func (w *worker) objectsHandlerGET(jc jape.Context) { @@ -949,60 +963,48 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { } path := jc.PathParam("path") - res, err := w.bus.Object(ctx, bucket, path, opts) - if utils.IsErr(err, api.ErrObjectNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't get object or entries", err) != nil { - return - } if path == "" || strings.HasSuffix(path, "/") { + // list directory + res, err := w.bus.Object(ctx, bucket, path, opts) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't get object or entries", err) != nil { + return + } jc.Encode(res.Entries) return } - // return early if the object is empty - if len(res.Object.Slabs) == 0 { + dr, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("couldn't fetch gouging parameters from bus", err) != nil { + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) return - } - - // fetch all contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) - if err != nil { + } else if err != nil { jc.Error(err, http.StatusInternalServerError) return } - // create a download function - downloadFn := func(wr io.Writer, offset, length int64) (err error) { - ctx = WithGougingChecker(ctx, w.bus, gp) - err = w.downloadManager.DownloadObject(ctx, wr, *res.Object.Object, uint64(offset), uint64(length), contracts) - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && - !errors.Is(err, errDownloadCancelled) && - !errors.Is(err, io.ErrClosedPipe) { - w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) - } - } + gor, err := w.GetObject(ctx, bucket, path, api.DownloadObjectOptions{ + GetObjectOptions: opts, + Range: &dr, + }) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't get object", err) != nil { return } + defer gor.Content.Close() // serve the content - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, downloadFn) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, gor.Content, gor.HeadObjectResponse) } func (w *worker) objectsHandlerPUT(jc jape.Context) { @@ -1012,18 +1014,10 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the mimetype from the query string @@ -1038,35 +1032,12 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { - return - } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } @@ -1078,40 +1049,33 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } } - // build options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithMimeType(mimeType), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithObjectUserMetadata(metadata), - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { - return - } - // upload the object - params := defaultParameters(bucket, path) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) - if err := jc.Check("couldn't upload object", err); err != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err)) - } - } + resp, err := w.UploadObject(ctx, jc.Request.Body, bucket, path, api.UploadObjectOptions{ + MinShards: minShards, + TotalShards: totalShards, + ContractSet: contractset, + ContentLength: jc.Request.ContentLength, + MimeType: mimeType, + Metadata: metadata, + }) + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) + return + } else if jc.Check("couldn't upload object", err) != nil { return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { @@ -1121,34 +1085,10 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the bucket from the query string @@ -1157,13 +1097,6 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - // decode the upload id var uploadID string if jc.DecodeForm("uploadid", &uploadID) != nil { @@ -1180,76 +1113,57 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { } // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { - return + + // prepare options + opts := api.UploadMultipartUploadPartOptions{ + ContractSet: contractset, + MinShards: minShards, + TotalShards: totalShards, + EncryptionOffset: nil, + ContentLength: jc.Request.ContentLength, } // get the offset var offset int if jc.DecodeForm("offset", &offset) != nil { return - } else if offset < 0 { - jc.Error(errors.New("offset must be positive"), http.StatusBadRequest) - return + } else if jc.Request.FormValue("offset") != "" { + opts.EncryptionOffset = &offset } - // fetch upload from bus - upload, err := w.bus.MultipartUpload(ctx, uploadID) - if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + // upload the multipart + resp, err := w.UploadMultipartUploadPart(ctx, jc.Request.Body, bucket, path, uploadID, partNumber, opts) + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { jc.Error(err, http.StatusNotFound) return - } else if jc.Check("failed to fetch multipart upload", err) != nil { + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) return - } - - // built options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithCustomKey(upload.Key), - } - - // make sure only one of the following is set - if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && jc.Request.FormValue("offset") == "" { - jc.Error(errors.New("if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set"), http.StatusBadRequest) + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) return - } else if encryptionEnabled { - opts = append(opts, WithCustomEncryptionOffset(uint64(offset))) - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { + } else if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + jc.Error(err, http.StatusNotFound) return - } - - // upload the multipart - params := multipartParameters(bucket, path, uploadID, partNumber) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) - if jc.Check("couldn't upload object", err) != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err)) - } - } + } else if utils.IsErr(err, api.ErrInvalidMultipartEncryptionSettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't upload multipart part", err) != nil { return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) objectsHandlerDELETE(jc jape.Context) { @@ -1581,3 +1495,238 @@ func isErrHostUnreachable(err error) bool { utils.IsErr(err, errors.New("unknown port")) || utils.IsErr(err, errors.New("cannot assign requested address")) } + +func (w *worker) headObject(ctx context.Context, bucket, path string, onlyMetadata bool, opts api.HeadObjectOptions) (*api.HeadObjectResponse, api.ObjectsResponse, error) { + // fetch object + res, err := w.bus.Object(ctx, bucket, path, api.GetObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + OnlyMetadata: onlyMetadata, + }) + if err != nil { + return nil, api.ObjectsResponse{}, fmt.Errorf("couldn't fetch object: %w", err) + } else if res.Object == nil { + return nil, api.ObjectsResponse{}, errors.New("object is a directory") + } + + // adjust length + if opts.Range == nil { + opts.Range = &api.DownloadRange{Offset: 0, Length: -1} + } + if opts.Range.Length == -1 { + opts.Range.Length = res.Object.Size - opts.Range.Offset + } + + // check size of object against range + if opts.Range.Offset+opts.Range.Length > res.Object.Size { + return nil, api.ObjectsResponse{}, http_range.ErrInvalid + } + + return &api.HeadObjectResponse{ + ContentType: res.Object.MimeType, + Etag: res.Object.ETag, + LastModified: res.Object.ModTime, + Range: opts.Range.ContentRange(res.Object.Size), + Size: res.Object.Size, + Metadata: res.Object.Metadata, + }, res, nil +} + +func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + // head object + hor, res, err := w.headObject(ctx, bucket, path, false, api.HeadObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + Range: opts.Range, + }) + if err != nil { + return nil, fmt.Errorf("couldn't fetch object: %w", err) + } + obj := *res.Object.Object + + // adjust range + if opts.Range == nil { + opts.Range = &api.DownloadRange{} + } + opts.Range.Offset = hor.Range.Offset + opts.Range.Length = hor.Range.Length + + // fetch gouging params + gp, err := w.bus.GougingParams(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't fetch gouging parameters from bus: %w", err) + } + + // fetch all contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // prepare the content + var content io.ReadCloser + if opts.Range.Length == 0 || obj.TotalSize() == 0 { + // if the object has no content or the requested range is 0, return an + // empty reader + content = io.NopCloser(bytes.NewReader(nil)) + } else { + // otherwise return a pipe reader + downloadFn := func(wr io.Writer, offset, length int64) error { + ctx = WithGougingChecker(ctx, w.bus, gp) + err = w.downloadManager.DownloadObject(ctx, wr, obj, uint64(offset), uint64(length), contracts) + if err != nil { + w.logger.Error(err) + if !errors.Is(err, ErrShuttingDown) && + !errors.Is(err, errDownloadCancelled) && + !errors.Is(err, io.ErrClosedPipe) { + w.registerAlert(newDownloadFailedAlert(bucket, path, opts.Prefix, opts.Marker, offset, length, int64(len(contracts)), err)) + } + return fmt.Errorf("failed to download object: %w", err) + } + return nil + } + pr, pw := io.Pipe() + go func() { + err := downloadFn(pw, opts.Range.Offset, opts.Range.Length) + pw.CloseWithError(err) + }() + content = pr + } + + return &api.GetObjectResponse{ + Content: content, + HeadObjectResponse: *hor, + }, nil +} + +func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) { + res, _, err := w.headObject(ctx, bucket, path, true, opts) + return res, err +} + +func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) + if err != nil { + return nil, err + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithMimeType(opts.MimeType), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithObjectUserMetadata(opts.Metadata), + ) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, opts.MimeType, up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadObjectResponse{ + ETag: eTag, + }, nil +} + +func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) + if err != nil { + return nil, err + } + + // fetch upload from bus + upload, err := w.bus.MultipartUpload(ctx, uploadID) + if err != nil { + return nil, fmt.Errorf("couldn't fetch multipart upload: %w", err) + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // prepare opts + uploadOpts := []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithCustomKey(upload.Key), + WithPartNumber(partNumber), + WithUploadID(uploadID), + } + + // make sure only one of the following is set + if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && opts.EncryptionOffset == nil { + return nil, fmt.Errorf("%w: if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set", api.ErrInvalidMultipartEncryptionSettings) + } else if opts.EncryptionOffset != nil && *opts.EncryptionOffset < 0 { + return nil, fmt.Errorf("%w: encryption offset must be positive", api.ErrInvalidMultipartEncryptionSettings) + } else if encryptionEnabled { + uploadOpts = append(uploadOpts, WithCustomEncryptionOffset(uint64(*opts.EncryptionOffset))) + } + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, uploadOpts...) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadMultipartUploadPartResponse{ + ETag: eTag, + }, nil +} + +func (w *worker) prepareUploadParams(ctx context.Context, bucket string, contractSet string, minShards, totalShards int) (api.UploadParams, error) { + // return early if the bucket does not exist + _, err := w.bus.Bucket(ctx, bucket) + if err != nil { + return api.UploadParams{}, fmt.Errorf("bucket '%s' not found; %w", bucket, err) + } + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if err != nil { + return api.UploadParams{}, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) + } else if contractSet != "" { + up.ContractSet = contractSet + } else if up.ContractSet == "" { + return api.UploadParams{}, api.ErrContractSetNotSpecified + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + return api.UploadParams{}, api.ErrConsensusNotSynced + } + + // allow overriding the redundancy settings + if minShards != 0 { + up.RedundancySettings.MinShards = minShards + } + if totalShards != 0 { + up.RedundancySettings.TotalShards = totalShards + } + err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + if err != nil { + return api.UploadParams{}, err + } + return up, nil +}