From fadd1892acea04fd6a9428dd488985bc9a152cfa Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 4 Apr 2024 14:36:04 +0200 Subject: [PATCH 01/14] worker: move s3 package to worker --- api/object.go | 19 ++- api/worker.go | 58 +++++++-- bus/bus.go | 2 +- cmd/renterd/main.go | 14 +-- internal/node/node.go | 18 ++- internal/test/e2e/cluster.go | 7 +- internal/test/e2e/metadata_test.go | 4 +- worker/client/client.go | 10 +- {s3 => worker/s3}/authentication.go | 0 {s3 => worker/s3}/backend.go | 9 +- {s3 => worker/s3}/s3.go | 6 +- worker/serve.go | 70 ++--------- worker/worker.go | 186 ++++++++++++++++++++-------- worker/worker_test.go | 9 ++ 14 files changed, 247 insertions(+), 165 deletions(-) rename {s3 => worker/s3}/authentication.go (100%) rename {s3 => worker/s3}/backend.go (99%) rename {s3 => worker/s3}/s3.go (97%) diff --git a/api/object.go b/api/object.go index 0382f69a7..b269baff2 100644 --- a/api/object.go +++ b/api/object.go @@ -9,6 +9,7 @@ import ( "net/url" "path/filepath" "strings" + "time" "go.sia.tech/renterd/object" ) @@ -91,12 +92,12 @@ type ( // HeadObjectResponse is the response type for the HEAD /worker/object endpoint. HeadObjectResponse struct { - ContentType string `json:"contentType"` - Etag string `json:"eTag"` - LastModified string `json:"lastModified"` - Range *DownloadRange `json:"range,omitempty"` - Size int64 `json:"size"` - Metadata ObjectUserMetadata `json:"metadata"` + ContentType string + Etag string + LastModified time.Time + Range *ContentRange + Size int64 + Metadata ObjectUserMetadata } // ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint. @@ -151,12 +152,6 @@ func ExtractObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadat return oum } -// LastModified returns the object's ModTime formatted for use in the -// 'Last-Modified' header -func (o ObjectMetadata) LastModified() string { - return o.ModTime.Std().Format(http.TimeFormat) -} - // ContentType returns the object's MimeType for use in the 'Content-Type' // header, if the object's mime type is empty we try and deduce it from the // extension in the object's name. diff --git a/api/worker.go b/api/worker.go index 6d0c0e9d2..68e80b80f 100644 --- a/api/worker.go +++ b/api/worker.go @@ -3,9 +3,12 @@ package api import ( "errors" "fmt" + "math" + "net/http" "strconv" "strings" + "github.com/gotd/contrib/http_range" rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" @@ -23,6 +26,10 @@ var ( // ErrHostOnPrivateNetwork is returned by the worker API when a host can't // be scanned since it is on a private network. ErrHostOnPrivateNetwork = errors.New("host is on a private network") + + // ErrMultiRangeNotSupported is returned by the worker API when a requesta + // tries to download multiple ranges at once. + ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported") ) type ( @@ -216,41 +223,74 @@ type ( } ) -type DownloadRange struct { +// ContentRange represents a content range returned via the "Content-Range" +// header. +type ContentRange struct { Offset int64 Length int64 Size int64 } -func ParseDownloadRange(contentRange string) (DownloadRange, error) { +// DownloadRange represents a requested range for a download via the "Range" +// header. +type DownloadRange struct { + Offset int64 + Length int64 +} + +func (r *DownloadRange) ContentRange(size int64) *ContentRange { + return &ContentRange{ + Offset: r.Offset, + Length: r.Length, + Size: size, + } +} + +func ParseContentRange(contentRange string) (ContentRange, error) { parts := strings.Split(contentRange, " ") if len(parts) != 2 || parts[0] != "bytes" { - return DownloadRange{}, errors.New("missing 'bytes' prefix in range header") + return ContentRange{}, errors.New("missing 'bytes' prefix in range header") } parts = strings.Split(parts[1], "/") if len(parts) != 2 { - return DownloadRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) + return ContentRange{}, fmt.Errorf("invalid Content-Range header: %s", contentRange) } rangeStr := parts[0] rangeParts := strings.Split(rangeStr, "-") if len(rangeParts) != 2 { - return DownloadRange{}, errors.New("invalid Content-Range header") + return ContentRange{}, errors.New("invalid Content-Range header") } start, err := strconv.ParseInt(rangeParts[0], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } end, err := strconv.ParseInt(rangeParts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } size, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { - return DownloadRange{}, err + return ContentRange{}, err } - return DownloadRange{ + return ContentRange{ Offset: start, Length: end - start + 1, Size: size, }, nil } + +func ParseDownloadRange(req *http.Request) (int64, int64, error) { + // parse the request range + // we pass math.MaxInt64 since a range header in a request doesn't have a + // size + ranges, err := http_range.ParseRange(req.Header.Get("Range"), math.MaxInt64) + if err != nil { + return 0, 0, err + } + + // extract requested offset and length + if len(ranges) > 1 { + return 0, 0, ErrMultiRangeNotSupported + } + return ranges[0].Start, ranges[0].Length, nil +} diff --git a/bus/bus.go b/bus/bus.go index a6b86c0e1..804184e43 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1290,7 +1290,7 @@ func (b *bus) objectsCopyHandlerPOST(jc jape.Context) { return } - jc.ResponseWriter.Header().Set("Last-Modified", om.LastModified()) + jc.ResponseWriter.Header().Set("Last-Modified", om.ModTime.Std().Format(http.TimeFormat)) jc.ResponseWriter.Header().Set("ETag", api.FormatETag(om.ETag)) jc.Encode(om) } diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 24b309b17..3005b4b34 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -28,9 +28,9 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/utils" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.sia.tech/web/renterd" "go.uber.org/zap" "golang.org/x/sys/cpu" @@ -573,7 +573,10 @@ func main() { var workers []autopilot.Worker if len(cfg.Worker.Remotes) == 0 { if cfg.Worker.Enabled { - w, fn, err := node.NewWorker(cfg.Worker, bc, seed, logger) + w, s3Handler, fn, err := node.NewWorker(cfg.Worker, s3.Opts{ + AuthDisabled: cfg.S3.DisableAuth, + HostBucketEnabled: cfg.S3.HostBucketEnabled, + }, bc, seed, logger) if err != nil { logger.Fatal("failed to create worker: " + err.Error()) } @@ -588,13 +591,6 @@ func main() { workers = append(workers, wc) if cfg.S3.Enabled { - s3Handler, err := s3.New(bc, wc, logger.Sugar(), s3.Opts{ - AuthDisabled: cfg.S3.DisableAuth, - HostBucketEnabled: cfg.S3.HostBucketEnabled, - }) - if err != nil { - log.Fatal("failed to create s3 client", err) - } s3Srv = &http.Server{ Addr: cfg.S3.Address, Handler: s3Handler, diff --git a/internal/node/node.go b/internal/node/node.go index 8d8a9816c..d247f199a 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -20,6 +20,7 @@ import ( "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" "go.sia.tech/renterd/worker" + "go.sia.tech/renterd/worker/s3" "go.sia.tech/siad/modules" mconsensus "go.sia.tech/siad/modules/consensus" "go.sia.tech/siad/modules/gateway" @@ -31,6 +32,11 @@ import ( "gorm.io/gorm/logger" ) +type Bus interface { + worker.Bus + s3.Bus +} + type BusConfig struct { config.Bus Network *consensus.Network @@ -191,14 +197,18 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht return b.Handler(), shutdownFn, nil } -func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - - return w.Handler(), w.Shutdown, nil + s3Handler, err := s3.New(b, w, l.Named("s3").Sugar(), s3Opts) + if err != nil { + err = errors.Join(err, w.Shutdown(context.Background())) + return nil, nil, nil, fmt.Errorf("failed to create s3 handler: %w", err) + } + return w.Handler(), s3Handler, w.Shutdown, nil } func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 16b3acbfd..0a066cfb5 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -23,8 +23,8 @@ import ( "go.sia.tech/renterd/config" "go.sia.tech/renterd/internal/node" "go.sia.tech/renterd/internal/test" - "go.sia.tech/renterd/s3" "go.sia.tech/renterd/stores" + "go.sia.tech/renterd/worker/s3" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gorm.io/gorm" @@ -315,7 +315,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { busShutdownFns = append(busShutdownFns, bStopFn) // Create worker. - w, wShutdownFn, err := node.NewWorker(workerCfg, busClient, wk, logger) + w, s3Handler, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) tt.OK(err) workerAuth := jape.BasicAuth(workerPassword) @@ -328,9 +328,6 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { workerShutdownFns = append(workerShutdownFns, wShutdownFn) // Create S3 API. - s3Handler, err := s3.New(busClient, workerClient, logger.Sugar(), s3.Opts{}) - tt.OK(err) - s3Server := http.Server{ Handler: s3Handler, } diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index af924f847..8fa2bd4ae 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -66,8 +66,8 @@ func TestObjectMetadata(t *testing.T) { } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ ContentType: or.Object.ContentType(), Etag: gor.Etag, - LastModified: or.Object.LastModified(), - Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))}, + LastModified: or.Object.ModTime.Std(), + Range: &api.ContentRange{Offset: 1, Length: 1, Size: int64(len(data))}, Size: int64(len(data)), Metadata: gor.Metadata, }) { diff --git a/worker/client/client.go b/worker/client/client.go index fe284469f..aecad676b 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -303,9 +303,9 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } // parse range - var r *api.DownloadRange + var r *api.ContentRange if cr := header.Get("Content-Range"); cr != "" { - dr, err := api.ParseDownloadRange(cr) + dr, err := api.ParseContentRange(cr) if err != nil { return api.HeadObjectResponse{}, err } @@ -325,10 +325,14 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err } } + modTime, err := time.Parse(http.TimeFormat, header.Get("Last-Modified")) + if err != nil { + return api.HeadObjectResponse{}, fmt.Errorf("failed to parse Last-Modified header: %w", err) + } return api.HeadObjectResponse{ ContentType: header.Get("Content-Type"), Etag: trimEtag(header.Get("ETag")), - LastModified: header.Get("Last-Modified"), + LastModified: modTime, Range: r, Size: size, Metadata: api.ExtractObjectUserMetadataFrom(headers), diff --git a/s3/authentication.go b/worker/s3/authentication.go similarity index 100% rename from s3/authentication.go rename to worker/s3/authentication.go diff --git a/s3/backend.go b/worker/s3/backend.go similarity index 99% rename from s3/backend.go rename to worker/s3/backend.go index bb6e3ff7c..2eff3d713 100644 --- a/s3/backend.go +++ b/worker/s3/backend.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "io" + "net/http" "strings" "go.sia.tech/gofakes3" @@ -29,8 +30,8 @@ var ( ) type s3 struct { - b bus - w worker + b Bus + w Worker logger *zap.SugaredLogger } @@ -277,7 +278,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // decorate metadata res.Metadata["Content-Type"] = res.ContentType - res.Metadata["Last-Modified"] = res.LastModified + res.Metadata["Last-Modified"] = res.LastModified.Format(http.TimeFormat) // etag to bytes etag, err := hex.DecodeString(res.Etag) @@ -322,7 +323,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go // decorate metadata metadata["Content-Type"] = res.ContentType - metadata["Last-Modified"] = res.LastModified + metadata["Last-Modified"] = res.LastModified.Format(http.TimeFormat) // etag to bytes hash, err := hex.DecodeString(res.Etag) diff --git a/s3/s3.go b/worker/s3/s3.go similarity index 97% rename from s3/s3.go rename to worker/s3/s3.go index 0ac1dbd49..045fdf946 100644 --- a/s3/s3.go +++ b/worker/s3/s3.go @@ -23,7 +23,7 @@ type Opts struct { HostBucketEnabled bool } -type bus interface { +type Bus interface { Bucket(ctx context.Context, bucketName string) (api.Bucket, error) CreateBucket(ctx context.Context, bucketName string, opts api.CreateBucketOptions) error DeleteBucket(ctx context.Context, bucketName string) error @@ -46,7 +46,7 @@ type bus interface { UploadParams(ctx context.Context) (api.UploadParams, error) } -type worker interface { +type Worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) @@ -66,7 +66,7 @@ func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { } } -func New(b bus, w worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { +func New(b Bus, w Worker, logger *zap.SugaredLogger, opts Opts) (http.Handler, error) { namedLogger := logger.Named("s3") s3Backend := &s3{ b: b, diff --git a/worker/serve.go b/worker/serve.go index 25d0c0412..e4467d960 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -6,7 +6,6 @@ import ( "io" "net/http" - "github.com/gotd/contrib/http_range" "go.sia.tech/renterd/api" ) @@ -24,14 +23,12 @@ type ( } ) -var errMultiRangeNotSupported = errors.New("multipart ranges are not supported") - -func newContentReader(r io.Reader, obj api.Object, offset int64) io.ReadSeeker { +func newContentReader(r io.Reader, size int64, offset int64) io.ReadSeeker { return &contentReader{ r: r, dataOffset: offset, seekOffset: offset, - size: obj.Size, + size: size, } } @@ -58,67 +55,18 @@ func (cr *contentReader) Read(p []byte) (int, error) { return cr.r.Read(p) } -func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, downloadFn func(w io.Writer, offset, length int64) error) (int, error) { - // parse offset and length from the request range header - offset, length, err := parseRangeHeader(req, obj) - if err != nil { - return http.StatusRequestedRangeNotSatisfiable, err - } - - // launch the download in a goroutine - pr, pw := io.Pipe() - defer pr.Close() - go func() { - if err := downloadFn(pw, offset, length); err != nil { - pw.CloseWithError(err) - } else { - pw.Close() - } - }() - - // fetch the content type, if not set and we can't infer it from object's - // name we default to application/octet-stream, that is important because we - // have to avoid http.ServeContent to sniff the content type as it would - // require a seek - contentType := obj.ContentType() - if contentType == "" { - contentType = "application/octet-stream" - } - rw.Header().Set("Content-Type", contentType) - - // set the response headers, no need to set Last-Modified header as - // serveContent does that for us - rw.Header().Set("ETag", api.FormatETag(obj.ETag)) +func serveContent(rw http.ResponseWriter, req *http.Request, name string, gor api.GetObjectResponse) { + // set content type and etag + rw.Header().Set("Content-Type", gor.ContentType) + rw.Header().Set("ETag", api.FormatETag(gor.Etag)) // set the user metadata headers - for k, v := range obj.Metadata { + for k, v := range gor.Metadata { rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetadataPrefix, k), v) } // create a content reader - rs := newContentReader(pr, obj, offset) + rs := newContentReader(gor.Content, gor.Size, gor.Range.Offset) - http.ServeContent(rw, req, obj.Name, obj.ModTime.Std(), rs) - return http.StatusOK, nil -} - -func parseRangeHeader(req *http.Request, obj api.Object) (int64, int64, error) { - // parse the request range - ranges, err := http_range.ParseRange(req.Header.Get("Range"), obj.Size) - if err != nil { - return 0, 0, err - } - - // extract requested offset and length - offset := int64(0) - length := obj.Size - if len(ranges) == 1 { - offset, length = ranges[0].Start, ranges[0].Length - if offset < 0 || length < 0 || offset+length > obj.Size { - return 0, 0, fmt.Errorf("%w: %v %v", http_range.ErrInvalid, offset, length) - } - } else if len(ranges) > 1 { - return 0, 0, errMultiRangeNotSupported - } - return offset, length, nil + http.ServeContent(rw, req, name, gor.LastModified, rs) } diff --git a/worker/worker.go b/worker/worker.go index 43450d933..d843c0dae 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,6 +1,7 @@ package worker import ( + "bytes" "context" "errors" "fmt" @@ -872,32 +873,45 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { return } + var off int + if jc.DecodeForm("offset", &off) != nil { + return + } + limit := -1 + if jc.DecodeForm("limit", &limit) != nil { + return + } + // fetch object metadata - res, err := w.bus.Object(jc.Request.Context(), bucket, path, api.GetObjectOptions{ + opts := api.GetObjectOptions{ + Prefix: "", // not relevant for HEAD request + Marker: "", // not relevant for HEAD request + Offset: 0, // not relevant for HEAD request + Limit: 0, // not relevant for HEAD request IgnoreDelim: ignoreDelim, OnlyMetadata: true, + SortBy: "", // not relevant for HEAD request + SortDir: "", // not relevant for HEAD reuqest + } + + gor, err := w.GetObject(jc.Request.Context(), bucket, path, api.DownloadObjectOptions{ + GetObjectOptions: opts, + Range: api.DownloadRange{}, // empty range for HEAD requests }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return - } else if err != nil { - jc.Error(err, http.StatusInternalServerError) + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) return - } else if res.Object == nil { - jc.Error(api.ErrObjectNotFound, http.StatusInternalServerError) // should never happen but checking because we deref. later + } else if jc.Check("couldn't get object", err) != nil { return } + defer gor.Content.Close() // serve the content to ensure we're setting the exact same headers as we // would for a GET request - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, func(io.Writer, int64, int64) error { return nil }) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, *gor) } func (w *worker) objectsHandlerGET(jc jape.Context) { @@ -949,60 +963,51 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { } path := jc.PathParam("path") - res, err := w.bus.Object(ctx, bucket, path, opts) - if utils.IsErr(err, api.ErrObjectNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't get object or entries", err) != nil { - return - } if path == "" || strings.HasSuffix(path, "/") { + // list directory + res, err := w.bus.Object(ctx, bucket, path, opts) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't get object or entries", err) != nil { + return + } jc.Encode(res.Entries) return } - // return early if the object is empty - if len(res.Object.Slabs) == 0 { + offset, length, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("couldn't fetch gouging parameters from bus", err) != nil { + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) return - } - - // fetch all contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) - if err != nil { + } else if err != nil { jc.Error(err, http.StatusInternalServerError) return } - // create a download function - downloadFn := func(wr io.Writer, offset, length int64) (err error) { - ctx = WithGougingChecker(ctx, w.bus, gp) - err = w.downloadManager.DownloadObject(ctx, wr, *res.Object.Object, uint64(offset), uint64(length), contracts) - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && - !errors.Is(err, errDownloadCancelled) && - !errors.Is(err, io.ErrClosedPipe) { - w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err)) - } - } + gor, err := w.GetObject(ctx, bucket, path, api.DownloadObjectOptions{ + GetObjectOptions: opts, + Range: api.DownloadRange{ + Offset: offset, + Length: length, + }, + }) + if utils.IsErr(err, api.ErrObjectNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if errors.Is(err, http_range.ErrInvalid) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't get object", err) != nil { return } + defer gor.Content.Close() // serve the content - status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, downloadFn) - if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) { - jc.Error(err, http.StatusBadRequest) - } else if errors.Is(err, http_range.ErrNoOverlap) { - jc.Error(err, http.StatusRequestedRangeNotSatisfiable) - } else if err != nil { - jc.Error(err, status) - } + serveContent(jc.ResponseWriter, jc.Request, path, *gor) } func (w *worker) objectsHandlerPUT(jc jape.Context) { @@ -1585,3 +1590,80 @@ func isErrHostUnreachable(err error) bool { func isErrDuplicateTransactionSet(err error) bool { return utils.IsErr(err, modules.ErrDuplicateTransactionSet) } + +func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + // fetch object + res, err := w.bus.Object(ctx, bucket, path, opts.GetObjectOptions) + if err != nil { + return nil, fmt.Errorf("couldn't fetch object: %w", err) + } else if res.Object == nil { + return nil, errors.New("object is a directory") + } + obj := *res.Object.Object + + // check size of object against range + if opts.Range.Offset+opts.Range.Length > res.Object.Size { + return nil, http_range.ErrInvalid + } + + // fetch gouging params + gp, err := w.bus.GougingParams(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't fetch gouging parameters from bus: %w", err) + } + + // fetch all contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // prepare the content + var content io.ReadCloser + if opts.Range.Length == 0 || obj.TotalSize() == 0 { + // if the object has no content or the requested range is 0, return an + // empty reader + content = io.NopCloser(bytes.NewReader(nil)) + } else { + // otherwise return a pipe reader + downloadFn := func(wr io.Writer, offset, length int64) error { + ctx = WithGougingChecker(ctx, w.bus, gp) + err = w.downloadManager.DownloadObject(ctx, wr, obj, uint64(offset), uint64(length), contracts) + if err != nil { + w.logger.Error(err) + if !errors.Is(err, ErrShuttingDown) && + !errors.Is(err, errDownloadCancelled) && + !errors.Is(err, io.ErrClosedPipe) { + w.registerAlert(newDownloadFailedAlert(bucket, path, opts.Prefix, opts.Marker, offset, length, int64(len(contracts)), err)) + } + return fmt.Errorf("failed to download object: %w", err) + } + return nil + } + pr, pw := io.Pipe() + go func() { + err := downloadFn(pw, opts.Range.Offset, opts.Range.Length) + pw.CloseWithError(err) + }() + content = pr + } + + return &api.GetObjectResponse{ + Content: content, + HeadObjectResponse: api.HeadObjectResponse{ + ContentType: res.Object.MimeType, + Etag: res.Object.ETag, + LastModified: res.Object.ModTime.Std(), + Range: opts.Range.ContentRange(res.Object.Size), + }, + }, nil +} +func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) { + panic("not implemented") +} +func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { + panic("not implemented") +} +func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { + panic("not implemented") +} diff --git a/worker/worker_test.go b/worker/worker_test.go index 706fae14e..637168950 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -3,8 +3,10 @@ package worker import ( "context" "fmt" + "testing" "time" + "github.com/gotd/contrib/http_range" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" @@ -134,3 +136,10 @@ func newTestSector() (*[rhpv2.SectorSize]byte, types.Hash256) { frand.Read(sector[:]) return §or, rhpv2.SectorRoot(§or) } + +func TestFoo(t *testing.T) { + fmt.Println(http_range.Range{ + Start: 1, + Length: 2, + }.ContentRange(100)) +} From 07cf7f6d117d6c44cdf616a500aba5d9f4128fc2 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 4 Apr 2024 15:11:43 +0200 Subject: [PATCH 02/14] e2e: fix TestObjectEntries --- api/worker.go | 7 +++++-- worker/worker.go | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/api/worker.go b/api/worker.go index 68e80b80f..ec8be15d3 100644 --- a/api/worker.go +++ b/api/worker.go @@ -289,8 +289,11 @@ func ParseDownloadRange(req *http.Request) (int64, int64, error) { } // extract requested offset and length - if len(ranges) > 1 { + start, length := int64(0), int64(-1) + if len(ranges) == 1 { + start, length = ranges[0].Start, ranges[0].Length + } else if len(ranges) > 1 { return 0, 0, ErrMultiRangeNotSupported } - return ranges[0].Start, ranges[0].Length, nil + return start, length, nil } diff --git a/worker/worker.go b/worker/worker.go index d843c0dae..a3922edd7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1601,6 +1601,11 @@ func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.Do } obj := *res.Object.Object + // adjust length + if opts.Range.Length == -1 { + opts.Range.Length = res.Object.Size - opts.Range.Offset + } + // check size of object against range if opts.Range.Offset+opts.Range.Length > res.Object.Size { return nil, http_range.ErrInvalid From 7f64f5e7c64a1c15564fa83b5d232423c2181f8b Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 4 Apr 2024 16:32:23 +0200 Subject: [PATCH 03/14] worker: implement HeadObject --- internal/test/e2e/cluster_test.go | 5 +- worker/serve.go | 12 ++--- worker/worker.go | 88 ++++++++++++++++++++----------- 3 files changed, 68 insertions(+), 37 deletions(-) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 754bf273e..c197bc7c9 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -410,8 +410,11 @@ func TestObjectEntries(t *testing.T) { } for _, entry := range got { if !strings.HasSuffix(entry.Name, "/") { - if err := w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { + buf := new(bytes.Buffer) + if err := w.DownloadObject(context.Background(), buf, api.DefaultBucketName, entry.Name, api.DownloadObjectOptions{}); err != nil { t.Fatal(err) + } else if buf.Len() != int(entry.Size) { + t.Fatal("unexpected", buf.Len(), entry.Size) } } } diff --git a/worker/serve.go b/worker/serve.go index e4467d960..9c02b9d5e 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -55,18 +55,18 @@ func (cr *contentReader) Read(p []byte) (int, error) { return cr.r.Read(p) } -func serveContent(rw http.ResponseWriter, req *http.Request, name string, gor api.GetObjectResponse) { +func serveContent(rw http.ResponseWriter, req *http.Request, name string, content io.Reader, hor api.HeadObjectResponse) { // set content type and etag - rw.Header().Set("Content-Type", gor.ContentType) - rw.Header().Set("ETag", api.FormatETag(gor.Etag)) + rw.Header().Set("Content-Type", hor.ContentType) + rw.Header().Set("ETag", api.FormatETag(hor.Etag)) // set the user metadata headers - for k, v := range gor.Metadata { + for k, v := range hor.Metadata { rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetadataPrefix, k), v) } // create a content reader - rs := newContentReader(gor.Content, gor.Size, gor.Range.Offset) + rs := newContentReader(content, hor.Size, hor.Range.Offset) - http.ServeContent(rw, req, name, gor.LastModified, rs) + http.ServeContent(rw, req, name, hor.LastModified, rs) } diff --git a/worker/worker.go b/worker/worker.go index a3922edd7..300b09d42 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -882,21 +882,25 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { return } - // fetch object metadata - opts := api.GetObjectOptions{ - Prefix: "", // not relevant for HEAD request - Marker: "", // not relevant for HEAD request - Offset: 0, // not relevant for HEAD request - Limit: 0, // not relevant for HEAD request - IgnoreDelim: ignoreDelim, - OnlyMetadata: true, - SortBy: "", // not relevant for HEAD request - SortDir: "", // not relevant for HEAD reuqest + offset, length, err := api.ParseDownloadRange(jc.Request) + if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { + jc.Error(err, http.StatusBadRequest) + return + } else if errors.Is(err, http_range.ErrNoOverlap) { + jc.Error(err, http.StatusRequestedRangeNotSatisfiable) + return + } else if err != nil { + jc.Error(err, http.StatusInternalServerError) + return } - gor, err := w.GetObject(jc.Request.Context(), bucket, path, api.DownloadObjectOptions{ - GetObjectOptions: opts, - Range: api.DownloadRange{}, // empty range for HEAD requests + // fetch object metadata + hor, err := w.HeadObject(jc.Request.Context(), bucket, path, api.HeadObjectOptions{ + IgnoreDelim: ignoreDelim, + Range: api.DownloadRange{ + Offset: offset, + Length: length, + }, }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) @@ -907,11 +911,10 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { } else if jc.Check("couldn't get object", err) != nil { return } - defer gor.Content.Close() // serve the content to ensure we're setting the exact same headers as we // would for a GET request - serveContent(jc.ResponseWriter, jc.Request, path, *gor) + serveContent(jc.ResponseWriter, jc.Request, path, bytes.NewReader(nil), *hor) } func (w *worker) objectsHandlerGET(jc jape.Context) { @@ -1007,7 +1010,7 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { defer gor.Content.Close() // serve the content - serveContent(jc.ResponseWriter, jc.Request, path, *gor) + serveContent(jc.ResponseWriter, jc.Request, path, gor.Content, gor.HeadObjectResponse) } func (w *worker) objectsHandlerPUT(jc jape.Context) { @@ -1591,15 +1594,17 @@ func isErrDuplicateTransactionSet(err error) bool { return utils.IsErr(err, modules.ErrDuplicateTransactionSet) } -func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { +func (w *worker) headObject(ctx context.Context, bucket, path string, onlyMetadata bool, opts api.HeadObjectOptions) (*api.HeadObjectResponse, api.ObjectsResponse, error) { // fetch object - res, err := w.bus.Object(ctx, bucket, path, opts.GetObjectOptions) + res, err := w.bus.Object(ctx, bucket, path, api.GetObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + OnlyMetadata: onlyMetadata, + }) if err != nil { - return nil, fmt.Errorf("couldn't fetch object: %w", err) + return nil, api.ObjectsResponse{}, fmt.Errorf("couldn't fetch object: %w", err) } else if res.Object == nil { - return nil, errors.New("object is a directory") + return nil, api.ObjectsResponse{}, errors.New("object is a directory") } - obj := *res.Object.Object // adjust length if opts.Range.Length == -1 { @@ -1608,9 +1613,34 @@ func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.Do // check size of object against range if opts.Range.Offset+opts.Range.Length > res.Object.Size { - return nil, http_range.ErrInvalid + return nil, api.ObjectsResponse{}, http_range.ErrInvalid } + return &api.HeadObjectResponse{ + ContentType: res.Object.MimeType, + Etag: res.Object.ETag, + LastModified: res.Object.ModTime.Std(), + Range: opts.Range.ContentRange(res.Object.Size), + Size: res.Object.Size, + Metadata: res.Object.Metadata, + }, res, nil +} + +func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + // head object + hor, res, err := w.headObject(ctx, bucket, path, false, api.HeadObjectOptions{ + IgnoreDelim: opts.IgnoreDelim, + Range: opts.Range, + }) + if err != nil { + return nil, fmt.Errorf("couldn't fetch object: %w", err) + } + obj := *res.Object.Object + + // adjust range + opts.Range.Offset = hor.Range.Offset + opts.Range.Length = hor.Range.Length + // fetch gouging params gp, err := w.bus.GougingParams(ctx) if err != nil { @@ -1654,18 +1684,16 @@ func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.Do } return &api.GetObjectResponse{ - Content: content, - HeadObjectResponse: api.HeadObjectResponse{ - ContentType: res.Object.MimeType, - Etag: res.Object.ETag, - LastModified: res.Object.ModTime.Std(), - Range: opts.Range.ContentRange(res.Object.Size), - }, + Content: content, + HeadObjectResponse: *hor, }, nil } + func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) { - panic("not implemented") + res, _, err := w.headObject(ctx, bucket, path, true, opts) + return res, err } + func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { panic("not implemented") } From 331b8359cd82c1f8db05d0db7ec9f1ac6dcd461b Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 4 Apr 2024 16:54:46 +0200 Subject: [PATCH 04/14] e2e: fix TestObjectMetadata --- api/object.go | 3 +-- internal/test/e2e/metadata_test.go | 4 +++- worker/client/client.go | 2 +- worker/s3/backend.go | 4 ++-- worker/serve.go | 2 +- worker/worker.go | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/api/object.go b/api/object.go index b269baff2..9ab1d6372 100644 --- a/api/object.go +++ b/api/object.go @@ -9,7 +9,6 @@ import ( "net/url" "path/filepath" "strings" - "time" "go.sia.tech/renterd/object" ) @@ -94,7 +93,7 @@ type ( HeadObjectResponse struct { ContentType string Etag string - LastModified time.Time + LastModified TimeRFC3339 Range *ContentRange Size int64 Metadata ObjectUserMetadata diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index 8fa2bd4ae..cc95413b4 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -5,6 +5,7 @@ import ( "context" "reflect" "testing" + "time" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" @@ -61,12 +62,13 @@ func TestObjectMetadata(t *testing.T) { // perform a HEAD request and assert the headers are all present hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}}) + hor.LastModified = api.TimeRFC3339(hor.LastModified.Std().Round(time.Second)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ ContentType: or.Object.ContentType(), Etag: gor.Etag, - LastModified: or.Object.ModTime.Std(), + LastModified: api.TimeRFC3339(or.Object.ModTime.Std().Round(time.Second)), Range: &api.ContentRange{Offset: 1, Length: 1, Size: int64(len(data))}, Size: int64(len(data)), Metadata: gor.Metadata, diff --git a/worker/client/client.go b/worker/client/client.go index aecad676b..71fd200ad 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -332,7 +332,7 @@ func parseObjectResponseHeaders(header http.Header) (api.HeadObjectResponse, err return api.HeadObjectResponse{ ContentType: header.Get("Content-Type"), Etag: trimEtag(header.Get("ETag")), - LastModified: modTime, + LastModified: api.TimeRFC3339(modTime), Range: r, Size: size, Metadata: api.ExtractObjectUserMetadataFrom(headers), diff --git a/worker/s3/backend.go b/worker/s3/backend.go index 2eff3d713..d69eaff17 100644 --- a/worker/s3/backend.go +++ b/worker/s3/backend.go @@ -278,7 +278,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range // decorate metadata res.Metadata["Content-Type"] = res.ContentType - res.Metadata["Last-Modified"] = res.LastModified.Format(http.TimeFormat) + res.Metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes etag, err := hex.DecodeString(res.Etag) @@ -323,7 +323,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, objectName string) (*go // decorate metadata metadata["Content-Type"] = res.ContentType - metadata["Last-Modified"] = res.LastModified.Format(http.TimeFormat) + metadata["Last-Modified"] = res.LastModified.Std().Format(http.TimeFormat) // etag to bytes hash, err := hex.DecodeString(res.Etag) diff --git a/worker/serve.go b/worker/serve.go index 9c02b9d5e..31c347ff7 100644 --- a/worker/serve.go +++ b/worker/serve.go @@ -68,5 +68,5 @@ func serveContent(rw http.ResponseWriter, req *http.Request, name string, conten // create a content reader rs := newContentReader(content, hor.Size, hor.Range.Offset) - http.ServeContent(rw, req, name, hor.LastModified, rs) + http.ServeContent(rw, req, name, hor.LastModified.Std(), rs) } diff --git a/worker/worker.go b/worker/worker.go index 300b09d42..68bb509e7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1619,7 +1619,7 @@ func (w *worker) headObject(ctx context.Context, bucket, path string, onlyMetada return &api.HeadObjectResponse{ ContentType: res.Object.MimeType, Etag: res.Object.ETag, - LastModified: res.Object.ModTime.Std(), + LastModified: res.Object.ModTime, Range: opts.Range.ContentRange(res.Object.Size), Size: res.Object.Size, Metadata: res.Object.Metadata, From 5e05f7751720fc30e30b1cefa98cf2580a2bd964 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 10:20:18 +0200 Subject: [PATCH 05/14] worker: implement UploadObject --- worker/upload.go | 3 +- worker/upload_params.go | 29 ++++---- worker/upload_test.go | 15 ++-- worker/worker.go | 151 ++++++++++++++++++++++------------------ 4 files changed, 111 insertions(+), 87 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 4e82f533e..65322baa9 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -154,8 +154,9 @@ func (w *worker) initUploadManager(maxMemory, maxOverdrive uint64, overdriveTime w.uploadManager = newUploadManager(w.shutdownCtx, w, mm, w.bus, w.bus, w.bus, maxOverdrive, overdriveTimeout, w.contractLockingDuration, logger) } -func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, opts ...UploadOption) (_ string, err error) { +func (w *worker) upload(ctx context.Context, bucket, path string, r io.Reader, contracts []api.ContractMetadata, opts ...UploadOption) (_ string, err error) { // apply the options + up := defaultParameters(bucket, path) for _, opt := range opts { opt(&up) } diff --git a/worker/upload_params.go b/worker/upload_params.go index d3cca49e4..ae8baa8d0 100644 --- a/worker/upload_params.go +++ b/worker/upload_params.go @@ -38,22 +38,6 @@ func defaultParameters(bucket, path string) uploadParameters { } } -func multipartParameters(bucket, path, uploadID string, partNumber int) uploadParameters { - return uploadParameters{ - bucket: bucket, - path: path, - - multipart: true, - uploadID: uploadID, - partNumber: partNumber, - - ec: object.GenerateEncryptionKey(), // random key - encryptionOffset: 0, // from the beginning - - rs: build.DefaultRedundancySettings, - } -} - type UploadOption func(*uploadParameters) func WithBlockHeight(bh uint64) UploadOption { @@ -92,12 +76,25 @@ func WithPacking(packing bool) UploadOption { } } +func WithPartNumber(partNumber int) UploadOption { + return func(up *uploadParameters) { + up.partNumber = partNumber + } +} + func WithRedundancySettings(rs api.RedundancySettings) UploadOption { return func(up *uploadParameters) { up.rs = rs } } +func WithUploadID(uploadID string) UploadOption { + return func(up *uploadParameters) { + up.uploadID = uploadID + up.multipart = true + } +} + func WithObjectUserMetadata(metadata api.ObjectUserMetadata) UploadOption { return func(up *uploadParameters) { up.metadata = metadata diff --git a/worker/upload_test.go b/worker/upload_test.go index 0b6308ffe..4221a5414 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -220,7 +220,7 @@ func TestUploadPackedSlab(t *testing.T) { uploadBytes := func(n int) { t.Helper() params.path = fmt.Sprintf("%s_%d", t.Name(), c) - _, err := w.upload(context.Background(), bytes.NewReader(frand.Bytes(n)), w.Contracts(), params) + _, err := w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(frand.Bytes(n)), w.Contracts(), testOpts()...) if err != nil { t.Fatal(err) } @@ -505,7 +505,7 @@ func TestRefreshUploaders(t *testing.T) { // upload data contracts := w.Contracts() - _, err := w.upload(context.Background(), bytes.NewReader(data), contracts, params) + _, err := w.upload(context.Background(), params.bucket, t.Name(), bytes.NewReader(data), contracts) if err != nil { t.Fatal(err) } @@ -607,7 +607,7 @@ func TestUploadRegression(t *testing.T) { // upload data ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, err := w.upload(ctx, bytes.NewReader(data), w.Contracts(), params) + _, err := w.upload(ctx, params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if !errors.Is(err, errUploadInterrupted) { t.Fatal(err) } @@ -616,7 +616,7 @@ func TestUploadRegression(t *testing.T) { unblock() // upload data - _, err = w.upload(context.Background(), bytes.NewReader(data), w.Contracts(), params) + _, err = w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(data), w.Contracts(), testOpts()...) if err != nil { t.Fatal(err) } @@ -637,6 +637,13 @@ func TestUploadRegression(t *testing.T) { } } +func testOpts() []UploadOption { + return []UploadOption{ + WithContractSet(testContractSet), + WithRedundancySettings(testRedundancySettings), + } +} + func testParameters(path string) uploadParameters { return uploadParameters{ bucket: testBucket, diff --git a/worker/worker.go b/worker/worker.go index 68bb509e7..46047d091 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1020,18 +1020,10 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the mimetype from the query string @@ -1046,35 +1038,12 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { - return - } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } @@ -1086,40 +1055,28 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } } - // build options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithMimeType(mimeType), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithObjectUserMetadata(metadata), - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { - return - } - // upload the object - params := defaultParameters(bucket, path) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) - if err := jc.Check("couldn't upload object", err); err != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err)) - } - } + resp, err := w.UploadObject(ctx, jc.Request.Body, bucket, path, api.UploadObjectOptions{ + MinShards: minShards, + TotalShards: totalShards, + ContractSet: contractset, + ContentLength: jc.Request.ContentLength, + MimeType: mimeType, + Metadata: metadata, + }) + if utils.IsErr(err, api.ErrBucketNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { @@ -1224,6 +1181,8 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { WithPacking(up.UploadPacking), WithRedundancySettings(up.RedundancySettings), WithCustomKey(upload.Key), + WithPartNumber(partNumber), + WithUploadID(uploadID), } // make sure only one of the following is set @@ -1244,8 +1203,7 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { } // upload the multipart - params := multipartParameters(bucket, path, uploadID, partNumber) - eTag, err := w.upload(ctx, jc.Request.Body, contracts, params, opts...) + eTag, err := w.upload(ctx, bucket, path, jc.Request.Body, contracts, opts...) if jc.Check("couldn't upload object", err) != nil { if err != nil { w.logger.Error(err) @@ -1695,8 +1653,69 @@ func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.H } func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { - panic("not implemented") + // return early if the bucket does not exist + _, err := w.bus.Bucket(ctx, bucket) + if err != nil { + return nil, fmt.Errorf("bucket '%s' not found; %w", bucket, err) + } + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) + } else if opts.ContractSet != "" { + up.ContractSet = opts.ContractSet + } else if up.ContractSet == "" { + return nil, api.ErrContractSetNotSpecified + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + return nil, api.ErrConsensusNotSynced + } + + // allow overriding the redundancy settings + if opts.MinShards != 0 { + up.RedundancySettings.MinShards = opts.MinShards + } + if opts.TotalShards != 0 { + up.RedundancySettings.TotalShards = opts.TotalShards + } + err = api.RedundancySettings{MinShards: opts.MinShards, TotalShards: opts.TotalShards}.Validate() + if err != nil { + return nil, fmt.Errorf("invalid redundancy settings: %w", err) + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithMimeType(opts.MimeType), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithObjectUserMetadata(opts.Metadata), + }...) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, opts.MimeType, up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadObjectResponse{ + ETag: eTag, + }, nil } + func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { panic("not implemented") } From 1c01b3994d4cb3a30e142bcb9c31932324ea6890 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 10:42:44 +0200 Subject: [PATCH 06/14] worker: fix TestRefreshUploaders and TestUploadPackedSlabs --- worker/upload_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/worker/upload_test.go b/worker/upload_test.go index 4221a5414..477c1a72c 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -140,6 +140,8 @@ func TestUploadPackedSlab(t *testing.T) { // create upload params params := testParameters(t.Name()) params.packing = true + opts := testOpts() + opts = append(opts, WithPacking(true)) // create test data data := frand.Bytes(128) @@ -220,7 +222,7 @@ func TestUploadPackedSlab(t *testing.T) { uploadBytes := func(n int) { t.Helper() params.path = fmt.Sprintf("%s_%d", t.Name(), c) - _, err := w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(frand.Bytes(n)), w.Contracts(), testOpts()...) + _, err := w.upload(context.Background(), params.bucket, params.path, bytes.NewReader(frand.Bytes(n)), w.Contracts(), opts...) if err != nil { t.Fatal(err) } @@ -502,10 +504,11 @@ func TestRefreshUploaders(t *testing.T) { // create upload params params := testParameters(t.Name()) + opts := testOpts() // upload data contracts := w.Contracts() - _, err := w.upload(context.Background(), params.bucket, t.Name(), bytes.NewReader(data), contracts) + _, err := w.upload(context.Background(), params.bucket, t.Name(), bytes.NewReader(data), contracts, opts...) if err != nil { t.Fatal(err) } From 5cd675a7895339100891b826c55c354ea108f690 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 11:55:28 +0200 Subject: [PATCH 07/14] e2e: fix TestObjectEntries --- worker/worker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index 46047d091..1cc956484 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1073,6 +1073,8 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { } else if utils.IsErr(err, api.ErrConsensusNotSynced) { jc.Error(err, http.StatusServiceUnavailable) return + } else if jc.Check("couldn't upload object", err) != nil { + return } // set etag header @@ -1681,7 +1683,7 @@ func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path str if opts.TotalShards != 0 { up.RedundancySettings.TotalShards = opts.TotalShards } - err = api.RedundancySettings{MinShards: opts.MinShards, TotalShards: opts.TotalShards}.Validate() + err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() if err != nil { return nil, fmt.Errorf("invalid redundancy settings: %w", err) } From 055a6e0e04e2f938d7997a49ebf4f7dde58a5477 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 12:13:32 +0200 Subject: [PATCH 08/14] e2e: fix TestObjecdtMetadata NDF --- internal/test/e2e/metadata_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index cc95413b4..3894dbdee 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "context" + "net/http" "reflect" "testing" "time" @@ -60,15 +61,22 @@ func TestObjectMetadata(t *testing.T) { t.Fatal("missing etag") } + // HeadObject retrieves the modtime from a http header so it's not as + // accurate as the modtime from the object GET endpoint which returns it in + // the body. + orModtime, err := time.Parse(http.TimeFormat, or.Object.ModTime.Std().Format(http.TimeFormat)) + if err != nil { + t.Fatal(err) + } + // perform a HEAD request and assert the headers are all present hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}}) - hor.LastModified = api.TimeRFC3339(hor.LastModified.Std().Round(time.Second)) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ ContentType: or.Object.ContentType(), Etag: gor.Etag, - LastModified: api.TimeRFC3339(or.Object.ModTime.Std().Round(time.Second)), + LastModified: api.TimeRFC3339(orModtime), Range: &api.ContentRange{Offset: 1, Length: 1, Size: int64(len(data))}, Size: int64(len(data)), Metadata: gor.Metadata, From 26e299966d4c7b4d29e06e423755f5bd85fed1d4 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 13:25:43 +0200 Subject: [PATCH 09/14] e2e: fix TestS3Basic --- api/object.go | 8 ++++---- api/worker.go | 12 ++++++------ internal/test/e2e/cluster_test.go | 6 +++--- internal/test/e2e/metadata_test.go | 2 +- worker/s3/backend.go | 2 +- worker/worker.go | 20 ++++++++++---------- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/api/object.go b/api/object.go index 9ab1d6372..41ab7ae62 100644 --- a/api/object.go +++ b/api/object.go @@ -208,12 +208,12 @@ type ( HeadObjectOptions struct { IgnoreDelim bool - Range DownloadRange + Range *DownloadRange } DownloadObjectOptions struct { GetObjectOptions - Range DownloadRange + Range *DownloadRange } GetObjectOptions struct { @@ -291,7 +291,7 @@ func (opts DownloadObjectOptions) ApplyValues(values url.Values) { } func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { @@ -313,7 +313,7 @@ func (opts HeadObjectOptions) Apply(values url.Values) { } func (opts HeadObjectOptions) ApplyHeaders(h http.Header) { - if opts.Range != (DownloadRange{}) { + if opts.Range != nil { if opts.Range.Length == -1 { h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset)) } else { diff --git a/api/worker.go b/api/worker.go index ec8be15d3..c0249135f 100644 --- a/api/worker.go +++ b/api/worker.go @@ -279,21 +279,21 @@ func ParseContentRange(contentRange string) (ContentRange, error) { }, nil } -func ParseDownloadRange(req *http.Request) (int64, int64, error) { +func ParseDownloadRange(req *http.Request) (DownloadRange, error) { // parse the request range // we pass math.MaxInt64 since a range header in a request doesn't have a // size ranges, err := http_range.ParseRange(req.Header.Get("Range"), math.MaxInt64) if err != nil { - return 0, 0, err + return DownloadRange{}, err } // extract requested offset and length - start, length := int64(0), int64(-1) + dr := DownloadRange{Offset: 0, Length: -1} if len(ranges) == 1 { - start, length = ranges[0].Start, ranges[0].Length + dr.Offset, dr.Length = ranges[0].Start, ranges[0].Length } else if len(ranges) > 1 { - return 0, 0, ErrMultiRangeNotSupported + return DownloadRange{}, ErrMultiRangeNotSupported } - return start, length, nil + return dr, nil } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index c197bc7c9..55db48c19 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -588,7 +588,7 @@ func TestUploadDownloadBasic(t *testing.T) { for i := int64(0); i < 4; i++ { offset := i * 32 var buffer bytes.Buffer - tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: 32}})) + tt.OK(w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: 32}})) if !bytes.Equal(data[offset:offset+32], buffer.Bytes()) { fmt.Println(data[offset : offset+32]) fmt.Println(buffer.Bytes()) @@ -1562,7 +1562,7 @@ func TestUploadPacking(t *testing.T) { &buffer, api.DefaultBucketName, path, - api.DownloadObjectOptions{Range: api.DownloadRange{Offset: offset, Length: length}}, + api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: offset, Length: length}}, ); err != nil { t.Fatal(err) } @@ -2131,7 +2131,7 @@ func TestMultipartUploads(t *testing.T) { } // Download a range of the object - gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: api.DownloadRange{Offset: 0, Length: 1}}) + gor, err = w.GetObject(context.Background(), api.DefaultBucketName, objPath, api.DownloadObjectOptions{Range: &api.DownloadRange{Offset: 0, Length: 1}}) tt.OK(err) if gor.Range == nil || gor.Range.Offset != 0 || gor.Range.Length != 1 { t.Fatal("unexpected range:", gor.Range) diff --git a/internal/test/e2e/metadata_test.go b/internal/test/e2e/metadata_test.go index 3894dbdee..4bb1ea2dd 100644 --- a/internal/test/e2e/metadata_test.go +++ b/internal/test/e2e/metadata_test.go @@ -70,7 +70,7 @@ func TestObjectMetadata(t *testing.T) { } // perform a HEAD request and assert the headers are all present - hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}}) + hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: &api.DownloadRange{Offset: 1, Length: 1}}) if err != nil { t.Fatal(err) } else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{ diff --git a/worker/s3/backend.go b/worker/s3/backend.go index d69eaff17..a8dd1cb22 100644 --- a/worker/s3/backend.go +++ b/worker/s3/backend.go @@ -250,7 +250,7 @@ func (s *s3) GetObject(ctx context.Context, bucketName, objectName string, range if rangeRequest.End >= 0 { length = rangeRequest.End - rangeRequest.Start + 1 } - opts.Range = api.DownloadRange{Offset: rangeRequest.Start, Length: length} + opts.Range = &api.DownloadRange{Offset: rangeRequest.Start, Length: length} } res, err := s.w.GetObject(ctx, bucketName, objectName, opts) diff --git a/worker/worker.go b/worker/worker.go index 1cc956484..ed6302f4a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -882,7 +882,7 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { return } - offset, length, err := api.ParseDownloadRange(jc.Request) + dr, err := api.ParseDownloadRange(jc.Request) if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { jc.Error(err, http.StatusBadRequest) return @@ -897,10 +897,7 @@ func (w *worker) objectsHandlerHEAD(jc jape.Context) { // fetch object metadata hor, err := w.HeadObject(jc.Request.Context(), bucket, path, api.HeadObjectOptions{ IgnoreDelim: ignoreDelim, - Range: api.DownloadRange{ - Offset: offset, - Length: length, - }, + Range: &dr, }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) @@ -979,7 +976,7 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { return } - offset, length, err := api.ParseDownloadRange(jc.Request) + dr, err := api.ParseDownloadRange(jc.Request) if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, api.ErrMultiRangeNotSupported) { jc.Error(err, http.StatusBadRequest) return @@ -993,10 +990,7 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { gor, err := w.GetObject(ctx, bucket, path, api.DownloadObjectOptions{ GetObjectOptions: opts, - Range: api.DownloadRange{ - Offset: offset, - Length: length, - }, + Range: &dr, }) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) @@ -1567,6 +1561,9 @@ func (w *worker) headObject(ctx context.Context, bucket, path string, onlyMetada } // adjust length + if opts.Range == nil { + opts.Range = &api.DownloadRange{Offset: 0, Length: -1} + } if opts.Range.Length == -1 { opts.Range.Length = res.Object.Size - opts.Range.Offset } @@ -1598,6 +1595,9 @@ func (w *worker) GetObject(ctx context.Context, bucket, path string, opts api.Do obj := *res.Object.Object // adjust range + if opts.Range == nil { + opts.Range = &api.DownloadRange{} + } opts.Range.Offset = hor.Range.Offset opts.Range.Length = hor.Range.Length From 6f352b1bf9e931648292822eac9616bddc90f53f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 14:16:09 +0200 Subject: [PATCH 10/14] worker: implement UploadMultipartUploadPart --- api/multipart.go | 5 ++ api/object.go | 16 +++- api/setting.go | 10 ++- worker/worker.go | 196 +++++++++++++++++++++++++++-------------------- 4 files changed, 135 insertions(+), 92 deletions(-) diff --git a/api/multipart.go b/api/multipart.go index ee26567b1..ecd19789f 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -7,6 +7,11 @@ import ( ) var ( + // ErrInvalidMultipartEncryptionSettings is returned if the multipart upload + // has an invalid combination of encryption params. e.g. when encryption is + // enabled but not offset is set. + ErrInvalidMultipartEncryptionSettings = errors.New("invalid multipart encryption settings") + // ErrMultipartUploadNotFound is returned if the specified multipart upload // wasn't found. ErrMultipartUploadNotFound = errors.New("multipart upload not found") diff --git a/api/object.go b/api/object.go index 41ab7ae62..91332eec7 100644 --- a/api/object.go +++ b/api/object.go @@ -241,7 +241,6 @@ type ( // UploadObjectOptions is the options type for the worker client. UploadObjectOptions struct { - Offset int MinShards int TotalShards int ContractSet string @@ -251,15 +250,15 @@ type ( } UploadMultipartUploadPartOptions struct { + ContractSet string + MinShards int + TotalShards int EncryptionOffset *int ContentLength int64 } ) func (opts UploadObjectOptions) ApplyValues(values url.Values) { - if opts.Offset != 0 { - values.Set("offset", fmt.Sprint(opts.Offset)) - } if opts.MinShards != 0 { values.Set("minshards", fmt.Sprint(opts.MinShards)) } @@ -284,6 +283,15 @@ func (opts UploadMultipartUploadPartOptions) Apply(values url.Values) { if opts.EncryptionOffset != nil { values.Set("offset", fmt.Sprint(*opts.EncryptionOffset)) } + if opts.MinShards != 0 { + values.Set("minshards", fmt.Sprint(opts.MinShards)) + } + if opts.TotalShards != 0 { + values.Set("totalshards", fmt.Sprint(opts.TotalShards)) + } + if opts.ContractSet != "" { + values.Set("contractset", opts.ContractSet) + } } func (opts DownloadObjectOptions) ApplyValues(values url.Values) { diff --git a/api/setting.go b/api/setting.go index 0c0057410..923863e58 100644 --- a/api/setting.go +++ b/api/setting.go @@ -24,6 +24,10 @@ const ( ) var ( + // ErrInvalidRedundancySettings is returned if the redundancy settings are + // not valid + ErrInvalidRedundancySettings = errors.New("invalid redundancy settings") + // ErrSettingNotFound is returned if a requested setting is not present in the // database. ErrSettingNotFound = errors.New("setting not found") @@ -136,13 +140,13 @@ func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { // valid. func (rs RedundancySettings) Validate() error { if rs.MinShards < 1 { - return errors.New("MinShards must be greater than 0") + return fmt.Errorf("%w: MinShards must be greater than 0", ErrInvalidRedundancySettings) } if rs.TotalShards < rs.MinShards { - return errors.New("TotalShards must be at least MinShards") + return fmt.Errorf("%w: TotalShards must be at least MinShards", ErrInvalidRedundancySettings) } if rs.TotalShards > 255 { - return errors.New("TotalShards must be less than 256") + return fmt.Errorf("%w: TotalShards must be less than 256", ErrInvalidRedundancySettings) } return nil } diff --git a/worker/worker.go b/worker/worker.go index ed6302f4a..ad3ae8a5e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1058,7 +1058,10 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) { MimeType: mimeType, Metadata: metadata, }) - if utils.IsErr(err, api.ErrBucketNotFound) { + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { jc.Error(err, http.StatusNotFound) return } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { @@ -1082,34 +1085,10 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { // grab the path path := jc.PathParam("path") - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if jc.Check("couldn't fetch upload parameters from bus", err) != nil { - return - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // cancel the upload if no contract set is specified - if up.ContractSet == "" { - jc.Error(api.ErrContractSetNotSpecified, http.StatusBadRequest) - return - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - w.logger.Errorf("upload cancelled, err: %v", api.ErrConsensusNotSynced) - jc.Error(api.ErrConsensusNotSynced, http.StatusServiceUnavailable) - return - } - // decode the contract set from the query string var contractset string if jc.DecodeForm("contractset", &contractset) != nil { return - } else if contractset != "" { - up.ContractSet = contractset } // decode the bucket from the query string @@ -1118,13 +1097,6 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } - // return early if the bucket does not exist - _, err = w.bus.Bucket(ctx, bucket) - if utils.IsErr(err, api.ErrBucketNotFound) { - jc.Error(fmt.Errorf("bucket '%s' not found; %w", bucket, err), http.StatusNotFound) - return - } - // decode the upload id var uploadID string if jc.DecodeForm("uploadid", &uploadID) != nil { @@ -1141,14 +1113,11 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { } // allow overriding the redundancy settings - rs := up.RedundancySettings - if jc.DecodeForm("minshards", &rs.MinShards) != nil { - return - } - if jc.DecodeForm("totalshards", &rs.TotalShards) != nil { + var minShards, totalShards int + if jc.DecodeForm("minshards", &minShards) != nil { return } - if jc.Check("invalid redundancy settings", rs.Validate()) != nil { + if jc.DecodeForm("totalshards", &totalShards) != nil { return } @@ -1156,62 +1125,40 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { var offset int if jc.DecodeForm("offset", &offset) != nil { return - } else if offset < 0 { - jc.Error(errors.New("offset must be positive"), http.StatusBadRequest) - return } - // fetch upload from bus - upload, err := w.bus.MultipartUpload(ctx, uploadID) - if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + // upload the multipart + resp, err := w.UploadMultipartUploadPart(ctx, jc.Request.Body, bucket, path, uploadID, partNumber, api.UploadMultipartUploadPartOptions{ + ContractSet: contractset, + MinShards: minShards, + TotalShards: totalShards, + EncryptionOffset: nil, + ContentLength: jc.Request.ContentLength, + }) + if utils.IsErr(err, api.ErrInvalidRedundancySettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if utils.IsErr(err, api.ErrBucketNotFound) { jc.Error(err, http.StatusNotFound) return - } else if jc.Check("failed to fetch multipart upload", err) != nil { + } else if utils.IsErr(err, api.ErrContractSetNotSpecified) { + jc.Error(err, http.StatusBadRequest) return - } - - // built options - opts := []UploadOption{ - WithBlockHeight(up.CurrentHeight), - WithContractSet(up.ContractSet), - WithPacking(up.UploadPacking), - WithRedundancySettings(up.RedundancySettings), - WithCustomKey(upload.Key), - WithPartNumber(partNumber), - WithUploadID(uploadID), - } - - // make sure only one of the following is set - if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && jc.Request.FormValue("offset") == "" { - jc.Error(errors.New("if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set"), http.StatusBadRequest) + } else if utils.IsErr(err, api.ErrConsensusNotSynced) { + jc.Error(err, http.StatusServiceUnavailable) return - } else if encryptionEnabled { - opts = append(opts, WithCustomEncryptionOffset(uint64(offset))) - } - - // attach gouging checker to the context - ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) - - // fetch contracts - contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) - if jc.Check("couldn't fetch contracts from bus", err) != nil { + } else if utils.IsErr(err, api.ErrMultipartUploadNotFound) { + jc.Error(err, http.StatusNotFound) return - } - - // upload the multipart - eTag, err := w.upload(ctx, bucket, path, jc.Request.Body, contracts, opts...) - if jc.Check("couldn't upload object", err) != nil { - if err != nil { - w.logger.Error(err) - if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) { - w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err)) - } - } + } else if utils.IsErr(err, api.ErrInvalidMultipartEncryptionSettings) { + jc.Error(err, http.StatusBadRequest) + return + } else if jc.Check("couldn't upload multipart part", err) != nil { return } // set etag header - jc.ResponseWriter.Header().Set("ETag", api.FormatETag(eTag)) + jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } func (w *worker) objectsHandlerDELETE(jc jape.Context) { @@ -1685,7 +1632,7 @@ func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path str } err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() if err != nil { - return nil, fmt.Errorf("invalid redundancy settings: %w", err) + return nil, err } // attach gouging checker to the context @@ -1719,5 +1666,84 @@ func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path str } func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { - panic("not implemented") + // return early if the bucket does not exist + _, err := w.bus.Bucket(ctx, bucket) + if err != nil { + return nil, fmt.Errorf("bucket '%s' not found; %w", bucket, err) + } + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) + } else if opts.ContractSet != "" { + up.ContractSet = opts.ContractSet + } else if up.ContractSet == "" { + return nil, api.ErrContractSetNotSpecified + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + return nil, api.ErrConsensusNotSynced + } + + // allow overriding the redundancy settings + if opts.MinShards != 0 { + up.RedundancySettings.MinShards = opts.MinShards + } + if opts.TotalShards != 0 { + up.RedundancySettings.TotalShards = opts.TotalShards + } + err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + if err != nil { + return nil, err + } + + // fetch upload from bus + upload, err := w.bus.MultipartUpload(ctx, uploadID) + if err != nil { + return nil, fmt.Errorf("couldn't fetch multipart upload: %w", err) + } + + // attach gouging checker to the context + ctx = WithGougingChecker(ctx, w.bus, up.GougingParams) + + // prepare opts + uploadOpts := []UploadOption{ + WithBlockHeight(up.CurrentHeight), + WithContractSet(up.ContractSet), + WithPacking(up.UploadPacking), + WithRedundancySettings(up.RedundancySettings), + WithCustomKey(upload.Key), + WithPartNumber(partNumber), + WithUploadID(uploadID), + } + + // make sure only one of the following is set + if encryptionEnabled := !upload.Key.IsNoopKey(); encryptionEnabled && opts.EncryptionOffset == nil { + return nil, fmt.Errorf("%w: if object encryption (pre-erasure coding) wasn't disabled by creating the multipart upload with the no-op key, the offset needs to be set", api.ErrInvalidMultipartEncryptionSettings) + } else if opts.EncryptionOffset != nil && *opts.EncryptionOffset < 0 { + return nil, fmt.Errorf("%w: encryption offset must be positive", api.ErrInvalidMultipartEncryptionSettings) + } else if encryptionEnabled { + uploadOpts = append(uploadOpts, WithCustomEncryptionOffset(uint64(*opts.EncryptionOffset))) + } + + // fetch contracts + contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: up.ContractSet}) + if err != nil { + return nil, fmt.Errorf("couldn't fetch contracts from bus: %w", err) + } + + // upload + eTag, err := w.upload(ctx, bucket, path, r, contracts, uploadOpts...) + if err != nil { + w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") + if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { + w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", up.RedundancySettings.MinShards, up.RedundancySettings.TotalShards, len(contracts), up.UploadPacking, false, err)) + } + return nil, fmt.Errorf("couldn't upload object: %w", err) + } + return &api.UploadMultipartUploadPartResponse{ + ETag: eTag, + }, nil } From 35d583d9e6d2de13ec368cf9ee1e97670f2d3e0f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 14:34:24 +0200 Subject: [PATCH 11/14] e2e: fix TestMultipartUploads --- worker/worker.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index ad3ae8a5e..7dbbd233e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1121,20 +1121,25 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { return } + // prepare options + opts := api.UploadMultipartUploadPartOptions{ + ContractSet: contractset, + MinShards: minShards, + TotalShards: totalShards, + EncryptionOffset: nil, + ContentLength: jc.Request.ContentLength, + } + // get the offset var offset int if jc.DecodeForm("offset", &offset) != nil { return + } else if jc.Request.FormValue("offset") != "" { + opts.EncryptionOffset = &offset } // upload the multipart - resp, err := w.UploadMultipartUploadPart(ctx, jc.Request.Body, bucket, path, uploadID, partNumber, api.UploadMultipartUploadPartOptions{ - ContractSet: contractset, - MinShards: minShards, - TotalShards: totalShards, - EncryptionOffset: nil, - ContentLength: jc.Request.ContentLength, - }) + resp, err := w.UploadMultipartUploadPart(ctx, jc.Request.Body, bucket, path, uploadID, partNumber, opts) if utils.IsErr(err, api.ErrInvalidRedundancySettings) { jc.Error(err, http.StatusBadRequest) return From 59727fbde7f3cbb04accc97eafb710297f847ba2 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 5 Apr 2024 14:45:27 +0200 Subject: [PATCH 12/14] go mod tidy --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 73169eae2..260117032 100644 --- a/go.sum +++ b/go.sum @@ -243,8 +243,6 @@ gitlab.com/NebulousLabs/threadgroup v0.0.0-20200608151952-38921fbef213/go.mod h1 gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20200618142844-c59a90f49130/go.mod h1:SxigdS5Q1ui+OMgGAXt1E/Fg3RB6PvKXMov2O3gvIzs= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= -go.sia.tech/core v0.2.2-0.20240325122830-e781eaa57d37 h1:jsiab6uAUkaeDL7XEseAxJw7NVhxLNoU2WaB0AHbgG8= -go.sia.tech/core v0.2.2-0.20240325122830-e781eaa57d37/go.mod h1:Zk7HaybEPgkPC1p6e6tTQr8PIeZClTgNcLNGYDLQJeE= go.sia.tech/core v0.2.2-0.20240404003127-f4248250d041 h1:3tgQlTmop/OU5dTHnBmAdNIPgae67wRijaknBhmAOCg= go.sia.tech/core v0.2.2-0.20240404003127-f4248250d041/go.mod h1:Zk7HaybEPgkPC1p6e6tTQr8PIeZClTgNcLNGYDLQJeE= go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y= From 3243333c27ba334ad61bf1f71328e2509809c984 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 15 Apr 2024 14:08:38 +0200 Subject: [PATCH 13/14] worker: address comments --- api/worker.go | 7 +++---- worker/worker.go | 4 ++-- worker/worker_test.go | 9 --------- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/api/worker.go b/api/worker.go index c0249135f..2908802f7 100644 --- a/api/worker.go +++ b/api/worker.go @@ -27,7 +27,7 @@ var ( // be scanned since it is on a private network. ErrHostOnPrivateNetwork = errors.New("host is on a private network") - // ErrMultiRangeNotSupported is returned by the worker API when a requesta + // ErrMultiRangeNotSupported is returned by the worker API when a request // tries to download multiple ranges at once. ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported") ) @@ -280,9 +280,8 @@ func ParseContentRange(contentRange string) (ContentRange, error) { } func ParseDownloadRange(req *http.Request) (DownloadRange, error) { - // parse the request range - // we pass math.MaxInt64 since a range header in a request doesn't have a - // size + // parse the request range we pass math.MaxInt64 since a range header in a + // request doesn't have a size ranges, err := http_range.ParseRange(req.Header.Get("Range"), math.MaxInt64) if err != nil { return DownloadRange{}, err diff --git a/worker/worker.go b/worker/worker.go index 7dbbd233e..a79eeb2f3 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1650,14 +1650,14 @@ func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path str } // upload - eTag, err := w.upload(ctx, bucket, path, r, contracts, []UploadOption{ + eTag, err := w.upload(ctx, bucket, path, r, contracts, WithBlockHeight(up.CurrentHeight), WithContractSet(up.ContractSet), WithMimeType(opts.MimeType), WithPacking(up.UploadPacking), WithRedundancySettings(up.RedundancySettings), WithObjectUserMetadata(opts.Metadata), - }...) + ) if err != nil { w.logger.With(zap.Error(err)).With("path", path).With("bucket", bucket).Error("failed to upload object") if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errUploadInterrupted) && !errors.Is(err, context.Canceled) { diff --git a/worker/worker_test.go b/worker/worker_test.go index 637168950..706fae14e 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -3,10 +3,8 @@ package worker import ( "context" "fmt" - "testing" "time" - "github.com/gotd/contrib/http_range" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" @@ -136,10 +134,3 @@ func newTestSector() (*[rhpv2.SectorSize]byte, types.Hash256) { frand.Read(sector[:]) return §or, rhpv2.SectorRoot(§or) } - -func TestFoo(t *testing.T) { - fmt.Println(http_range.Range{ - Start: 1, - Length: 2, - }.ContentRange(100)) -} From 7385035adbe05a01a34598c3e513e4f425ef898f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 15 Apr 2024 14:23:54 +0200 Subject: [PATCH 14/14] worker: remove code duplication --- worker/worker.go | 98 ++++++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 58 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index a79eeb2f3..cd3d545eb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1607,35 +1607,8 @@ func (w *worker) HeadObject(ctx context.Context, bucket, path string, opts api.H } func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { - // return early if the bucket does not exist - _, err := w.bus.Bucket(ctx, bucket) - if err != nil { - return nil, fmt.Errorf("bucket '%s' not found; %w", bucket, err) - } - - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if err != nil { - return nil, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) - } else if opts.ContractSet != "" { - up.ContractSet = opts.ContractSet - } else if up.ContractSet == "" { - return nil, api.ErrContractSetNotSpecified - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - return nil, api.ErrConsensusNotSynced - } - - // allow overriding the redundancy settings - if opts.MinShards != 0 { - up.RedundancySettings.MinShards = opts.MinShards - } - if opts.TotalShards != 0 { - up.RedundancySettings.TotalShards = opts.TotalShards - } - err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) if err != nil { return nil, err } @@ -1671,35 +1644,8 @@ func (w *worker) UploadObject(ctx context.Context, r io.Reader, bucket, path str } func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { - // return early if the bucket does not exist - _, err := w.bus.Bucket(ctx, bucket) - if err != nil { - return nil, fmt.Errorf("bucket '%s' not found; %w", bucket, err) - } - - // fetch the upload parameters - up, err := w.bus.UploadParams(ctx) - if err != nil { - return nil, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) - } else if opts.ContractSet != "" { - up.ContractSet = opts.ContractSet - } else if up.ContractSet == "" { - return nil, api.ErrContractSetNotSpecified - } - - // cancel the upload if consensus is not synced - if !up.ConsensusState.Synced { - return nil, api.ErrConsensusNotSynced - } - - // allow overriding the redundancy settings - if opts.MinShards != 0 { - up.RedundancySettings.MinShards = opts.MinShards - } - if opts.TotalShards != 0 { - up.RedundancySettings.TotalShards = opts.TotalShards - } - err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + // prepare upload params + up, err := w.prepareUploadParams(ctx, bucket, opts.ContractSet, opts.MinShards, opts.TotalShards) if err != nil { return nil, err } @@ -1752,3 +1698,39 @@ func (w *worker) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc ETag: eTag, }, nil } + +func (w *worker) prepareUploadParams(ctx context.Context, bucket string, contractSet string, minShards, totalShards int) (api.UploadParams, error) { + // return early if the bucket does not exist + _, err := w.bus.Bucket(ctx, bucket) + if err != nil { + return api.UploadParams{}, fmt.Errorf("bucket '%s' not found; %w", bucket, err) + } + + // fetch the upload parameters + up, err := w.bus.UploadParams(ctx) + if err != nil { + return api.UploadParams{}, fmt.Errorf("couldn't fetch upload parameters from bus: %w", err) + } else if contractSet != "" { + up.ContractSet = contractSet + } else if up.ContractSet == "" { + return api.UploadParams{}, api.ErrContractSetNotSpecified + } + + // cancel the upload if consensus is not synced + if !up.ConsensusState.Synced { + return api.UploadParams{}, api.ErrConsensusNotSynced + } + + // allow overriding the redundancy settings + if minShards != 0 { + up.RedundancySettings.MinShards = minShards + } + if totalShards != 0 { + up.RedundancySettings.TotalShards = totalShards + } + err = api.RedundancySettings{MinShards: up.RedundancySettings.MinShards, TotalShards: up.RedundancySettings.TotalShards}.Validate() + if err != nil { + return api.UploadParams{}, err + } + return up, nil +}