diff --git a/api/multipart.go b/api/multipart.go index ee26567b1..ecd19789f 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -7,6 +7,11 @@ import ( ) var ( + // ErrInvalidMultipartEncryptionSettings is returned if the multipart upload + // has an invalid combination of encryption params. e.g. when encryption is + // enabled but not offset is set. + ErrInvalidMultipartEncryptionSettings = errors.New("invalid multipart encryption settings") + // ErrMultipartUploadNotFound is returned if the specified multipart upload // wasn't found. ErrMultipartUploadNotFound = errors.New("multipart upload not found") diff --git a/api/object.go b/api/object.go index 0382f69a7..91332eec7 100644 --- a/api/object.go +++ b/api/object.go @@ -91,12 +91,12 @@ type ( // HeadObjectResponse is the response type for the HEAD /worker/object endpoint. HeadObjectResponse struct { - ContentType string `json:"contentType"` - Etag string `json:"eTag"` - LastModified string `json:"lastModified"` - Range *DownloadRange `json:"range,omitempty"` - Size int64 `json:"size"` - Metadata ObjectUserMetadata `json:"metadata"` + ContentType string + Etag string + LastModified TimeRFC3339 + Range *ContentRange + Size int64 + Metadata ObjectUserMetadata } // ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint. @@ -151,12 +151,6 @@ func ExtractObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadat return oum } -// LastModified returns the object's ModTime formatted for use in the -// 'Last-Modified' header -func (o ObjectMetadata) LastModified() string { - return o.ModTime.Std().Format(http.TimeFormat) -} - // ContentType returns the object's MimeType for use in the 'Content-Type' // header, if the object's mime type is empty we try and deduce it from the // extension in the object's name. @@ -214,12 +208,12 @@ type ( HeadObjectOptions struct { IgnoreDelim bool - Range DownloadRange + Range *DownloadRange } DownloadObjectOptions struct { GetObjectOptions - Range DownloadRange + Range *DownloadRange } GetObjectOptions struct { @@ -247,7 +241,6 @@ type ( // UploadObjectOptions is the options type for the worker client. UploadObjectOptions struct { - Offset int MinShards int TotalShards int ContractSet string @@ -257,15 +250,15 @@ type ( } UploadMultipartUploadPartOptions struct { + ContractSet string + MinShards int + TotalShards int EncryptionOffset *int ContentLength int64 } ) func (opts UploadObjectOptions) ApplyValues(values url.Values) { - if opts.Offset != 0 { - values.Set("offset", fmt.Sprint(opts.Offset)) - } if opts.MinShards != 0 { values.Set("minshards", fmt.Sprint(opts.MinShards)) } @@ -290,6 +283,15 @@ func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) { if opts.EncryptionOffset != nil { values.Set("offset", fmt.Sprint(*opts.EncryptionOffset)) } + if opts.MinShards != 0 { + values.Set("minshards", fmt.Sprint(opts.MinShards)) + } + if opts.TotalShards != 0 { + values.Set("totalshards", fmt.Sprint(opts.TotalShards)) + } + if opts.ContractSet != "" { + values.Set("contractset", opts.ContractSet) + } } func (opts DownloadObjectOptions) ApplyValues(values url.Values) { @@ -297,7 +299,7 @@ func (opts DownloadObjectOptions) ApplyValues(values url.Values) { } func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { @@ -319,7 +321,7 @@ func (opts HeadObjectOptions) Apply(values url.Values) { } func (opts HeadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { diff --git a/api/setting.go b/api/setting.go index 0c0057410..923863e58 100644 --- a/api/setting.go +++ b/api/setting.go @@ -24,6 +24,10 @@ const ( ) var ( + // ErrInvalidRedundancySettings is returned if the redundancy settings are + // not valid + ErrInvalidRedundancySettings = errors.New("invalid redundancy settings") + // ErrSettingNotFound is returned if a requested setting is not present in the // database. ErrSettingNotFound = errors.New("setting not found") @@ -136,13 +140,13 @@ func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { // valid. func (rs RedundancySettings) Validate() error { if rs.MinShards < 1 { - return errors.New("MinShards must be greater than 0") + return fmt.Errorf("%w: MinShards must be greater than 0", ErrInvalidRedundancySettings) } if rs.TotalShards < rs.MinShards { - return errors.New("TotalShards must be at least MinShards") + return fmt.Errorf("%w: TotalShards must be at least MinShards", ErrInvalidRedundancySettings) } if rs.TotalShards > 255 { - return errors.New("TotalShards must be less than 256") + return fmt.Errorf("%w: TotalShards must be less than 256", ErrInvalidRedundancySettings) } return nil } diff --git a/api/worker.go b/api/worker.go index 6d0c0e9d2..2908802f7 100644 --- a/api/worker.go +++ b/api/worker.go @@ -3,9 +3,12 @@ package api import ( "errors" "fmt" + "math" + "net/http" "strconv" "strings" + "github.com/gotd/contrib/http_range" rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" @@ -23,6 +26,10 @@ var ( // ErrHostOnPrivateNetwork is returned by the worker API when a host can't // be scanned since it is on a private network. ErrHostOnPrivateNetwork = errors.New("host is on a private network") + + // ErrMultiRangeNotSupported is returned by the worker API when a request + // tries to download multiple ranges at once. + ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported") ) type ( @@ -216,41 +223,76 @@ type ( } ) -type DownloadRange struct { +// ContentRange represents a content range returned via the "Content-Range" +// header. +type ContentRange struct { Offset int64 Length int64 Size int64 } -func ParseDownloadRange(contentRange string) (DownloadRange, error) { +// DownloadRange represents a requested range for a download via the "Range" +// header. +type DownloadRange struct { + Offset int64 + Length int64 +} + +func (r *DownloadRange) ContentRange(size int64) *ContentRange { + return &ContentRange{ + Offset: r.Offset, + Length: r.Length, + Size: size, + } +} + +func ParseContentRange(contentRange string) (ContentRange, error) { parts := strings.Split(contentRange, " ") if len(parts) != 2 || parts[0] != "bytes" { - return DownloadRange{}, errors.New("missing 'bytes' prefix in range header") + return ContentRange{}, errors.New("missing 'bytes' prefix in range header") } parts = strings.Split(parts[1], "/") if len(parts) != 2 { - return DownloadRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) + return ContentRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) } rangeStr := parts[0] rangeParts := strings.Split(rangeStr, "-") if len(rangeParts) != 2 { - return DownloadRange{}, errors.New("invalid Content-Range header") + return ContentRange{}, errors.New("invalid Content-Range header") } start, err := strconv.ParseInt(rangeParts[0], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } end, err := strconv.ParseInt(rangeParts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } size, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } - return DownloadRange{ + return ContentRange{ Offset: start, Length: end - start + 1, Size: size, }, nil } + +func ParseDownloadRange(req *http.Request) (DownloadRange, error) { + // parse the request range we pass math.MaxInt64 since a range header in a + // request doesn't have a size + ranges, err := http_range.ParseRange(req.Header.Get("Range"), math.MaxInt64) + if err != nil { + return DownloadRange{}, err + } + + // extract requested offset and length + dr := DownloadRange{Offset: 0, Length: -1} + if len(ranges) == 1 { + dr.Offset, dr.Length = ranges[0].Start, ranges[0].Length + } else if len(ranges) > 1 { + return DownloadRange{}, ErrMultiRangeNotSupported + } + return dr, nil +} diff --git a/bus/bus.go b/bus/bus.go index a6b86c0e1..804184e43 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1290,7 +1290,7 @@ func (b *bus) objectsCopyHandlerPOST(jc jape.Context) { return } - jc.ResponseWriter.Header().Set("Last-Modified", om.LastModified()) + jc.ResponseWriter.Header().Set("Last-Modified", om.ModTime.Std().Format(http.TimeFormat)) jc.ResponseWriter.Header().Set("ETag", api.FormatETag(om.ETag)) jc.Encode(om) } diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 24b309b17..3005b4b34 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -28,9 +28,9 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/utils" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.sia.tech/web/renterd" "go.uber.org/zap" "golang.org/x/sys/cpu" @@ -573,7 +573,10 @@ func main() { var workers []autopilot.Worker if len(cfg.Worker.Remotes) == 0 { if cfg.Worker.Enabled { - w, fn, err := node.NewWorker(cfg.Worker, bc, seed, logger) + w, s3Handler, fn, err := node.NewWorker(cfg.Worker, s3.Opts{ + AuthDisabled: cfg.S3.DisableAuth, + HostBucketEnabled: cfg.S3.HostBucketEnabled, + }, bc, seed, logger) if err != nil { logger.Fatal("failed to create worker: " + err.Error()) } @@ -588,13 +591,6 @@ func main() { workers = append(workers, wc) if cfg.S3.Enabled { - s3Handler, err := s3.New(bc, wc, logger.Sugar(), s3.Opts{ - AuthDisabled: cfg.S3.DisableAuth, - HostBucketEnabled: cfg.S3.HostBucketEnabled, - }) - if err != nil { - log.Fatal("failed to create s3 client", err) - } s3Srv = &http.Server{ Addr: cfg.S3.Address, Handler: s3Handler, diff --git a/internal/node/node.go b/internal/node/node.go index 8d8a9816c..d247f199a 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -20,6 +20,7 @@ import ( "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.sia.tech/siad/modules" mconsensus "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" @@ -31,6 +32,11 @@ import ( "gorm.io/gorm/logger" ) +type Bus interface { + worker.Bus + s3.Bus +} + type BusConfig struct { config.Bus Network *consensus.Network @@ -191,14 +197,18 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht return b.Handler(), shutdownFn, nil } -func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - - return w.Handler(), w.Shutdown, nil + s3Handler, err := s3.New(b, w, l.Named("s3").Sugar(), s3Opts) + if err != nil { + err = errors.Join(err, w.Shutdown(context.Background())) + return nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err) + } + return w.Handler(), s3Handler, w.Shutdown, nil } func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 1a47a4e9a..8b310e4e7 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -24,8 +24,8 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/test" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" + "go.sia.tech/renterd/worker/s3" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gorm.io/gorm" @@ -316,7 +316,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { busShutdownFns = append(busShutdownFns, bStopFn) // Create worker. - w, wShutdownFn, err := node.NewWorker(workerCfg, busClient, wk, logger) + w, s3Handler, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) tt.OK(err) workerAuth := jape.BasicAuth(workerPassword) @@ -329,9 +329,6 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { workerShutdownFns = append(workerShutdownFns, wShutdownFn) // Create S3 API. - s3Handler, err := s3.New(busClient, workerClient, logger.Sugar(), s3.Opts{}) - tt.OK(err) - s3Server := http.Server{ Handler: s3Handler, } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 754bf273e..55db48c19 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -410,8 +410,11 @@ func TestObjectEntries(t *testing.T) { } for _, entry := range got { if !strings.HasSuffix(entry.Name, "/") { - if err := w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { + buf := new(bytes.Buffer) + if err := w.DownloadObject(context.Background(), buf, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { t.Fatal(err) + } else if buf.Len() != int(entry.Size) { + t.Fatal("unexpected", buf.Len(), entry.Size) } } } @@ -585,7 +588,7 @@ func TestUploadDownloadBasic(t *testing.T) { for i := int64(0); i < 4; i++ { offset := i * 32 var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: 32}})) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: 32}})) if !bytes.Equal(data[offset:offset+32], buffer.Bytes()) { fmt.Println(data[offset : offset+32]) fmt.Println(buffer.Bytes()) @@ -1559,7 +1562,7 @@ func TestUploadPacking(t *testing.T) { &buffer, api.DefaultBucketName, path, - api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: length}}, + api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: length}}, ); err != nil { t.Fatal(err) } @@ -2128,7 +2131,7 @@ func TestMultipartUploads(t *testing.T) { } // Download a range of the object - gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: 0, Length: 1}}) + gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: 0, Length: 1}}) tt.OK(err) if gor.Range == nil || gor.Range.Offset != 0 || gor.Range.Length != 1 { t.Fatal("unexpected range:", gor.Range) diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index af924f847..4bb1ea2dd 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -3,8 +3,10 @@ package e2e import ( "bytes" "context" + "net/http" "reflect" "testing" + "time" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" @@ -59,15 +61,23 @@ func TestObjectMetadata(t *testing.T) { t.Fatal("missing etag") } + // HeadObject retrieves the modtime from a http header so it's not as + // accurate as the modtime from the object GET endpoint which returns it in + // the body. + orModtime, err := time.Parse(http.TimeFormat, or.Object.ModTime.Std().Format(http.TimeFormat)) + if err != nil { + t.Fatal(err) + } + // perform a HEAD request and assert the headers are all present - hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}}) + hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: &api.DownloadRange{Offset: 1, Length: 1}}) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ ContentType: or.Object.ContentType(), Etag: gor.Etag, - LastModified: or.Object.LastModified(), - Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))}, + LastModified: api.TimeRFC3339(orModtime), + Range: &api.ContentRange{Offset: 1, Length: 1, Size: int64(len(data))}, Size: int64(len(data)), Metadata: gor.Metadata, }) { diff --git a/worker/client/client.go b/worker/client/client.go index fe284469f..71fd200ad 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -303,9 +303,9 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } // parse range - var r *api.DownloadRange + var r *api.ContentRange if cr := header.Get("Content-Range"); cr != "" { - dr, err := api.ParseDownloadRange(cr) + dr, err := api.ParseContentRange(cr) if err != nil { return api.HeadObjectResponse{}, err } @@ -325,10 +325,14 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } } + modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) + if err != nil { + return api.HeadObjectResponse{}, fmt.Errorf("failed to parse Last-Modified header: %w", err) + } return api.HeadObjectResponse{ ContentType: header.Get("Content-Type"), Etag: trimEtag(header.Get("ETag")), - LastModified: header.Get("Last-Modified"), + LastModified: api.TimeRFC3339(modTime), Range: r, Size: size, Metadata: api.ExtractObjectUserMetadataFrom(headers), diff --git a/s3/authentication.go b/worker/s3/authentication.go similarity index 100% rename from s3/authentication.go rename to worker/s3/authentication.go diff --git a/s3/backend.go b/worker/s3/backend.go similarity index 98% rename from s3/backend.go rename to worker/s3/backend.go index bb6e3ff7c..a8dd1cb22 100644 --- a/s3/backend.go +++ b/worker/s3/backend.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "io" + "net/http" "strings" "go.sia.tech/gofakes3" @@ -29,8 +30,8 @@ var ( ) type s3 struct { - b bus - w worker + b Bus + w Worker logger *zap.SugaredLogger } @@ -249,7 +250,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range if rangeRequest.End >= 0 { length = rangeRequest.End - rangeRequest.Start + 1 } - opts.Range = api.DownloadRange{Offset: rangeRequest.Start, Length: length} + opts.Range = &api.DownloadRange{Offset: rangeRequest.Start, Length: length} } res, err := s.w.GetObject(ctx, bucketName, objectName, opts) @@ -277,7 +278,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // decorate metadata res.Metadata["Content-Type"] = res.ContentType - res.Metadata["Last-Modified"] = res.LastModified + res.Metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes etag, err := hex.DecodeString(res.Etag) @@ -322,7 +323,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go // decorate metadata metadata["Content-Type"] = res.ContentType - metadata["Last-Modified"] = res.LastModified + metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes hash, err := hex.DecodeString(res.Etag) diff --git a/s3/s3.go b/worker/s3/s3.go similarity index 97% rename from s3/s3.go rename to worker/s3/s3.go index 0ac1dbd49..045fdf946 100644 --- a/s3/s3.go +++ b/worker/s3/s3.go @@ -23,7 +23,7 @@ type Opts struct { HostBucketEnabled bool } -type bus interface { +type Bus interface { Bucket(ctx context.Context, bucketName string) (api.Bucket, error) CreateBucket(ctx context.Context, bucketName string, opts api.CreateBucketOptions) error DeleteBucket(ctx context.Context, bucketName string) error @@ -46,7 +46,7 @@ type bus interface { UploadParams(ctx context.Context) (api.UploadParams, error) } -type worker interface { +type Worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) @@ -66,7 +66,7 @@ func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { } } -func New(b bus, w worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { +func New(b Bus, w Worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { namedLogger := logger.Named("s3") s3Backend := &s3{ b: b, diff --git a/worker/serve.go b/worker/serve.go index 25d0c0412..31c347ff7 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -6,7 +6,6 @@ import ( "io" "net/http" - "github.com/gotd/contrib/http_range" "go.sia.tech/renterd/api" ) @@ -24,14 +23,12 @@ type ( } ) -var errMultiRangeNotSupported = errors.New("multipart ranges are not supported") - -func newContentReader(r io.Reader, obj api.Object, offset int64) io.ReadSeeker { +func newContentReader(r io.Reader, size int64, offset int64) io.ReadSeeker { return &contentReader{ r: r, dataOffset: offset, seekOffset: offset, - size: obj.Size, + size: size, } } @@ -58,67 +55,18 @@ func (cr *contentReader) Read(p []byte) (int, error) { return cr.r.Read(p) } -func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, downloadFn func(w io.Writer, offset, length int64) error) (int, error) { - // parse offset and length from the request range header - offset, length, err := parseRangeHeader(req, obj) - if err != nil { - return http.StatusRequestedRangeNotSatisfiable, err - } - - // launch the download in a goroutine - pr, pw := io.Pipe() - defer pr.Close() - go func() { - if err := downloadFn(pw, offset, length); err != nil { - pw.CloseWithError(err) - } else { - pw.Close() - } - }() - - // fetch the content type, if not set and we can't infer it from object's - // name we default to application/octet-stream, that is important because we - // have to avoid http.ServeContent to sniff the content type as it would - // require a seek - contentType := obj.ContentType() - if contentType == "" { - contentType = "application/octet-stream" - } - rw.Header().Set("Content-Type", contentType) - - // set the response headers, no need to set Last-Modified header as - // serveContent does that for us - rw.Header().Set("ETag", api.FormatETag(obj.ETag)) +func serveContent(rw http.ResponseWriter, req *http.Request, name string, content io.Reader, hor api.HeadObjectResponse) { + // set content type and etag + rw.Header().Set("Content-Type", hor.ContentType) + rw.Header().Set("ETag", api.FormatETag(hor.Etag)) // set the user metadata headers - for k, v := range obj.Metadata { + for k, v := range hor.Metadata { rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetadataPrefix, k), v) } // create a content reader - rs := newContentReader(pr, obj, offset) + rs := newContentReader(content, hor.Size, hor.Range.Offset) - http.ServeContent(rw, req, obj.Name, obj.ModTime.Std(), rs) - return http.StatusOK, nil -} - -func parseRangeHeader(req *http.Request, obj api.Object) (int64, int64, error) { - // parse the request range - ranges, err := http_range.ParseRange(req.Header.Get("Range"), obj.Size) - if err != nil { - return 0, 0, err - } - - // extract requested offset and length - offset := int64(0) - length := obj.Size - if len(ranges) == 1 { - offset, length = ranges[0].Start, ranges[0].Length - if offset < 0 || length < 0 || offset+length > obj.Size { - return 0, 0, fmt.Errorf("%w: %v %v", http_range.ErrInvalid, offset, length) - } - } else if len(ranges) > 1 { - return 0, 0, errMultiRangeNotSupported - } - return offset, length, nil + http.ServeContent(rw, req, name, hor.LastModified.Std(), rs) } diff --git a/worker/upload.go b/worker/upload.go index 377db3c86..718d717ab 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -154,8 +154,9 @@ func (w *worker) initUploadManager(maxMemory, maxOverdrive uint64, overdriveTime w.uploadManager = newUploadManager(w.shutdownCtx, w, mm, w.bus, w.bus, w.bus, maxOverdrive, overdriveTimeout, w.contractLockingDuration, logger) } -func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (_ string, err error) { +func (w *worker) upload(ctx context.Context, bucket, path string, r io.Reader, contracts []api.ContractMetadata, opts ...UploadOption) (_ string, err error) { // apply the options + up := defaultParameters(bucket, path) for _, opt := range opts { opt(&up) } diff --git a/worker/upload_params.go b/worker/upload_params.go index d3cca49e4..ae8baa8d0 100644 --- a/worker/upload_params.go +++ b/worker/upload_params.go @@ -38,22 +38,6 @@ func defaultParameters(bucket, path string) uploadParameters { } } -func multipartParameters(bucket, path, uploadID string, partNumber int) uploadParameters { - return uploadParameters{ - bucket: bucket, - path: path, - - multipart: true, - uploadID: uploadID, - partNumber: partNumber, - - ec: object.GenerateEncryptionKey(), // random key - encryptionOffset: 0, // from the beginning - - rs: build.DefaultRedundancySettings, - } -} - type UploadOption func(*uploadParameters) func WithBlockHeight(bh uint64) UploadOption { @@ -92,12 +76,25 @@ func WithPacking(packing bool) UploadOption { } } +func WithPartNumber(partNumber int) UploadOption { + return func(up *uploadParameters) { + up.partNumber = partNumber + } +} + func WithRedundancySettings(rs api.RedundancySettings) UploadOption { return func(up *uploadParameters) { up.rs = rs } } +func WithUploadID(uploadID string) UploadOption { + return func(up *uploadParameters) { + up.uploadID = uploadID + up.multipart = true + } +} + func WithObjectUserMetadata(metadata api.ObjectUserMetadata) UploadOption { return func(up *uploadParameters) { up.metadata = metadata diff --git a/worker/upload_test.go b/worker/upload_test.go index 0b2488f32..b9cc05ba2 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -140,6 +140,8 @@ func TestUploadPackedSlab(t *testing.T) { // create upload params params := testParameters(t.Name()) params.packing = true + opts := testOpts() + opts = append(opts, WithPacking(true)) // create test data data := frand.Bytes(128) @@ -220,7 +222,7 @@ func TestUploadPackedSlab(t *testing.T) { uploadBytes := func(n int) { t.Helper() params.path = fmt.Sprintf("%s_%d", t.Name(), c) - _, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params) + _, err := w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(frand.Bytes(n)), w.Contracts(), opts...) if err != nil { t.Fatal(err) } @@ -502,10 +504,11 @@ func TestRefreshUploaders(t *testing.T) { // create upload params params := testParameters(t.Name()) + opts := testOpts() // upload data contracts := w.Contracts() - _, err := w.upload(context.Background(), bytes.NewReader(data), contracts, params) + _, err := w.upload(context.Background(), params.bucket, t.Name(), bytes.NewReader(data), contracts, opts...) if err != nil { t.Fatal(err) } @@ -607,7 +610,7 @@ func TestUploadRegression(t *testing.T) { // upload data ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, err := w.upload(ctx, bytes.NewReader(data), w.Contracts(), params) + _, err := w.upload(ctx, params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if !errors.Is(err, errUploadInterrupted) { t.Fatal(err) } @@ -616,7 +619,7 @@ func TestUploadRegression(t *testing.T) { unblock() // upload data - _, err = w.upload(context.Background(), bytes.NewReader(data), w.Contracts(), params) + _, err = w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if err != nil { t.Fatal(err) } @@ -680,3 +683,10 @@ func testParameters(path string) uploadParameters { rs: testRedundancySettings, } } + +func testOpts() []UploadOption { + return []UploadOption{ + WithContractSet(testContractSet), + WithRedundancySettings(testRedundancySettings), + } +} diff --git a/worker/worker.go b/worker/worker.go index 43450d933..cd3d545eb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,6 +1,7 @@ package worker import ( + "bytes" "context" "errors" "fmt" @@ -872,32 +873,45 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { return } + var off int + if jc.DecodeForm("offset", &off) != nil { + return + } + limit := -1 + if jc.DecodeForm("limit", &limit) != nil { + return + } + + dr, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) + return + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) + return + } else if err != nil { + jc.Error(err, http.StatusInternalServerError) + return + } + // fetch object metadata - res, err := w.bus.Object(jc.Request.Context(), bucket, path, api.GetObjectOptions{ - IgnoreDelim: ignoreDelim, - OnlyMetadata: true, + hor, err := w.HeadObject(jc.Request.Context(), bucket, path, api.HeadObjectOptions{ + IgnoreDelim: ignoreDelim, + Range: &dr, }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return - } else if err != nil { - jc.Error(err, http.StatusInternalServerError) + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) return - } else if res.Object == nil { - jc.Error(api.ErrObjectNotFound, http.StatusInternalServerError) // should never happen but checking because we deref. later + } else if jc.Check("couldn't get object", err) != nil { return } // serve the content to ensure we're setting the exact same headers as we // would for a GET request - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, func(io.Writer, int64, int64) error { return nil }) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, bytes.NewReader(nil), *hor) } func (w *worker) objectsHandlerGET(jc jape.Context) { @@ -949,60 +963,48 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { } path := jc.PathParam("path") - res, err := w.bus.Object(ctx, bucket, path, opts) - if utils.IsErr(err, api.ErrObjectNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't get object or entries", err) != nil { - return - } if path == "" || strings.HasSuffix(path, "/") { + // list directory + res, err := w.bus.Object(ctx, bucket, path, opts) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't get object or entries", err) != nil { + return + } jc.Encode(res.Entries) return } - // return early if the object is empty - if len(res.Object.Slabs) == 0 { + dr, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("couldn't fetch gouging parameters from bus", err) != nil { + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) return - } - - // fetch all contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) - if err != nil { + } else if err != nil { jc.Error(err, http.StatusInternalServerError) return } - // create a download function - downloadFn := func(wr io.Writer, offset, length int64) (err error) { - ctx = WithGougingChecker(ctx, w.bus, gp) - err = w.downloadManager.DownloadObject(ctx, wr, *res.Object.Object, uint64(offset), uint64(length), contracts) - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && - !errors.Is(err, errDownloadCancelled) && - !errors.Is(err, io.ErrClosedPipe) { - w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) - } - } + gor, err := w.GetObject(ctx, bucket, path, api.DownloadObjectOptions{ + GetObjectOptions: opts, + Range: &dr, + }) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't get object", err) != nil { return } + defer gor.Content.Close() // serve the content - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, downloadFn) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, gor.Content, gor.HeadObjectResponse) } func (w *worker) objectsHandlerPUT(jc jape.Context) { @@ -1012,18 +1014,10 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the mimetype from the query string @@ -1038,35 +1032,12 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { - return - } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } @@ -1078,40 +1049,33 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } } - // build options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithMimeType(mimeType), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithObjectUserMetadata(metadata), - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { - return - } - // upload the object - params := defaultParameters(bucket, path) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) - if err := jc.Check("couldn't upload object", err); err != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err)) - } - } + resp, err := w.UploadObject(ctx, jc.Request.Body, bucket, path, api.UploadObjectOptions{ + MinShards: minShards, + TotalShards: totalShards, + ContractSet: contractset, + ContentLength: jc.Request.ContentLength, + MimeType: mimeType, + Metadata: metadata, + }) + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) + return + } else if jc.Check("couldn't upload object", err) != nil { return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { @@ -1121,34 +1085,10 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the bucket from the query string @@ -1157,13 +1097,6 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - // decode the upload id var uploadID string if jc.DecodeForm("uploadid", &uploadID) != nil { @@ -1180,76 +1113,57 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { } // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { - return + + // prepare options + opts := api.UploadMultipartUploadPartOptions{ + ContractSet: contractset, + MinShards: minShards, + TotalShards: totalShards, + EncryptionOffset: nil, + ContentLength: jc.Request.ContentLength, } // get the offset var offset int if jc.DecodeForm("offset", &offset) != nil { return - } else if offset < 0 { - jc.Error(errors.New("offset must be positive"), http.StatusBadRequest) - return + } else if jc.Request.FormValue("offset") != "" { + opts.EncryptionOffset = &offset } - // fetch upload from bus - upload, err := w.bus.MultipartUpload(ctx, uploadID) - if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + // upload the multipart + resp, err := w.UploadMultipartUploadPart(ctx, jc.Request.Body, bucket, path, uploadID, partNumber, opts) + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { jc.Error(err, http.StatusNotFound) return - } else if jc.Check("failed to fetch multipart upload", err) != nil { + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) return - } - - // built options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithCustomKey(upload.Key), - } - - // make sure only one of the following is set - if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && jc.Request.FormValue("offset") == "" { - jc.Error(errors.New("if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set"), http.StatusBadRequest) + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) return - } else if encryptionEnabled { - opts = append(opts, WithCustomEncryptionOffset(uint64(offset))) - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { + } else if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + jc.Error(err, http.StatusNotFound) return - } - - // upload the multipart - params := multipartParameters(bucket, path, uploadID, partNumber) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) - if jc.Check("couldn't upload object", err) != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err)) - } - } + } else if utils.IsErr(err, api.ErrInvalidMultipartEncryptionSettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't upload multipart part", err) != nil { return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) objectsHandlerDELETE(jc jape.Context) { @@ -1585,3 +1499,238 @@ func isErrHostUnreachable(err error) bool { func isErrDuplicateTransactionSet(err error) bool { return utils.IsErr(err, modules.ErrDuplicateTransactionSet) } + +func (w *worker) headObject(ctx context.Context, bucket, path string, onlyMetadata bool, opts api.HeadObjectOptions) (*api.HeadObjectResponse, api.ObjectsResponse, error) { + // fetch object + res, err := w.bus.Object(ctx, bucket, path, api.GetObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + OnlyMetadata: onlyMetadata, + }) + if err != nil { + return nil, api.ObjectsResponse{}, fmt.Errorf("couldn't fetch object: %w", err) + } else if res.Object == nil { + return nil, api.ObjectsResponse{}, errors.New("object is a directory") + } + + // adjust length + if opts.Range == nil { + opts.Range = &api.DownloadRange{Offset: 0, Length: -1} + } + if opts.Range.Length == -1 { + opts.Range.Length = res.Object.Size - opts.Range.Offset + } + + // check size of object against range + if opts.Range.Offset+opts.Range.Length > res.Object.Size { + return nil, api.ObjectsResponse{}, http_range.ErrInvalid + } + + return &api.HeadObjectResponse{ + ContentType: res.Object.MimeType, + Etag: res.Object.ETag, + LastModified: res.Object.ModTime, + Range: opts.Range.ContentRange(res.Object.Size), + Size: res.Object.Size, + Metadata: res.Object.Metadata, + }, res, nil +} + +func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + // head object + hor, res, err := w.headObject(ctx, bucket, path, false, api.HeadObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + Range: opts.Range, + }) + if err != nil { + return nil, fmt.Errorf("couldn't fetch object: %w", err) + } + obj := *res.Object.Object + + // adjust range + if opts.Range == nil { + opts.Range = &api.DownloadRange{} + } + opts.Range.Offset = hor.Range.Offset + opts.Range.Length = hor.Range.Length + + // fetch gouging params + gp, err := w.bus.GougingParams(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't fetch gouging parameters from bus: %w", err) + } + + // fetch all contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // prepare the content + var content io.ReadCloser + if opts.Range.Length == 0 || obj.TotalSize() == 0 { + // if the object has no content or the requested range is 0, return an + // empty reader + content = io.NopCloser(bytes.NewReader(nil)) + } else { + // otherwise return a pipe reader + downloadFn := func(wr io.Writer, offset, length int64) error { + ctx = WithGougingChecker(ctx, w.bus, gp) + err = w.downloadManager.DownloadObject(ctx, wr, obj, uint64(offset), uint64(length), contracts) + if err != nil { + w.logger.Error(err) + if !errors.Is(err, ErrShuttingDown) && + !errors.Is(err, errDownloadCancelled) && + !errors.Is(err, io.ErrClosedPipe) { + w.registerAlert(newDownloadFailedAlert(bucket, path, opts.Prefix, opts.Marker, offset, length, int64(len(contracts)), err)) + } + return fmt.Errorf("failed to download object: %w", err) + } + return nil + } + pr, pw := io.Pipe() + go func() { + err := downloadFn(pw, opts.Range.Offset, opts.Range.Length) + pw.CloseWithError(err) + }() + content = pr + } + + return &api.GetObjectResponse{ + Content: content, + HeadObjectResponse: *hor, + }, nil +} + +func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) { + res, _, err := w.headObject(ctx, bucket, path, true, opts) + return res, err +} + +func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) + if err != nil { + return nil, err + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithMimeType(opts.MimeType), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithObjectUserMetadata(opts.Metadata), + ) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, opts.MimeType, up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadObjectResponse{ + ETag: eTag, + }, nil +} + +func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) + if err != nil { + return nil, err + } + + // fetch upload from bus + upload, err := w.bus.MultipartUpload(ctx, uploadID) + if err != nil { + return nil, fmt.Errorf("couldn't fetch multipart upload: %w", err) + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // prepare opts + uploadOpts := []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithCustomKey(upload.Key), + WithPartNumber(partNumber), + WithUploadID(uploadID), + } + + // make sure only one of the following is set + if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && opts.EncryptionOffset == nil { + return nil, fmt.Errorf("%w: if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set", api.ErrInvalidMultipartEncryptionSettings) + } else if opts.EncryptionOffset != nil && *opts.EncryptionOffset < 0 { + return nil, fmt.Errorf("%w: encryption offset must be positive", api.ErrInvalidMultipartEncryptionSettings) + } else if encryptionEnabled { + uploadOpts = append(uploadOpts, WithCustomEncryptionOffset(uint64(*opts.EncryptionOffset))) + } + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, uploadOpts...) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadMultipartUploadPartResponse{ + ETag: eTag, + }, nil +} + +func (w *worker) prepareUploadParams(ctx context.Context, bucket string, contractSet string, minShards, totalShards int) (api.UploadParams, error) { + // return early if the bucket does not exist + _, err := w.bus.Bucket(ctx, bucket) + if err != nil { + return api.UploadParams{}, fmt.Errorf("bucket '%s' not found; %w", bucket, err) + } + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if err != nil { + return api.UploadParams{}, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) + } else if contractSet != "" { + up.ContractSet = contractSet + } else if up.ContractSet == "" { + return api.UploadParams{}, api.ErrContractSetNotSpecified + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + return api.UploadParams{}, api.ErrConsensusNotSynced + } + + // allow overriding the redundancy settings + if minShards != 0 { + up.RedundancySettings.MinShards = minShards + } + if totalShards != 0 { + up.RedundancySettings.TotalShards = totalShards + } + err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + if err != nil { + return api.UploadParams{}, err + } + return up, nil +}